From 6434db0f82bacd0e466315831a6f8b3a2914555f Mon Sep 17 00:00:00 2001
From: Tangent 128 <Tangent128@gmail.com>
Date: Thu, 12 Apr 2018 23:29:12 -0400
Subject: [PATCH] Stub out throttle filter

---
 src/commands/filter.rs | 13 ++++++++++---
 src/fixers.rs          | 24 ++++++++++++++++++++++++
 2 files changed, 34 insertions(+), 3 deletions(-)

diff --git a/src/commands/filter.rs b/src/commands/filter.rs
index 2e85c53..c3a4839 100644
--- a/src/commands/filter.rs
+++ b/src/commands/filter.rs
@@ -4,7 +4,7 @@ use std::{
     io::prelude::*
 };
 
-use clap::{App, ArgMatches, SubCommand};
+use clap::{App, Arg, ArgMatches, SubCommand};
 use futures::Stream;
 
 use super::StdinStream;
@@ -20,12 +20,15 @@ use webmetro::{
 pub fn options() -> App<'static, 'static> {
     SubCommand::with_name("filter")
         .about("Copies WebM from stdin to stdout, applying the same cleanup & stripping the relay server does.")
+        .arg(Arg::with_name("throttle")
+            .long("throttle")
+            .help("Slow down output to \"realtime\" speed as determined by the timestamps (useful for streaming)"))
 }
 
-pub fn run(_args: &ArgMatches) -> Result<(), Box<Error>> {
+pub fn run(args: &ArgMatches) -> Result<(), Box<Error>> {
 
     let stdin = io::stdin();
-    let chunk_stream: Box<Stream<Item = Chunk, Error = Box<Error>>> = Box::new(
+    let mut chunk_stream: Box<Stream<Item = Chunk, Error = Box<Error>>> = Box::new(
         StdinStream::new(stdin.lock())
         .parse_ebml()
         .chunk_webm()
@@ -33,6 +36,10 @@ pub fn run(_args: &ArgMatches) -> Result<(), Box<Error>> {
         .fix_timecodes()
     );
 
+    if args.is_present("throttle") {
+        chunk_stream = Box::new(chunk_stream.throttle());
+    }
+
     let stdout = io::stdout();
     let mut stdout_writer = stdout.lock();
     for chunk in chunk_stream.wait() {
diff --git a/src/fixers.rs b/src/fixers.rs
index 4957781..5925c26 100644
--- a/src/fixers.rs
+++ b/src/fixers.rs
@@ -1,3 +1,5 @@
+use std::time::Instant;
+
 use futures::Async;
 use futures::stream::Stream;
 
@@ -82,6 +84,21 @@ impl<S: Stream<Item = Chunk>> Stream for StartingPointFinder<S>
     }
 }
 
+pub struct Throttle<S> {
+    stream: S,
+    start_time: Instant
+}
+
+impl<S: Stream<Item = Chunk>> Stream for Throttle<S>
+{
+    type Item = S::Item;
+    type Error = S::Error;
+
+    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
+        self.stream.poll()
+    }
+}
+
 pub trait ChunkStream where Self : Sized + Stream<Item = Chunk> {
     fn fix_timecodes(self) -> ChunkTimecodeFixer<Self> {
         ChunkTimecodeFixer {
@@ -99,6 +116,13 @@ pub trait ChunkStream where Self : Sized + Stream<Item = Chunk> {
             seen_keyframe: false
         }
     }
+
+    fn throttle(self) -> Throttle<Self> {
+        Throttle {
+            stream: self,
+            start_time: Instant::now()
+        }
+    }
 }
 
 impl<T: Stream<Item = Chunk>> ChunkStream for T {}