From a847d62b34d4eb714e873dcf17992a48293135b0 Mon Sep 17 00:00:00 2001
From: Tangent 128 <Tangent128@gmail.com>
Date: Sat, 14 Apr 2018 18:18:50 -0400
Subject: [PATCH] Have utilities read stdin via tokio_io wrapper

---
 Cargo.lock             |  1 +
 Cargo.toml             |  1 +
 src/commands/dump.rs   | 10 +++-----
 src/commands/filter.rs |  5 ++--
 src/commands/mod.rs    | 55 ++++++++++++++----------------------------
 src/main.rs            |  1 +
 6 files changed, 26 insertions(+), 47 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index e679726..559258a 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -598,6 +598,7 @@ dependencies = [
  "hyper 0.11.25 (registry+https://github.com/rust-lang/crates.io-index)",
  "odds 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
  "tokio 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio-io 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)",
 ]
 
 [[package]]
diff --git a/Cargo.toml b/Cargo.toml
index f2d5d16..99d0239 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -10,3 +10,4 @@ futures = "0.1.20"
 hyper = "0.11.25"
 odds = { version = "0.3.1", features = ["std-vec"] }
 tokio = "0.1.5"
+tokio-io = "0.1.6"
diff --git a/src/commands/dump.rs b/src/commands/dump.rs
index fa2fb40..7a83a69 100644
--- a/src/commands/dump.rs
+++ b/src/commands/dump.rs
@@ -1,12 +1,9 @@
-use std::{
-    error::Error,
-    io
-};
+use std::error::Error;
 
 use clap::{App, AppSettings, ArgMatches, SubCommand};
 use futures::Async;
 
-use super::StdinStream;
+use super::stdin_stream;
 use webmetro::{
     stream_parser::StreamEbml,
     webm::{
@@ -23,8 +20,7 @@ pub fn options() -> App<'static, 'static> {
 
 pub fn run(_args: &ArgMatches) -> Result<(), Box<Error>> {
 
-    let stdin = io::stdin();
-    let mut events = StdinStream::new(stdin.lock()).parse_ebml();
+    let mut events = stdin_stream().parse_ebml();
 
     // stdin is sync so Async::NotReady will never happen
     while let Ok(Async::Ready(Some(element))) = events.poll_event() {
diff --git a/src/commands/filter.rs b/src/commands/filter.rs
index 4e6d7bd..5025afc 100644
--- a/src/commands/filter.rs
+++ b/src/commands/filter.rs
@@ -7,7 +7,7 @@ use std::{
 use clap::{App, Arg, ArgMatches, SubCommand};
 use futures::prelude::*;
 
-use super::StdinStream;
+use super::stdin_stream;
 use webmetro::{
     chunk::{
         Chunk,
@@ -28,9 +28,8 @@ pub fn options() -> App<'static, 'static> {
 
 pub fn run(args: &ArgMatches) -> Result<(), Box<Error>> {
 
-    let stdin = io::stdin();
     let mut chunk_stream: Box<Stream<Item = Chunk, Error = WebmetroError>> = Box::new(
-        StdinStream::new(stdin.lock())
+        stdin_stream()
         .parse_ebml()
         .chunk_webm()
         .fix_timecodes()
diff --git a/src/commands/mod.rs b/src/commands/mod.rs
index 46f739b..a7654ce 100644
--- a/src/commands/mod.rs
+++ b/src/commands/mod.rs
@@ -1,11 +1,19 @@
 use std::io::{
-    StdinLock,
-    prelude::*
+    Error as IoError,
+    stdin,
+    Stdin
 };
 
 use futures::{
-    Async,
-    stream::Stream
+    prelude::*,
+    stream::MapErr
+};
+use tokio_io::{
+    io::AllowStdIo,
+    codec::{
+        BytesCodec,
+        FramedRead
+    }
 };
 use webmetro::error::WebmetroError;
 
@@ -13,37 +21,10 @@ pub mod dump;
 pub mod filter;
 pub mod relay;
 
-/// A hackish adapter that makes chunks of bytes from stdin available as a Stream;
-/// is NOT actually async, and just uses blocking read. Buffers aren't optimized either
-/// and copy more than necessary.
-pub struct StdinStream<'a> {
-    buf_reader: StdinLock<'a>,
-    read_bytes: usize
-}
-
-impl<'a> StdinStream<'a> {
-    pub fn new(lock: StdinLock<'a>) -> Self {
-        StdinStream {
-            buf_reader: lock,
-            read_bytes: 0
-        }
-    }
-}
-
-impl<'a> Stream for StdinStream<'a> {
-    type Item = Vec<u8>;
-    type Error = WebmetroError;
-
-    fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
-        self.buf_reader.consume(self.read_bytes);
-        let read_bytes = &mut self.read_bytes;
-        self.buf_reader.fill_buf().map(|slice| {
-            *read_bytes = slice.len();
-            if *read_bytes > 0 {
-                Async::Ready(Some(Into::<Vec<u8>>::into(slice)))
-            } else {
-                Async::Ready(None)
-            }
-        }).map_err(WebmetroError::IoError)
-    }
+/// An adapter that makes chunks of bytes from stdin available as a Stream;
+/// is NOT actually async, and just uses blocking read. Don't use more than
+/// one at once, who knows who gets which bytes.
+pub fn stdin_stream() -> MapErr<FramedRead<AllowStdIo<Stdin>, BytesCodec>, fn(IoError) -> WebmetroError> {
+    FramedRead::new(AllowStdIo::new(stdin()), BytesCodec::new())
+    .map_err(WebmetroError::IoError)
 }
diff --git a/src/main.rs b/src/main.rs
index eb6d5ab..3e76ad8 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -2,6 +2,7 @@
 extern crate futures;
 extern crate hyper;
 extern crate tokio;
+extern crate tokio_io;
 extern crate webmetro;
 
 mod commands;