From 9b9b6beb54f1e2914a77fb50ce4d1cf998c583de Mon Sep 17 00:00:00 2001
From: Tangent 128 <Tangent128@gmail.com>
Date: Sun, 21 Oct 2018 02:25:24 -0400
Subject: [PATCH] Use hyper::Body instead of a custom Payload type

---
 src/commands/mod.rs   | 19 +------------------
 src/commands/relay.rs | 26 +++++++++++---------------
 src/commands/send.rs  |  8 +++++---
 src/main.rs           |  1 +
 4 files changed, 18 insertions(+), 36 deletions(-)

diff --git a/src/commands/mod.rs b/src/commands/mod.rs
index 2628da7..7e1abfd 100644
--- a/src/commands/mod.rs
+++ b/src/commands/mod.rs
@@ -1,5 +1,4 @@
 use std::io::{
-    Cursor,
     Error as IoError,
     stdin,
     Stdin
@@ -9,16 +8,12 @@ use futures::{
     prelude::*,
     stream::MapErr
 };
-use hyper::body::Payload;
 use tokio_io::io::AllowStdIo;
 use tokio_codec::{
     BytesCodec,
     FramedRead
 };
-use webmetro::{
-    chunk::Chunk,
-    error::WebmetroError,
-};
+use webmetro::error::WebmetroError;
 
 pub mod dump;
 pub mod filter;
@@ -32,15 +27,3 @@ pub fn stdin_stream() -> MapErr<FramedRead<AllowStdIo<Stdin>, BytesCodec>, fn(Io
     FramedRead::new(AllowStdIo::new(stdin()), BytesCodec::new())
     .map_err(WebmetroError::IoError)
 }
-
-/// A wrapper to make a Stream of Webm chunks work as a payload for Hyper
-pub struct WebmPayload<S: Send + 'static>(pub S);
-
-impl<S: Stream<Item = Chunk, Error = WebmetroError> + Send + 'static> Payload for WebmPayload<S> {
-    type Data = Cursor<Chunk>;
-    type Error = S::Error;
-
-    fn poll_data(&mut self) -> Poll<Option<Cursor<Chunk>>, WebmetroError> {
-        self.0.poll().map(|async| async.map(|option| option.map(Cursor::new)))
-    }
-}
diff --git a/src/commands/relay.rs b/src/commands/relay.rs
index ff11087..232cfc6 100644
--- a/src/commands/relay.rs
+++ b/src/commands/relay.rs
@@ -1,10 +1,10 @@
-use std::error::Error;
 use std::net::ToSocketAddrs;
 use std::sync::{
     Arc,
     Mutex
 };
 
+use bytes::Bytes;
 use clap::{App, Arg, ArgMatches, SubCommand};
 use futures::{
     Future,
@@ -39,14 +39,12 @@ use webmetro::{
         Listener,
         Transmitter
     },
-    chunk::{Chunk, WebmStream},
+    chunk::WebmStream,
     error::WebmetroError,
     fixers::ChunkStream,
     stream_parser::StreamEbml
 };
 
-use super::WebmPayload;
-
 const BUFFER_LIMIT: usize = 2 * 1024 * 1024;
 
 struct RelayServer(Arc<Mutex<Channel>>);
@@ -56,15 +54,15 @@ impl RelayServer {
         self.0.clone()
     }
 
-    fn get_stream(&self) -> impl Stream<Item = Chunk, Error = WebmetroError> {
+    fn get_stream(&self) -> impl Stream<Item = Bytes, Error = WebmetroError> {
         Listener::new(self.get_channel())
         .fix_timecodes()
         .find_starting_point()
+        .map(|webm_chunk| webm_chunk.into_bytes())
         .map_err(|err| match err {})
     }
 
-    fn post_stream<I: AsRef<[u8]>, S: Stream<Item = I> + Send + 'static>(&self, stream: S) -> impl Stream<Item = Chunk, Error = WebmetroError>
-    where S::Error: Error + Send + Sync {
+    fn post_stream(&self, stream: Body) -> impl Stream<Item = Bytes, Error = WebmetroError> {
         let source = stream
             .map_err(WebmetroError::from_err)
             .parse_ebml().with_soft_limit(BUFFER_LIMIT)
@@ -82,13 +80,11 @@ impl RelayServer {
     }
 }
 
-type BoxedBodyStream = Box<Stream<Item = Chunk, Error = WebmetroError> + Send + 'static>;
-
 impl Service for RelayServer {
     type ReqBody = Body;
-    type ResBody = WebmPayload<BoxedBodyStream>;
+    type ResBody = Body;
     type Error = WebmetroError;
-    type Future = FutureResult<Response<WebmPayload<BoxedBodyStream>>, WebmetroError>;
+    type Future = FutureResult<Response<Body>, WebmetroError>;
 
     fn call(&mut self, request: Request<Body>) -> Self::Future {
         let (Parts {method, uri, ..}, request_body) = request.into_parts();
@@ -99,7 +95,7 @@ impl Service for RelayServer {
                     .header(CONTENT_TYPE, "video/webm")
                     .header("X-Accel-Buffering", "no")
                     .header(CACHE_CONTROL, "no-cache, no-store")
-                    .body(WebmPayload(Box::new(empty()) as BoxedBodyStream))
+                    .body(Body::empty())
                     .unwrap()
             },
             (Method::GET, "/live") => {
@@ -107,17 +103,17 @@ impl Service for RelayServer {
                     .header(CONTENT_TYPE, "video/webm")
                     .header("X-Accel-Buffering", "no")
                     .header(CACHE_CONTROL, "no-cache, no-store")
-                    .body(WebmPayload(Box::new(self.get_stream()) as BoxedBodyStream))
+                    .body(Body::wrap_stream(self.get_stream()))
                     .unwrap()
             },
             (Method::POST, "/live") | (Method::PUT, "/live") => {
                 println!("[Info] New source on {}", uri.path());
-                Response::new(WebmPayload(Box::new(self.post_stream(request_body)) as BoxedBodyStream))
+                Response::new(Body::wrap_stream(self.post_stream(request_body)))
             },
             _ => {
                 Response::builder()
                     .status(StatusCode::NOT_FOUND)
-                    .body(WebmPayload(Box::new(empty()) as BoxedBodyStream))
+                    .body(Body::empty())
                     .unwrap()
             }
         })
diff --git a/src/commands/send.rs b/src/commands/send.rs
index 03467f3..148e9ad 100644
--- a/src/commands/send.rs
+++ b/src/commands/send.rs
@@ -3,6 +3,7 @@ use futures::{
     prelude::*
 };
 use hyper::{
+    Body,
     Client,
     client::HttpConnector,
     Request
@@ -10,8 +11,7 @@ use hyper::{
 use tokio::runtime::Runtime;
 
 use super::{
-    stdin_stream,
-    WebmPayload
+    stdin_stream
 };
 use webmetro::{
     chunk::{
@@ -53,7 +53,9 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
         chunk_stream = Box::new(chunk_stream.throttle());
     }
 
-    let request_payload = WebmPayload(chunk_stream.map_err(|err| {
+    let request_payload = Body::wrap_stream(chunk_stream.map(
+        |webm_chunk| webm_chunk.into_bytes()
+    ).map_err(|err| {
         eprintln!("{}", &err);
         err
     }));
diff --git a/src/main.rs b/src/main.rs
index e2616cb..4f45e54 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,3 +1,4 @@
+extern crate bytes;
 #[macro_use] extern crate clap;
 extern crate futures;
 extern crate http;