From c422a1c3f3dbb379f79f93373fbae52ea4e5a57a Mon Sep 17 00:00:00 2001 From: Tangent Wantwight Date: Fri, 8 May 2020 21:15:18 -0400 Subject: [PATCH] Simplify/sanify some types --- src/chunk.rs | 4 +++- src/commands/filter.rs | 36 +++++++++++++----------------------- src/commands/mod.rs | 15 +++------------ src/commands/send.rs | 7 +------ src/fixers.rs | 9 ++++----- src/stream_parser.rs | 35 ++++++++++++++++------------------- 6 files changed, 40 insertions(+), 66 deletions(-) diff --git a/src/chunk.rs b/src/chunk.rs index f33e73e..64bcc1a 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -141,7 +141,9 @@ fn encode(element: WebmElement, buffer: &mut Cursor>, limit: Option> + Unpin> Stream for WebmChunker +impl> + Unpin> Stream for WebmChunker +where + WebmetroError: From, { type Item = Result; diff --git a/src/commands/filter.rs b/src/commands/filter.rs index b9bf3a4..6b57326 100644 --- a/src/commands/filter.rs +++ b/src/commands/filter.rs @@ -1,23 +1,14 @@ -use std::{ - io, - io::prelude::* -}; +use std::{io, io::prelude::*}; use clap::{App, Arg, ArgMatches, SubCommand}; use futures::prelude::*; use super::stdin_stream; use webmetro::{ - chunk::{ - Chunk, - WebmStream - }, + chunk::{Chunk, WebmStream}, error::WebmetroError, - fixers::{ - ChunkTimecodeFixer, - Throttle, - }, - stream_parser::StreamEbml + fixers::{ChunkTimecodeFixer, Throttle}, + stream_parser::StreamEbml, }; pub fn options() -> App<'static, 'static> { @@ -31,21 +22,20 @@ pub fn options() -> App<'static, 'static> { #[tokio::main] pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> { let mut timecode_fixer = ChunkTimecodeFixer::new(); - let mut chunk_stream: Box, Ok = Chunk, Error = WebmetroError> + Send + Unpin> = Box::new( - stdin_stream() - .parse_ebml() - .chunk_webm() - .map_ok(move |chunk| timecode_fixer.process(chunk)) - ); + let mut chunk_stream: Box> + Send + Unpin> = + Box::new( + stdin_stream() + .parse_ebml() + .chunk_webm() + .map_ok(move |chunk| timecode_fixer.process(chunk)), + ); if args.is_present("throttle") { chunk_stream = Box::new(Throttle::new(chunk_stream)); } while let Some(chunk) = chunk_stream.next().await { - chunk?.try_for_each(|buffer| - io::stdout().write_all(&buffer) - )?; - }; + chunk?.try_for_each(|buffer| io::stdout().write_all(&buffer))?; + } Ok(()) } diff --git a/src/commands/mod.rs b/src/commands/mod.rs index 00a3d7a..c442c07 100644 --- a/src/commands/mod.rs +++ b/src/commands/mod.rs @@ -1,9 +1,6 @@ -use std::io::Cursor; - use bytes::Bytes; -use futures::{TryStream, TryStreamExt}; +use futures::{Stream, TryStreamExt}; use tokio_util::codec::{BytesCodec, FramedRead}; -use webmetro::error::WebmetroError; pub mod dump; pub mod filter; @@ -13,13 +10,7 @@ pub mod send; /// An adapter that makes chunks of bytes from stdin available as a Stream; /// is NOT actually async, and just uses blocking read. Don't use more than /// one at once, who knows who gets which bytes. -pub fn stdin_stream() -> impl TryStream< - Item = Result, WebmetroError>, - Ok = Cursor, - Error = WebmetroError, -> + Sized - + Unpin { +pub fn stdin_stream() -> impl Stream> + Sized + Unpin { FramedRead::new(tokio::io::stdin(), BytesCodec::new()) - .map_ok(|bytes| Cursor::new(bytes.freeze())) - .map_err(WebmetroError::from) + .map_ok(|bytes| bytes.freeze()) } diff --git a/src/commands/send.rs b/src/commands/send.rs index 1919939..8917aeb 100644 --- a/src/commands/send.rs +++ b/src/commands/send.rs @@ -22,12 +22,7 @@ pub fn options() -> App<'static, 'static> { .help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)")) } -type BoxedChunkStream = Box< - dyn TryStream, Ok = Chunk, Error = WebmetroError> - + Send - + Sync - + Unpin, ->; +type BoxedChunkStream = Box> + Send + Sync + Unpin>; #[tokio::main] pub async fn run(args: &ArgMatches) -> Result<(), WebmetroError> { diff --git a/src/fixers.rs b/src/fixers.rs index f91046e..bdb3606 100644 --- a/src/fixers.rs +++ b/src/fixers.rs @@ -13,7 +13,6 @@ use tokio::time::{ }; use crate::chunk::Chunk; -use crate::error::WebmetroError; pub struct ChunkTimecodeFixer { current_offset: u64, @@ -29,7 +28,7 @@ impl ChunkTimecodeFixer { assumed_duration: 33 } } - pub fn process<'a>(&mut self, mut chunk: Chunk) -> Chunk { + pub fn process(&mut self, mut chunk: Chunk) -> Chunk { match chunk { Chunk::ClusterHead(ref mut cluster_head) => { let start = cluster_head.start; @@ -111,11 +110,11 @@ impl Throttle { } } -impl + Unpin> Stream for Throttle +impl + Unpin> Stream for Throttle { - type Item = Result; + type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll>> { match self.sleep.poll_unpin(cx) { Poll::Pending => return Poll::Pending, Poll::Ready(()) => { /* can continue */ }, diff --git a/src/stream_parser.rs b/src/stream_parser.rs index 0697ff6..4e9b433 100644 --- a/src/stream_parser.rs +++ b/src/stream_parser.rs @@ -1,5 +1,5 @@ use bytes::{Buf, BufMut, Bytes, BytesMut}; -use futures::stream::{Stream, StreamExt, TryStream}; +use futures::stream::{Stream, StreamExt}; use std::task::{Context, Poll}; use crate::ebml::FromEbml; @@ -22,11 +22,7 @@ impl EbmlStreamingParser { } } -pub trait StreamEbml: Sized + TryStream + Unpin -where - Self: Sized + TryStream + Unpin, - Self::Ok: Buf, -{ +pub trait StreamEbml: Sized { fn parse_ebml(self) -> EbmlStreamingParser { EbmlStreamingParser { stream: self, @@ -37,9 +33,13 @@ where } } -impl> + Unpin> StreamEbml for S {} +impl> + Unpin> StreamEbml for S where WebmetroError: From +{} -impl> + Unpin> EbmlStreamingParser { +impl> + Unpin> EbmlStreamingParser +where + WebmetroError: From, +{ pub fn poll_event<'a, T: FromEbml<'a>>( &'a mut self, cx: &mut Context, @@ -53,10 +53,9 @@ impl> + Unpin> EbmlStreamingPa let mut bytes = self.buffer.split_to(info.element_len).freeze(); bytes.advance(info.body_offset); self.borrowed = bytes; - return Poll::Ready(Some(T::decode( - info.element_id, - &self.borrowed, - ).map_err(Into::into))); + return Poll::Ready(Some( + T::decode(info.element_id, &self.borrowed).map_err(Into::into), + )); } } @@ -77,9 +76,7 @@ impl> + Unpin> EbmlStreamingPa } } } -} -impl> + Unpin> EbmlStreamingParser { pub async fn next<'a, T: FromEbml<'a>>(&'a mut self) -> Result, WebmetroError> { loop { if let Some(info) = T::check_space(&self.buffer)? { @@ -130,7 +127,7 @@ mod tests { ]; let mut stream_parser = futures::stream::iter(pieces.iter()) - .map(|bytes| Ok(&bytes[..])) + .map(|bytes| Ok::<&[u8], WebmetroError>(&bytes[..])) .parse_ebml(); assert_matches!( @@ -182,7 +179,7 @@ mod tests { async { let mut parser = futures::stream::iter(pieces.iter()) - .map(|bytes| Ok(&bytes[..])) + .map(|bytes| Ok::<&[u8], WebmetroError>(&bytes[..])) .parse_ebml(); assert_matches!(parser.next().await?, Some(WebmElement::EbmlHead)); @@ -196,8 +193,8 @@ mod tests { Result::<(), WebmetroError>::Ok(()) } - .now_or_never() - .expect("Test tried to block on I/O") - .expect("Parse failed"); + .now_or_never() + .expect("Test tried to block on I/O") + .expect("Parse failed"); } }