Implement Throttle filter (fails because not executed on runtime)

This commit is contained in:
Tangent 2018-04-14 04:45:35 -04:00
parent 9123d63343
commit e77a3d0e98
4 changed files with 33 additions and 14 deletions

View file

@ -1,9 +1,10 @@
use std::time::Instant;
use std::time::{Duration, Instant};
use futures::Async;
use futures::stream::Stream;
use futures::prelude::*;
use tokio::timer::Delay;
use chunk::Chunk;
use error::WebmetroError;
pub struct ChunkTimecodeFixer<S> {
stream: S,
@ -86,16 +87,29 @@ impl<S: Stream<Item = Chunk>> Stream for StartingPointFinder<S>
pub struct Throttle<S> {
stream: S,
start_time: Instant
start_time: Instant,
sleep: Delay
}
impl<S: Stream<Item = Chunk>> Stream for Throttle<S>
impl<S: Stream<Item = Chunk, Error = WebmetroError>> Stream for Throttle<S>
{
type Item = S::Item;
type Error = S::Error;
type Error = WebmetroError;
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
self.stream.poll()
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, WebmetroError> {
match self.sleep.poll() {
Err(err) => return Err(WebmetroError::Unknown(Box::new(err))),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(())) => { /* can continue */ }
}
let next_chunk = self.stream.poll();
if let Ok(Async::Ready(Some(Chunk::ClusterHead(ref cluster_head)))) = next_chunk {
// snooze until real time has "caught up" to the stream
let offset = Duration::from_millis(cluster_head.end);
self.sleep.reset(self.start_time + offset);
}
next_chunk
}
}
@ -118,9 +132,11 @@ pub trait ChunkStream where Self : Sized + Stream<Item = Chunk> {
}
fn throttle(self) -> Throttle<Self> {
let now = Instant::now();
Throttle {
stream: self,
start_time: Instant::now()
start_time: now,
sleep: Delay::new(now)
}
}
}