From 710af7e1d7a02573e63fd91b7d0ec5e0269ef2c4 Mon Sep 17 00:00:00 2001
From: Tangent 128 <Tangent128@gmail.com>
Date: Sat, 19 Oct 2019 16:37:56 -0400
Subject: [PATCH] feeder-based parser proof-of-concept adjusted to work with
 the lifetime FromEbml trait

---
 Cargo.lock          |   1 +
 Cargo.toml          |   1 +
 src/async_parser.rs | 108 ++++++++++++++++++++++++++++++++++++++++++++
 src/ebml.rs         |   6 +--
 src/lib.rs          |   2 +
 5 files changed, 115 insertions(+), 3 deletions(-)
 create mode 100644 src/async_parser.rs

diff --git a/Cargo.lock b/Cargo.lock
index 80164a3..20eb6ee 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1635,6 +1635,7 @@ dependencies = [
  "futures-preview 0.3.0-alpha.19 (registry+https://github.com/rust-lang/crates.io-index)",
  "http 0.1.18 (registry+https://github.com/rust-lang/crates.io-index)",
  "hyper 0.12.35 (registry+https://github.com/rust-lang/crates.io-index)",
+ "matches 0.1.8 (registry+https://github.com/rust-lang/crates.io-index)",
  "odds 0.3.1 (registry+https://github.com/rust-lang/crates.io-index)",
  "tokio 0.1.22 (registry+https://github.com/rust-lang/crates.io-index)",
  "tokio 0.2.0-alpha.6 (registry+https://github.com/rust-lang/crates.io-index)",
diff --git a/Cargo.toml b/Cargo.toml
index ccbfe32..28195e5 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -12,6 +12,7 @@ futures = "0.1.29"
 futures3 = { package = "futures-preview", version="0.3.0-alpha", features = ["compat"] }
 http = "0.1.18"
 hyper = "0.12.35"
+matches = "0.1.8"
 odds = { version = "0.3.1", features = ["std-vec"] }
 tokio = "0.1.22"
 tokio2 = { package = "tokio", version="0.2.0-alpha.6" }
diff --git a/src/async_parser.rs b/src/async_parser.rs
new file mode 100644
index 0000000..e650ddb
--- /dev/null
+++ b/src/async_parser.rs
@@ -0,0 +1,108 @@
+use bytes::{Bytes, BytesMut};
+use std::future::Future;
+
+use crate::ebml::FromEbml;
+use crate::error::WebmetroError;
+
+#[derive(Default)]
+pub struct EbmlParser {
+    buffer: BytesMut,
+    buffer_size_limit: Option<usize>,
+    borrowed: Bytes
+}
+
+impl EbmlParser {
+    /// add a "soft" buffer size limit; if the input buffer exceeds this size,
+    /// error the stream instead of resuming. It's still possible for the buffer
+    /// to exceed this size *after* a fill, so ensure input sizes are reasonable.
+    pub fn with_soft_limit(mut self, limit: usize) -> Self {
+        self.buffer_size_limit = Some(limit);
+        self
+    }
+
+    pub fn feed(&mut self, bytes: impl AsRef<[u8]>) {
+        self.buffer.extend_from_slice(bytes.as_ref())
+    }
+
+    pub fn next_element<'a, T: FromEbml<'a>>(&'a mut self) -> Result<Option<T>, WebmetroError> {
+        Ok(match T::check_space(&self.buffer)? {
+            None => None,
+            Some(info) => {
+                let mut bytes = self.buffer.split_to(info.element_len).freeze();
+                bytes.advance(info.body_offset);
+                self.borrowed = bytes;
+                Some(T::decode(info.element_id, &self.borrowed)?)
+            }
+        })
+    }
+
+    pub async fn next_element_with_feeder<
+        'a,
+        T: FromEbml<'a>,
+        F: FnMut() -> Fut,
+        Fut: Future<Output = Result<Bytes, WebmetroError>>,
+    >(
+        &'a mut self,
+        mut feeder: F,
+    ) -> Result<Option<T>, WebmetroError> {
+        loop {
+            if let Some(_) = T::check_space(&self.buffer)? {
+                return self.next_element();
+            }
+
+            if let Some(limit) = self.buffer_size_limit {
+                if limit <= self.buffer.len() {
+                    // hit our buffer limit and still nothing parsed
+                    return Err(WebmetroError::ResourcesExceeded);
+                }
+            }
+
+            self.buffer.extend(feeder().await?);
+        }
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use matches::assert_matches;
+
+    use crate::async_parser::*;
+    use crate::tests::ENCODE_WEBM_TEST_FILE;
+    use crate::webm::*;
+
+    #[test]
+    fn async_webm_test() {
+        let pieces = vec![
+            &ENCODE_WEBM_TEST_FILE[0..20],
+            &ENCODE_WEBM_TEST_FILE[20..40],
+            &ENCODE_WEBM_TEST_FILE[40..],
+        ];
+
+        let mut piece_iter = pieces.iter();
+
+        let result: Result<_, WebmetroError> = futures3::executor::block_on(async {
+            let mut next = || {
+                let result = if let Some(bytes) = piece_iter.next() {
+                    Ok(Bytes::from(*bytes))
+                } else {
+                    Err("End of input".into())
+                };
+                async { result }
+            };
+
+            let mut parser = EbmlParser::default();
+
+            assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::EbmlHead));
+            assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::Segment));
+            assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::Tracks(_)));
+            assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::Cluster));
+            assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::Timecode(0)));
+            assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::SimpleBlock(_)));
+            assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::Cluster));
+            assert_matches!(parser.next_element_with_feeder(&mut next).await?, Some(WebmElement::Timecode(1000)));
+
+            Ok(())
+        });
+        result.unwrap();
+    }
+}
diff --git a/src/ebml.rs b/src/ebml.rs
index 5815591..8844a5f 100644
--- a/src/ebml.rs
+++ b/src/ebml.rs
@@ -198,9 +198,9 @@ pub fn encode_integer<T: Write>(tag: u64, value: u64, output: &mut T) -> IoResul
 }
 
 pub struct EbmlLayout {
-    element_id: u64,
-    body_offset: usize,
-    element_len: usize,
+    pub element_id: u64,
+    pub body_offset: usize,
+    pub element_len: usize,
 }
 
 pub trait FromEbml<'a>: Sized {
diff --git a/src/lib.rs b/src/lib.rs
index 64e234c..84a89d0 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -1,6 +1,8 @@
 
 pub mod ebml;
 pub mod error;
+
+pub mod async_parser;
 pub mod iterator;
 pub mod stream_parser;