Use core futures for the fixers; try different approach to timecode fixer
This commit is contained in:
parent
cfb56f1281
commit
3a6ca629ca
6 changed files with 371 additions and 58 deletions
src
100
src/fixers.rs
100
src/fixers.rs
|
@ -1,27 +1,36 @@
|
|||
use std::pin::Pin;
|
||||
use std::task::{
|
||||
Context,
|
||||
Poll
|
||||
};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use futures::prelude::*;
|
||||
use tokio::timer::Delay;
|
||||
use futures3::prelude::*;
|
||||
use tokio2::timer::{
|
||||
delay,
|
||||
Delay
|
||||
};
|
||||
|
||||
use crate::chunk::Chunk;
|
||||
use crate::error::WebmetroError;
|
||||
|
||||
pub struct ChunkTimecodeFixer<S> {
|
||||
stream: S,
|
||||
pub struct ChunkTimecodeFixer {
|
||||
current_offset: u64,
|
||||
last_observed_timecode: u64,
|
||||
assumed_duration: u64
|
||||
}
|
||||
|
||||
impl<S: Stream<Item = Chunk>> Stream for ChunkTimecodeFixer<S>
|
||||
{
|
||||
type Item = S::Item;
|
||||
type Error = S::Error;
|
||||
|
||||
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
|
||||
let mut poll_chunk = self.stream.poll();
|
||||
match poll_chunk {
|
||||
Ok(Async::Ready(Some(Chunk::ClusterHead(ref mut cluster_head)))) => {
|
||||
impl ChunkTimecodeFixer {
|
||||
pub fn new() -> ChunkTimecodeFixer {
|
||||
ChunkTimecodeFixer {
|
||||
current_offset: 0,
|
||||
last_observed_timecode: 0,
|
||||
assumed_duration: 33
|
||||
}
|
||||
}
|
||||
pub fn process<'a>(&mut self, mut chunk: Chunk) -> Chunk {
|
||||
match chunk {
|
||||
Chunk::ClusterHead(ref mut cluster_head) => {
|
||||
let start = cluster_head.start;
|
||||
if start < self.last_observed_timecode {
|
||||
let next_timecode = self.last_observed_timecode + self.assumed_duration;
|
||||
|
@ -30,10 +39,10 @@ impl<S: Stream<Item = Chunk>> Stream for ChunkTimecodeFixer<S>
|
|||
|
||||
cluster_head.update_timecode(start + self.current_offset);
|
||||
self.last_observed_timecode = cluster_head.end;
|
||||
},
|
||||
}
|
||||
_ => {}
|
||||
};
|
||||
poll_chunk
|
||||
}
|
||||
chunk
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -43,33 +52,32 @@ pub struct StartingPointFinder<S> {
|
|||
seen_keyframe: bool
|
||||
}
|
||||
|
||||
impl<S: Stream<Item = Chunk>> Stream for StartingPointFinder<S>
|
||||
impl<S: TryStream<Ok = Chunk> + Unpin> Stream for StartingPointFinder<S>
|
||||
{
|
||||
type Item = S::Item;
|
||||
type Error = S::Error;
|
||||
type Item = Result<Chunk, S::Error>;
|
||||
|
||||
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, S::Error>>> {
|
||||
loop {
|
||||
return match self.stream.poll() {
|
||||
Ok(Async::Ready(Some(Chunk::ClusterHead(cluster_head)))) => {
|
||||
return match self.stream.try_poll_next_unpin(cx) {
|
||||
Poll::Ready(Some(Ok(Chunk::ClusterHead(cluster_head)))) => {
|
||||
if cluster_head.keyframe {
|
||||
self.seen_keyframe = true;
|
||||
}
|
||||
|
||||
if self.seen_keyframe {
|
||||
Ok(Async::Ready(Some(Chunk::ClusterHead(cluster_head))))
|
||||
Poll::Ready(Some(Ok(Chunk::ClusterHead(cluster_head))))
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
},
|
||||
chunk @ Ok(Async::Ready(Some(Chunk::ClusterBody {..}))) => {
|
||||
chunk @ Poll::Ready(Some(Ok(Chunk::ClusterBody {..}))) => {
|
||||
if self.seen_keyframe {
|
||||
chunk
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
},
|
||||
chunk @ Ok(Async::Ready(Some(Chunk::Headers {..}))) => {
|
||||
chunk @ Poll::Ready(Some(Ok(Chunk::Headers {..}))) => {
|
||||
if self.seen_header {
|
||||
// new stream starting, we don't need a new header but should wait for a safe spot to resume
|
||||
self.seen_keyframe = false;
|
||||
|
@ -91,37 +99,35 @@ pub struct Throttle<S> {
|
|||
sleep: Delay
|
||||
}
|
||||
|
||||
impl<S: Stream<Item = Chunk, Error = WebmetroError>> Stream for Throttle<S>
|
||||
impl<S: TryStream<Ok = Chunk, Error = WebmetroError> + Unpin> Stream for Throttle<S>
|
||||
{
|
||||
type Item = S::Item;
|
||||
type Error = WebmetroError;
|
||||
type Item = Result<Chunk, WebmetroError>;
|
||||
|
||||
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, WebmetroError> {
|
||||
match self.sleep.poll() {
|
||||
Err(err) => return Err(err.into()),
|
||||
Ok(Async::NotReady) => return Ok(Async::NotReady),
|
||||
Ok(Async::Ready(())) => { /* can continue */ }
|
||||
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Result<Chunk, WebmetroError>>> {
|
||||
match self.sleep.poll_unpin(cx) {
|
||||
Poll::Pending => return Poll::Pending,
|
||||
Poll::Ready(()) => { /* can continue */ },
|
||||
}
|
||||
|
||||
let next_chunk = self.stream.poll();
|
||||
if let Ok(Async::Ready(Some(Chunk::ClusterHead(ref cluster_head)))) = next_chunk {
|
||||
let next_chunk = self.stream.try_poll_next_unpin(cx);
|
||||
if let Poll::Ready(Some(Ok(Chunk::ClusterHead(ref cluster_head)))) = next_chunk {
|
||||
// snooze until real time has "caught up" to the stream
|
||||
let offset = Duration::from_millis(cluster_head.end);
|
||||
self.sleep.reset(self.start_time + offset);
|
||||
let sleep_until = self.start_time + offset;
|
||||
self.sleep.reset(sleep_until);
|
||||
}
|
||||
next_chunk
|
||||
}
|
||||
}
|
||||
|
||||
pub trait ChunkStream where Self : Sized + Stream<Item = Chunk> {
|
||||
fn fix_timecodes(self) -> ChunkTimecodeFixer<Self> {
|
||||
ChunkTimecodeFixer {
|
||||
stream: self,
|
||||
current_offset: 0,
|
||||
last_observed_timecode: 0,
|
||||
assumed_duration: 33
|
||||
}
|
||||
}
|
||||
pub trait ChunkStream where Self : Sized + TryStream<Ok = Chunk> {
|
||||
/*fn fix_timecodes(self) -> Map<_> {
|
||||
let fixer = ;
|
||||
self.map(move |chunk| {
|
||||
fixer.process(chunk);
|
||||
chunk
|
||||
})
|
||||
}*/
|
||||
|
||||
fn find_starting_point(self) -> StartingPointFinder<Self> {
|
||||
StartingPointFinder {
|
||||
|
@ -136,9 +142,9 @@ pub trait ChunkStream where Self : Sized + Stream<Item = Chunk> {
|
|||
Throttle {
|
||||
stream: self,
|
||||
start_time: now,
|
||||
sleep: Delay::new(now)
|
||||
sleep: delay(now)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Stream<Item = Chunk>> ChunkStream for T {}
|
||||
impl<T: TryStream<Ok = Chunk>> ChunkStream for T {}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue