diff options
| -rw-r--r-- | src/eio_delimited.rs | 50 | ||||
| -rw-r--r-- | src/lib.rs | 147 |
2 files changed, 92 insertions, 105 deletions
diff --git a/src/eio_delimited.rs b/src/eio_delimited.rs index 59cb4bf..7b679ff 100644 --- a/src/eio_delimited.rs +++ b/src/eio_delimited.rs @@ -59,19 +59,6 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { loop { - // if let StateProj::Data(data) = self.as_mut().project().state.project() { - // let data = data.take().unwrap(); - // - // let (tx, rx) = async_channel::bounded(1); - // - // self.as_mut().project().state.set(EioState::Rx(rx)); - // - // break Poll::Ready(Some(StreamWrap { - // data: Some(data), - // tx, - // })); - // } - let result = match self.as_mut().project().state.project() { StateProj::Data(Some(data)) if data.read_complete && data.fragment.is_empty() => { break Poll::Ready(None); @@ -84,34 +71,14 @@ where self.as_mut().project().state.set(EioState::Rx(rx)); break Poll::Ready(Some(StreamWrap { - data: Some(data), + data: Some(EioDelimited { + saw_sentinel: false, + ..data + }), tx, })); }, - // StateProj::Data(_) => { - // let (tx, rx) = async_channel::bounded(1); - // let mut state = EioState::Rx(rx); - // - // core::mem::swap(&mut self.get_mut().state, &mut state); - // - // let EioState::Data(data) = state else { - // unreachable!(); - // }; - // - // break Poll::Ready(Some(StreamWrap { - // data: Some(data), - // tx, - // })); - // }, - // StateProj::Rx(rx) => match rx.poll_next(cx) { - // Poll::Ready(Some(data)) => { - // core::mem::swap(&mut self.state, &mut EioState::Data(data)); - // }, - // Poll::Ready(None) => unreachable!(), - // Poll::Pending => break Poll::Pending, - // }, StateProj::Rx(rx) => rx.poll_next(cx), - _ => continue, }; match result { @@ -187,6 +154,8 @@ where sentinel: u8, fragment: heapless::Vec<u8, MAX_SIZE>, + + saw_sentinel: bool, } impl<'r, R> EioDelimited<'r, R> @@ -199,6 +168,7 @@ where read: Some(r), read_complete: false, sentinel, + saw_sentinel: false, fut: None, fragment: heapless::Vec::new(), @@ -218,6 +188,10 @@ where type Item = ReadResult<R>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + if self.saw_sentinel { + return Poll::Ready(None); + } + if let Some(val) = self.as_mut().get_mut().try_produce_frame() { return val; }; @@ -277,6 +251,8 @@ where self.fragment.copy_within(i + 1.., 0); self.fragment.truncate(self.fragment.len() - i - 1); + self.saw_sentinel = true; + Some(Poll::Ready(Some(Ok(result)))) } else { self.fragment_max() @@ -4,7 +4,10 @@ extern crate alloc; -use futures::TryStream; +use futures::{ + Stream, + TryStream, +}; use itertools::Itertools; mod eio_delimited; @@ -41,21 +44,6 @@ pub fn encode(buf: &[u8]) -> impl Iterator<Item = [u8; BLOCK_DELIMITED_SIZE]> + Some(msg2) }) - - // let chunks = chunks1.into_iter(); - // - // chunks.map(|mut chunk| { - // - // msg.fill_with(|| chunk.next().unwrap_or(0)); - // msg2[..rs::DATA_SIZE].fill(0); - // - // rs::encode(&mut msg[..rs::BLOCK_SIZE]); - // - // // cobs overhead is at most 1 byte - // corncobs::encode_buf(&msg[..rs::BLOCK_SIZE], &mut msg2); - // - // msg2 - // }) } #[derive(Debug)] @@ -85,22 +73,41 @@ impl<E> From<rs::Error> for Error<E> { pub fn decode<S>( s: S, -) -> impl TryStream<Ok = heapless::Vec<u8, { rs::DATA_SIZE - 1 }>, Error = Error<S::Error>> +) -> impl Stream< + Item = impl TryStream< + Ok = heapless::Vec<u8, { rs::DATA_SIZE - 1 }>, + Error = Error<<S::Item as TryStream>::Error>, + >, +> where - S: TryStream<Ok = heapless::Vec<u8, BLOCK_ENCODED_SIZE>>, + S: Stream, + S::Item: TryStream<Ok = heapless::Vec<u8, BLOCK_DELIMITED_SIZE>>, { - use futures::TryStreamExt; + use futures::{ + StreamExt, + TryStreamExt, + }; - s.map_err(Error::StreamErr).and_then(|mut b| async move { - let x = corncobs::decode_in_place(&mut b)?; - rs::correct_errors(&mut b[..x])?; - let n = corncobs::decode_in_place(&mut b[..rs::DATA_SIZE + 1])?; + s.map(|pkt_stream| { + pkt_stream.map_err(Error::StreamErr).and_then(|mut b| async move { + let x = corncobs::decode_in_place(&mut b)?; + rs::correct_errors(&mut b[..x])?; + let n = corncobs::decode_in_place(&mut b[..rs::DATA_SIZE + 1])?; - Ok(unsafe { heapless::Vec::from_slice(&b[..n]).unwrap_unchecked() }) + Ok(unsafe { heapless::Vec::from_slice(&b[..n]).unwrap_unchecked() }) + }) }) + + // s.map(|mut b| async move { + // s.map_err(Error::StreamErr).and_then(|mut b| async move { + // let x = corncobs::decode_in_place(&mut b)?; + // rs::correct_errors(&mut b[..x])?; + // let n = corncobs::decode_in_place(&mut b[..rs::DATA_SIZE + 1])?; + // + // Ok(unsafe { heapless::Vec::from_slice(&b[..n]).unwrap_unchecked() }) + // }) } -#[cfg(any())] #[cfg(test)] mod test { extern crate std; @@ -109,60 +116,64 @@ mod test { use proptest::prelude::*; use super::*; - use eio_delimited::EioDelimited; + use eio_delimited::EioParent; #[async_std::test] async fn basic_test_transcode() { - use futures::TryStreamExt; + use futures::StreamExt; - let v = encode(b"abc").collect::<std::vec::Vec<_>>(); - std::println!("{v:?}"); + let v = encode(b"abc\0asldkfjalskdjf;alsdkfj;alsdkfj;alsdjf;alskdfj;laskjdf;laskjdfl;askjdf;laskdjf;laskjdfl;akjsdf;lkasjdf;lkjasdfl;kjasdf;lkjas;ldfkjaskofoqwieoruqoweiruqwer\0]]0]0]0]\0\0\0\0ooaoisdufoaisduf\0\0alksjkhdfkl;ajsdfl;kjasdfklhashfdhasdfjkladsfjkldsfaadsfhjklhjkfadshljkladsfhjkhdsklhjkladfshjkladsfhkjladsfhkjladsfhkjadsfhkjladsfhjklshjklafhkljsfhkdhkjlfdshjklhjkladsfhjkldfk").collect::<std::vec::Vec<_>>(); + std::println!("{v:#?}"); let v = v.flatten(); + std::println!("{v:#?}"); - let s = EioDelimited::new(v, 0); + let s = EioParent::new(v, 0); decode(s) - .try_for_each(|x| async move { - std::println!("{x:?}"); - - Ok(()) - }) - .await - .unwrap(); - } - - proptest_async::proptest! { - #[test] - async fn fuzz_bytestring_root(mut x in any::<std::vec::Vec<u8>>()) { - std::println!("src: {x:?}"); - - let x_dup = x.clone(); - const EXTEND_FACTOR: usize = 3; + .for_each(|x| async move { + x.try_for_each(|x| async move { + std::println!("{x:?}"); - for _ in 0..EXTEND_FACTOR { - x.extend_from_slice(&x_dup); - } - - let v = encode(&x).collect::<std::vec::Vec<_>>(); - let v = v.flatten(); - std::println!("flattened bytes: {v:?}"); - - let fs = EioDelimited::new(v, 0); - - decode(fs) - .try_for_each(|result| { - let x_slice = x.as_slice(); - - async move { - std::println!("{result:?}"); - - assert_eq!(x_slice, result.as_slice()); - - Ok(()) - }}) + Ok(()) + }) .await .unwrap(); - } + }) + .await; } + + // proptest_async::proptest! { + // #[test] + // async fn fuzz_bytestring_root(mut x in any::<std::vec::Vec<u8>>()) { + // std::println!("src: {x:?}"); + // + // let x_dup = x.clone(); + // const EXTEND_FACTOR: usize = 3; + // + // for _ in 0..EXTEND_FACTOR { + // x.extend_from_slice(&x_dup); + // } + // + // let v = encode(&x).collect::<std::vec::Vec<_>>(); + // let v = v.flatten(); + // std::println!("flattened bytes: {v:?}"); + // + // let fs = EioParent::new(v, 0); + // + // decode(fs) + // .try_for_each(|result| { + // let x_slice = x.as_slice(); + // + // async move { + // std::println!("{result:?}"); + // + // assert_eq!(x_slice, result.as_slice()); + // + // Ok(()) + // }}) + // .await + // .unwrap(); + // } + // } } |