ensure Throttle never waits before the first chunk of data, even if it's timestamped after zero
This commit is contained in:
parent
43acf93fa9
commit
18fb8390a0
1 changed files with 4 additions and 3 deletions
|
@ -115,10 +115,11 @@ impl<S: TryStream<Ok = Chunk> + Unpin> Stream for Throttle<S>
|
||||||
|
|
||||||
let next_chunk = self.stream.try_poll_next_unpin(cx);
|
let next_chunk = self.stream.try_poll_next_unpin(cx);
|
||||||
if let Poll::Ready(Some(Ok(Chunk::Cluster(ref cluster_head, _)))) = next_chunk {
|
if let Poll::Ready(Some(Ok(Chunk::Cluster(ref cluster_head, _)))) = next_chunk {
|
||||||
// we have actual data, so start the clock if we haven't yet
|
|
||||||
let start_time = self.start_time.get_or_insert_with(Instant::now);
|
|
||||||
// snooze until real time has "caught up" to the stream
|
|
||||||
let offset = Duration::from_millis(cluster_head.end);
|
let offset = Duration::from_millis(cluster_head.end);
|
||||||
|
// we have actual data, so start the clock if we haven't yet;
|
||||||
|
// if we're starting the clock now, though, don't insert delays if the first chunk happens to start after zero
|
||||||
|
let start_time = self.start_time.get_or_insert_with(|| Instant::now() - offset);
|
||||||
|
// snooze until real time has "caught up" to the stream
|
||||||
let sleep_until = *start_time + offset;
|
let sleep_until = *start_time + offset;
|
||||||
self.sleep.reset(sleep_until);
|
self.sleep.reset(sleep_until);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue