From 0a868521974fd84859506a3f9ef4d427bf05c2b9 Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Wed, 21 Jul 2021 03:40:29 +0700 Subject: [PATCH 01/23] Add `MessageReader` implementation I've decided that it would be a good idea to separete compoments that transform text messages into byte chunks to send to Unreal Engine and back. This means that each of them will have only one responsibility and using them will be easier in case we decide to read and write via tcp/ip in separate threads. The only interesting to them both parameter is the amount of bytes that were confirmed to be received from the Unreal Engine side, which can be easily shared "by hand". This patch implements "reader" that accepts byte stream expected from the game server and converts it into: 1. separate string messages; 2. amount of bytes game server reported to already have received. --- Cargo.lock | 10 ++ Cargo.toml | 1 + src/link/mod.rs | 300 ++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 1 + 4 files changed, 312 insertions(+) create mode 100644 src/link/mod.rs diff --git a/Cargo.lock b/Cargo.lock index b0eb9bf..11949f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3,4 +3,14 @@ [[package]] name = "avarice" version = "0.1.0" +dependencies = [ + "custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)", +] +[[package]] +name = "custom_error" +version = "1.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[metadata] +"checksum custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4f8a51dd197fa6ba5b4dc98a990a43cc13693c23eb0089ebb0fcc1f04152bca6" diff --git a/Cargo.toml b/Cargo.toml index e8facc0..58d91ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,3 +7,4 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +custom_error="1.9.2" \ No newline at end of file diff --git a/src/link/mod.rs b/src/link/mod.rs new file mode 100644 index 0000000..0a317a6 --- /dev/null +++ b/src/link/mod.rs @@ -0,0 +1,300 @@ +use std::collections::VecDeque; +use std::str; + +extern crate custom_error; +use custom_error::custom_error; + +const RECEIVED_FIELD_SIZE: usize = 4; +const LENGTH_FIELD_SIZE: usize = 4; +const HEAD_RECEIVED: u8 = 85; +const HEAD_MESSAGE: u8 = 42; +const MAX_MESSAGE_LENGTH: usize = 0xA00000; + +custom_error! { pub ReadingStreamError + InvalidHead{input: u8} = "Invalid byte used as a HEAD: {input}", + MessageTooLong{length: usize} = "Message to receive is too long: {length}", + InvalidUnicode = "Invalid utf-8 was received", + BrokenStream = "Used stream is broken" +} + +enum ReadingState { + Head, + ReceivedBytes, + Length, + Payload, +} + +pub struct MessageReader { + is_broken: bool, + reading_state: ReadingState, + read_bytes: usize, + current_message_length: usize, + current_message: Vec, + read_messages: VecDeque, + next_received_bytes: u32, + received_bytes: u64, +} + +/// For converting byte stream expected to be generated by Acedia mod from the game server into +/// actual messages. Expected format is a sequence of either: +/// [HEAD_RECEIVED: 1 byte] [amount of bytes received by game server since last update: 4 bytes] +/// [HEAD_MESSAGE: 1 byte] [length of the message: 4 bytes] [utf8-encoded string: ??? bytes] +/// On any invalid input enters a failure state (can be checked by `is_broken()`) and +/// never recovers from it. +/// Use either `push_byte()` or `push()` to input byte stream from game server and `pop()` to +/// retrieve resulting messages. +impl MessageReader { + pub fn new() -> MessageReader { + MessageReader { + is_broken: false, + reading_state: ReadingState::Head, + read_bytes: 0, + current_message_length: 0, + current_message: Vec::new(), + read_messages: VecDeque::new(), + next_received_bytes: 0, + received_bytes: 0, + } + } + + pub fn push_byte(&mut self, input: u8) -> Result<(), ReadingStreamError> { + if self.is_broken { + return Err(ReadingStreamError::BrokenStream); + } + match &self.reading_state { + ReadingState::Head => { + if input == HEAD_RECEIVED { + self.reading_state = ReadingState::ReceivedBytes; + } else if input == HEAD_MESSAGE { + self.reading_state = ReadingState::Length; + } else { + self.is_broken = true; + return Err(ReadingStreamError::InvalidHead { input }); + } + } + ReadingState::ReceivedBytes => { + self.next_received_bytes = self.next_received_bytes << 8; + self.next_received_bytes += input as u32; + self.read_bytes += 1; + if self.read_bytes >= RECEIVED_FIELD_SIZE { + self.received_bytes += self.next_received_bytes as u64; + self.next_received_bytes = 0; + self.read_bytes = 0; + self.reading_state = ReadingState::Head; + } + } + ReadingState::Length => { + self.current_message_length = self.current_message_length << 8; + self.current_message_length += input as usize; + self.read_bytes += 1; + if self.read_bytes >= LENGTH_FIELD_SIZE { + self.read_bytes = 0; + self.reading_state = ReadingState::Payload; + if self.current_message_length > MAX_MESSAGE_LENGTH { + self.is_broken = true; + return Err(ReadingStreamError::MessageTooLong { + length: self.current_message_length, + }); + } + self.current_message = Vec::with_capacity(self.current_message_length); + } + } + ReadingState::Payload => { + self.current_message.push(input); + self.read_bytes += 1 as usize; + if self.read_bytes >= self.current_message_length { + match str::from_utf8(&self.current_message) { + Ok(next_message) => self.read_messages.push_front(next_message.to_owned()), + _ => { + self.is_broken = true; + return Err(ReadingStreamError::InvalidUnicode); + } + }; + self.current_message.clear(); + self.current_message_length = 0; + self.read_bytes = 0; + self.reading_state = ReadingState::Head; + } + } + } + Ok(()) + } + + pub fn push(&mut self, input: &[u8]) -> Result<(), ReadingStreamError> { + for &byte in input { + self.push_byte(byte)?; + } + Ok(()) + } + + pub fn pop(&mut self) -> Option { + self.read_messages.pop_back() + } + + pub fn received_bytes(&self) -> u64 { + self.received_bytes + } + + pub fn is_broken(&self) -> bool { + self.is_broken + } +} + +#[test] +fn message_push_byte() { + let mut reader = MessageReader::new(); + reader.push_byte(HEAD_MESSAGE).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(13).unwrap(); + reader.push_byte(0x48).unwrap(); // H + reader.push_byte(0x65).unwrap(); // e + reader.push_byte(0x6c).unwrap(); // l + reader.push_byte(0x6c).unwrap(); // l + reader.push_byte(0x6f).unwrap(); // o + reader.push_byte(0x2c).unwrap(); // , + reader.push_byte(0x20).unwrap(); // + reader.push_byte(0x77).unwrap(); // w + reader.push_byte(0x6f).unwrap(); // o + reader.push_byte(0x72).unwrap(); // r + reader.push_byte(0x6c).unwrap(); // l + reader.push_byte(0x64).unwrap(); // d + reader.push_byte(0x21).unwrap(); // + reader.push_byte(HEAD_MESSAGE).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(3).unwrap(); + reader.push_byte(0x59).unwrap(); // Y + reader.push_byte(0x6f).unwrap(); // o + reader.push_byte(0x21).unwrap(); // + assert_eq!(reader.pop().unwrap(), "Hello, world!"); + assert_eq!(reader.pop().unwrap(), "Yo!"); + assert_eq!(reader.pop(), None); +} + +#[test] +fn received_push_byte() { + let mut reader = MessageReader::new(); + reader.push_byte(HEAD_RECEIVED).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(243).unwrap(); + assert_eq!(reader.received_bytes(), 243); + reader.push_byte(HEAD_RECEIVED).unwrap(); + reader.push_byte(65).unwrap(); + reader.push_byte(25).unwrap(); + reader.push_byte(178).unwrap(); + reader.push_byte(4).unwrap(); + assert_eq!(reader.received_bytes(), 1092203255); + reader.push_byte(HEAD_RECEIVED).unwrap(); + reader.push_byte(231).unwrap(); + reader.push_byte(34).unwrap(); + reader.push_byte(154).unwrap(); + assert_eq!(reader.received_bytes(), 1092203255); +} + +#[test] +fn mixed_push_byte() { + let mut reader = MessageReader::new(); + reader.push_byte(HEAD_RECEIVED).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(243).unwrap(); + reader.push_byte(HEAD_MESSAGE).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(3).unwrap(); + reader.push_byte(0x59).unwrap(); // Y + reader.push_byte(0x6f).unwrap(); // o + reader.push_byte(0x21).unwrap(); // + reader.push_byte(HEAD_RECEIVED).unwrap(); + reader.push_byte(65).unwrap(); + reader.push_byte(25).unwrap(); + reader.push_byte(178).unwrap(); + reader.push_byte(4).unwrap(); + assert_eq!(reader.received_bytes(), 1092203255); + assert_eq!(reader.pop().unwrap(), "Yo!"); + assert_eq!(reader.pop(), None); +} + +#[test] +fn pushing_many_bytes_at_once() { + let mut reader = MessageReader::new(); + reader + .push(&[ + HEAD_RECEIVED, + 0, + 0, + 0, + 243, + HEAD_MESSAGE, + 0, + 0, + 0, + 3, + 0x59, // Y + 0x6f, // o + 0x21, // + HEAD_RECEIVED, + 65, + 25, + 178, + 4, + ]) + .unwrap(); + assert_eq!(reader.received_bytes(), 1092203255); + assert_eq!(reader.pop().unwrap(), "Yo!"); + assert_eq!(reader.pop(), None); +} + +#[test] +fn generates_error_invalid_head() { + let mut reader = MessageReader::new(); + reader.push_byte(HEAD_RECEIVED).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(243).unwrap(); + assert!(!reader.is_broken()); + reader + .push_byte(25) + .expect_err("Testing failing on incorrect HEAD"); + assert!(reader.is_broken()); +} + +#[test] +fn generates_error_message_too_long() { + let mut reader = MessageReader::new(); + reader.push_byte(HEAD_MESSAGE).unwrap(); + // Max length right now is `0xA00000` + reader.push_byte(0).unwrap(); + reader.push_byte(0xA0).unwrap(); + reader.push_byte(0).unwrap(); + assert!(!reader.is_broken()); + reader + .push_byte(1) + .expect_err("Testing failing on exceeding allowed message length"); + assert!(reader.is_broken()); +} + +#[test] +fn generates_error_invalid_unicode() { + let mut reader = MessageReader::new(); + reader.push_byte(HEAD_MESSAGE).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(2).unwrap(); + reader.push_byte(0b11010011).unwrap(); // start of 2-byte sequence + assert!(!reader.is_broken()); + // Bytes inside multi-byte code point have to have `1` for their high bit + reader + .push_byte(0b01010011) + .expect_err("Testing failing on incorrect unicode"); + assert!(reader.is_broken()); +} diff --git a/src/main.rs b/src/main.rs index 8f664c9..604053c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ use std::env; use std::path::Path; +mod link; mod unreal_config; fn main() { -- 2.20.1 From ba3ac088dd1ec9fa27b7322062ef3422e1e28f16 Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Thu, 22 Jul 2021 18:03:48 +0700 Subject: [PATCH 02/23] Refactor numeric constants for `MessageReader` --- src/link/mod.rs | 74 ++++++++++++++++++++++++++++--------------------- 1 file changed, 43 insertions(+), 31 deletions(-) diff --git a/src/link/mod.rs b/src/link/mod.rs index 0a317a6..e54c9cd 100644 --- a/src/link/mod.rs +++ b/src/link/mod.rs @@ -4,11 +4,21 @@ use std::str; extern crate custom_error; use custom_error::custom_error; -const RECEIVED_FIELD_SIZE: usize = 4; -const LENGTH_FIELD_SIZE: usize = 4; -const HEAD_RECEIVED: u8 = 85; -const HEAD_MESSAGE: u8 = 42; -const MAX_MESSAGE_LENGTH: usize = 0xA00000; +// Defines how many bytes is used to encode "AMOUNT" field in the response from ue-server about +// amount of bytes it received since the last update +const UE_RECEIVED_FIELD_SIZE: usize = 4; +// Defines how many bytes is used to encode "LENGTH" field, describing length of +// next JSON message from ue-server +const UE_LENGTH_FIELD_SIZE: usize = 4; +// Value indicating that next byte sequence from ue-server reports amount of bytes received by +// that server so far. Value itself is arbitrary. + +const HEAD_UE_RECEIVED: u8 = 85; +// Value indicating that next byte sequence from ue-server contains JSON message. +// Value itself is arbitrary. +const HEAD_UE_MESSAGE: u8 = 42; +// Maximum allowed size of JSON message sent from ue-server. +const MAX_UE_MESSAGE_LENGTH: usize = 25 * 1024 * 1024; custom_error! { pub ReadingStreamError InvalidHead{input: u8} = "Invalid byte used as a HEAD: {input}", @@ -37,8 +47,8 @@ pub struct MessageReader { /// For converting byte stream expected to be generated by Acedia mod from the game server into /// actual messages. Expected format is a sequence of either: -/// [HEAD_RECEIVED: 1 byte] [amount of bytes received by game server since last update: 4 bytes] -/// [HEAD_MESSAGE: 1 byte] [length of the message: 4 bytes] [utf8-encoded string: ??? bytes] +/// [HEAD_UE_RECEIVED: 1 byte] [amount of bytes received by game server since last update: 4 bytes] +/// [HEAD_UE_MESSAGE: 1 byte] [length of the message: 4 bytes] [utf8-encoded string: ??? bytes] /// On any invalid input enters a failure state (can be checked by `is_broken()`) and /// never recovers from it. /// Use either `push_byte()` or `push()` to input byte stream from game server and `pop()` to @@ -63,9 +73,9 @@ impl MessageReader { } match &self.reading_state { ReadingState::Head => { - if input == HEAD_RECEIVED { + if input == HEAD_UE_RECEIVED { self.reading_state = ReadingState::ReceivedBytes; - } else if input == HEAD_MESSAGE { + } else if input == HEAD_UE_MESSAGE { self.reading_state = ReadingState::Length; } else { self.is_broken = true; @@ -76,7 +86,7 @@ impl MessageReader { self.next_received_bytes = self.next_received_bytes << 8; self.next_received_bytes += input as u32; self.read_bytes += 1; - if self.read_bytes >= RECEIVED_FIELD_SIZE { + if self.read_bytes >= UE_RECEIVED_FIELD_SIZE { self.received_bytes += self.next_received_bytes as u64; self.next_received_bytes = 0; self.read_bytes = 0; @@ -87,10 +97,10 @@ impl MessageReader { self.current_message_length = self.current_message_length << 8; self.current_message_length += input as usize; self.read_bytes += 1; - if self.read_bytes >= LENGTH_FIELD_SIZE { + if self.read_bytes >= UE_LENGTH_FIELD_SIZE { self.read_bytes = 0; self.reading_state = ReadingState::Payload; - if self.current_message_length > MAX_MESSAGE_LENGTH { + if self.current_message_length > MAX_UE_MESSAGE_LENGTH { self.is_broken = true; return Err(ReadingStreamError::MessageTooLong { length: self.current_message_length, @@ -143,7 +153,7 @@ impl MessageReader { #[test] fn message_push_byte() { let mut reader = MessageReader::new(); - reader.push_byte(HEAD_MESSAGE).unwrap(); + reader.push_byte(HEAD_UE_MESSAGE).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); @@ -161,7 +171,7 @@ fn message_push_byte() { reader.push_byte(0x6c).unwrap(); // l reader.push_byte(0x64).unwrap(); // d reader.push_byte(0x21).unwrap(); // - reader.push_byte(HEAD_MESSAGE).unwrap(); + reader.push_byte(HEAD_UE_MESSAGE).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); @@ -177,19 +187,19 @@ fn message_push_byte() { #[test] fn received_push_byte() { let mut reader = MessageReader::new(); - reader.push_byte(HEAD_RECEIVED).unwrap(); + reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(243).unwrap(); assert_eq!(reader.received_bytes(), 243); - reader.push_byte(HEAD_RECEIVED).unwrap(); + reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(65).unwrap(); reader.push_byte(25).unwrap(); reader.push_byte(178).unwrap(); reader.push_byte(4).unwrap(); assert_eq!(reader.received_bytes(), 1092203255); - reader.push_byte(HEAD_RECEIVED).unwrap(); + reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(231).unwrap(); reader.push_byte(34).unwrap(); reader.push_byte(154).unwrap(); @@ -199,12 +209,12 @@ fn received_push_byte() { #[test] fn mixed_push_byte() { let mut reader = MessageReader::new(); - reader.push_byte(HEAD_RECEIVED).unwrap(); + reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(243).unwrap(); - reader.push_byte(HEAD_MESSAGE).unwrap(); + reader.push_byte(HEAD_UE_MESSAGE).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); @@ -212,7 +222,7 @@ fn mixed_push_byte() { reader.push_byte(0x59).unwrap(); // Y reader.push_byte(0x6f).unwrap(); // o reader.push_byte(0x21).unwrap(); // - reader.push_byte(HEAD_RECEIVED).unwrap(); + reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(65).unwrap(); reader.push_byte(25).unwrap(); reader.push_byte(178).unwrap(); @@ -227,12 +237,12 @@ fn pushing_many_bytes_at_once() { let mut reader = MessageReader::new(); reader .push(&[ - HEAD_RECEIVED, + HEAD_UE_RECEIVED, 0, 0, 0, 243, - HEAD_MESSAGE, + HEAD_UE_MESSAGE, 0, 0, 0, @@ -240,7 +250,7 @@ fn pushing_many_bytes_at_once() { 0x59, // Y 0x6f, // o 0x21, // - HEAD_RECEIVED, + HEAD_UE_RECEIVED, 65, 25, 178, @@ -255,7 +265,7 @@ fn pushing_many_bytes_at_once() { #[test] fn generates_error_invalid_head() { let mut reader = MessageReader::new(); - reader.push_byte(HEAD_RECEIVED).unwrap(); + reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); @@ -270,14 +280,16 @@ fn generates_error_invalid_head() { #[test] fn generates_error_message_too_long() { let mut reader = MessageReader::new(); - reader.push_byte(HEAD_MESSAGE).unwrap(); - // Max length right now is `0xA00000` - reader.push_byte(0).unwrap(); - reader.push_byte(0xA0).unwrap(); - reader.push_byte(0).unwrap(); + let huge_length = MAX_UE_MESSAGE_LENGTH + 1; + let bytes = (huge_length as u32).to_be_bytes(); + + reader.push_byte(HEAD_UE_MESSAGE).unwrap(); + reader.push_byte(bytes[0]).unwrap(); + reader.push_byte(bytes[1]).unwrap(); + reader.push_byte(bytes[2]).unwrap(); assert!(!reader.is_broken()); reader - .push_byte(1) + .push_byte(bytes[3]) .expect_err("Testing failing on exceeding allowed message length"); assert!(reader.is_broken()); } @@ -285,7 +297,7 @@ fn generates_error_message_too_long() { #[test] fn generates_error_invalid_unicode() { let mut reader = MessageReader::new(); - reader.push_byte(HEAD_MESSAGE).unwrap(); + reader.push_byte(HEAD_UE_MESSAGE).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); -- 2.20.1 From b846fcf55b0477a8642f5f985b9fe4ee4ea2c801 Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Thu, 22 Jul 2021 18:11:44 +0700 Subject: [PATCH 03/23] Fix `MessageReader` documentation --- src/link/mod.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/link/mod.rs b/src/link/mod.rs index e54c9cd..eeba2fe 100644 --- a/src/link/mod.rs +++ b/src/link/mod.rs @@ -45,13 +45,16 @@ pub struct MessageReader { received_bytes: u64, } -/// For converting byte stream expected to be generated by Acedia mod from the game server into -/// actual messages. Expected format is a sequence of either: -/// [HEAD_UE_RECEIVED: 1 byte] [amount of bytes received by game server since last update: 4 bytes] -/// [HEAD_UE_MESSAGE: 1 byte] [length of the message: 4 bytes] [utf8-encoded string: ??? bytes] -/// On any invalid input enters a failure state (can be checked by `is_broken()`) and +/// For converting byte stream that is expected from the ue-server into actual messages. +/// Expected format is a sequence of either: +/// 1. [HEAD_UE_RECEIVED: marker byte | 1 byte] +/// [AMOUNT: amount of bytes received by ue-server since last update | 4 bytes: u32 BE] +/// 2. [HEAD_UE_MESSAGE: marker byte | 1 byte] +/// [LENGTH: length of the JSON message in utf8 encoding | 4 bytes: u32 BE] +/// [PAYLOAD: utf8-encoded string | `LENGTH` bytes] +/// On any invalid input enters into a failure state (can be checked by `is_broken()`) and /// never recovers from it. -/// Use either `push_byte()` or `push()` to input byte stream from game server and `pop()` to +/// Use either `push_byte()` or `push()` to input byte stream from ue-server and `pop()` to /// retrieve resulting messages. impl MessageReader { pub fn new() -> MessageReader { -- 2.20.1 From 0dedd1d1f1386e7b48757499fa8bfad89e9127f8 Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Thu, 22 Jul 2021 18:18:29 +0700 Subject: [PATCH 04/23] Change `MessageReader` to use `with_capacity` Some of the collections inside `MessageReader` were created with `new()` instead of `with_capacity()` call. This patch fixes that or comments why it was not done in some places. --- src/link/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/link/mod.rs b/src/link/mod.rs index eeba2fe..db57956 100644 --- a/src/link/mod.rs +++ b/src/link/mod.rs @@ -20,6 +20,9 @@ const HEAD_UE_MESSAGE: u8 = 42; // Maximum allowed size of JSON message sent from ue-server. const MAX_UE_MESSAGE_LENGTH: usize = 25 * 1024 * 1024; +// We do not expect to receive more that this much messages at once from ue-server +const EXPECTED_LIMIT_TO_UE_MESSAGES: usize = 100; + custom_error! { pub ReadingStreamError InvalidHead{input: u8} = "Invalid byte used as a HEAD: {input}", MessageTooLong{length: usize} = "Message to receive is too long: {length}", @@ -63,8 +66,9 @@ impl MessageReader { reading_state: ReadingState::Head, read_bytes: 0, current_message_length: 0, + // Will be recreated with `with_capacity` in `push_byte()` current_message: Vec::new(), - read_messages: VecDeque::new(), + read_messages: VecDeque::with_capacity(EXPECTED_LIMIT_TO_UE_MESSAGES), next_received_bytes: 0, received_bytes: 0, } -- 2.20.1 From af3341c1e77b776eb9ec23aaf6fa252c010e3916 Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Thu, 22 Jul 2021 18:48:09 +0700 Subject: [PATCH 05/23] Refactor `MessageReader` for clarity Rename `received_bytes` into `ue_received_bytes` and get rid of `next_received_bytes` by reading relevant data into new byte buffer instead. --- src/link/mod.rs | 59 +++++++++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 26 deletions(-) diff --git a/src/link/mod.rs b/src/link/mod.rs index db57956..b761827 100644 --- a/src/link/mod.rs +++ b/src/link/mod.rs @@ -41,11 +41,11 @@ pub struct MessageReader { is_broken: bool, reading_state: ReadingState, read_bytes: usize, + buffer: [u8; 4], current_message_length: usize, current_message: Vec, read_messages: VecDeque, - next_received_bytes: u32, - received_bytes: u64, + ue_received_bytes: u64, } /// For converting byte stream that is expected from the ue-server into actual messages. @@ -65,12 +65,12 @@ impl MessageReader { is_broken: false, reading_state: ReadingState::Head, read_bytes: 0, + buffer: [0; 4], current_message_length: 0, // Will be recreated with `with_capacity` in `push_byte()` current_message: Vec::new(), read_messages: VecDeque::with_capacity(EXPECTED_LIMIT_TO_UE_MESSAGES), - next_received_bytes: 0, - received_bytes: 0, + ue_received_bytes: 0, } } @@ -81,32 +81,28 @@ impl MessageReader { match &self.reading_state { ReadingState::Head => { if input == HEAD_UE_RECEIVED { - self.reading_state = ReadingState::ReceivedBytes; + self.change_state(ReadingState::ReceivedBytes); } else if input == HEAD_UE_MESSAGE { - self.reading_state = ReadingState::Length; + self.change_state(ReadingState::Length); } else { self.is_broken = true; return Err(ReadingStreamError::InvalidHead { input }); } } ReadingState::ReceivedBytes => { - self.next_received_bytes = self.next_received_bytes << 8; - self.next_received_bytes += input as u32; + self.buffer[self.read_bytes] = input; self.read_bytes += 1; if self.read_bytes >= UE_RECEIVED_FIELD_SIZE { - self.received_bytes += self.next_received_bytes as u64; - self.next_received_bytes = 0; - self.read_bytes = 0; - self.reading_state = ReadingState::Head; + self.ue_received_bytes += array_of_u8_to_u32(self.buffer) as u64; + self.change_state(ReadingState::Head); } } ReadingState::Length => { - self.current_message_length = self.current_message_length << 8; - self.current_message_length += input as usize; + self.buffer[self.read_bytes] = input; self.read_bytes += 1; if self.read_bytes >= UE_LENGTH_FIELD_SIZE { - self.read_bytes = 0; - self.reading_state = ReadingState::Payload; + self.current_message_length = array_of_u8_to_u32(self.buffer) as usize; + self.change_state(ReadingState::Payload); if self.current_message_length > MAX_UE_MESSAGE_LENGTH { self.is_broken = true; return Err(ReadingStreamError::MessageTooLong { @@ -118,7 +114,7 @@ impl MessageReader { } ReadingState::Payload => { self.current_message.push(input); - self.read_bytes += 1 as usize; + self.read_bytes += 1; if self.read_bytes >= self.current_message_length { match str::from_utf8(&self.current_message) { Ok(next_message) => self.read_messages.push_front(next_message.to_owned()), @@ -129,8 +125,7 @@ impl MessageReader { }; self.current_message.clear(); self.current_message_length = 0; - self.read_bytes = 0; - self.reading_state = ReadingState::Head; + self.change_state(ReadingState::Head); } } } @@ -148,13 +143,25 @@ impl MessageReader { self.read_messages.pop_back() } - pub fn received_bytes(&self) -> u64 { - self.received_bytes + pub fn ue_received_bytes(&self) -> u64 { + self.ue_received_bytes } pub fn is_broken(&self) -> bool { self.is_broken } + + fn change_state(&mut self, next_state: ReadingState) { + self.read_bytes = 0; + self.reading_state = next_state; + } +} + +fn array_of_u8_to_u32(bytes: [u8; 4]) -> u64 { + (u64::from(bytes[0]) << 24) + + (u64::from(bytes[1]) << 16) + + (u64::from(bytes[2]) << 8) + + (u64::from(bytes[3])) } #[test] @@ -199,18 +206,18 @@ fn received_push_byte() { reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(243).unwrap(); - assert_eq!(reader.received_bytes(), 243); + assert_eq!(reader.ue_received_bytes(), 243); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(65).unwrap(); reader.push_byte(25).unwrap(); reader.push_byte(178).unwrap(); reader.push_byte(4).unwrap(); - assert_eq!(reader.received_bytes(), 1092203255); + assert_eq!(reader.ue_received_bytes(), 1092203255); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(231).unwrap(); reader.push_byte(34).unwrap(); reader.push_byte(154).unwrap(); - assert_eq!(reader.received_bytes(), 1092203255); + assert_eq!(reader.ue_received_bytes(), 1092203255); } #[test] @@ -234,7 +241,7 @@ fn mixed_push_byte() { reader.push_byte(25).unwrap(); reader.push_byte(178).unwrap(); reader.push_byte(4).unwrap(); - assert_eq!(reader.received_bytes(), 1092203255); + assert_eq!(reader.ue_received_bytes(), 1092203255); assert_eq!(reader.pop().unwrap(), "Yo!"); assert_eq!(reader.pop(), None); } @@ -264,7 +271,7 @@ fn pushing_many_bytes_at_once() { 4, ]) .unwrap(); - assert_eq!(reader.received_bytes(), 1092203255); + assert_eq!(reader.ue_received_bytes(), 1092203255); assert_eq!(reader.pop().unwrap(), "Yo!"); assert_eq!(reader.pop(), None); } -- 2.20.1 From bb1f73a755753221159eb247553b5b4d0c9c9042 Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Thu, 22 Jul 2021 18:53:22 +0700 Subject: [PATCH 06/23] Change character byte definitions into b'X' form --- src/link/mod.rs | 44 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/src/link/mod.rs b/src/link/mod.rs index b761827..87d8533 100644 --- a/src/link/mod.rs +++ b/src/link/mod.rs @@ -172,27 +172,27 @@ fn message_push_byte() { reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(13).unwrap(); - reader.push_byte(0x48).unwrap(); // H - reader.push_byte(0x65).unwrap(); // e - reader.push_byte(0x6c).unwrap(); // l - reader.push_byte(0x6c).unwrap(); // l - reader.push_byte(0x6f).unwrap(); // o - reader.push_byte(0x2c).unwrap(); // , - reader.push_byte(0x20).unwrap(); // - reader.push_byte(0x77).unwrap(); // w - reader.push_byte(0x6f).unwrap(); // o - reader.push_byte(0x72).unwrap(); // r - reader.push_byte(0x6c).unwrap(); // l - reader.push_byte(0x64).unwrap(); // d - reader.push_byte(0x21).unwrap(); // + reader.push_byte(b'H').unwrap(); + reader.push_byte(b'e').unwrap(); + reader.push_byte(b'l').unwrap(); + reader.push_byte(b'l').unwrap(); + reader.push_byte(b'o').unwrap(); + reader.push_byte(b',').unwrap(); + reader.push_byte(b' ').unwrap(); + reader.push_byte(b'w').unwrap(); + reader.push_byte(b'o').unwrap(); + reader.push_byte(b'r').unwrap(); + reader.push_byte(b'l').unwrap(); + reader.push_byte(b'd').unwrap(); + reader.push_byte(b'!').unwrap(); reader.push_byte(HEAD_UE_MESSAGE).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(3).unwrap(); - reader.push_byte(0x59).unwrap(); // Y - reader.push_byte(0x6f).unwrap(); // o - reader.push_byte(0x21).unwrap(); // + reader.push_byte(b'Y').unwrap(); + reader.push_byte(b'o').unwrap(); + reader.push_byte(b'!').unwrap(); assert_eq!(reader.pop().unwrap(), "Hello, world!"); assert_eq!(reader.pop().unwrap(), "Yo!"); assert_eq!(reader.pop(), None); @@ -233,9 +233,9 @@ fn mixed_push_byte() { reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(3).unwrap(); - reader.push_byte(0x59).unwrap(); // Y - reader.push_byte(0x6f).unwrap(); // o - reader.push_byte(0x21).unwrap(); // + reader.push_byte(b'Y').unwrap(); + reader.push_byte(b'o').unwrap(); + reader.push_byte(b'!').unwrap(); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(65).unwrap(); reader.push_byte(25).unwrap(); @@ -261,9 +261,9 @@ fn pushing_many_bytes_at_once() { 0, 0, 3, - 0x59, // Y - 0x6f, // o - 0x21, // + b'Y', + b'o', + b'!', HEAD_UE_RECEIVED, 65, 25, -- 2.20.1 From 202fa39a42e6340ccf7ed7fc4363b65bce382129 Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Fri, 23 Jul 2021 01:22:29 +0700 Subject: [PATCH 07/23] Fix comments for some `MessageReader` contants --- src/link/mod.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/link/mod.rs b/src/link/mod.rs index 87d8533..d10f93a 100644 --- a/src/link/mod.rs +++ b/src/link/mod.rs @@ -10,12 +10,10 @@ const UE_RECEIVED_FIELD_SIZE: usize = 4; // Defines how many bytes is used to encode "LENGTH" field, describing length of // next JSON message from ue-server const UE_LENGTH_FIELD_SIZE: usize = 4; -// Value indicating that next byte sequence from ue-server reports amount of bytes received by -// that server so far. Value itself is arbitrary. - +// Arbitrary value indicating that next byte sequence from ue-server reports amount of bytes +// received by that server so far. const HEAD_UE_RECEIVED: u8 = 85; -// Value indicating that next byte sequence from ue-server contains JSON message. -// Value itself is arbitrary. +// Arbitrary value indicating that next byte sequence from ue-server contains JSON message. const HEAD_UE_MESSAGE: u8 = 42; // Maximum allowed size of JSON message sent from ue-server. const MAX_UE_MESSAGE_LENGTH: usize = 25 * 1024 * 1024; -- 2.20.1 From f118767e3faddad55cdc4751f64a0c4209be7156 Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Fri, 23 Jul 2021 01:23:13 +0700 Subject: [PATCH 08/23] Rename `buffer` into `length_buffer` --- src/link/mod.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/link/mod.rs b/src/link/mod.rs index d10f93a..c58ce13 100644 --- a/src/link/mod.rs +++ b/src/link/mod.rs @@ -39,7 +39,7 @@ pub struct MessageReader { is_broken: bool, reading_state: ReadingState, read_bytes: usize, - buffer: [u8; 4], + length_buffer: [u8; 4], current_message_length: usize, current_message: Vec, read_messages: VecDeque, @@ -63,7 +63,7 @@ impl MessageReader { is_broken: false, reading_state: ReadingState::Head, read_bytes: 0, - buffer: [0; 4], + length_buffer: [0; 4], current_message_length: 0, // Will be recreated with `with_capacity` in `push_byte()` current_message: Vec::new(), @@ -88,18 +88,18 @@ impl MessageReader { } } ReadingState::ReceivedBytes => { - self.buffer[self.read_bytes] = input; + self.length_buffer[self.read_bytes] = input; self.read_bytes += 1; if self.read_bytes >= UE_RECEIVED_FIELD_SIZE { - self.ue_received_bytes += array_of_u8_to_u32(self.buffer) as u64; + self.ue_received_bytes += array_of_u8_to_u32(self.length_buffer) as u64; self.change_state(ReadingState::Head); } } ReadingState::Length => { - self.buffer[self.read_bytes] = input; + self.length_buffer[self.read_bytes] = input; self.read_bytes += 1; if self.read_bytes >= UE_LENGTH_FIELD_SIZE { - self.current_message_length = array_of_u8_to_u32(self.buffer) as usize; + self.current_message_length = array_of_u8_to_u32(self.length_buffer) as usize; self.change_state(ReadingState::Payload); if self.current_message_length > MAX_UE_MESSAGE_LENGTH { self.is_broken = true; -- 2.20.1 From 9ca00c5b8cbf480b24aae0753a327f4965307c8e Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Fri, 23 Jul 2021 01:28:33 +0700 Subject: [PATCH 09/23] Refactor tests for clarity --- src/link/mod.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/link/mod.rs b/src/link/mod.rs index c58ce13..0fce536 100644 --- a/src/link/mod.rs +++ b/src/link/mod.rs @@ -203,19 +203,19 @@ fn received_push_byte() { reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); - reader.push_byte(243).unwrap(); - assert_eq!(reader.ue_received_bytes(), 243); + reader.push_byte(0xf3).unwrap(); + assert_eq!(reader.ue_received_bytes(), 0xf3); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); - reader.push_byte(65).unwrap(); - reader.push_byte(25).unwrap(); - reader.push_byte(178).unwrap(); - reader.push_byte(4).unwrap(); - assert_eq!(reader.ue_received_bytes(), 1092203255); + reader.push_byte(0x41).unwrap(); + reader.push_byte(0x19).unwrap(); + reader.push_byte(0xb2).unwrap(); + reader.push_byte(0x04).unwrap(); + assert_eq!(reader.ue_received_bytes(), 0x41_19_b2_f7); // 0xf7 = 0x04 + 0xf3 reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(231).unwrap(); reader.push_byte(34).unwrap(); reader.push_byte(154).unwrap(); - assert_eq!(reader.ue_received_bytes(), 1092203255); + assert_eq!(reader.ue_received_bytes(), 0x41_19_b2_f7); } #[test] -- 2.20.1 From e3f554218acdf2582176d52684fc5c983bb6cb97 Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Fri, 23 Jul 2021 01:31:40 +0700 Subject: [PATCH 10/23] Refactor `push_byte`'s code Move logic below error checking with early exit. --- src/link/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/link/mod.rs b/src/link/mod.rs index 0fce536..f9b69a6 100644 --- a/src/link/mod.rs +++ b/src/link/mod.rs @@ -100,7 +100,6 @@ impl MessageReader { self.read_bytes += 1; if self.read_bytes >= UE_LENGTH_FIELD_SIZE { self.current_message_length = array_of_u8_to_u32(self.length_buffer) as usize; - self.change_state(ReadingState::Payload); if self.current_message_length > MAX_UE_MESSAGE_LENGTH { self.is_broken = true; return Err(ReadingStreamError::MessageTooLong { @@ -108,6 +107,7 @@ impl MessageReader { }); } self.current_message = Vec::with_capacity(self.current_message_length); + self.change_state(ReadingState::Payload); } } ReadingState::Payload => { -- 2.20.1 From 978a5c4182296ae45ca7a2aa780e53a5278819d4 Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Fri, 23 Jul 2021 01:48:42 +0700 Subject: [PATCH 11/23] Change `UE_RECEIVED_FIELD_SIZE` to 2 ue-server is only supposed to receive up to 4095 bytes at once and is expected to report after reading them, therefore 4 bytes for the amount of received bytes is excessive and 2 will suffice. --- src/link/mod.rs | 58 ++++++++++++++++++++----------------------------- 1 file changed, 23 insertions(+), 35 deletions(-) diff --git a/src/link/mod.rs b/src/link/mod.rs index f9b69a6..9cb8637 100644 --- a/src/link/mod.rs +++ b/src/link/mod.rs @@ -6,7 +6,7 @@ use custom_error::custom_error; // Defines how many bytes is used to encode "AMOUNT" field in the response from ue-server about // amount of bytes it received since the last update -const UE_RECEIVED_FIELD_SIZE: usize = 4; +const UE_RECEIVED_FIELD_SIZE: usize = 2; // Defines how many bytes is used to encode "LENGTH" field, describing length of // next JSON message from ue-server const UE_LENGTH_FIELD_SIZE: usize = 4; @@ -49,7 +49,7 @@ pub struct MessageReader { /// For converting byte stream that is expected from the ue-server into actual messages. /// Expected format is a sequence of either: /// 1. [HEAD_UE_RECEIVED: marker byte | 1 byte] -/// [AMOUNT: amount of bytes received by ue-server since last update | 4 bytes: u32 BE] +/// [AMOUNT: amount of bytes received by ue-server since last update | 2 bytes: u16 BE] /// 2. [HEAD_UE_MESSAGE: marker byte | 1 byte] /// [LENGTH: length of the JSON message in utf8 encoding | 4 bytes: u32 BE] /// [PAYLOAD: utf8-encoded string | `LENGTH` bytes] @@ -91,7 +91,7 @@ impl MessageReader { self.length_buffer[self.read_bytes] = input; self.read_bytes += 1; if self.read_bytes >= UE_RECEIVED_FIELD_SIZE { - self.ue_received_bytes += array_of_u8_to_u32(self.length_buffer) as u64; + self.ue_received_bytes += array_of_u8_to_u16(self.length_buffer) as u64; self.change_state(ReadingState::Head); } } @@ -155,11 +155,15 @@ impl MessageReader { } } -fn array_of_u8_to_u32(bytes: [u8; 4]) -> u64 { - (u64::from(bytes[0]) << 24) - + (u64::from(bytes[1]) << 16) - + (u64::from(bytes[2]) << 8) - + (u64::from(bytes[3])) +fn array_of_u8_to_u16(bytes: [u8; 4]) -> u16 { + (u16::from(bytes[0]) << 8) + u16::from(bytes[1]) +} + +fn array_of_u8_to_u32(bytes: [u8; 4]) -> u32 { + (u32::from(bytes[0]) << 24) + + (u32::from(bytes[1]) << 16) + + (u32::from(bytes[2]) << 8) + + (u32::from(bytes[3])) } #[test] @@ -201,21 +205,15 @@ fn received_push_byte() { let mut reader = MessageReader::new(); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(0).unwrap(); - reader.push_byte(0).unwrap(); - reader.push_byte(0).unwrap(); reader.push_byte(0xf3).unwrap(); assert_eq!(reader.ue_received_bytes(), 0xf3); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); - reader.push_byte(0x41).unwrap(); - reader.push_byte(0x19).unwrap(); reader.push_byte(0xb2).unwrap(); reader.push_byte(0x04).unwrap(); - assert_eq!(reader.ue_received_bytes(), 0x41_19_b2_f7); // 0xf7 = 0x04 + 0xf3 + assert_eq!(reader.ue_received_bytes(), 0xb2_f7); // 0xf7 = 0x04 + 0xf3 reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(231).unwrap(); - reader.push_byte(34).unwrap(); - reader.push_byte(154).unwrap(); - assert_eq!(reader.ue_received_bytes(), 0x41_19_b2_f7); + assert_eq!(reader.ue_received_bytes(), 0xb2_f7); } #[test] @@ -223,9 +221,7 @@ fn mixed_push_byte() { let mut reader = MessageReader::new(); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(0).unwrap(); - reader.push_byte(0).unwrap(); - reader.push_byte(0).unwrap(); - reader.push_byte(243).unwrap(); + reader.push_byte(0xf3).unwrap(); reader.push_byte(HEAD_UE_MESSAGE).unwrap(); reader.push_byte(0).unwrap(); reader.push_byte(0).unwrap(); @@ -235,11 +231,9 @@ fn mixed_push_byte() { reader.push_byte(b'o').unwrap(); reader.push_byte(b'!').unwrap(); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); - reader.push_byte(65).unwrap(); - reader.push_byte(25).unwrap(); - reader.push_byte(178).unwrap(); - reader.push_byte(4).unwrap(); - assert_eq!(reader.ue_received_bytes(), 1092203255); + reader.push_byte(0xb2).unwrap(); + reader.push_byte(0x04).unwrap(); + assert_eq!(reader.ue_received_bytes(), 0xb2_f7); // 0xf7 = 0x04 + 0xf3 assert_eq!(reader.pop().unwrap(), "Yo!"); assert_eq!(reader.pop(), None); } @@ -251,9 +245,7 @@ fn pushing_many_bytes_at_once() { .push(&[ HEAD_UE_RECEIVED, 0, - 0, - 0, - 243, + 0xf3, HEAD_UE_MESSAGE, 0, 0, @@ -263,13 +255,11 @@ fn pushing_many_bytes_at_once() { b'o', b'!', HEAD_UE_RECEIVED, - 65, - 25, - 178, - 4, + 0xb2, + 0x04, ]) .unwrap(); - assert_eq!(reader.ue_received_bytes(), 1092203255); + assert_eq!(reader.ue_received_bytes(), 0xb2_f7); assert_eq!(reader.pop().unwrap(), "Yo!"); assert_eq!(reader.pop(), None); } @@ -279,9 +269,7 @@ fn generates_error_invalid_head() { let mut reader = MessageReader::new(); reader.push_byte(HEAD_UE_RECEIVED).unwrap(); reader.push_byte(0).unwrap(); - reader.push_byte(0).unwrap(); - reader.push_byte(0).unwrap(); - reader.push_byte(243).unwrap(); + reader.push_byte(0xf3).unwrap(); assert!(!reader.is_broken()); reader .push_byte(25) -- 2.20.1 From 5ce511c5a72224898c5c20d82807152d1833fcf8 Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Fri, 23 Jul 2021 01:50:47 +0700 Subject: [PATCH 12/23] Refactor `main.rs` to use `if let` construction --- src/main.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main.rs b/src/main.rs index 604053c..7f15ba3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,8 +7,7 @@ fn main() { let args: Vec = env::args().collect(); let filename = &args[1]; let config = unreal_config::load_file(Path::new(filename)); - match config { - Ok(config) => print!("{}", config), - _ => (), + if let Ok(config) = config { + print!("{}", config); } } -- 2.20.1 From 3f660f54d5476cb9db6bd495526f91ca4fe25d5f Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Fri, 23 Jul 2021 02:01:17 +0700 Subject: [PATCH 13/23] Remove `EXPECTED_LIMIT_TO_UE_MESSAGES` constant --- src/link/mod.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/link/mod.rs b/src/link/mod.rs index 9cb8637..9bc53c5 100644 --- a/src/link/mod.rs +++ b/src/link/mod.rs @@ -18,9 +18,6 @@ const HEAD_UE_MESSAGE: u8 = 42; // Maximum allowed size of JSON message sent from ue-server. const MAX_UE_MESSAGE_LENGTH: usize = 25 * 1024 * 1024; -// We do not expect to receive more that this much messages at once from ue-server -const EXPECTED_LIMIT_TO_UE_MESSAGES: usize = 100; - custom_error! { pub ReadingStreamError InvalidHead{input: u8} = "Invalid byte used as a HEAD: {input}", MessageTooLong{length: usize} = "Message to receive is too long: {length}", @@ -67,7 +64,8 @@ impl MessageReader { current_message_length: 0, // Will be recreated with `with_capacity` in `push_byte()` current_message: Vec::new(), - read_messages: VecDeque::with_capacity(EXPECTED_LIMIT_TO_UE_MESSAGES), + // This value should be more than enough for typical use + read_messages: VecDeque::with_capacity(100), ue_received_bytes: 0, } } -- 2.20.1 From b187041d9e053989957f20df3f8589dd7165eb33 Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Fri, 23 Jul 2021 02:02:05 +0700 Subject: [PATCH 14/23] Move documentation for `MessageReader` --- src/link/mod.rs | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/link/mod.rs b/src/link/mod.rs index 9bc53c5..511473d 100644 --- a/src/link/mod.rs +++ b/src/link/mod.rs @@ -32,17 +32,6 @@ enum ReadingState { Payload, } -pub struct MessageReader { - is_broken: bool, - reading_state: ReadingState, - read_bytes: usize, - length_buffer: [u8; 4], - current_message_length: usize, - current_message: Vec, - read_messages: VecDeque, - ue_received_bytes: u64, -} - /// For converting byte stream that is expected from the ue-server into actual messages. /// Expected format is a sequence of either: /// 1. [HEAD_UE_RECEIVED: marker byte | 1 byte] @@ -54,6 +43,17 @@ pub struct MessageReader { /// never recovers from it. /// Use either `push_byte()` or `push()` to input byte stream from ue-server and `pop()` to /// retrieve resulting messages. +pub struct MessageReader { + is_broken: bool, + reading_state: ReadingState, + read_bytes: usize, + length_buffer: [u8; 4], + current_message_length: usize, + current_message: Vec, + read_messages: VecDeque, + ue_received_bytes: u64, +} + impl MessageReader { pub fn new() -> MessageReader { MessageReader { -- 2.20.1 From 816454f4cf0e2a3d598190c8d3fc5a8a46732fb5 Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Fri, 23 Jul 2021 02:36:05 +0700 Subject: [PATCH 15/23] Move `MessageReader` into a separate file --- src/link/{mod.rs => reader.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/link/{mod.rs => reader.rs} (100%) diff --git a/src/link/mod.rs b/src/link/reader.rs similarity index 100% rename from src/link/mod.rs rename to src/link/reader.rs -- 2.20.1 From f310febe62b4472cb367c22c952ba611d01a715e Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Fri, 23 Jul 2021 02:36:40 +0700 Subject: [PATCH 16/23] Add new `mod.rs` file with for `link` module --- src/link/mod.rs | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 src/link/mod.rs diff --git a/src/link/mod.rs b/src/link/mod.rs new file mode 100644 index 0000000..26ec5ad --- /dev/null +++ b/src/link/mod.rs @@ -0,0 +1,2 @@ +mod reader; +pub use reader::MessageReader; -- 2.20.1 From 5bfdd612482b02b7f3edf89132948e50369c7135 Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Fri, 23 Jul 2021 05:53:26 +0700 Subject: [PATCH 17/23] Add `MessageWriter` implementation This is WIP implementation, buggy, untested and not properly commented --- src/link/mod.rs | 2 + src/link/writer.rs | 169 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 171 insertions(+) create mode 100644 src/link/writer.rs diff --git a/src/link/mod.rs b/src/link/mod.rs index 26ec5ad..70b5e9e 100644 --- a/src/link/mod.rs +++ b/src/link/mod.rs @@ -1,2 +1,4 @@ mod reader; +mod writer; pub use reader::MessageReader; +pub use writer::MessageWriter; diff --git a/src/link/writer.rs b/src/link/writer.rs new file mode 100644 index 0000000..ff7f9c6 --- /dev/null +++ b/src/link/writer.rs @@ -0,0 +1,169 @@ +use std::cmp::{max, min}; +use std::collections::VecDeque; +use std::convert::TryFrom; + +const LENGTH_FIELD_SIZE: usize = 4; +const UE_INPUT_BUFFER: usize = 4095; + +struct PendingMessage { + contents: Vec, + bytes_sent: usize, +} + +pub struct MessageWriter { + sent_bytes: u64, + ue_received_bytes: u64, + pending_messages: VecDeque, +} + +impl PendingMessage { + fn bytes_left(&self) -> usize { + max(0, self.contents.len() - self.bytes_sent) + } +} + +impl MessageWriter { + pub fn new() -> MessageWriter { + MessageWriter { + sent_bytes: 0, + ue_received_bytes: 0, + pending_messages: VecDeque::with_capacity(100), + } + } + + pub fn push(&mut self, message: &str) { + let message_as_utf8 = message.as_bytes(); + self.pending_messages.push_back(PendingMessage { + contents: [ + &(message_as_utf8.len() as u32).to_be_bytes(), + message_as_utf8, + ] + .concat(), + bytes_sent: 0, + }); + } + + pub fn try_pop(&mut self) -> Option> { + if !self.should_send_now() { + return None; + } + let max_output_bytes = self.available_ue_buffer_capacity(); + let next_payload_size = max_output_bytes - 2; + let next_message = match self.pending_messages.get_mut(0) { + Some(message) => message, + _ => return None, + }; + let first_index = next_message.bytes_sent; + let last_index = min( + next_message.bytes_sent + next_payload_size - 1, + next_message.contents.len() - 1, + ); + next_message.bytes_sent = last_index + 1; + let chunk_length = last_index - first_index + 1; + let bytes_to_send = [ + &(chunk_length as u16).to_be_bytes(), + &next_message.contents[first_index..last_index + 1], + ] + .concat(); + self.sent_bytes += bytes_to_send.len() as u64; + if next_message.bytes_sent >= next_message.contents.len() { + self.pending_messages.pop_front(); + } + Some(bytes_to_send) + } + + pub fn is_empty(&self) -> bool { + self.pending_messages.is_empty() + } + + pub fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) { + self.ue_received_bytes = max(ue_received_bytes, self.ue_received_bytes); + } + + fn available_ue_buffer_capacity(&self) -> usize { + match usize::try_from(self.sent_bytes - self.ue_received_bytes).ok() { + Some(ue_buffered_bytes) => max(0, UE_INPUT_BUFFER - ue_buffered_bytes), + _ => 0, + } + } + + fn should_send_now(&self) -> bool { + if self.pending_messages.is_empty() { + return false; + } + let bytes_left_in_message = match self.pending_messages.get(0) { + Some(message) => message.bytes_left(), + _ => return false, + }; + let max_output_bytes = self.available_ue_buffer_capacity(); + if bytes_left_in_message <= max_output_bytes { + return true; + }; + let max_bytes_in_payload = UE_INPUT_BUFFER - LENGTH_FIELD_SIZE; + let next_payload_size = max_output_bytes.checked_sub(4).unwrap_or_default(); + let chunks_in_message = (bytes_left_in_message / max_bytes_in_payload) + 1; + let chunks_in_message_after_sending = + ((bytes_left_in_message - next_payload_size) / max_bytes_in_payload) + 1; + chunks_in_message_after_sending < chunks_in_message + } +} + +#[test] +fn new_writer_is_empty() { + let mut writer = MessageWriter::new(); + assert_eq!(writer.is_empty(), true); +} + +#[test] +fn single_short_message() { + let mut writer = MessageWriter::new(); + writer.push("Hello, world!"); + assert_eq!(writer.is_empty(), false); + let resulting_bytes = writer.try_pop().unwrap(); + let expected_bytes = [ + 0, 17, // Bytes in the chunk = message length (4 bytes) + message (13 bytes) + 0, 0, 0, 13, // Bytes in the message + b'H', b'e', b'l', b'l', b'o', b',', b' ', b'w', b'o', b'r', b'l', b'd', b'!', + ]; + assert_eq!(writer.is_empty(), true); + assert_eq!(resulting_bytes, expected_bytes); + assert_eq!(writer.sent_bytes, expected_bytes.len() as u64); +} + +#[test] +fn single_long_message() { + let mut writer = MessageWriter::new(); + // Because we also have to pass lengths, this will go over the sending limit + let long_message = "Q".repeat(UE_INPUT_BUFFER); + writer.push(&long_message); + assert_eq!(writer.is_empty(), false); + let resulting_bytes = writer.try_pop().unwrap(); + assert_eq!(writer.is_empty(), false); + assert_eq!(resulting_bytes.len(), 4095); + // Bytes in the chunk = 4095 - 2 = 4093 = 0x0ffd + assert_eq!(resulting_bytes[0], 0x0f); + assert_eq!(resulting_bytes[1], 0x0fd); + // Bytes in message = 4095 = 0x0fff + assert_eq!(resulting_bytes[2], 0); + assert_eq!(resulting_bytes[3], 0); + assert_eq!(resulting_bytes[4], 0x0f); + assert_eq!(resulting_bytes[5], 0x0ff); + for &byte in resulting_bytes[6..].iter() { + assert_eq!(byte, b'Q'); + } + + assert_eq!(writer.try_pop(), None); + assert_eq!(writer.is_empty(), false); + writer.update_ue_received_bytes(UE_INPUT_BUFFER as u64); + let resulting_bytes = writer.try_pop().unwrap(); + assert_eq!(writer.is_empty(), true); + // Last time we have popped all but 6 bytes from `long_message` + // Bytes in the chunk = 6 = 0x06 + assert_eq!(resulting_bytes[0], 0); + assert_eq!(resulting_bytes[1], 0x06); + assert_eq!(resulting_bytes[2..], [b'Q', b'Q', b'Q', b'Q', b'Q', b'Q']) +} + +// TODO: test that `update_ue_received_bytes()` cannot reduce that value +// TODO: test several messages, message only partially fitting into remaining buffer +// TODO: test giving just barely enough free buffer space -- 2.20.1 From 034b7860d6612703c8b7ba31537f6ee684214132 Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Sat, 24 Jul 2021 02:26:43 +0700 Subject: [PATCH 18/23] Refactor `MessageWriter` --- src/link/writer.rs | 208 +++++++++++++++++++++++++++------------------ 1 file changed, 127 insertions(+), 81 deletions(-) diff --git a/src/link/writer.rs b/src/link/writer.rs index ff7f9c6..70ae84b 100644 --- a/src/link/writer.rs +++ b/src/link/writer.rs @@ -1,79 +1,86 @@ use std::cmp::{max, min}; use std::collections::VecDeque; use std::convert::TryFrom; +use std::iter::Extend; -const LENGTH_FIELD_SIZE: usize = 4; +// Defines how many bytes is used to encode "LENGTH" field in the chunk sent to ue-server +const CHUNK_LENGTH_FIELD: usize = 2; +// Maximum amount of bytes ue-server is able to receive at once const UE_INPUT_BUFFER: usize = 4095; - -struct PendingMessage { - contents: Vec, - bytes_sent: usize, -} +// Minimal payload size (in bytes) to send, unless there is not enough data left +const MIN_PAYLOAD_SIZE: usize = 50; pub struct MessageWriter { sent_bytes: u64, ue_received_bytes: u64, - pending_messages: VecDeque, -} - -impl PendingMessage { - fn bytes_left(&self) -> usize { - max(0, self.contents.len() - self.bytes_sent) - } + pending_data: VecDeque, } +/// For converting byte stream that is expected from the ue-server into actual messages. +/// Conversion process has two steps: +/// 1. Every string message is converted into it's utf8 representation and is pre-pended with +/// it's own length in format: +/// [MESSAGE_LENGTH: length of the message in utf8 encoding | 4 bytes: u32 BE] +/// [MESSAGE: utf8-encoded string | `MESSAGE_LENGTH` bytes] +/// Resulting byte sequences from all the messages then concatenated, in order, into +/// a single data stream. +/// 2. Resulting data stream is then separated into "chunks" that can be accepted by +/// the ue-server (each no longer than `UE_INPUT_BUFFER` in total) and are sent in a format: +/// [LENGTH: length of the chunk | 2 bytes: u16 BE] +/// [PAYLOAD: utf8-encoded string | `LENGTH` bytes] +/// Use `push()` to input string messages and `try_pop()` to retrieve next chunk, if ue-server +/// can accept it. Call `update_ue_received_bytes()` to update `MessageWriter`'s information about +/// how many bytes ue-server has received so far. +/// NOTE: `try_pop()` can return `None` even if not all message data has been transferred, +/// in case ue-server's buffer does not have enough space. Use `is_empty()` call to check for +/// whether `MessageWriter` has depleted all it's data. impl MessageWriter { pub fn new() -> MessageWriter { MessageWriter { sent_bytes: 0, ue_received_bytes: 0, - pending_messages: VecDeque::with_capacity(100), + // This value should be more than enough for typical use: + // it will take at least one second to send this much data to a 30 tick rate ue-server. + pending_data: VecDeque::with_capacity(30 * UE_INPUT_BUFFER), } } pub fn push(&mut self, message: &str) { let message_as_utf8 = message.as_bytes(); - self.pending_messages.push_back(PendingMessage { - contents: [ - &(message_as_utf8.len() as u32).to_be_bytes(), - message_as_utf8, - ] - .concat(), - bytes_sent: 0, - }); + let message_as_utf8 = [ + &(message_as_utf8.len() as u32).to_be_bytes(), + message_as_utf8, + ] + .concat(); + self.pending_data.extend(message_as_utf8.into_iter()); } + /// This method will always return chunk if all remaining data will fit inside it, otherwise it + /// will wait until ue-server's buffer has enough space for at least `MIN_PAYLOAD_SIZE` bytes. pub fn try_pop(&mut self) -> Option> { - if !self.should_send_now() { + if self.is_empty() { return None; } - let max_output_bytes = self.available_ue_buffer_capacity(); - let next_payload_size = max_output_bytes - 2; - let next_message = match self.pending_messages.get_mut(0) { - Some(message) => message, - _ => return None, - }; - let first_index = next_message.bytes_sent; - let last_index = min( - next_message.bytes_sent + next_payload_size - 1, - next_message.contents.len() - 1, - ); - next_message.bytes_sent = last_index + 1; - let chunk_length = last_index - first_index + 1; - let bytes_to_send = [ - &(chunk_length as u16).to_be_bytes(), - &next_message.contents[first_index..last_index + 1], - ] - .concat(); - self.sent_bytes += bytes_to_send.len() as u64; - if next_message.bytes_sent >= next_message.contents.len() { - self.pending_messages.pop_front(); + let required_payload_size = min(self.pending_data.len(), MIN_PAYLOAD_SIZE); + let available_payload_space = self + .available_ue_buffer_capacity() + .checked_sub(CHUNK_LENGTH_FIELD) + .unwrap_or_default(); + if required_payload_size > available_payload_space { + return None; + } + let payload_size = min(available_payload_space, self.pending_data.len()); + let mut bytes_to_send = Vec::with_capacity(CHUNK_LENGTH_FIELD + payload_size); + bytes_to_send.extend((payload_size as u16).to_be_bytes().iter()); + for next_byte in self.pending_data.drain(..payload_size) { + bytes_to_send.push(next_byte); } + self.sent_bytes += bytes_to_send.len() as u64; Some(bytes_to_send) } pub fn is_empty(&self) -> bool { - self.pending_messages.is_empty() + self.pending_data.is_empty() } pub fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) { @@ -86,39 +93,27 @@ impl MessageWriter { _ => 0, } } - - fn should_send_now(&self) -> bool { - if self.pending_messages.is_empty() { - return false; - } - let bytes_left_in_message = match self.pending_messages.get(0) { - Some(message) => message.bytes_left(), - _ => return false, - }; - let max_output_bytes = self.available_ue_buffer_capacity(); - if bytes_left_in_message <= max_output_bytes { - return true; - }; - let max_bytes_in_payload = UE_INPUT_BUFFER - LENGTH_FIELD_SIZE; - let next_payload_size = max_output_bytes.checked_sub(4).unwrap_or_default(); - let chunks_in_message = (bytes_left_in_message / max_bytes_in_payload) + 1; - let chunks_in_message_after_sending = - ((bytes_left_in_message - next_payload_size) / max_bytes_in_payload) + 1; - chunks_in_message_after_sending < chunks_in_message - } } #[test] -fn new_writer_is_empty() { - let mut writer = MessageWriter::new(); +fn writer_contents_after_creation() { + let writer = MessageWriter::new(); assert_eq!(writer.is_empty(), true); + assert_eq!(writer.available_ue_buffer_capacity(), UE_INPUT_BUFFER); } #[test] -fn single_short_message() { +fn writer_content_after_push() { let mut writer = MessageWriter::new(); writer.push("Hello, world!"); assert_eq!(writer.is_empty(), false); + assert_eq!(writer.available_ue_buffer_capacity(), UE_INPUT_BUFFER); +} + +#[test] +fn writing_single_short_message() { + let mut writer = MessageWriter::new(); + writer.push("Hello, world!"); let resulting_bytes = writer.try_pop().unwrap(); let expected_bytes = [ 0, 17, // Bytes in the chunk = message length (4 bytes) + message (13 bytes) @@ -126,44 +121,95 @@ fn single_short_message() { b'H', b'e', b'l', b'l', b'o', b',', b' ', b'w', b'o', b'r', b'l', b'd', b'!', ]; assert_eq!(writer.is_empty(), true); + assert_eq!( + writer.available_ue_buffer_capacity(), + UE_INPUT_BUFFER - "Hello, world!".len() - 2 - 4 + ); assert_eq!(resulting_bytes, expected_bytes); assert_eq!(writer.sent_bytes, expected_bytes.len() as u64); } #[test] -fn single_long_message() { +fn writing_first_chunk_of_single_long_message() { let mut writer = MessageWriter::new(); // Because we also have to pass lengths, this will go over the sending limit let long_message = "Q".repeat(UE_INPUT_BUFFER); writer.push(&long_message); - assert_eq!(writer.is_empty(), false); let resulting_bytes = writer.try_pop().unwrap(); assert_eq!(writer.is_empty(), false); - assert_eq!(resulting_bytes.len(), 4095); + assert_eq!(resulting_bytes.len(), UE_INPUT_BUFFER); + assert_eq!(writer.available_ue_buffer_capacity(), 0); // Bytes in the chunk = 4095 - 2 = 4093 = 0x0ffd assert_eq!(resulting_bytes[0], 0x0f); - assert_eq!(resulting_bytes[1], 0x0fd); + assert_eq!(resulting_bytes[1], 0xfd); // Bytes in message = 4095 = 0x0fff assert_eq!(resulting_bytes[2], 0); assert_eq!(resulting_bytes[3], 0); assert_eq!(resulting_bytes[4], 0x0f); - assert_eq!(resulting_bytes[5], 0x0ff); + assert_eq!(resulting_bytes[5], 0xff); for &byte in resulting_bytes[6..].iter() { assert_eq!(byte, b'Q'); } - assert_eq!(writer.try_pop(), None); assert_eq!(writer.is_empty(), false); - writer.update_ue_received_bytes(UE_INPUT_BUFFER as u64); +} + +#[test] +fn writing_second_chunk_of_single_long_message() { + let mut writer = MessageWriter::new(); + // Because we also have to pass lengths, this will go over the sending limit + let long_message = "Q".repeat(UE_INPUT_BUFFER); + writer.push(&long_message); + // This pops all but 6 bytes of `long_message`, that were required to encode lengths of + // message and first chunk + let first_bytes = writer.try_pop().unwrap(); + writer.update_ue_received_bytes(first_bytes.len() as u64); let resulting_bytes = writer.try_pop().unwrap(); + assert_eq!( + writer.available_ue_buffer_capacity(), + UE_INPUT_BUFFER - resulting_bytes.len() + ); assert_eq!(writer.is_empty(), true); - // Last time we have popped all but 6 bytes from `long_message` - // Bytes in the chunk = 6 = 0x06 + // Bytes in the chunk = 6 assert_eq!(resulting_bytes[0], 0); - assert_eq!(resulting_bytes[1], 0x06); + assert_eq!(resulting_bytes[1], 6); assert_eq!(resulting_bytes[2..], [b'Q', b'Q', b'Q', b'Q', b'Q', b'Q']) } -// TODO: test that `update_ue_received_bytes()` cannot reduce that value -// TODO: test several messages, message only partially fitting into remaining buffer -// TODO: test giving just barely enough free buffer space +#[test] +fn will_write_small_chunks_if_no_more_data() { + let mut writer = MessageWriter::new(); + // Because we also have to pass lengths (of chunk `CHUNK_LENGTH_FIELD` amd of message `4`), + // sending this will leave us with exactly 10 free bytes in the buffer + let long_message = "Q".repeat(UE_INPUT_BUFFER / 2); + writer.push(&long_message); + writer.try_pop(); + let short_message = "Hello, world!"; + writer.push(&short_message); + let expected_bytes = [ + 0, 17, // Bytes in the chunk = message length (4 bytes) + message (13 bytes) + 0, 0, 0, 13, // Bytes in the message + b'H', b'e', b'l', b'l', b'o', b',', b' ', b'w', b'o', b'r', b'l', b'd', b'!', + ]; + // There should be enough space in the ue-server buffer to send `short_message` + let resulting_bytes = writer.try_pop().unwrap(); + assert_eq!(resulting_bytes, expected_bytes); + assert_eq!(writer.try_pop(), None); + assert_eq!(writer.is_empty(), true); +} + +#[test] +fn will_not_write_small_chunks_if_more_data_remains() { + let mut writer = MessageWriter::new(); + // Because we also have to pass lengths (of chunk `CHUNK_LENGTH_FIELD` amd of message `4`), + // sending this will leave us with exactly 10 free bytes in the buffer + let long_message = "Q".repeat(UE_INPUT_BUFFER - CHUNK_LENGTH_FIELD - 4 - 10); + writer.push(&long_message); + writer.try_pop(); + let short_message = "Hello, world!"; + writer.push(&short_message); + // `MessageWriter` can neither send full message, nor a chunk of size 10 + // (because it is too short) + assert_eq!(writer.try_pop(), None); + assert_eq!(writer.is_empty(), false); +} -- 2.20.1 From 65ef791f00536c6b6d90de84ef1a20bc63ddc596 Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Sat, 24 Jul 2021 02:36:39 +0700 Subject: [PATCH 19/23] Fix clippy warnings --- src/link/reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/link/reader.rs b/src/link/reader.rs index 511473d..03bb4ce 100644 --- a/src/link/reader.rs +++ b/src/link/reader.rs @@ -89,7 +89,7 @@ impl MessageReader { self.length_buffer[self.read_bytes] = input; self.read_bytes += 1; if self.read_bytes >= UE_RECEIVED_FIELD_SIZE { - self.ue_received_bytes += array_of_u8_to_u16(self.length_buffer) as u64; + self.ue_received_bytes += u64::from(array_of_u8_to_u16(self.length_buffer)); self.change_state(ReadingState::Head); } } -- 2.20.1 From 26b7d1dd91722496ed6313ecbf9842596a310bca Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Sat, 24 Jul 2021 04:46:19 +0700 Subject: [PATCH 20/23] Add network prototype This prototype is untested, since there is currently no client to test it with. Related to #2 --- src/link/mod.rs | 56 +++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 14 +++++-------- 2 files changed, 61 insertions(+), 9 deletions(-) diff --git a/src/link/mod.rs b/src/link/mod.rs index 70b5e9e..610c1a0 100644 --- a/src/link/mod.rs +++ b/src/link/mod.rs @@ -1,4 +1,60 @@ +use std::error::Error; +use std::io::{Read, Write}; +use std::net::{SocketAddr, TcpListener}; +use std::sync::Arc; +use std::{str, thread}; + mod reader; mod writer; pub use reader::MessageReader; pub use writer::MessageWriter; + +pub struct Link { + reader: MessageReader, + writer: MessageWriter, +} + +impl Link { + pub fn run(port: u16, handler: F) -> Result<(), Box> + where + F: Fn(&mut Link, &str) -> () + Send + Sync + 'static, + { + let address = SocketAddr::from(([0, 0, 0, 0], port)); + let listener = TcpListener::bind(address)?; + let handler = Arc::new(handler); + loop { + // Listen to new (multiple) connection + let mut reading_stream = listener.accept()?.0; + let mut writing_stream = reading_stream.try_clone()?; + let mut avarice_link = Link { + reader: MessageReader::new(), + writer: MessageWriter::new(), + }; + let handler_clone = handler.clone(); + // On connection - spawn a new thread + thread::spawn(move || loop { + let mut buffer = [0; 1024]; + // Reading cycle + match reading_stream.read(&mut buffer) { + Ok(n) => avarice_link.reader.push(&buffer[..n]).unwrap(), + _ => panic!("Connection issue!"), + }; + // Handling cycle + while let Some(message) = avarice_link.reader.pop() { + handler_clone(&mut avarice_link, &message); + } + // Writing + avarice_link + .writer + .update_ue_received_bytes(avarice_link.reader.ue_received_bytes()); + if let Some(bytes) = avarice_link.writer.try_pop() { + writing_stream.write_all(&bytes).unwrap(); + } + }); + } + } + + pub fn write(&mut self, message: &str) { + self.writer.push(message); + } +} diff --git a/src/main.rs b/src/main.rs index 7f15ba3..9378c87 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,9 @@ -use std::env; -use std::path::Path; mod link; -mod unreal_config; +use link::Link; fn main() { - let args: Vec = env::args().collect(); - let filename = &args[1]; - let config = unreal_config::load_file(Path::new(filename)); - if let Ok(config) = config { - print!("{}", config); - } + match Link::run(1234, |link, message| { link.write(message);}) { + Ok(_) => print!("Connect!"), + _ => print!("Connection error!"), + }; } -- 2.20.1 From 4f67a7b72ccfcb12668ca5d669f39d6b80557786 Mon Sep 17 00:00:00 2001 From: g Date: Sat, 24 Jul 2021 14:40:32 +0700 Subject: [PATCH 21/23] fix docs --- src/link/mod.rs | 2 ++ src/link/reader.rs | 29 ++++++++++++++++++--------- src/link/writer.rs | 50 +++++++++++++++++++++++++++++----------------- 3 files changed, 54 insertions(+), 27 deletions(-) diff --git a/src/link/mod.rs b/src/link/mod.rs index 610c1a0..6c4d781 100644 --- a/src/link/mod.rs +++ b/src/link/mod.rs @@ -1,3 +1,5 @@ +//! Implements reader and writer to use when talking to UE2 game server. + use std::error::Error; use std::io::{Read, Write}; use std::net::{SocketAddr, TcpListener}; diff --git a/src/link/reader.rs b/src/link/reader.rs index 03bb4ce..f2f49b5 100644 --- a/src/link/reader.rs +++ b/src/link/reader.rs @@ -1,3 +1,5 @@ +//! Implements reader that receives data from UE2 game server. + use std::collections::VecDeque; use std::str; @@ -32,16 +34,25 @@ enum ReadingState { Payload, } -/// For converting byte stream that is expected from the ue-server into actual messages. +/// For converting byte stream that is expected from the ue-server into actual messages. +/// /// Expected format is a sequence of either: -/// 1. [HEAD_UE_RECEIVED: marker byte | 1 byte] -/// [AMOUNT: amount of bytes received by ue-server since last update | 2 bytes: u16 BE] -/// 2. [HEAD_UE_MESSAGE: marker byte | 1 byte] -/// [LENGTH: length of the JSON message in utf8 encoding | 4 bytes: u32 BE] -/// [PAYLOAD: utf8-encoded string | `LENGTH` bytes] -/// On any invalid input enters into a failure state (can be checked by `is_broken()`) and -/// never recovers from it. -/// Use either `push_byte()` or `push()` to input byte stream from ue-server and `pop()` to +/// +/// | Data | Length | +/// |---------|---------| +/// | [`HEAD_UE_RECEIVED`] (marker byte) | 1 byte | +/// | Amount of bytes received by ue-server since last update | 2 bytes: u16 BE| +/// +/// or +/// +/// | Data | Length | +/// |---------|---------| +/// | [`HEAD_UE_MESSAGE`] (marker byte) | 1 byte| +/// | `LENGTH` of the JSON message in utf8 encoding | 4 bytes: u32 BE| +/// | UTF8-encoded string | `LENGTH` bytes| +/// +/// On any invalid input enters into a failure state (can be checked by `is_broken()`) and never recovers from it. +/// Use either `push_byte()` or `push()` to input byte stream from ue-server and `pop()` to /// retrieve resulting messages. pub struct MessageReader { is_broken: bool, diff --git a/src/link/writer.rs b/src/link/writer.rs index 70ae84b..adfddb2 100644 --- a/src/link/writer.rs +++ b/src/link/writer.rs @@ -1,3 +1,5 @@ +//! Implements writer that sends data to UE2 game server. + use std::cmp::{max, min}; use std::collections::VecDeque; use std::convert::TryFrom; @@ -10,30 +12,42 @@ const UE_INPUT_BUFFER: usize = 4095; // Minimal payload size (in bytes) to send, unless there is not enough data left const MIN_PAYLOAD_SIZE: usize = 50; +/// For converting byte stream that is expected from the ue-server into actual messages. +/// Conversion process has two steps: +/// 1. Every string message is converted into it's utf8 representation and is pre-pended with +/// it's own length in format: +/// +/// | Data | Length | +/// |---------|---------| +/// | Message `LENGTH` | 4 bytes: u32 BE | +/// | UTF8-encoded string | `LENGTH` bytes| +/// +/// Resulting byte sequences from all the messages then concatenated, in order, into +/// a single data stream. +/// +/// 2. Resulting data stream is then separated into "chunks" that can be accepted by +/// the ue-server (each no longer than `UE_INPUT_BUFFER` in total) and are sent in a format: +/// +/// | Data | Length | +/// |---------|---------| +/// | Chunk `LENGTH` | 2 bytes: u16 BE | +/// | UTF8-encoded string | `LENGTH` bytes| +/// +/// Use `push()` to input string messages and `try_pop()` to retrieve next chunk, if ue-server +/// can accept it. +/// NOTE: `try_pop()` can return `None` even if not all message data has been transferred, +/// in case ue-server's buffer does not have enough space. +/// +/// Call `update_ue_received_bytes()` to update `MessageWriter`'s information about +/// how many bytes ue-server has received so far. +/// +/// Use `is_empty()` call to check for whether `MessageWriter` has depleted all it's data. pub struct MessageWriter { sent_bytes: u64, ue_received_bytes: u64, pending_data: VecDeque, } -/// For converting byte stream that is expected from the ue-server into actual messages. -/// Conversion process has two steps: -/// 1. Every string message is converted into it's utf8 representation and is pre-pended with -/// it's own length in format: -/// [MESSAGE_LENGTH: length of the message in utf8 encoding | 4 bytes: u32 BE] -/// [MESSAGE: utf8-encoded string | `MESSAGE_LENGTH` bytes] -/// Resulting byte sequences from all the messages then concatenated, in order, into -/// a single data stream. -/// 2. Resulting data stream is then separated into "chunks" that can be accepted by -/// the ue-server (each no longer than `UE_INPUT_BUFFER` in total) and are sent in a format: -/// [LENGTH: length of the chunk | 2 bytes: u16 BE] -/// [PAYLOAD: utf8-encoded string | `LENGTH` bytes] -/// Use `push()` to input string messages and `try_pop()` to retrieve next chunk, if ue-server -/// can accept it. Call `update_ue_received_bytes()` to update `MessageWriter`'s information about -/// how many bytes ue-server has received so far. -/// NOTE: `try_pop()` can return `None` even if not all message data has been transferred, -/// in case ue-server's buffer does not have enough space. Use `is_empty()` call to check for -/// whether `MessageWriter` has depleted all it's data. impl MessageWriter { pub fn new() -> MessageWriter { MessageWriter { -- 2.20.1 From a6ac0b7a170db343a54b9965b8ed6eb3c47f3753 Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Thu, 29 Jul 2021 17:41:28 +0700 Subject: [PATCH 22/23] Change `MessageWriter`'s protocol There is no need to send length of each data chunk. This patch simplifies protocol by only sending length of each message. --- src/link/writer.rs | 122 ++++++++++++--------------------------------- 1 file changed, 31 insertions(+), 91 deletions(-) diff --git a/src/link/writer.rs b/src/link/writer.rs index adfddb2..e769cf1 100644 --- a/src/link/writer.rs +++ b/src/link/writer.rs @@ -5,43 +5,34 @@ use std::collections::VecDeque; use std::convert::TryFrom; use std::iter::Extend; -// Defines how many bytes is used to encode "LENGTH" field in the chunk sent to ue-server -const CHUNK_LENGTH_FIELD: usize = 2; // Maximum amount of bytes ue-server is able to receive at once const UE_INPUT_BUFFER: usize = 4095; -// Minimal payload size (in bytes) to send, unless there is not enough data left -const MIN_PAYLOAD_SIZE: usize = 50; -/// For converting byte stream that is expected from the ue-server into actual messages. -/// Conversion process has two steps: -/// 1. Every string message is converted into it's utf8 representation and is pre-pended with -/// it's own length in format: +/// For converting text messages into chunks of bytes that can be sent to the ue-server. +/// +/// Every string message is converted into a length-prefixed array of utf8 bytes: /// /// | Data | Length | /// |---------|---------| /// | Message `LENGTH` | 4 bytes: u32 BE | /// | UTF8-encoded string | `LENGTH` bytes| /// -/// Resulting byte sequences from all the messages then concatenated, in order, into -/// a single data stream. -/// -/// 2. Resulting data stream is then separated into "chunks" that can be accepted by -/// the ue-server (each no longer than `UE_INPUT_BUFFER` in total) and are sent in a format: -/// -/// | Data | Length | -/// |---------|---------| -/// | Chunk `LENGTH` | 2 bytes: u16 BE | -/// | UTF8-encoded string | `LENGTH` bytes| +/// Resulting byte sequences from all messages are then concatenated (in the same order as they were +/// "written") into a single data stream. Bytes from the data stream are returned in chunks of +/// size no more than `UE_INPUT_BUFFER`. New chunk is returned only when `MessageWriter` knows +/// that ue-server's buffer has enough space to accept it. /// -/// Use `push()` to input string messages and `try_pop()` to retrieve next chunk, if ue-server -/// can accept it. -/// NOTE: `try_pop()` can return `None` even if not all message data has been transferred, -/// in case ue-server's buffer does not have enough space. +/// Use `push()` to input string messages and `try_pop()` to retrieve next chunk. +/// NOTE: `try_pop()` can return `None` even if not all message data has been returned, +/// in case `MessageWriter` thinks that ue-server's buffer does not have enough space. /// /// Call `update_ue_received_bytes()` to update `MessageWriter`'s information about -/// how many bytes ue-server has received so far. +/// how many bytes ue-server has received so far. This can signal that its buffer has enough +/// free space. `MessageWriter` assumes that all data returned by its `try_pop()` method is sent +/// to ue-server. /// -/// Use `is_empty()` call to check for whether `MessageWriter` has depleted all it's data. +/// Use `is_empty()` method to check for whether `MessageWriter` still has some data to return +/// (possibly after ``update_ue_received_bytes()`). pub struct MessageWriter { sent_bytes: u64, ue_received_bytes: u64, @@ -69,24 +60,16 @@ impl MessageWriter { self.pending_data.extend(message_as_utf8.into_iter()); } - /// This method will always return chunk if all remaining data will fit inside it, otherwise it - /// will wait until ue-server's buffer has enough space for at least `MIN_PAYLOAD_SIZE` bytes. pub fn try_pop(&mut self) -> Option> { if self.is_empty() { return None; } - let required_payload_size = min(self.pending_data.len(), MIN_PAYLOAD_SIZE); - let available_payload_space = self - .available_ue_buffer_capacity() - .checked_sub(CHUNK_LENGTH_FIELD) - .unwrap_or_default(); - if required_payload_size > available_payload_space { + let chunk_size = min(self.available_ue_buffer_capacity(), self.pending_data.len()); + if chunk_size == 0 { return None; } - let payload_size = min(available_payload_space, self.pending_data.len()); - let mut bytes_to_send = Vec::with_capacity(CHUNK_LENGTH_FIELD + payload_size); - bytes_to_send.extend((payload_size as u16).to_be_bytes().iter()); - for next_byte in self.pending_data.drain(..payload_size) { + let mut bytes_to_send = Vec::with_capacity(chunk_size); + for next_byte in self.pending_data.drain(..chunk_size) { bytes_to_send.push(next_byte); } self.sent_bytes += bytes_to_send.len() as u64; @@ -97,6 +80,8 @@ impl MessageWriter { self.pending_data.is_empty() } + /// Takes total amount of bytes received so far by the ue-server, not just bytes received after + /// the last `update_ue_received_bytes()` call. pub fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) { self.ue_received_bytes = max(ue_received_bytes, self.ue_received_bytes); } @@ -130,14 +115,13 @@ fn writing_single_short_message() { writer.push("Hello, world!"); let resulting_bytes = writer.try_pop().unwrap(); let expected_bytes = [ - 0, 17, // Bytes in the chunk = message length (4 bytes) + message (13 bytes) 0, 0, 0, 13, // Bytes in the message b'H', b'e', b'l', b'l', b'o', b',', b' ', b'w', b'o', b'r', b'l', b'd', b'!', ]; assert_eq!(writer.is_empty(), true); assert_eq!( writer.available_ue_buffer_capacity(), - UE_INPUT_BUFFER - "Hello, world!".len() - 2 - 4 + UE_INPUT_BUFFER - "Hello, world!".len() - 4 ); assert_eq!(resulting_bytes, expected_bytes); assert_eq!(writer.sent_bytes, expected_bytes.len() as u64); @@ -146,22 +130,19 @@ fn writing_single_short_message() { #[test] fn writing_first_chunk_of_single_long_message() { let mut writer = MessageWriter::new(); - // Because we also have to pass lengths, this will go over the sending limit + // Because we also have to pass message length, this will go over the sending limit let long_message = "Q".repeat(UE_INPUT_BUFFER); writer.push(&long_message); let resulting_bytes = writer.try_pop().unwrap(); assert_eq!(writer.is_empty(), false); assert_eq!(resulting_bytes.len(), UE_INPUT_BUFFER); assert_eq!(writer.available_ue_buffer_capacity(), 0); - // Bytes in the chunk = 4095 - 2 = 4093 = 0x0ffd - assert_eq!(resulting_bytes[0], 0x0f); - assert_eq!(resulting_bytes[1], 0xfd); // Bytes in message = 4095 = 0x0fff - assert_eq!(resulting_bytes[2], 0); - assert_eq!(resulting_bytes[3], 0); - assert_eq!(resulting_bytes[4], 0x0f); - assert_eq!(resulting_bytes[5], 0xff); - for &byte in resulting_bytes[6..].iter() { + assert_eq!(resulting_bytes[0], 0); + assert_eq!(resulting_bytes[1], 0); + assert_eq!(resulting_bytes[2], 0x0f); + assert_eq!(resulting_bytes[3], 0xff); + for &byte in resulting_bytes[4..].iter() { assert_eq!(byte, b'Q'); } assert_eq!(writer.try_pop(), None); @@ -174,8 +155,7 @@ fn writing_second_chunk_of_single_long_message() { // Because we also have to pass lengths, this will go over the sending limit let long_message = "Q".repeat(UE_INPUT_BUFFER); writer.push(&long_message); - // This pops all but 6 bytes of `long_message`, that were required to encode lengths of - // message and first chunk + // This pops all but 4 bytes of `long_message`, that were required to encode message length let first_bytes = writer.try_pop().unwrap(); writer.update_ue_received_bytes(first_bytes.len() as u64); let resulting_bytes = writer.try_pop().unwrap(); @@ -184,46 +164,6 @@ fn writing_second_chunk_of_single_long_message() { UE_INPUT_BUFFER - resulting_bytes.len() ); assert_eq!(writer.is_empty(), true); - // Bytes in the chunk = 6 - assert_eq!(resulting_bytes[0], 0); - assert_eq!(resulting_bytes[1], 6); - assert_eq!(resulting_bytes[2..], [b'Q', b'Q', b'Q', b'Q', b'Q', b'Q']) -} - -#[test] -fn will_write_small_chunks_if_no_more_data() { - let mut writer = MessageWriter::new(); - // Because we also have to pass lengths (of chunk `CHUNK_LENGTH_FIELD` amd of message `4`), - // sending this will leave us with exactly 10 free bytes in the buffer - let long_message = "Q".repeat(UE_INPUT_BUFFER / 2); - writer.push(&long_message); - writer.try_pop(); - let short_message = "Hello, world!"; - writer.push(&short_message); - let expected_bytes = [ - 0, 17, // Bytes in the chunk = message length (4 bytes) + message (13 bytes) - 0, 0, 0, 13, // Bytes in the message - b'H', b'e', b'l', b'l', b'o', b',', b' ', b'w', b'o', b'r', b'l', b'd', b'!', - ]; - // There should be enough space in the ue-server buffer to send `short_message` - let resulting_bytes = writer.try_pop().unwrap(); - assert_eq!(resulting_bytes, expected_bytes); - assert_eq!(writer.try_pop(), None); - assert_eq!(writer.is_empty(), true); -} - -#[test] -fn will_not_write_small_chunks_if_more_data_remains() { - let mut writer = MessageWriter::new(); - // Because we also have to pass lengths (of chunk `CHUNK_LENGTH_FIELD` amd of message `4`), - // sending this will leave us with exactly 10 free bytes in the buffer - let long_message = "Q".repeat(UE_INPUT_BUFFER - CHUNK_LENGTH_FIELD - 4 - 10); - writer.push(&long_message); - writer.try_pop(); - let short_message = "Hello, world!"; - writer.push(&short_message); - // `MessageWriter` can neither send full message, nor a chunk of size 10 - // (because it is too short) - assert_eq!(writer.try_pop(), None); - assert_eq!(writer.is_empty(), false); + // Bytes left for the next chunk = 4 + assert_eq!(resulting_bytes, [b'Q', b'Q', b'Q', b'Q']) } -- 2.20.1 From cfe680d7702c7fad3801726024d46128fa0175ba Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Wed, 4 Aug 2021 04:29:20 +0700 Subject: [PATCH 23/23] Refactor `link` module for use in one thread While `link` module is still spawning new threads to handle new connections and reading data from connected clients - it now collects all the received messages in one thread, simplifying their handling. --- Cargo.lock | 31 ++++++++++ Cargo.toml | 4 +- src/link/message.rs | 38 ++++++++++++ src/link/mod.rs | 139 ++++++++++++++++++++++++++++++-------------- src/link/network.rs | 106 +++++++++++++++++++++++++++++++++ src/main.rs | 15 +++-- 6 files changed, 282 insertions(+), 51 deletions(-) create mode 100644 src/link/message.rs create mode 100644 src/link/network.rs diff --git a/Cargo.lock b/Cargo.lock index 11949f5..2f72667 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5,6 +5,8 @@ name = "avarice" version = "0.1.0" dependencies = [ "custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -12,5 +14,34 @@ name = "custom_error" version = "1.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "itoa" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "ryu" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "serde" +version = "1.0.126" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "serde_json" +version = "1.0.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "itoa 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", + "ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)", +] + [metadata] "checksum custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4f8a51dd197fa6ba5b4dc98a990a43cc13693c23eb0089ebb0fcc1f04152bca6" +"checksum itoa 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" +"checksum ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" +"checksum serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)" = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03" +"checksum serde_json 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)" = "336b10da19a12ad094b59d870ebde26a45402e5b470add4b5fd03c5048a32127" diff --git a/Cargo.toml b/Cargo.toml index 58d91ba..af73fa0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,4 +7,6 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -custom_error="1.9.2" \ No newline at end of file +custom_error="1.*" +serde ="1.*" +serde_json="1.*" \ No newline at end of file diff --git a/src/link/message.rs b/src/link/message.rs new file mode 100644 index 0000000..03049ac --- /dev/null +++ b/src/link/message.rs @@ -0,0 +1,38 @@ +//! Implements Avarice message: target service, message type, json parameters +use serde_json; +use serde_json::json; +use std::fmt; + +pub struct AvariceMessage { + pub service: String, + pub message_type: String, + pub parameters: serde_json::Value, +} + +impl AvariceMessage { + /// Parses JSON form of a message into `AvariceMessage` struct + pub fn from(message_str: &str) -> Option { + let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap(); + let message_json = message_json.as_object_mut()?; + Some(AvariceMessage { + service: message_json.remove("s")?.as_str()?.to_owned(), + message_type: message_json.remove("t")?.as_str()?.to_owned(), + parameters: message_json.remove("p")?, + }) + } +} + +impl fmt::Display for AvariceMessage { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "{}", + json!({ + "s": self.service, + "t": self.message_type, + "p": self.parameters, + }) + .to_string() + ) + } +} diff --git a/src/link/mod.rs b/src/link/mod.rs index 6c4d781..72ba9b4 100644 --- a/src/link/mod.rs +++ b/src/link/mod.rs @@ -1,62 +1,111 @@ -//! Implements reader and writer to use when talking to UE2 game server. - +//! This module provides a simple interface to exchange messages (`message::AvariceMessage`) between +//! Avarice and ue-server. The model is simple - we create an mpsc-channel of `Sender` and +//! `Receiver`, then pass `Sender` to the `network` sub-module and wait for its messages about +//! ue-servers' connections using `Receiver`. +use std::collections::HashMap; use std::error::Error; -use std::io::{Read, Write}; -use std::net::{SocketAddr, TcpListener}; -use std::sync::Arc; -use std::{str, thread}; +use std::io::Write; +use std::net::{SocketAddr, TcpStream}; +use std::sync::mpsc::{channel, Receiver}; +pub use writer::MessageWriter; +mod message; +mod network; mod reader; mod writer; -pub use reader::MessageReader; -pub use writer::MessageWriter; +pub use message::AvariceMessage; +pub use network::{run_server, NetworkMessage}; + +/// For collecting messages from all connected ue-servers and providing a way to reply back. +pub struct AvariceServer { + connected_links: HashMap, + receiver: Receiver, +} +/// For representing a link to one of the connected ue-servers, can be used to send messages back. +/// To receive messages use `AvariceServer`'s `next()` method instead. pub struct Link { - reader: MessageReader, + ue_server_address: SocketAddr, writer: MessageWriter, + writing_stream: TcpStream, } -impl Link { - pub fn run(port: u16, handler: F) -> Result<(), Box> - where - F: Fn(&mut Link, &str) -> () + Send + Sync + 'static, - { - let address = SocketAddr::from(([0, 0, 0, 0], port)); - let listener = TcpListener::bind(address)?; - let handler = Arc::new(handler); +impl AvariceServer { + /// Blocks until a new message arrives from one of the ue-servers. Returns a pair of `Link`, + /// corresponding to the ue-server that sent next message and `AvariceMessage`, + /// representing that message. + pub fn next(&mut self) -> Option<(&mut Link, AvariceMessage)> { loop { - // Listen to new (multiple) connection - let mut reading_stream = listener.accept()?.0; - let mut writing_stream = reading_stream.try_clone()?; - let mut avarice_link = Link { - reader: MessageReader::new(), - writer: MessageWriter::new(), - }; - let handler_clone = handler.clone(); - // On connection - spawn a new thread - thread::spawn(move || loop { - let mut buffer = [0; 1024]; - // Reading cycle - match reading_stream.read(&mut buffer) { - Ok(n) => avarice_link.reader.push(&buffer[..n]).unwrap(), - _ => panic!("Connection issue!"), - }; - // Handling cycle - while let Some(message) = avarice_link.reader.pop() { - handler_clone(&mut avarice_link, &message); + match self.receiver.recv() { + Ok(NetworkMessage::ConnectionEstablished(ue_server_address, writing_stream)) => { + // If `ue_server_address` was already present in `self.connected_links` + // hash map, then it means we have failed to clean it up after it + // has disconnected. We can just throw away the old value here. + self.connected_links.insert( + ue_server_address, + Link { + ue_server_address, + writing_stream, + writer: MessageWriter::new(), + }, + ); + continue; + } + Ok(NetworkMessage::ConnectionLost(ue_server_address)) => { + self.connected_links.remove(&ue_server_address); + continue; + } + Ok(NetworkMessage::InvalidDataReceived(ue_server_address)) => { + self.connected_links.remove(&ue_server_address); + continue; } - // Writing - avarice_link - .writer - .update_ue_received_bytes(avarice_link.reader.ue_received_bytes()); - if let Some(bytes) = avarice_link.writer.try_pop() { - writing_stream.write_all(&bytes).unwrap(); + Ok(NetworkMessage::UEReceivedUpdate(ue_server_address, ue_received_bytes)) => { + if let Some(link) = self.connected_links.get_mut(&ue_server_address) { + link.update_ue_received_bytes(ue_received_bytes) + } + continue; } - }); + Ok(NetworkMessage::MessageReceived(ue_server_address, message)) => { + // Not having a link with key `ue_server_address` should be impossible here + return self + .connected_links + .get_mut(&ue_server_address) + .and_then(|x| Some((x, message))); + } + _ => return None, + } } } +} + +impl Link { + pub fn send(&mut self, message: AvariceMessage) { + self.writer.push(&message.to_string()); + self.flush(); + } + + pub fn socket_address(&self) -> SocketAddr { + self.ue_server_address + } - pub fn write(&mut self, message: &str) { - self.writer.push(message); + fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) { + self.writer.update_ue_received_bytes(ue_received_bytes); + self.flush(); } + + fn flush(&mut self) { + if let Some(bytes) = self.writer.try_pop() { + self.writing_stream.write_all(&bytes).unwrap(); + } + } +} + +/// Creates a new `AvariceServer` that will listen for ue-server connections on the specified port. +pub fn start_avarice(port: u16) -> Result> { + let (sender, receiver) = channel(); + run_server(port, sender)?; + Ok(AvariceServer { + connected_links: HashMap::new(), + receiver, + }) } diff --git a/src/link/network.rs b/src/link/network.rs new file mode 100644 index 0000000..9424240 --- /dev/null +++ b/src/link/network.rs @@ -0,0 +1,106 @@ +//! Implements a network model where messages from all the ue-servers are collected in a single +//! main thread. For that we spawn a new thread that listens for new connections, which in turn +//! spawns a new thread for every connected ue-server to handle reading data from it. +//! Since all reading is handled in ue-servers' own threads, to collect messages they have +//! received in the main thread we use `std::sync::mpsc::Sender`. Conversely, all the writing to +//! ue-server is handled in the main thread itself. Writing `TcpStream` is sent to the main thread +//! by the same `std::sync::mpsc::Sender` object. +use super::message::AvariceMessage; +pub use super::reader::MessageReader; +use std::error::Error; +use std::io::Read; +use std::net::{SocketAddr, TcpListener, TcpStream}; +use std::sync::mpsc::Sender; +use std::thread; + +pub struct UEConnection { + pub address: SocketAddr, + pub reader: MessageReader, + pub reading_stream: TcpStream, + pub message_sender: Sender, +} + +/// Possible messages to the main thread +pub enum NetworkMessage { + ConnectionEstablished(SocketAddr, TcpStream), + InvalidDataReceived(SocketAddr), + ConnectionLost(SocketAddr), + MessageReceived(SocketAddr, AvariceMessage), + UEReceivedUpdate(SocketAddr, u64), +} + +pub fn run_server(port: u16, message_sender: Sender) -> Result<(), Box> { + let address = SocketAddr::from(([0, 0, 0, 0], port)); + let listener = TcpListener::bind(address)?; + thread::spawn(move || loop { + // Listen to new (multiple) connection + let (reading_stream, address) = listener.accept().unwrap(); + let writing_stream = reading_stream.try_clone().unwrap(); + message_sender + .send(NetworkMessage::ConnectionEstablished( + address, + writing_stream, + )) + .unwrap(); + // On connection - spawn a new thread + let sender_clone = message_sender.clone(); + thread::spawn(move || { + manage_connection(UEConnection { + reader: MessageReader::new(), + message_sender: sender_clone, + reading_stream, + address, + }) + }); + }); + Ok(()) +} + +fn manage_connection(mut connection: UEConnection) { + let mut buffer = [0; 1024]; + loop { + // Reading cycle + match connection.reading_stream.read(&mut buffer) { + Ok(n) => connection.reader.push(&buffer[..n]).unwrap(), + _ => { + connection + .message_sender + .send(NetworkMessage::ConnectionLost(connection.address)) + .unwrap(); + return; + } + }; + if connection.reader.is_broken() { + connection + .message_sender + .send(NetworkMessage::InvalidDataReceived(connection.address)) + .unwrap(); + return; + } + // Decoding cycle + while let Some(text_message) = connection.reader.pop() { + if let Some(avarice_message) = AvariceMessage::from(&text_message) { + connection + .message_sender + .send(NetworkMessage::MessageReceived( + connection.address, + avarice_message, + )) + .unwrap(); + } else { + connection + .message_sender + .send(NetworkMessage::InvalidDataReceived(connection.address)) + .unwrap(); + return; + } + } + connection + .message_sender + .send(NetworkMessage::UEReceivedUpdate( + connection.address, + connection.reader.ue_received_bytes(), + )) + .unwrap(); + } +} diff --git a/src/main.rs b/src/main.rs index 9378c87..59c341b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,14 @@ +// ! This utility provides a way to communicate with Unreal Engine 2 servers running Acedia mutator. mod link; -use link::Link; +use link::start_avarice; fn main() { - match Link::run(1234, |link, message| { link.write(message);}) { - Ok(_) => print!("Connect!"), - _ => print!("Connection error!"), - }; + let mut server = start_avarice(1234).unwrap(); + while let Some((link, message)) = server.next() { + println!("{}: {}", link.socket_address(), message.to_string()); + if message.message_type != "end" { + link.send(message); + } + } + println!("Avarice has shut down!"); } -- 2.20.1