diff --git a/Cargo.toml b/Cargo.toml index 280bd11..7c6aff3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,3 +7,4 @@ authors = ["Tangent 128 "] bytes = "0.4" futures = "0.1.20" hyper = "0.11.24" +odds = { version = "0.3.1", features = ["std-vec"] } diff --git a/src/channel.rs b/src/channel.rs new file mode 100644 index 0000000..961b441 --- /dev/null +++ b/src/channel.rs @@ -0,0 +1,102 @@ +use std::sync::{ + Arc, + Mutex +}; + +use futures::{ + Async, + AsyncSink, + Sink, + Stream, + sync::mpsc::{ + channel as mpsc_channel, + Sender, + Receiver + } +}; +use odds::vec::VecExt; + +use chunk::Chunk; + +/// A collection of listeners to a stream of WebM chunks. +/// Sending a chunk may fail due to a client being disconnected, +/// or simply failing to keep up with the stream buffer. In either +/// case, there's nothing practical the server can do to recover, +/// so the failing client is just dropped from the listener list. +pub struct Channel { + header_chunk: Option, + listeners: Vec> +} + +impl Channel { + pub fn new() -> Arc> { + Arc::new(Mutex::new(Channel { + header_chunk: None, + listeners: Vec::new() + })) + } +} + +pub struct Transmitter { + channel: Arc> +} + +impl Sink for Transmitter { + type SinkItem = Chunk; + type SinkError = (); // never errors, slow clients are simply dropped + + fn start_send(&mut self, chunk: Chunk) -> Result, ()> { + let mut channel = self.channel.lock().expect("Locking channel"); + + if let Chunk::Headers { .. } = chunk { + channel.header_chunk = Some(chunk.clone()); + } + + channel.listeners.retain_mut(|listener| listener.start_send(chunk.clone()).is_ok()); + + Ok(AsyncSink::Ready) + } + fn poll_complete(&mut self) -> Result, ()> { + let mut channel = self.channel.lock().expect("Locking channel"); + + channel.listeners.retain_mut(|listener| listener.poll_complete().is_ok()); + + Ok(Async::Ready(())) + } +} + +pub struct Listener { + /// not used in operation, but its refcount keeps the channel alive when there's no Transmitter + _channel: Arc>, + receiver: Receiver +} + +impl Listener { + pub fn new(channel_arc: Arc>) -> Self { + let (mut sender, receiver) = mpsc_channel(5); + + { + let mut channel = channel_arc.lock().expect("Locking channel"); + + if let Some(ref chunk) = channel.header_chunk { + sender.start_send(chunk.clone()).expect("Queuing existing header chunk"); + } + + channel.listeners.push(sender); + } + + Listener { + _channel: channel_arc, + receiver: receiver + } + } +} + +impl Stream for Listener { + type Item = Chunk; + type Error = (); // no transmitter errors are exposed to the listeners + + fn poll(&mut self) -> Result>, ()> { + self.receiver.poll() + } +} diff --git a/src/lib.rs b/src/lib.rs index 2a0ff22..b7cde9f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ extern crate bytes; extern crate futures; +extern crate odds; pub mod ebml; pub mod iterator; @@ -11,6 +12,8 @@ pub mod chunk; pub mod fixers; pub mod webm; +pub mod channel; + pub use ebml::{EbmlError, FromEbml}; #[cfg(test)]