summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/eio_delimited.rs50
-rw-r--r--src/lib.rs147
2 files changed, 92 insertions, 105 deletions
diff --git a/src/eio_delimited.rs b/src/eio_delimited.rs
index 59cb4bf..7b679ff 100644
--- a/src/eio_delimited.rs
+++ b/src/eio_delimited.rs
@@ -59,19 +59,6 @@ where
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
- // if let StateProj::Data(data) = self.as_mut().project().state.project() {
- // let data = data.take().unwrap();
- //
- // let (tx, rx) = async_channel::bounded(1);
- //
- // self.as_mut().project().state.set(EioState::Rx(rx));
- //
- // break Poll::Ready(Some(StreamWrap {
- // data: Some(data),
- // tx,
- // }));
- // }
-
let result = match self.as_mut().project().state.project() {
StateProj::Data(Some(data)) if data.read_complete && data.fragment.is_empty() => {
break Poll::Ready(None);
@@ -84,34 +71,14 @@ where
self.as_mut().project().state.set(EioState::Rx(rx));
break Poll::Ready(Some(StreamWrap {
- data: Some(data),
+ data: Some(EioDelimited {
+ saw_sentinel: false,
+ ..data
+ }),
tx,
}));
},
- // StateProj::Data(_) => {
- // let (tx, rx) = async_channel::bounded(1);
- // let mut state = EioState::Rx(rx);
- //
- // core::mem::swap(&mut self.get_mut().state, &mut state);
- //
- // let EioState::Data(data) = state else {
- // unreachable!();
- // };
- //
- // break Poll::Ready(Some(StreamWrap {
- // data: Some(data),
- // tx,
- // }));
- // },
- // StateProj::Rx(rx) => match rx.poll_next(cx) {
- // Poll::Ready(Some(data)) => {
- // core::mem::swap(&mut self.state, &mut EioState::Data(data));
- // },
- // Poll::Ready(None) => unreachable!(),
- // Poll::Pending => break Poll::Pending,
- // },
StateProj::Rx(rx) => rx.poll_next(cx),
- _ => continue,
};
match result {
@@ -187,6 +154,8 @@ where
sentinel: u8,
fragment: heapless::Vec<u8, MAX_SIZE>,
+
+ saw_sentinel: bool,
}
impl<'r, R> EioDelimited<'r, R>
@@ -199,6 +168,7 @@ where
read: Some(r),
read_complete: false,
sentinel,
+ saw_sentinel: false,
fut: None,
fragment: heapless::Vec::new(),
@@ -218,6 +188,10 @@ where
type Item = ReadResult<R>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ if self.saw_sentinel {
+ return Poll::Ready(None);
+ }
+
if let Some(val) = self.as_mut().get_mut().try_produce_frame() {
return val;
};
@@ -277,6 +251,8 @@ where
self.fragment.copy_within(i + 1.., 0);
self.fragment.truncate(self.fragment.len() - i - 1);
+ self.saw_sentinel = true;
+
Some(Poll::Ready(Some(Ok(result))))
} else {
self.fragment_max()
diff --git a/src/lib.rs b/src/lib.rs
index 0244a90..b6014c0 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -4,7 +4,10 @@
extern crate alloc;
-use futures::TryStream;
+use futures::{
+ Stream,
+ TryStream,
+};
use itertools::Itertools;
mod eio_delimited;
@@ -41,21 +44,6 @@ pub fn encode(buf: &[u8]) -> impl Iterator<Item = [u8; BLOCK_DELIMITED_SIZE]> +
Some(msg2)
})
-
- // let chunks = chunks1.into_iter();
- //
- // chunks.map(|mut chunk| {
- //
- // msg.fill_with(|| chunk.next().unwrap_or(0));
- // msg2[..rs::DATA_SIZE].fill(0);
- //
- // rs::encode(&mut msg[..rs::BLOCK_SIZE]);
- //
- // // cobs overhead is at most 1 byte
- // corncobs::encode_buf(&msg[..rs::BLOCK_SIZE], &mut msg2);
- //
- // msg2
- // })
}
#[derive(Debug)]
@@ -85,22 +73,41 @@ impl<E> From<rs::Error> for Error<E> {
pub fn decode<S>(
s: S,
-) -> impl TryStream<Ok = heapless::Vec<u8, { rs::DATA_SIZE - 1 }>, Error = Error<S::Error>>
+) -> impl Stream<
+ Item = impl TryStream<
+ Ok = heapless::Vec<u8, { rs::DATA_SIZE - 1 }>,
+ Error = Error<<S::Item as TryStream>::Error>,
+ >,
+>
where
- S: TryStream<Ok = heapless::Vec<u8, BLOCK_ENCODED_SIZE>>,
+ S: Stream,
+ S::Item: TryStream<Ok = heapless::Vec<u8, BLOCK_DELIMITED_SIZE>>,
{
- use futures::TryStreamExt;
+ use futures::{
+ StreamExt,
+ TryStreamExt,
+ };
- s.map_err(Error::StreamErr).and_then(|mut b| async move {
- let x = corncobs::decode_in_place(&mut b)?;
- rs::correct_errors(&mut b[..x])?;
- let n = corncobs::decode_in_place(&mut b[..rs::DATA_SIZE + 1])?;
+ s.map(|pkt_stream| {
+ pkt_stream.map_err(Error::StreamErr).and_then(|mut b| async move {
+ let x = corncobs::decode_in_place(&mut b)?;
+ rs::correct_errors(&mut b[..x])?;
+ let n = corncobs::decode_in_place(&mut b[..rs::DATA_SIZE + 1])?;
- Ok(unsafe { heapless::Vec::from_slice(&b[..n]).unwrap_unchecked() })
+ Ok(unsafe { heapless::Vec::from_slice(&b[..n]).unwrap_unchecked() })
+ })
})
+
+ // s.map(|mut b| async move {
+ // s.map_err(Error::StreamErr).and_then(|mut b| async move {
+ // let x = corncobs::decode_in_place(&mut b)?;
+ // rs::correct_errors(&mut b[..x])?;
+ // let n = corncobs::decode_in_place(&mut b[..rs::DATA_SIZE + 1])?;
+ //
+ // Ok(unsafe { heapless::Vec::from_slice(&b[..n]).unwrap_unchecked() })
+ // })
}
-#[cfg(any())]
#[cfg(test)]
mod test {
extern crate std;
@@ -109,60 +116,64 @@ mod test {
use proptest::prelude::*;
use super::*;
- use eio_delimited::EioDelimited;
+ use eio_delimited::EioParent;
#[async_std::test]
async fn basic_test_transcode() {
- use futures::TryStreamExt;
+ use futures::StreamExt;
- let v = encode(b"abc").collect::<std::vec::Vec<_>>();
- std::println!("{v:?}");
+ let v = encode(b"abc\0asldkfjalskdjf;alsdkfj;alsdkfj;alsdjf;alskdfj;laskjdf;laskjdfl;askjdf;laskdjf;laskjdfl;akjsdf;lkasjdf;lkjasdfl;kjasdf;lkjas;ldfkjaskofoqwieoruqoweiruqwer\0]]0]0]0]\0\0\0\0ooaoisdufoaisduf\0\0alksjkhdfkl;ajsdfl;kjasdfklhashfdhasdfjkladsfjkldsfaadsfhjklhjkfadshljkladsfhjkhdsklhjkladfshjkladsfhkjladsfhkjladsfhkjadsfhkjladsfhjklshjklafhkljsfhkdhkjlfdshjklhjkladsfhjkldfk").collect::<std::vec::Vec<_>>();
+ std::println!("{v:#?}");
let v = v.flatten();
+ std::println!("{v:#?}");
- let s = EioDelimited::new(v, 0);
+ let s = EioParent::new(v, 0);
decode(s)
- .try_for_each(|x| async move {
- std::println!("{x:?}");
-
- Ok(())
- })
- .await
- .unwrap();
- }
-
- proptest_async::proptest! {
- #[test]
- async fn fuzz_bytestring_root(mut x in any::<std::vec::Vec<u8>>()) {
- std::println!("src: {x:?}");
-
- let x_dup = x.clone();
- const EXTEND_FACTOR: usize = 3;
+ .for_each(|x| async move {
+ x.try_for_each(|x| async move {
+ std::println!("{x:?}");
- for _ in 0..EXTEND_FACTOR {
- x.extend_from_slice(&x_dup);
- }
-
- let v = encode(&x).collect::<std::vec::Vec<_>>();
- let v = v.flatten();
- std::println!("flattened bytes: {v:?}");
-
- let fs = EioDelimited::new(v, 0);
-
- decode(fs)
- .try_for_each(|result| {
- let x_slice = x.as_slice();
-
- async move {
- std::println!("{result:?}");
-
- assert_eq!(x_slice, result.as_slice());
-
- Ok(())
- }})
+ Ok(())
+ })
.await
.unwrap();
- }
+ })
+ .await;
}
+
+ // proptest_async::proptest! {
+ // #[test]
+ // async fn fuzz_bytestring_root(mut x in any::<std::vec::Vec<u8>>()) {
+ // std::println!("src: {x:?}");
+ //
+ // let x_dup = x.clone();
+ // const EXTEND_FACTOR: usize = 3;
+ //
+ // for _ in 0..EXTEND_FACTOR {
+ // x.extend_from_slice(&x_dup);
+ // }
+ //
+ // let v = encode(&x).collect::<std::vec::Vec<_>>();
+ // let v = v.flatten();
+ // std::println!("flattened bytes: {v:?}");
+ //
+ // let fs = EioParent::new(v, 0);
+ //
+ // decode(fs)
+ // .try_for_each(|result| {
+ // let x_slice = x.as_slice();
+ //
+ // async move {
+ // std::println!("{result:?}");
+ //
+ // assert_eq!(x_slice, result.as_slice());
+ //
+ // Ok(())
+ // }})
+ // .await
+ // .unwrap();
+ // }
+ // }
}