From bc8e45936bd1976e35fec75e055c19fdb04b67c9 Mon Sep 17 00:00:00 2001
From: Tangent 128 <Tangent128@gmail.com>
Date: Mon, 21 Oct 2019 03:18:51 -0400
Subject: [PATCH] Convert parser & chunker APIs to work with Futures 0.3-style
 streams

---
 src/chunk.rs           |  50 ++++++++++--------
 src/commands/dump.rs   |  24 +++++----
 src/commands/filter.rs |  21 ++++----
 src/commands/mod.rs    |  27 +++++-----
 src/commands/relay.rs  |   7 +--
 src/commands/send.rs   |  14 ++---
 src/stream_parser.rs   | 114 +++++++++++++++++++++++------------------
 7 files changed, 139 insertions(+), 118 deletions(-)

diff --git a/src/chunk.rs b/src/chunk.rs
index 2813ce3..bbd5df4 100644
--- a/src/chunk.rs
+++ b/src/chunk.rs
@@ -1,8 +1,11 @@
 use bytes::{Buf, Bytes};
-use futures::{Async, Stream};
+use futures::{Async};
+use futures3::prelude::*;
 use std::{
     io::Cursor,
-    mem
+    mem,
+    pin::Pin,
+    task::{Context, Poll, Poll::*},
 };
 use crate::stream_parser::EbmlStreamingParser;
 use crate::error::WebmetroError;
@@ -128,22 +131,22 @@ fn encode(element: WebmElement, buffer: &mut Cursor<Vec<u8>>, limit: Option<usiz
     encode_webm_element(element, buffer).map_err(|err| err.into())
 }
 
-impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> Stream for WebmChunker<S>
+impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> Stream for WebmChunker<S>
 {
-    type Item = Chunk;
-    type Error = WebmetroError;
+    type Item = Result<Chunk, WebmetroError>;
 
-    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, WebmetroError> {
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, WebmetroError>>> {
+        let mut chunker = self.get_mut();
         loop {
             let mut return_value = None;
             let mut new_state = None;
 
-            match self.state {
+            match chunker.state {
                 ChunkerState::BuildingHeader(ref mut buffer) => {
-                    match self.source.poll_event() {
-                        Err(passthru) => return Err(passthru.into()),
-                        Ok(Async::NotReady) => return Ok(Async::NotReady),
-                        Ok(Async::Ready(None)) => return Ok(Async::Ready(None)),
+                    match chunker.source.poll_event(cx) {
+                        Err(passthru) => return Ready(Some(Err(passthru))),
+                        Ok(Async::NotReady) => return Pending,
+                        Ok(Async::Ready(None)) => return Ready(None),
                         Ok(Async::Ready(Some(WebmElement::Cluster))) => {
                             let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
                             let header_chunk = Chunk::Headers {bytes: Bytes::from(liberated_buffer.into_inner())};
@@ -158,7 +161,7 @@ impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> Stream for WebmChunker<
                         Ok(Async::Ready(Some(WebmElement::Void))) => {},
                         Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {},
                         Ok(Async::Ready(Some(element))) => {
-                            encode(element, buffer, self.buffer_size_limit).unwrap_or_else(|err| {
+                            encode(element, buffer, chunker.buffer_size_limit).unwrap_or_else(|err| {
                                 return_value = Some(Err(err));
                                 new_state = Some(ChunkerState::End);
                             });
@@ -166,16 +169,16 @@ impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> Stream for WebmChunker<
                     }
                 },
                 ChunkerState::BuildingCluster(ref mut cluster_head, ref mut buffer) => {
-                    match self.source.poll_event() {
-                        Err(passthru) => return Err(passthru.into()),
-                        Ok(Async::NotReady) => return Ok(Async::NotReady),
+                    match chunker.source.poll_event(cx) {
+                        Err(passthru) => return Ready(Some(Err(passthru))),
+                        Ok(Async::NotReady) => return Pending,
                         Ok(Async::Ready(Some(element @ WebmElement::EbmlHead)))
                         | Ok(Async::Ready(Some(element @ WebmElement::Segment))) => {
                             let liberated_cluster_head = mem::replace(cluster_head, ClusterHead::new(0));
                             let liberated_buffer = mem::replace(buffer, Cursor::new(Vec::new()));
 
                             let mut new_header_cursor = Cursor::new(Vec::new());
-                            match encode(element, &mut new_header_cursor, self.buffer_size_limit) {
+                            match encode(element, &mut new_header_cursor, chunker.buffer_size_limit) {
                                 Ok(_) => {
                                     return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterHead(liberated_cluster_head)))));
                                     new_state = Some(ChunkerState::EmittingClusterBodyBeforeNewHeader{
@@ -205,7 +208,7 @@ impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> Stream for WebmChunker<
                                 cluster_head.keyframe = true;
                             }
                             cluster_head.observe_simpleblock_timecode(block.timecode);
-                            encode(WebmElement::SimpleBlock(*block), buffer, self.buffer_size_limit).unwrap_or_else(|err| {
+                            encode(WebmElement::SimpleBlock(*block), buffer, chunker.buffer_size_limit).unwrap_or_else(|err| {
                                 return_value = Some(Err(err));
                                 new_state = Some(ChunkerState::End);
                             });
@@ -214,7 +217,7 @@ impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> Stream for WebmChunker<
                         Ok(Async::Ready(Some(WebmElement::Void))) => {},
                         Ok(Async::Ready(Some(WebmElement::Unknown(_)))) => {},
                         Ok(Async::Ready(Some(element))) => {
-                            encode(element, buffer, self.buffer_size_limit).unwrap_or_else(|err| {
+                            encode(element, buffer, chunker.buffer_size_limit).unwrap_or_else(|err| {
                                 return_value = Some(Err(err));
                                 new_state = Some(ChunkerState::End);
                             });
@@ -252,14 +255,19 @@ impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> Stream for WebmChunker<
                     return_value = Some(Ok(Async::Ready(Some(Chunk::ClusterBody {bytes: Bytes::from(liberated_buffer)}))));
                     new_state = Some(ChunkerState::End);
                 },
-                ChunkerState::End => return Ok(Async::Ready(None))
+                ChunkerState::End => return Ready(None)
             };
 
             if let Some(new_state) = new_state {
-                self.state = new_state;
+                chunker.state = new_state;
             }
             if let Some(return_value) = return_value {
-                return return_value;
+                return match return_value {
+                    Ok(Async::Ready(Some(chunk))) => Ready(Some(Ok(chunk))),
+                    Ok(Async::Ready(None)) => Ready(None),
+                    Ok(Async::NotReady) => Pending,
+                    Err(err) => Ready(Some(Err(err))),
+                };
             }
         }
     }
diff --git a/src/commands/dump.rs b/src/commands/dump.rs
index 8b691e6..55b8d69 100644
--- a/src/commands/dump.rs
+++ b/src/commands/dump.rs
@@ -1,5 +1,7 @@
 use clap::{App, AppSettings, ArgMatches, SubCommand};
-use futures::prelude::*;
+use futures::Async;
+use futures3::future::{FutureExt, poll_fn};
+use std::task::Poll;
 
 use super::stdin_stream;
 use webmetro::{
@@ -21,15 +23,17 @@ pub fn run(_args: &ArgMatches) -> Result<(), WebmetroError> {
 
     let mut events = stdin_stream().parse_ebml();
 
-    // stdin is sync so Async::NotReady will never happen
-    while let Ok(Async::Ready(Some(element))) = events.poll_event() {
-        match element {
-            // suppress printing byte arrays
-            Tracks(slice) => println!("Tracks[{}]", slice.len()),
-            SimpleBlock(SimpleBlock {timecode, ..}) => println!("SimpleBlock@{}", timecode),
-            other => println!("{:?}", other)
+    Ok(poll_fn(|cx| {
+        // stdin is sync so Async::NotReady will never happen on this tokio version
+        while let Ok(Async::Ready(Some(element))) = events.poll_event(cx) {
+            match element {
+                // suppress printing byte arrays
+                Tracks(slice) => println!("Tracks[{}]", slice.len()),
+                SimpleBlock(SimpleBlock {timecode, ..}) => println!("SimpleBlock@{}", timecode),
+                other => println!("{:?}", other)
+            }
         }
-    }
 
-    Ok(())
+        Poll::Ready(())
+    }).now_or_never().expect("Stdin should never go async"))
 }
diff --git a/src/commands/filter.rs b/src/commands/filter.rs
index c5d907d..40c271f 100644
--- a/src/commands/filter.rs
+++ b/src/commands/filter.rs
@@ -4,12 +4,9 @@ use std::{
 };
 
 use clap::{App, Arg, ArgMatches, SubCommand};
-use futures::prelude::*;
-use futures3::compat::{
-    Compat,
-    Compat01As03
-};
-use tokio::runtime::Runtime;
+use futures3::prelude::*;
+use futures3::future::ready;
+use tokio2::runtime::Runtime;
 
 use super::stdin_stream;
 use webmetro::{
@@ -19,8 +16,8 @@ use webmetro::{
     },
     error::WebmetroError,
     fixers::{
-        ChunkStream,
         ChunkTimecodeFixer,
+        Throttle,
     },
     stream_parser::StreamEbml
 };
@@ -35,18 +32,18 @@ pub fn options() -> App<'static, 'static> {
 
 pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
     let mut timecode_fixer = ChunkTimecodeFixer::new();
-    let mut chunk_stream: Box<dyn Stream<Item = Chunk, Error = WebmetroError> + Send> = Box::new(
+    let mut chunk_stream: Box<dyn TryStream<Item = Result<Chunk, WebmetroError>, Ok = Chunk, Error = WebmetroError> + Send + Unpin> = Box::new(
         stdin_stream()
         .parse_ebml()
         .chunk_webm()
-        .map(move |chunk| timecode_fixer.process(chunk))
+        .map_ok(move |chunk| timecode_fixer.process(chunk))
     );
 
     if args.is_present("throttle") {
-        chunk_stream = Box::new(Compat::new(Compat01As03::new(chunk_stream).throttle()));
+        chunk_stream = Box::new(Throttle::new(chunk_stream));
     }
 
-    Runtime::new().unwrap().block_on(chunk_stream.for_each(|chunk| {
-        io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::from)
+    Runtime::new().unwrap().block_on(chunk_stream.try_for_each(|chunk| {
+        ready(io::stdout().write_all(chunk.as_ref()).map_err(WebmetroError::from))
     }))
 }
diff --git a/src/commands/mod.rs b/src/commands/mod.rs
index 3acf871..f59349e 100644
--- a/src/commands/mod.rs
+++ b/src/commands/mod.rs
@@ -1,15 +1,7 @@
-use std::io::stdin;
+use std::io::Cursor;
 
-use bytes::{
-    Buf,
-    IntoBuf
-};
-use futures::prelude::*;
-use tokio_io::io::AllowStdIo;
-use tokio_codec::{
-    BytesCodec,
-    FramedRead
-};
+use bytes::Bytes;
+use futures3::TryStreamExt;
 use webmetro::error::WebmetroError;
 
 pub mod dump;
@@ -20,8 +12,13 @@ pub mod send;
 /// An adapter that makes chunks of bytes from stdin available as a Stream;
 /// is NOT actually async, and just uses blocking read. Don't use more than
 /// one at once, who knows who gets which bytes.
-pub fn stdin_stream() -> impl Stream<Item = impl Buf, Error = WebmetroError> {
-    FramedRead::new(AllowStdIo::new(stdin()), BytesCodec::new())
-    .map(|bytes| bytes.into_buf())
-    .map_err(WebmetroError::from)
+pub fn stdin_stream() -> impl futures3::TryStream<
+    Item = Result<Cursor<Bytes>, WebmetroError>,
+    Ok = Cursor<Bytes>,
+    Error = WebmetroError,
+> + Sized
+       + Unpin {
+    tokio2::codec::FramedRead::new(tokio2::io::stdin(), tokio2::codec::BytesCodec::new())
+        .map_ok(|bytes| Cursor::new(bytes.freeze()))
+        .map_err(WebmetroError::from)
 }
diff --git a/src/commands/relay.rs b/src/commands/relay.rs
index ae4820c..fd1c4bc 100644
--- a/src/commands/relay.rs
+++ b/src/commands/relay.rs
@@ -17,6 +17,7 @@ use futures3::{
     compat::{
         Compat,
         CompatSink,
+        Compat01As03,
     },
     Never,
     prelude::*,
@@ -66,13 +67,13 @@ fn get_stream(channel: Handle) -> impl Stream<Item = Bytes, Error = WebmetroErro
 }
 
 fn post_stream(channel: Handle, stream: impl Stream<Item = impl Buf, Error = warp::Error>) -> impl Stream<Item = Bytes, Error = WebmetroError> {
-    let source = stream
-        .map_err(WebmetroError::from)
+    let source = Compat01As03::new(stream
+        .map_err(WebmetroError::from))
         .parse_ebml().with_soft_limit(BUFFER_LIMIT)
         .chunk_webm().with_soft_limit(BUFFER_LIMIT);
     let sink = CompatSink::new(Transmitter::new(channel));
 
-    source.forward(sink.sink_map_err(|err| -> WebmetroError {match err {}}))
+    Compat::new(source).forward(sink.sink_map_err(|err| -> WebmetroError {match err {}}))
     .into_stream()
     .map(|_| empty())
     .map_err(|err| {
diff --git a/src/commands/send.rs b/src/commands/send.rs
index 4bf9fa3..eb02d7a 100644
--- a/src/commands/send.rs
+++ b/src/commands/send.rs
@@ -2,9 +2,9 @@ use clap::{App, Arg, ArgMatches, SubCommand};
 use futures::{
     prelude::*
 };
+use futures3::prelude::*;
 use futures3::compat::{
     Compat,
-    Compat01As03
 };
 use hyper::{
     Body,
@@ -24,8 +24,8 @@ use webmetro::{
     },
     error::WebmetroError,
     fixers::{
-        ChunkStream,
         ChunkTimecodeFixer,
+        Throttle,
     },
     stream_parser::StreamEbml
 };
@@ -41,7 +41,7 @@ pub fn options() -> App<'static, 'static> {
             .help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)"))
 }
 
-type BoxedChunkStream = Box<dyn Stream<Item = Chunk, Error = WebmetroError> + Send>;
+type BoxedChunkStream = Box<dyn TryStream<Item = Result<Chunk, WebmetroError>, Ok = Chunk, Error = WebmetroError> + Send + Unpin>;
 
 pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
     let mut timecode_fixer = ChunkTimecodeFixer::new();
@@ -49,7 +49,7 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
         stdin_stream()
         .parse_ebml()
         .chunk_webm()
-        .map(move |chunk| timecode_fixer.process(chunk))
+        .map_ok(move |chunk| timecode_fixer.process(chunk))
     );
 
     let url_str = match args.value_of("url") {
@@ -58,15 +58,15 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
     };
 
     if args.is_present("throttle") {
-        chunk_stream = Box::new(Compat::new(Compat01As03::new(chunk_stream).throttle()));
+        chunk_stream = Box::new(Throttle::new(chunk_stream));
     }
 
-    let request_payload = Body::wrap_stream(chunk_stream.map(
+    let request_payload = Body::wrap_stream(Compat::new(chunk_stream.map_ok(
         |webm_chunk| webm_chunk.into_bytes()
     ).map_err(|err| {
         eprintln!("{}", &err);
         err
-    }));
+    })));
 
     
     let request = Request::put(url_str)
diff --git a/src/stream_parser.rs b/src/stream_parser.rs
index b0bfe20..ed4a0a4 100644
--- a/src/stream_parser.rs
+++ b/src/stream_parser.rs
@@ -1,5 +1,7 @@
 use bytes::{Buf, BufMut, Bytes, BytesMut};
-use futures::{stream::Stream, Async};
+use futures::Async;
+use futures3::stream::{Stream, StreamExt, TryStream};
+use std::task::{Context, Poll};
 
 use crate::ebml::FromEbml;
 use crate::error::WebmetroError;
@@ -21,10 +23,10 @@ impl<S> EbmlStreamingParser<S> {
     }
 }
 
-pub trait StreamEbml
+pub trait StreamEbml: Sized + TryStream + Unpin
 where
-    Self: Sized + Stream,
-    Self::Item: Buf,
+    Self: Sized + TryStream + Unpin,
+    Self::Ok: Buf,
 {
     fn parse_ebml(self) -> EbmlStreamingParser<Self> {
         EbmlStreamingParser {
@@ -36,11 +38,12 @@ where
     }
 }
 
-impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> StreamEbml for S {}
+impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> StreamEbml for S {}
 
-impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> EbmlStreamingParser<S> {
+impl<I: Buf, S: Stream<Item = Result<I, WebmetroError>> + Unpin> EbmlStreamingParser<S> {
     pub fn poll_event<'a, T: FromEbml<'a>>(
         &'a mut self,
+        cx: &mut Context,
     ) -> Result<Async<Option<T>>, WebmetroError> {
         loop {
             match T::check_space(&self.buffer)? {
@@ -64,13 +67,14 @@ impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> EbmlStreamingParser<S>
                 }
             }
 
-            match self.stream.poll()? {
-                Async::Ready(Some(buf)) => {
+            match self.stream.poll_next_unpin(cx)? {
+                Poll::Ready(Some(buf)) => {
                     self.buffer.reserve(buf.remaining());
                     self.buffer.put(buf);
                     // ok can retry decoding now
                 }
-                other => return Ok(other.map(|_| None)),
+                Poll::Ready(None) => return Ok(Async::Ready(None)),
+                Poll::Pending => return Ok(Async::NotReady),
             }
         }
     }
@@ -79,8 +83,12 @@ impl<I: Buf, S: Stream<Item = I, Error = WebmetroError>> EbmlStreamingParser<S>
 #[cfg(test)]
 mod tests {
     use bytes::IntoBuf;
-    use futures::prelude::*;
     use futures::Async::*;
+    use futures3::{
+        future::poll_fn,
+        stream::StreamExt,
+        FutureExt,
+    };
     use matches::assert_matches;
 
     use crate::stream_parser::*;
@@ -89,47 +97,53 @@ mod tests {
 
     #[test]
     fn stream_webm_test() {
-        let pieces = vec![
-            &ENCODE_WEBM_TEST_FILE[0..20],
-            &ENCODE_WEBM_TEST_FILE[20..40],
-            &ENCODE_WEBM_TEST_FILE[40..],
-        ];
+        poll_fn(|cx| {
+            let pieces = vec![
+                &ENCODE_WEBM_TEST_FILE[0..20],
+                &ENCODE_WEBM_TEST_FILE[20..40],
+                &ENCODE_WEBM_TEST_FILE[40..],
+            ];
 
-        let mut stream_parser = futures::stream::iter_ok(pieces.iter())
-            .map(|bytes| bytes.into_buf())
-            .parse_ebml();
+            let mut stream_parser = futures3::stream::iter(pieces.iter())
+                .map(|bytes| Ok(bytes.into_buf()))
+                .parse_ebml();
 
-        assert_matches!(
-            stream_parser.poll_event(),
-            Ok(Ready(Some(WebmElement::EbmlHead)))
-        );
-        assert_matches!(
-            stream_parser.poll_event(),
-            Ok(Ready(Some(WebmElement::Segment)))
-        );
-        assert_matches!(
-            stream_parser.poll_event(),
-            Ok(Ready(Some(WebmElement::Tracks(_))))
-        );
-        assert_matches!(
-            stream_parser.poll_event(),
-            Ok(Ready(Some(WebmElement::Cluster)))
-        );
-        assert_matches!(
-            stream_parser.poll_event(),
-            Ok(Ready(Some(WebmElement::Timecode(0))))
-        );
-        assert_matches!(
-            stream_parser.poll_event(),
-            Ok(Ready(Some(WebmElement::SimpleBlock(_))))
-        );
-        assert_matches!(
-            stream_parser.poll_event(),
-            Ok(Ready(Some(WebmElement::Cluster)))
-        );
-        assert_matches!(
-            stream_parser.poll_event(),
-            Ok(Ready(Some(WebmElement::Timecode(1000))))
-        );
+            assert_matches!(
+                stream_parser.poll_event(cx),
+                Ok(Ready(Some(WebmElement::EbmlHead)))
+            );
+            assert_matches!(
+                stream_parser.poll_event(cx),
+                Ok(Ready(Some(WebmElement::Segment)))
+            );
+            assert_matches!(
+                stream_parser.poll_event(cx),
+                Ok(Ready(Some(WebmElement::Tracks(_))))
+            );
+            assert_matches!(
+                stream_parser.poll_event(cx),
+                Ok(Ready(Some(WebmElement::Cluster)))
+            );
+            assert_matches!(
+                stream_parser.poll_event(cx),
+                Ok(Ready(Some(WebmElement::Timecode(0))))
+            );
+            assert_matches!(
+                stream_parser.poll_event(cx),
+                Ok(Ready(Some(WebmElement::SimpleBlock(_))))
+            );
+            assert_matches!(
+                stream_parser.poll_event(cx),
+                Ok(Ready(Some(WebmElement::Cluster)))
+            );
+            assert_matches!(
+                stream_parser.poll_event(cx),
+                Ok(Ready(Some(WebmElement::Timecode(1000))))
+            );
+
+            std::task::Poll::Ready(())
+        })
+        .now_or_never()
+        .expect("Test succeeded without blocking");
     }
 }