From 32c72e1ee848acec0f3666c99857abae372bc0a2 Mon Sep 17 00:00:00 2001
From: Tangent 128 <Tangent128@gmail.com>
Date: Wed, 16 Oct 2019 00:15:44 -0400
Subject: [PATCH] Port send subcommand to core futures & alpha hyper (fixes
 --throttle)

---
 Cargo.lock           | 89 ++++++++++++++++++++++++++++++++++++++++++++
 Cargo.toml           |  1 +
 src/commands/send.rs | 83 +++++++++++++++++------------------------
 src/error.rs         |  1 +
 4 files changed, 125 insertions(+), 49 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 20eb6ee..8479993 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -337,6 +337,26 @@ dependencies = [
  "tokio-io 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)",
 ]
 
+[[package]]
+name = "h2"
+version = "0.2.0-alpha.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
+ "fnv 1.0.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-core-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-sink-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-util-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)",
+ "http 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
+ "indexmap 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
+ "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
+ "slab 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "string 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio-codec 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio-io 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio-sync 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
 [[package]]
 name = "headers"
 version = "0.2.3"
@@ -382,6 +402,15 @@ dependencies = [
  "tokio-buf 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
 ]
 
+[[package]]
+name = "http-body"
+version = "0.2.0-alpha.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
+ "http 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
 [[package]]
 name = "httparse"
 version = "1.3.4"
@@ -416,6 +445,36 @@ dependencies = [
  "want 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
 ]
 
+[[package]]
+name = "hyper"
+version = "0.13.0-alpha.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-channel-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-core-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)",
+ "futures-util-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)",
+ "h2 0.2.0-alpha.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "http 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
+ "http-body 0.2.0-alpha.3 (registry+https://github.com/rust-lang/crates.io-index)",
+ "httparse 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)",
+ "iovec 0.1.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "itoa 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
+ "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
+ "net2 0.2.33 (registry+https://github.com/rust-lang/crates.io-index)",
+ "pin-project 0.4.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "time 0.1.42 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio-executor 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio-io 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio-net 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio-sync 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tokio-timer 0.3.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tower-make 0.3.0-alpha.2a (registry+https://github.com/rust-lang/crates.io-index)",
+ "tower-service 0.3.0-alpha.2 (registry+https://github.com/rust-lang/crates.io-index)",
+ "want 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
 [[package]]
 name = "idna"
 version = "0.2.0"
@@ -1436,6 +1495,20 @@ dependencies = [
  "tokio-reactor 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)",
 ]
 
+[[package]]
+name = "tower-make"
+version = "0.3.0-alpha.2a"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "tokio-io 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
+ "tower-service 0.3.0-alpha.2 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
+[[package]]
+name = "tower-service"
+version = "0.3.0-alpha.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+
 [[package]]
 name = "tracing"
 version = "0.1.9"
@@ -1589,6 +1662,15 @@ dependencies = [
  "try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
 ]
 
+[[package]]
+name = "want"
+version = "0.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+dependencies = [
+ "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
+ "try-lock 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)",
+]
+
 [[package]]
 name = "warp"
 version = "0.1.20"
@@ -1635,6 +1717,7 @@ dependencies = [
  "futures-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)",
  "http 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
  "hyper 0.12.35 (registry+https://github.com/rust-lang/crates.io-index)",
+ "hyper 0.13.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)",
  "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
  "odds 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
  "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1726,12 +1809,15 @@ dependencies = [
 "checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec"
 "checksum getrandom 0.1.12 (registry+https://github.com/rust-lang/crates.io-index)" = "473a1265acc8ff1e808cd0a1af8cee3c2ee5200916058a2ca113c29f2d903571"
 "checksum h2 0.1.26 (registry+https://github.com/rust-lang/crates.io-index)" = "a5b34c246847f938a410a03c5458c7fee2274436675e76d8b903c08efc29c462"
+"checksum h2 0.2.0-alpha.3 (registry+https://github.com/rust-lang/crates.io-index)" = "0f107db1419ef8271686187b1a5d47c6431af4a7f4d98b495e7b7fc249bb0a78"
 "checksum headers 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "882ca7d8722f33ce2c2db44f95425d6267ed59ca96ce02acbe58320054ceb642"
 "checksum headers-core 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "967131279aaa9f7c20c7205b45a391638a83ab118e6509b2d0ccbe08de044237"
 "checksum http 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)" = "372bcb56f939e449117fb0869c2e8fd8753a8223d92a172c6e808cf123a5b6e4"
 "checksum http-body 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "6741c859c1b2463a423a1dbce98d418e6c3c3fc720fb0d45528657320920292d"
+"checksum http-body 0.2.0-alpha.3 (registry+https://github.com/rust-lang/crates.io-index)" = "1f3aef6f3de2bd8585f5b366f3f550b5774500b4764d00cf00f903c95749eec3"
 "checksum httparse 1.3.4 (registry+https://github.com/rust-lang/crates.io-index)" = "cd179ae861f0c2e53da70d892f5f3029f9594be0c41dc5269cd371691b1dc2f9"
 "checksum hyper 0.12.35 (registry+https://github.com/rust-lang/crates.io-index)" = "9dbe6ed1438e1f8ad955a4701e9a944938e9519f6888d12d8558b645e247d5f6"
+"checksum hyper 0.13.0-alpha.4 (registry+https://github.com/rust-lang/crates.io-index)" = "2d05aa523087ac0b9d8b93dd80d5d482a697308ed3b0dca7b0667511a7fa7cdc"
 "checksum idna 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "02e2673c30ee86b5b96a9cb52ad15718aa1f966f5ab9ad54a8b95d5ca33120a9"
 "checksum indexmap 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a61202fbe46c4a951e9404a720a0180bcf3212c750d735cb5c4ba4dc551299f3"
 "checksum input_buffer 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "8e1b822cc844905551931d6f81608ed5f50a79c1078a4e2b4d42dbc7c1eedfbf"
@@ -1842,6 +1928,8 @@ dependencies = [
 "checksum tokio-timer 0.3.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)" = "b97c1587fe71018eb245a4a9daa13a5a3b681bbc1f7fdadfe24720e141472c13"
 "checksum tokio-udp 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "f02298505547f73e60f568359ef0d016d5acd6e830ab9bc7c4a5b3403440121b"
 "checksum tokio-uds 0.2.5 (registry+https://github.com/rust-lang/crates.io-index)" = "037ffc3ba0e12a0ab4aca92e5234e0dedeb48fddf6ccd260f1f150a36a9f2445"
+"checksum tower-make 0.3.0-alpha.2a (registry+https://github.com/rust-lang/crates.io-index)" = "316d47dd40cde4ac5d88110eaf9a10a4e2a68612d9c056cd2aa24e37dcb484cd"
+"checksum tower-service 0.3.0-alpha.2 (registry+https://github.com/rust-lang/crates.io-index)" = "63ff37396cd966ce43bea418bfa339f802857495f797dafa00bea5b7221ebdfa"
 "checksum tracing 0.1.9 (registry+https://github.com/rust-lang/crates.io-index)" = "c21ff9457accc293386c20e8f754d0b059e67e325edf2284f04230d125d7e5ff"
 "checksum tracing-attributes 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)" = "3ff978fd9c9afe2cc9c671e247713421c6406b3422305cbdce5de695d3ab4c3c"
 "checksum tracing-core 0.1.6 (registry+https://github.com/rust-lang/crates.io-index)" = "528c8ebaaa16cdac34795180b046c031775b0d56402704d98c096788f33d646a"
@@ -1862,6 +1950,7 @@ dependencies = [
 "checksum vec_map 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)" = "05c78687fb1a80548ae3250346c3db86a80a7cdd77bda190189f2d0a0987c81a"
 "checksum version_check 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)" = "914b1a6776c4c929a602fafd8bc742e06365d4bcbe48c30f9cca5824f70dc9dd"
 "checksum want 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b6395efa4784b027708f7451087e647ec73cc74f5d9bc2e418404248d679a230"
+"checksum want 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
 "checksum warp 0.1.20 (registry+https://github.com/rust-lang/crates.io-index)" = "3921463c44f680d24f1273ea55efd985f31206a22a02dee207a2ec72684285ca"
 "checksum wasi 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b89c3ce4ce14bdc6fb6beaf9ec7928ca331de5df7e5ea278375642a2f478570d"
 "checksum weak-table 0.2.3 (registry+https://github.com/rust-lang/crates.io-index)" = "6a5862bb244c852a56c6f3c39668ff181271bda44513ef30d2073a3eedd9898d"
diff --git a/Cargo.toml b/Cargo.toml
index 28195e5..8132c9e 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -12,6 +12,7 @@ futures = "0.1.29"
 futures3 = { package = "futures-preview", version="0.3.0-alpha", features = ["compat"] }
 http = "0.1.18"
 hyper = "0.12.35"
+hyper13 = { package = "hyper", version="0.13.0-alpha.4", features = ["unstable-stream"] }
 matches = "0.1.8"
 odds = { version = "0.3.1", features = ["std-vec"] }
 tokio = "0.1.22"
diff --git a/src/commands/send.rs b/src/commands/send.rs
index eb02d7a..e06b03c 100644
--- a/src/commands/send.rs
+++ b/src/commands/send.rs
@@ -1,33 +1,15 @@
 use clap::{App, Arg, ArgMatches, SubCommand};
-use futures::{
-    prelude::*
-};
 use futures3::prelude::*;
-use futures3::compat::{
-    Compat,
-};
-use hyper::{
-    Body,
-    Client,
-    client::HttpConnector,
-    Request
-};
-use tokio::runtime::Runtime;
+use hyper13::{client::HttpConnector, Body, Client, Request};
+use std::io::{stdout, Write};
+use tokio2::runtime::Runtime;
 
-use super::{
-    stdin_stream
-};
+use super::stdin_stream;
 use webmetro::{
-    chunk::{
-        Chunk,
-        WebmStream
-    },
+    chunk::{Chunk, WebmStream},
     error::WebmetroError,
-    fixers::{
-        ChunkTimecodeFixer,
-        Throttle,
-    },
-    stream_parser::StreamEbml
+    fixers::{ChunkTimecodeFixer, Throttle},
+    stream_parser::StreamEbml,
 };
 
 pub fn options() -> App<'static, 'static> {
@@ -41,46 +23,49 @@ pub fn options() -> App<'static, 'static> {
             .help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)"))
 }
 
-type BoxedChunkStream = Box<dyn TryStream<Item = Result<Chunk, WebmetroError>, Ok = Chunk, Error = WebmetroError> + Send + Unpin>;
+type BoxedChunkStream = Box<
+    dyn TryStream<Item = Result<Chunk, WebmetroError>, Ok = Chunk, Error = WebmetroError>
+        + Send
+        + Sync
+        + Unpin,
+>;
 
 pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
     let mut timecode_fixer = ChunkTimecodeFixer::new();
     let mut chunk_stream: BoxedChunkStream = Box::new(
         stdin_stream()
-        .parse_ebml()
-        .chunk_webm()
-        .map_ok(move |chunk| timecode_fixer.process(chunk))
+            .parse_ebml()
+            .chunk_webm()
+            .map_ok(move |chunk| timecode_fixer.process(chunk)),
     );
 
     let url_str = match args.value_of("url") {
         Some(url) => String::from(url),
-        _ => return Err("Listen address wasn't provided".into())
+        _ => return Err("Listen address wasn't provided".into()),
     };
 
     if args.is_present("throttle") {
         chunk_stream = Box::new(Throttle::new(chunk_stream));
     }
 
-    let request_payload = Body::wrap_stream(Compat::new(chunk_stream.map_ok(
-        |webm_chunk| webm_chunk.into_bytes()
-    ).map_err(|err| {
-        eprintln!("{}", &err);
-        err
-    })));
+    let chunk_stream = chunk_stream
+        .map_ok(|webm_chunk| webm_chunk.into_bytes())
+        .map_err(|err| {
+            eprintln!("{}", &err);
+            err
+        });
 
-    
-    let request = Request::put(url_str)
-    .body(request_payload)
-    .map_err(WebmetroError::from)?;
+    let request_payload = Body::wrap_stream(chunk_stream);
 
-    let client = Client::builder().build(HttpConnector::new(1));
-    let future = client.request(request)
-    .and_then(|response| {
-        response.into_body().for_each(|_chunk| {
-            Ok(())
-        })
+    let request = Request::put(url_str).body(request_payload)?;
+    let client = Client::builder().build(HttpConnector::new());
+
+    Runtime::new().unwrap().block_on(async {
+        let response = client.request(request).await?;
+        let mut response_stream = response.into_body();
+        while let Some(response_chunk) = response_stream.next().await.transpose()? {
+            stdout().write_all(&response_chunk)?;
+        }
+        Ok(())
     })
-    .map_err(WebmetroError::from);
-
-    Runtime::new().unwrap().block_on(future)
 }
diff --git a/src/error.rs b/src/error.rs
index 2e40750..b54bc5a 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -6,6 +6,7 @@ custom_error!{pub WebmetroError
     EbmlError{source: crate::ebml::EbmlError} = "EBML error: {source}",
     HttpError{source: http::Error} = "HTTP error: {source}",
     HyperError{source: hyper::Error} = "Hyper error: {source}",
+    Hyper13Error{source: hyper13::Error} = "Hyper error: {source}",
     IoError{source: std::io::Error} = "IO error: {source}",
     TimerError{source: tokio::timer::Error} = "Timer error: {source}",
     WarpError{source: warp::Error} = "Warp error: {source}",