From e77a3d0e985e79b0a83dd2b004fb7584fdd56033 Mon Sep 17 00:00:00 2001
From: Tangent 128 <Tangent128@gmail.com>
Date: Sat, 14 Apr 2018 04:45:35 -0400
Subject: [PATCH] Implement Throttle filter (fails because not executed on
 runtime)

---
 src/commands/filter.rs | 11 ++++++-----
 src/fixers.rs          | 34 +++++++++++++++++++++++++---------
 src/lib.rs             |  1 +
 src/main.rs            |  1 +
 4 files changed, 33 insertions(+), 14 deletions(-)

diff --git a/src/commands/filter.rs b/src/commands/filter.rs
index 663a2bb..4e6d7bd 100644
--- a/src/commands/filter.rs
+++ b/src/commands/filter.rs
@@ -13,6 +13,7 @@ use webmetro::{
         Chunk,
         WebmStream
     },
+    error::WebmetroError,
     fixers::ChunkStream,
     stream_parser::StreamEbml
 };
@@ -28,11 +29,10 @@ pub fn options() -> App<'static, 'static> {
 pub fn run(args: &ArgMatches) -> Result<(), Box<Error>> {
 
     let stdin = io::stdin();
-    let mut chunk_stream: Box<Stream<Item = Chunk, Error = Box<Error>>> = Box::new(
+    let mut chunk_stream: Box<Stream<Item = Chunk, Error = WebmetroError>> = Box::new(
         StdinStream::new(stdin.lock())
         .parse_ebml()
         .chunk_webm()
-        .map_err(|err| Box::new(err) as Box<Error>)
         .fix_timecodes()
     );
 
@@ -40,8 +40,9 @@ pub fn run(args: &ArgMatches) -> Result<(), Box<Error>> {
         chunk_stream = Box::new(chunk_stream.throttle());
     }
 
-    let result = chunk_stream.fold((), |_, chunk| {
+    chunk_stream.fold((), |_, chunk| {
         io::stdout().write_all(chunk.as_ref())
-    }).wait();
-    result
+    }).wait()?;
+
+    Ok(())
 }
diff --git a/src/fixers.rs b/src/fixers.rs
index 5925c26..dc6fd24 100644
--- a/src/fixers.rs
+++ b/src/fixers.rs
@@ -1,9 +1,10 @@
-use std::time::Instant;
+use std::time::{Duration, Instant};
 
-use futures::Async;
-use futures::stream::Stream;
+use futures::prelude::*;
+use tokio::timer::Delay;
 
 use chunk::Chunk;
+use error::WebmetroError;
 
 pub struct ChunkTimecodeFixer<S> {
     stream: S,
@@ -86,16 +87,29 @@ impl<S: Stream<Item = Chunk>> Stream for StartingPointFinder<S>
 
 pub struct Throttle<S> {
     stream: S,
-    start_time: Instant
+    start_time: Instant,
+    sleep: Delay
 }
 
-impl<S: Stream<Item = Chunk>> Stream for Throttle<S>
+impl<S: Stream<Item = Chunk, Error = WebmetroError>> Stream for Throttle<S>
 {
     type Item = S::Item;
-    type Error = S::Error;
+    type Error = WebmetroError;
 
-    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
-        self.stream.poll()
+    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, WebmetroError> {
+        match self.sleep.poll() {
+            Err(err) => return Err(WebmetroError::Unknown(Box::new(err))),
+            Ok(Async::NotReady) => return Ok(Async::NotReady),
+            Ok(Async::Ready(())) => { /* can continue */ }
+        }
+
+        let next_chunk = self.stream.poll();
+        if let Ok(Async::Ready(Some(Chunk::ClusterHead(ref cluster_head)))) = next_chunk {
+            // snooze until real time has "caught up" to the stream
+            let offset = Duration::from_millis(cluster_head.end);
+            self.sleep.reset(self.start_time + offset);
+        }
+        next_chunk
     }
 }
 
@@ -118,9 +132,11 @@ pub trait ChunkStream where Self : Sized + Stream<Item = Chunk> {
     }
 
     fn throttle(self) -> Throttle<Self> {
+        let now = Instant::now();
         Throttle {
             stream: self,
-            start_time: Instant::now()
+            start_time: now,
+            sleep: Delay::new(now)
         }
     }
 }
diff --git a/src/lib.rs b/src/lib.rs
index 1f064cb..b15d184 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -2,6 +2,7 @@
 extern crate bytes;
 extern crate futures;
 extern crate odds;
+extern crate tokio;
 
 pub mod ebml;
 pub mod error;
diff --git a/src/main.rs b/src/main.rs
index d2f70d8..eb6d5ab 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,6 +1,7 @@
 #[macro_use] extern crate clap;
 extern crate futures;
 extern crate hyper;
+extern crate tokio;
 extern crate webmetro;
 
 mod commands;