From af3341c1e77b776eb9ec23aaf6fa252c010e3916 Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Thu, 22 Jul 2021 18:48:09 +0700 Subject: [PATCH] Refactor `MessageReader` for clarity Rename `received_bytes` into `ue_received_bytes` and get rid of `next_received_bytes` by reading relevant data into new byte buffer instead. --- src/link/mod.rs | 59 +++++++++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/src/link/mod.rs b/src/link/mod.rs index db57956..b761827 100644 --- a/src/link/mod.rs +++ b/src/link/mod.rs @@ -41,11 +41,11 @@ pub struct MessageReader { is_broken: bool, reading_state: ReadingState, read_bytes: usize, + buffer: [u8; 4], current_message_length: usize, current_message: Vec, read_messages: VecDeque, - next_received_bytes: u32, - received_bytes: u64, + ue_received_bytes: u64, } /// For converting byte stream that is expected from the ue-server into actual messages. @@ -65,12 +65,12 @@ impl MessageReader { is_broken: false, reading_state: ReadingState::Head, read_bytes: 0, + buffer: [0; 4], current_message_length: 0, // Will be recreated with `with_capacity` in `push_byte()` current_message: Vec::new(), read_messages: VecDeque::with_capacity(EXPECTED_LIMIT_TO_UE_MESSAGES), - next_received_bytes: 0, - received_bytes: 0, + ue_received_bytes: 0, } } @@ -81,32 +81,28 @@ impl MessageReader { match &self.reading_state { ReadingState::Head => { if input == HEAD_UE_RECEIVED { - self.reading_state = ReadingState::ReceivedBytes; + self.change_state(ReadingState::ReceivedBytes); } else if input == HEAD_UE_MESSAGE { - self.reading_state = ReadingState::Length; + self.change_state(ReadingState::Length); } else { self.is_broken = true; return Err(ReadingStreamError::InvalidHead { input }); } } ReadingState::ReceivedBytes => { - self.next_received_bytes = self.next_received_bytes << 8; - self.next_received_bytes += input as u32; + self.buffer[self.read_bytes] = input; self.read_bytes += 1; if self.read_bytes >= UE_RECEIVED_FIELD_SIZE { - self.received_bytes += self.next_received_bytes as u64; - self.next_received_bytes = 0; - self.read_bytes = 0; - self.reading_state = ReadingState::Head; + self.ue_received_bytes += array_of_u8_to_u32(self.buffer) as u64; + self.change_state(ReadingState::Head); } } ReadingState::Length => { - self.current_message_length = self.current_message_length << 8; - self.current_message_length += input as usize; + self.buffer[self.read_bytes] = input; self.read_bytes += 1; if self.read_bytes >= UE_LENGTH_FIELD_SIZE { - self.read_bytes = 0; - self.reading_state = ReadingState::Payload; + self.current_message_length = array_of_u8_to_u32(self.buffer) as usize; + self.change_state(ReadingState::Payload); if self.current_message_length > MAX_UE_MESSAGE_LENGTH { self.is_broken = true; return Err(ReadingStreamError::MessageTooLong { @@ -118,7 +114,7 @@ impl MessageReader { } ReadingState::Payload => { self.current_message.push(input); - self.read_bytes += 1 as usize; + self.read_bytes += 1; if self.read_bytes >= self.current_message_length { match str::from_utf8(&self.current_message) { Ok(next_message) => self.read_messages.push_front(next_message.to_owned()), @@ -129,8 +125,7 @@ impl MessageReader { }; self.current_message.clear(); self.current_message_length = 0; - self.read_bytes = 0; - self.reading_state = ReadingState::Head; + self.change_state(ReadingState::Head); } } } @@ -148,13 +143,25 @@ impl MessageReader { self.read_messages.pop_back() } - pub fn received_bytes(&self) -> u64 { - self.received_bytes + pub fn ue_received_bytes(&self) -> u64 { + self.ue_received_bytes } pub fn is_broken(&self) -> bool { self.is_broken } + + fn change_state(&mut self, next_state: ReadingState) { + self.read_bytes = 0; + self.reading_state = next_state; + } +} + +fn array_of_u8_to_u32(bytes: [u8; 4]) -> u64 { + (u64::from(bytes[0]) << 24) + + (u64::from(bytes[1]) << 16) + + (u64::from(bytes[2]) << 8) + + (u64::from(bytes[3])) } #[test] @@ -199,18 +206,18 @@ fn received_push_byte() { reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(243).unwrap(); - assert_eq!(reader.received_bytes(), 243); + assert_eq!(reader.ue_received_bytes(), 243); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(65).unwrap(); reader.push_byte(25).unwrap(); reader.push_byte(178).unwrap(); reader.push_byte(4).unwrap(); - assert_eq!(reader.received_bytes(), 1092203255); + assert_eq!(reader.ue_received_bytes(), 1092203255); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(231).unwrap(); reader.push_byte(34).unwrap(); reader.push_byte(154).unwrap(); - assert_eq!(reader.received_bytes(), 1092203255); + assert_eq!(reader.ue_received_bytes(), 1092203255); } #[test] @@ -234,7 +241,7 @@ fn mixed_push_byte() { reader.push_byte(25).unwrap(); reader.push_byte(178).unwrap(); reader.push_byte(4).unwrap(); - assert_eq!(reader.received_bytes(), 1092203255); + assert_eq!(reader.ue_received_bytes(), 1092203255); assert_eq!(reader.pop().unwrap(), "Yo!"); assert_eq!(reader.pop(), None); } @@ -264,7 +271,7 @@ fn pushing_many_bytes_at_once() { 4, ]) .unwrap(); - assert_eq!(reader.received_bytes(), 1092203255); + assert_eq!(reader.ue_received_bytes(), 1092203255); assert_eq!(reader.pop().unwrap(), "Yo!"); assert_eq!(reader.pop(), None); }