Update hyper to 0.12
This commit is contained in:
parent
55e1f29906
commit
14d468cc7d
8 changed files with 169 additions and 217 deletions
|
@ -54,6 +54,7 @@ impl AsRef<[u8]> for ClusterHead {
|
|||
}
|
||||
}
|
||||
|
||||
/// A chunk of WebM data
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum Chunk {
|
||||
Headers {
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
use std::error::Error;
|
||||
use std::io::{
|
||||
Cursor,
|
||||
Error as IoError,
|
||||
ErrorKind,
|
||||
stdin,
|
||||
Stdin
|
||||
};
|
||||
|
@ -10,7 +9,7 @@ use futures::{
|
|||
prelude::*,
|
||||
stream::MapErr
|
||||
};
|
||||
use hyper::Error as HyperError;
|
||||
use hyper::body::Payload;
|
||||
use tokio_io::{
|
||||
io::AllowStdIo,
|
||||
codec::{
|
||||
|
@ -18,7 +17,10 @@ use tokio_io::{
|
|||
FramedRead
|
||||
}
|
||||
};
|
||||
use webmetro::error::WebmetroError;
|
||||
use webmetro::{
|
||||
chunk::Chunk,
|
||||
error::WebmetroError,
|
||||
};
|
||||
|
||||
pub mod dump;
|
||||
pub mod filter;
|
||||
|
@ -33,9 +35,14 @@ pub fn stdin_stream() -> MapErr<FramedRead<AllowStdIo<Stdin>, BytesCodec>, fn(Io
|
|||
.map_err(WebmetroError::IoError)
|
||||
}
|
||||
|
||||
pub fn to_hyper_error(err: WebmetroError) -> HyperError {
|
||||
match err {
|
||||
WebmetroError::IoError(io_err) => io_err.into(),
|
||||
err => IoError::new(ErrorKind::InvalidData, err.description()).into()
|
||||
/// A wrapper to make a Stream of Webm chunks work as a payload for Hyper
|
||||
pub struct WebmPayload<S: Send + 'static>(pub S);
|
||||
|
||||
impl<S: Stream<Item = Chunk, Error = WebmetroError> + Send + 'static> Payload for WebmPayload<S> {
|
||||
type Data = Cursor<Chunk>;
|
||||
type Error = S::Error;
|
||||
|
||||
fn poll_data(&mut self) -> Poll<Option<Cursor<Chunk>>, WebmetroError> {
|
||||
self.0.poll().map(|async| async.map(|option| option.map(Cursor::new)))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -16,19 +16,22 @@ use futures::{
|
|||
},
|
||||
stream::empty
|
||||
};
|
||||
use hyper::{
|
||||
Error as HyperError,
|
||||
Get,
|
||||
Head,
|
||||
Post,
|
||||
Put,
|
||||
use http::{
|
||||
request::Parts,
|
||||
StatusCode,
|
||||
};
|
||||
use hyper::{
|
||||
Body,
|
||||
Method,
|
||||
Request,
|
||||
Response,
|
||||
rt,
|
||||
Server,
|
||||
service::Service,
|
||||
header::{
|
||||
CacheControl,
|
||||
CacheDirective,
|
||||
ContentType
|
||||
},
|
||||
server::{Http, Request, Response, Service}
|
||||
CACHE_CONTROL,
|
||||
CONTENT_TYPE
|
||||
}
|
||||
};
|
||||
use webmetro::{
|
||||
channel::{
|
||||
|
@ -42,9 +45,7 @@ use webmetro::{
|
|||
stream_parser::StreamEbml
|
||||
};
|
||||
|
||||
use super::to_hyper_error;
|
||||
|
||||
header! { (XAccelBuffering, "X-Accel-Buffering") => [String] }
|
||||
use super::WebmPayload;
|
||||
|
||||
const BUFFER_LIMIT: usize = 2 * 1024 * 1024;
|
||||
|
||||
|
@ -55,15 +56,15 @@ impl RelayServer {
|
|||
self.0.clone()
|
||||
}
|
||||
|
||||
fn get_stream(&self) -> impl Stream<Item = Chunk, Error = HyperError> {
|
||||
fn get_stream(&self) -> impl Stream<Item = Chunk, Error = WebmetroError> {
|
||||
Listener::new(self.get_channel())
|
||||
.fix_timecodes()
|
||||
.find_starting_point()
|
||||
.map_err(|err| match err {})
|
||||
}
|
||||
|
||||
fn post_stream<I: AsRef<[u8]>, S: Stream<Item = I> + 'static>(&self, stream: S) -> impl Stream<Item = Chunk, Error = HyperError>
|
||||
where S::Error: Error + Send {
|
||||
fn post_stream<I: AsRef<[u8]>, S: Stream<Item = I> + Send + 'static>(&self, stream: S) -> impl Stream<Item = Chunk, Error = WebmetroError>
|
||||
where S::Error: Error + Send + Sync {
|
||||
let source = stream
|
||||
.map_err(WebmetroError::from_err)
|
||||
.parse_ebml().with_soft_limit(BUFFER_LIMIT)
|
||||
|
@ -75,46 +76,49 @@ impl RelayServer {
|
|||
.map(|_| empty())
|
||||
.map_err(|err| {
|
||||
println!("[Warning] {}", err);
|
||||
to_hyper_error(err)
|
||||
err
|
||||
})
|
||||
.flatten()
|
||||
}
|
||||
}
|
||||
|
||||
type BoxedBodyStream = Box<Stream<Item = Chunk, Error = HyperError>>;
|
||||
type BoxedBodyStream = Box<Stream<Item = Chunk, Error = WebmetroError> + Send + 'static>;
|
||||
|
||||
impl Service for RelayServer {
|
||||
type Request = Request;
|
||||
type Response = Response<BoxedBodyStream>;
|
||||
type Error = HyperError;
|
||||
type Future = FutureResult<Self::Response, HyperError>;
|
||||
type ReqBody = Body;
|
||||
type ResBody = WebmPayload<BoxedBodyStream>;
|
||||
type Error = WebmetroError;
|
||||
type Future = FutureResult<Response<WebmPayload<BoxedBodyStream>>, WebmetroError>;
|
||||
|
||||
fn call(&self, request: Request) -> Self::Future {
|
||||
let (method, uri, _http_version, _headers, request_body) = request.deconstruct();
|
||||
|
||||
//TODO: log equiv to: eprintln!("New {} Request: {}", method, uri.path());
|
||||
fn call(&mut self, request: Request<Body>) -> Self::Future {
|
||||
let (Parts {method, uri, ..}, request_body) = request.into_parts();
|
||||
|
||||
ok(match (method, uri.path()) {
|
||||
(Head, "/live") => {
|
||||
Response::new()
|
||||
.with_header(ContentType("video/webm".parse().unwrap()))
|
||||
.with_header(XAccelBuffering("no".to_string()))
|
||||
.with_header(CacheControl(vec![CacheDirective::NoCache, CacheDirective::NoStore]))
|
||||
(Method::HEAD, "/live") => {
|
||||
Response::builder()
|
||||
.header(CONTENT_TYPE, "video/webm")
|
||||
.header("X-Accel-Buffering", "no")
|
||||
.header(CACHE_CONTROL, "no-cache, no-store")
|
||||
.body(WebmPayload(Box::new(empty()) as BoxedBodyStream))
|
||||
.unwrap()
|
||||
},
|
||||
(Get, "/live") => {
|
||||
Response::new()
|
||||
.with_header(ContentType("video/webm".parse().unwrap()))
|
||||
.with_header(XAccelBuffering("no".to_string()))
|
||||
.with_header(CacheControl(vec![CacheDirective::NoCache, CacheDirective::NoStore]))
|
||||
.with_body(Box::new(self.get_stream()) as BoxedBodyStream)
|
||||
(Method::GET, "/live") => {
|
||||
Response::builder()
|
||||
.header(CONTENT_TYPE, "video/webm")
|
||||
.header("X-Accel-Buffering", "no")
|
||||
.header(CACHE_CONTROL, "no-cache, no-store")
|
||||
.body(WebmPayload(Box::new(self.get_stream()) as BoxedBodyStream))
|
||||
.unwrap()
|
||||
},
|
||||
(Post, "/live") | (Put, "/live") => {
|
||||
Response::new()
|
||||
.with_body(Box::new(self.post_stream(request_body)) as BoxedBodyStream)
|
||||
(Method::POST, "/live") | (Method::PUT, "/live") => {
|
||||
println!("[Info] New source on {}", uri.path());
|
||||
Response::new(WebmPayload(Box::new(self.post_stream(request_body)) as BoxedBodyStream))
|
||||
},
|
||||
_ => {
|
||||
Response::new()
|
||||
.with_status(StatusCode::NotFound)
|
||||
Response::builder()
|
||||
.status(StatusCode::NOT_FOUND)
|
||||
.body(WebmPayload(Box::new(empty()) as BoxedBodyStream))
|
||||
.unwrap()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -134,13 +138,14 @@ pub fn run(args: &ArgMatches) -> Result<(), WebmetroError> {
|
|||
let addr_str = args.value_of("listen").ok_or("Listen address wasn't provided")?;
|
||||
let addr = addr_str.to_socket_addrs()?.next().ok_or("Listen address didn't resolve")?;
|
||||
|
||||
Http::new()
|
||||
.bind(&addr, move || {
|
||||
Ok(RelayServer(single_channel.clone()))
|
||||
rt::run(Server::bind(&addr)
|
||||
.serve(move || {
|
||||
ok::<_, WebmetroError>(RelayServer(single_channel.clone()))
|
||||
})
|
||||
.map_err(|err| WebmetroError::Unknown(Box::new(err)))?
|
||||
.run()
|
||||
.map_err(|err| WebmetroError::Unknown(Box::new(err)))?;
|
||||
.map_err(|err| {
|
||||
println!("[Error] {}", err);
|
||||
})
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -4,12 +4,9 @@ use futures::{
|
|||
prelude::*
|
||||
};
|
||||
use hyper::{
|
||||
Error as HyperError,
|
||||
Method,
|
||||
client::{
|
||||
Config,
|
||||
Request
|
||||
}
|
||||
Client,
|
||||
client::HttpConnector,
|
||||
Request
|
||||
};
|
||||
use tokio_core::reactor::{
|
||||
Handle
|
||||
|
@ -17,7 +14,7 @@ use tokio_core::reactor::{
|
|||
|
||||
use super::{
|
||||
stdin_stream,
|
||||
to_hyper_error
|
||||
WebmPayload
|
||||
};
|
||||
use webmetro::{
|
||||
chunk::{
|
||||
|
@ -40,10 +37,9 @@ pub fn options() -> App<'static, 'static> {
|
|||
.help("Slow down upload to \"real time\" speed as determined by the timestamps (useful for streaming static files)"))
|
||||
}
|
||||
|
||||
type BoxedChunkStream = Box<Stream<Item = Chunk, Error = WebmetroError>>;
|
||||
type BoxedHyperStream = Box<Stream<Item = Chunk, Error = HyperError>>;
|
||||
type BoxedChunkStream = Box<Stream<Item = Chunk, Error = WebmetroError> + Send>;
|
||||
|
||||
pub fn run(handle: Handle, args: &ArgMatches) -> Box<Future<Item=(), Error=WebmetroError>> {
|
||||
pub fn run(_handle: Handle, args: &ArgMatches) -> Box<Future<Item=(), Error=WebmetroError>> {
|
||||
let mut chunk_stream: BoxedChunkStream = Box::new(
|
||||
stdin_stream()
|
||||
.parse_ebml()
|
||||
|
@ -60,24 +56,20 @@ pub fn run(handle: Handle, args: &ArgMatches) -> Box<Future<Item=(), Error=Webme
|
|||
chunk_stream = Box::new(chunk_stream.throttle());
|
||||
}
|
||||
|
||||
let request_body_stream = Box::new(chunk_stream.map_err(|err| {
|
||||
let request_payload = WebmPayload(chunk_stream.map_err(|err| {
|
||||
eprintln!("{}", &err);
|
||||
to_hyper_error(err)
|
||||
})) as BoxedHyperStream;
|
||||
err
|
||||
}));
|
||||
|
||||
Box::new(future::lazy(move || {
|
||||
url_str.parse().map_err(WebmetroError::from_err)
|
||||
}).and_then(move |uri| {
|
||||
let client = Config::default()
|
||||
.body::<BoxedHyperStream>()
|
||||
.build(&handle);
|
||||
|
||||
let mut request: Request<BoxedHyperStream> = Request::new(Method::Put, uri);
|
||||
request.set_body(request_body_stream);
|
||||
|
||||
Request::put(url_str)
|
||||
.body(request_payload)
|
||||
.map_err(WebmetroError::from_err)
|
||||
}).and_then(|request| {
|
||||
let client = Client::builder().build(HttpConnector::new(1));
|
||||
client.request(request)
|
||||
.and_then(|response| {
|
||||
response.body().for_each(|_chunk| {
|
||||
response.into_body().for_each(|_chunk| {
|
||||
Ok(())
|
||||
})
|
||||
})
|
||||
|
|
|
@ -15,7 +15,7 @@ pub enum WebmetroError {
|
|||
ResourcesExceeded,
|
||||
EbmlError(EbmlError),
|
||||
IoError(IoError),
|
||||
Unknown(Box<Error + Send>)
|
||||
Unknown(Box<Error + Send + Sync>)
|
||||
}
|
||||
|
||||
impl WebmetroError {
|
||||
|
@ -23,7 +23,7 @@ impl WebmetroError {
|
|||
string.into()
|
||||
}
|
||||
|
||||
pub fn from_err<E: Error + Send + 'static>(err: E) -> WebmetroError {
|
||||
pub fn from_err<E: Error + Send + Sync + 'static>(err: E) -> WebmetroError {
|
||||
WebmetroError::Unknown(Box::new(err))
|
||||
}
|
||||
}
|
||||
|
@ -61,8 +61,8 @@ impl From<IoError> for WebmetroError {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<Box<Error + Send>> for WebmetroError {
|
||||
fn from(err: Box<Error + Send>) -> WebmetroError {
|
||||
impl From<Box<Error + Send + Sync>> for WebmetroError {
|
||||
fn from(err: Box<Error + Send + Sync>) -> WebmetroError {
|
||||
WebmetroError::Unknown(err)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#[macro_use] extern crate clap;
|
||||
extern crate futures;
|
||||
#[macro_use] extern crate hyper;
|
||||
extern crate http;
|
||||
extern crate hyper;
|
||||
extern crate tokio;
|
||||
extern crate tokio_core;
|
||||
extern crate tokio_io;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue