use std::collections::VecDeque; use std::str; extern crate custom_error; use custom_error::custom_error; // Defines how many bytes is used to encode "AMOUNT" field in the response from ue-server about // amount of bytes it received since the last update const UE_RECEIVED_FIELD_SIZE: usize = 4; // Defines how many bytes is used to encode "LENGTH" field, describing length of // next JSON message from ue-server const UE_LENGTH_FIELD_SIZE: usize = 4; // Arbitrary value indicating that next byte sequence from ue-server reports amount of bytes // received by that server so far. const HEAD_UE_RECEIVED: u8 = 85; // Arbitrary value indicating that next byte sequence from ue-server contains JSON message. const HEAD_UE_MESSAGE: u8 = 42; // Maximum allowed size of JSON message sent from ue-server. const MAX_UE_MESSAGE_LENGTH: usize = 25 * 1024 * 1024; // We do not expect to receive more that this much messages at once from ue-server const EXPECTED_LIMIT_TO_UE_MESSAGES: usize = 100; custom_error! { pub ReadingStreamError InvalidHead{input: u8} = "Invalid byte used as a HEAD: {input}", MessageTooLong{length: usize} = "Message to receive is too long: {length}", InvalidUnicode = "Invalid utf-8 was received", BrokenStream = "Used stream is broken" } enum ReadingState { Head, ReceivedBytes, Length, Payload, } pub struct MessageReader { is_broken: bool, reading_state: ReadingState, read_bytes: usize, length_buffer: [u8; 4], current_message_length: usize, current_message: Vec, read_messages: VecDeque, ue_received_bytes: u64, } /// For converting byte stream that is expected from the ue-server into actual messages. /// Expected format is a sequence of either: /// 1. [HEAD_UE_RECEIVED: marker byte | 1 byte] /// [AMOUNT: amount of bytes received by ue-server since last update | 4 bytes: u32 BE] /// 2. [HEAD_UE_MESSAGE: marker byte | 1 byte] /// [LENGTH: length of the JSON message in utf8 encoding | 4 bytes: u32 BE] /// [PAYLOAD: utf8-encoded string | `LENGTH` bytes] /// On any invalid input enters into a failure state (can be checked by `is_broken()`) and /// never recovers from it. /// Use either `push_byte()` or `push()` to input byte stream from ue-server and `pop()` to /// retrieve resulting messages. impl MessageReader { pub fn new() -> MessageReader { MessageReader { is_broken: false, reading_state: ReadingState::Head, read_bytes: 0, length_buffer: [0; 4], current_message_length: 0, // Will be recreated with `with_capacity` in `push_byte()` current_message: Vec::new(), read_messages: VecDeque::with_capacity(EXPECTED_LIMIT_TO_UE_MESSAGES), ue_received_bytes: 0, } } pub fn push_byte(&mut self, input: u8) -> Result<(), ReadingStreamError> { if self.is_broken { return Err(ReadingStreamError::BrokenStream); } match &self.reading_state { ReadingState::Head => { if input == HEAD_UE_RECEIVED { self.change_state(ReadingState::ReceivedBytes); } else if input == HEAD_UE_MESSAGE { self.change_state(ReadingState::Length); } else { self.is_broken = true; return Err(ReadingStreamError::InvalidHead { input }); } } ReadingState::ReceivedBytes => { self.length_buffer[self.read_bytes] = input; self.read_bytes += 1; if self.read_bytes >= UE_RECEIVED_FIELD_SIZE { self.ue_received_bytes += array_of_u8_to_u32(self.length_buffer) as u64; self.change_state(ReadingState::Head); } } ReadingState::Length => { self.length_buffer[self.read_bytes] = input; self.read_bytes += 1; if self.read_bytes >= UE_LENGTH_FIELD_SIZE { self.current_message_length = array_of_u8_to_u32(self.length_buffer) as usize; if self.current_message_length > MAX_UE_MESSAGE_LENGTH { self.is_broken = true; return Err(ReadingStreamError::MessageTooLong { length: self.current_message_length, }); } self.current_message = Vec::with_capacity(self.current_message_length); self.change_state(ReadingState::Payload); } } ReadingState::Payload => { self.current_message.push(input); self.read_bytes += 1; if self.read_bytes >= self.current_message_length { match str::from_utf8(&self.current_message) { Ok(next_message) => self.read_messages.push_front(next_message.to_owned()), _ => { self.is_broken = true; return Err(ReadingStreamError::InvalidUnicode); } }; self.current_message.clear(); self.current_message_length = 0; self.change_state(ReadingState::Head); } } } Ok(()) } pub fn push(&mut self, input: &[u8]) -> Result<(), ReadingStreamError> { for &byte in input { self.push_byte(byte)?; } Ok(()) } pub fn pop(&mut self) -> Option { self.read_messages.pop_back() } pub fn ue_received_bytes(&self) -> u64 { self.ue_received_bytes } pub fn is_broken(&self) -> bool { self.is_broken } fn change_state(&mut self, next_state: ReadingState) { self.read_bytes = 0; self.reading_state = next_state; } } fn array_of_u8_to_u32(bytes: [u8; 4]) -> u64 { (u64::from(bytes[0]) << 24) + (u64::from(bytes[1]) << 16) + (u64::from(bytes[2]) << 8) + (u64::from(bytes[3])) } #[test] fn message_push_byte() { let mut reader = MessageReader::new(); reader.push_byte(HEAD_UE_MESSAGE).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(13).unwrap(); reader.push_byte(b'H').unwrap(); reader.push_byte(b'e').unwrap(); reader.push_byte(b'l').unwrap(); reader.push_byte(b'l').unwrap(); reader.push_byte(b'o').unwrap(); reader.push_byte(b',').unwrap(); reader.push_byte(b' ').unwrap(); reader.push_byte(b'w').unwrap(); reader.push_byte(b'o').unwrap(); reader.push_byte(b'r').unwrap(); reader.push_byte(b'l').unwrap(); reader.push_byte(b'd').unwrap(); reader.push_byte(b'!').unwrap(); reader.push_byte(HEAD_UE_MESSAGE).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(3).unwrap(); reader.push_byte(b'Y').unwrap(); reader.push_byte(b'o').unwrap(); reader.push_byte(b'!').unwrap(); assert_eq!(reader.pop().unwrap(), "Hello, world!"); assert_eq!(reader.pop().unwrap(), "Yo!"); assert_eq!(reader.pop(), None); } #[test] fn received_push_byte() { let mut reader = MessageReader::new(); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0xf3).unwrap(); assert_eq!(reader.ue_received_bytes(), 0xf3); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(0x41).unwrap(); reader.push_byte(0x19).unwrap(); reader.push_byte(0xb2).unwrap(); reader.push_byte(0x04).unwrap(); assert_eq!(reader.ue_received_bytes(), 0x41_19_b2_f7); // 0xf7 = 0x04 + 0xf3 reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(231).unwrap(); reader.push_byte(34).unwrap(); reader.push_byte(154).unwrap(); assert_eq!(reader.ue_received_bytes(), 0x41_19_b2_f7); } #[test] fn mixed_push_byte() { let mut reader = MessageReader::new(); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(243).unwrap(); reader.push_byte(HEAD_UE_MESSAGE).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(3).unwrap(); reader.push_byte(b'Y').unwrap(); reader.push_byte(b'o').unwrap(); reader.push_byte(b'!').unwrap(); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(65).unwrap(); reader.push_byte(25).unwrap(); reader.push_byte(178).unwrap(); reader.push_byte(4).unwrap(); assert_eq!(reader.ue_received_bytes(), 1092203255); assert_eq!(reader.pop().unwrap(), "Yo!"); assert_eq!(reader.pop(), None); } #[test] fn pushing_many_bytes_at_once() { let mut reader = MessageReader::new(); reader .push(&[ HEAD_UE_RECEIVED, 0, 0, 0, 243, HEAD_UE_MESSAGE, 0, 0, 0, 3, b'Y', b'o', b'!', HEAD_UE_RECEIVED, 65, 25, 178, 4, ]) .unwrap(); assert_eq!(reader.ue_received_bytes(), 1092203255); assert_eq!(reader.pop().unwrap(), "Yo!"); assert_eq!(reader.pop(), None); } #[test] fn generates_error_invalid_head() { let mut reader = MessageReader::new(); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(243).unwrap(); assert!(!reader.is_broken()); reader .push_byte(25) .expect_err("Testing failing on incorrect HEAD"); assert!(reader.is_broken()); } #[test] fn generates_error_message_too_long() { let mut reader = MessageReader::new(); let huge_length = MAX_UE_MESSAGE_LENGTH + 1; let bytes = (huge_length as u32).to_be_bytes(); reader.push_byte(HEAD_UE_MESSAGE).unwrap(); reader.push_byte(bytes[0]).unwrap(); reader.push_byte(bytes[1]).unwrap(); reader.push_byte(bytes[2]).unwrap(); assert!(!reader.is_broken()); reader .push_byte(bytes[3]) .expect_err("Testing failing on exceeding allowed message length"); assert!(reader.is_broken()); } #[test] fn generates_error_invalid_unicode() { let mut reader = MessageReader::new(); reader.push_byte(HEAD_UE_MESSAGE).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(2).unwrap(); reader.push_byte(0b11010011).unwrap(); // start of 2-byte sequence assert!(!reader.is_broken()); // Bytes inside multi-byte code point have to have `1` for their high bit reader .push_byte(0b01010011) .expect_err("Testing failing on incorrect unicode"); assert!(reader.is_broken()); }