Add network link to ue-server implementation #13

Open
dkanus wants to merge 23 commits from feature_link into master
Showing only changes of commit af3341c1e7 - Show all commits

View File

@ -41,11 +41,11 @@ pub struct MessageReader {
is_broken: bool,
reading_state: ReadingState,
read_bytes: usize,
buffer: [u8; 4],
dkanus marked this conversation as resolved Outdated

buffer => length_buffer, since it's used only for storing length bytes

buffer => length_buffer, since it's used only for storing length bytes

Agreed

Agreed
current_message_length: usize,
current_message: Vec<u8>,
read_messages: VecDeque<String>,
next_received_bytes: u32,
received_bytes: u64,
ue_received_bytes: u64,
}
/// For converting byte stream that is expected from the ue-server into actual messages.
@ -65,12 +65,12 @@ impl MessageReader {
is_broken: false,
reading_state: ReadingState::Head,
read_bytes: 0,
buffer: [0; 4],
current_message_length: 0,
Review

Why?

Why?
// Will be recreated with `with_capacity` in `push_byte()`
current_message: Vec::new(),
read_messages: VecDeque::with_capacity(EXPECTED_LIMIT_TO_UE_MESSAGES),
next_received_bytes: 0,
received_bytes: 0,
ue_received_bytes: 0,
}
}
Ggg_123 marked this conversation as resolved Outdated

<< 8
wtf

> << 8 wtf
@ -81,32 +81,28 @@ impl MessageReader {
match &self.reading_state {
ReadingState::Head => {
if input == HEAD_UE_RECEIVED {
self.reading_state = ReadingState::ReceivedBytes;
self.change_state(ReadingState::ReceivedBytes);
} else if input == HEAD_UE_MESSAGE {
self.reading_state = ReadingState::Length;
self.change_state(ReadingState::Length);
} else {
self.is_broken = true;
return Err(ReadingStreamError::InvalidHead { input });
}
}
Review

Why is it a separate function, and not inside send/flush?
When should I call it?

Why is it a separate function, and not inside send/flush? When should I call it?
ReadingState::ReceivedBytes => {
self.next_received_bytes = self.next_received_bytes << 8;
self.next_received_bytes += input as u32;
self.buffer[self.read_bytes] = input;
self.read_bytes += 1;
if self.read_bytes >= UE_RECEIVED_FIELD_SIZE {
self.received_bytes += self.next_received_bytes as u64;
self.next_received_bytes = 0;
self.read_bytes = 0;
self.reading_state = ReadingState::Head;
self.ue_received_bytes += array_of_u8_to_u32(self.buffer) as u64;
self.change_state(ReadingState::Head);
}
}
ReadingState::Length => {
self.current_message_length = self.current_message_length << 8;
self.current_message_length += input as usize;
self.buffer[self.read_bytes] = input;
self.read_bytes += 1;
if self.read_bytes >= UE_LENGTH_FIELD_SIZE {
Review

// todo - add IP support, since 0.0.0.0 is subjective (make it default, but changeable)

// todo - add IP support, since 0.0.0.0 is subjective (make it default, but changeable)
self.read_bytes = 0;
self.reading_state = ReadingState::Payload;
self.current_message_length = array_of_u8_to_u32(self.buffer) as usize;
dkanus marked this conversation as resolved Outdated

"1 as usize" => "1usize"
Or just remove.

"1 as usize" => "1usize" Or just remove.
self.change_state(ReadingState::Payload);
dkanus marked this conversation as resolved Outdated

Move this line below error checking, since it contains early exit

Move this line below error checking, since it contains early exit

Agreed.

Agreed.
if self.current_message_length > MAX_UE_MESSAGE_LENGTH {
self.is_broken = true;
return Err(ReadingStreamError::MessageTooLong {
@ -118,7 +114,7 @@ impl MessageReader {
}
ReadingState::Payload => {
self.current_message.push(input);
self.read_bytes += 1 as usize;
self.read_bytes += 1;
if self.read_bytes >= self.current_message_length {
match str::from_utf8(&self.current_message) {
Ok(next_message) => self.read_messages.push_front(next_message.to_owned()),
@ -129,8 +125,7 @@ impl MessageReader {
};
self.current_message.clear();
self.current_message_length = 0;
self.read_bytes = 0;
self.reading_state = ReadingState::Head;
self.change_state(ReadingState::Head);
}
}
}
@ -148,13 +143,25 @@ impl MessageReader {
self.read_messages.pop_back()
}
pub fn received_bytes(&self) -> u64 {
self.received_bytes
pub fn ue_received_bytes(&self) -> u64 {
self.ue_received_bytes
}
pub fn is_broken(&self) -> bool {
self.is_broken
Ggg_123 marked this conversation as resolved Outdated

0x48 => b'H'

0x48 => b'H'
}
Ggg_123 marked this conversation as resolved Outdated

0x65 => b'e'
etc

0x65 => b'e' etc
fn change_state(&mut self, next_state: ReadingState) {
self.read_bytes = 0;
self.reading_state = next_state;
}
}
fn array_of_u8_to_u32(bytes: [u8; 4]) -> u64 {
(u64::from(bytes[0]) << 24)
+ (u64::from(bytes[1]) << 16)
+ (u64::from(bytes[2]) << 8)
+ (u64::from(bytes[3]))
}
#[test]
@ -199,18 +206,18 @@ fn received_push_byte() {
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(243).unwrap();
assert_eq!(reader.received_bytes(), 243);
assert_eq!(reader.ue_received_bytes(), 243);
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
reader.push_byte(65).unwrap();
reader.push_byte(25).unwrap();
reader.push_byte(178).unwrap();
reader.push_byte(4).unwrap();
assert_eq!(reader.received_bytes(), 1092203255);
assert_eq!(reader.ue_received_bytes(), 1092203255);
dkanus marked this conversation as resolved Outdated

Replace with:
reader.push_byte(0xf3).unwrap();
...
reader.push_byte(0x41).unwrap();
reader.push_byte(0x19).unwrap();
reader.push_byte(0xb2).unwrap();
reader.push_byte(0x04).unwrap();
assert_eq!(reader.ue_received_bytes(), 0x41_19_b2_f7); // 0xf7 = 0x04 + 0xf3
Because otherwise this is "magic numbers wtf".

Replace with: reader.push_byte(0xf3).unwrap(); ... reader.push_byte(0x41).unwrap(); reader.push_byte(0x19).unwrap(); reader.push_byte(0xb2).unwrap(); reader.push_byte(0x04).unwrap(); assert_eq!(reader.ue_received_bytes(), 0x41_19_b2_f7); // 0xf7 = 0x04 + 0xf3 Because otherwise this is "magic numbers wtf".

Woa, that's neat. Agreed.

Woa, that's neat. Agreed.
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
reader.push_byte(231).unwrap();
reader.push_byte(34).unwrap();
reader.push_byte(154).unwrap();
assert_eq!(reader.received_bytes(), 1092203255);
assert_eq!(reader.ue_received_bytes(), 1092203255);
}
#[test]
@ -234,7 +241,7 @@ fn mixed_push_byte() {
reader.push_byte(25).unwrap();
reader.push_byte(178).unwrap();
reader.push_byte(4).unwrap();
assert_eq!(reader.received_bytes(), 1092203255);
assert_eq!(reader.ue_received_bytes(), 1092203255);
assert_eq!(reader.pop().unwrap(), "Yo!");
assert_eq!(reader.pop(), None);
}
@ -264,7 +271,7 @@ fn pushing_many_bytes_at_once() {
4,
])
.unwrap();
assert_eq!(reader.received_bytes(), 1092203255);
assert_eq!(reader.ue_received_bytes(), 1092203255);
assert_eq!(reader.pop().unwrap(), "Yo!");
assert_eq!(reader.pop(), None);
}