From 0a868521974fd84859506a3f9ef4d427bf05c2b9 Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Wed, 21 Jul 2021 03:40:29 +0700 Subject: [PATCH] Add `MessageReader` implementation I've decided that it would be a good idea to separete compoments that transform text messages into byte chunks to send to Unreal Engine and back. This means that each of them will have only one responsibility and using them will be easier in case we decide to read and write via tcp/ip in separate threads. The only interesting to them both parameter is the amount of bytes that were confirmed to be received from the Unreal Engine side, which can be easily shared "by hand". This patch implements "reader" that accepts byte stream expected from the game server and converts it into: 1. separate string messages; 2. amount of bytes game server reported to already have received. --- Cargo.lock | 10 ++ Cargo.toml | 1 + src/link/mod.rs | 300 ++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 1 + 4 files changed, 312 insertions(+) create mode 100644 src/link/mod.rs diff --git a/Cargo.lock b/Cargo.lock index b0eb9bf..11949f5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3,4 +3,14 @@ [[package]] name = "avarice" version = "0.1.0" +dependencies = [ + "custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)", +] +[[package]] +name = "custom_error" +version = "1.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[metadata] +"checksum custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4f8a51dd197fa6ba5b4dc98a990a43cc13693c23eb0089ebb0fcc1f04152bca6" diff --git a/Cargo.toml b/Cargo.toml index e8facc0..58d91ba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,3 +7,4 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +custom_error="1.9.2" \ No newline at end of file diff --git a/src/link/mod.rs b/src/link/mod.rs new file mode 100644 index 0000000..0a317a6 --- /dev/null +++ b/src/link/mod.rs @@ -0,0 +1,300 @@ +use std::collections::VecDeque; +use std::str; + +extern crate custom_error; +use custom_error::custom_error; + +const RECEIVED_FIELD_SIZE: usize = 4; +const LENGTH_FIELD_SIZE: usize = 4; +const HEAD_RECEIVED: u8 = 85; +const HEAD_MESSAGE: u8 = 42; +const MAX_MESSAGE_LENGTH: usize = 0xA00000; + +custom_error! { pub ReadingStreamError + InvalidHead{input: u8} = "Invalid byte used as a HEAD: {input}", + MessageTooLong{length: usize} = "Message to receive is too long: {length}", + InvalidUnicode = "Invalid utf-8 was received", + BrokenStream = "Used stream is broken" +} + +enum ReadingState { + Head, + ReceivedBytes, + Length, + Payload, +} + +pub struct MessageReader { + is_broken: bool, + reading_state: ReadingState, + read_bytes: usize, + current_message_length: usize, + current_message: Vec, + read_messages: VecDeque, + next_received_bytes: u32, + received_bytes: u64, +} + +/// For converting byte stream expected to be generated by Acedia mod from the game server into +/// actual messages. Expected format is a sequence of either: +/// [HEAD_RECEIVED: 1 byte] [amount of bytes received by game server since last update: 4 bytes] +/// [HEAD_MESSAGE: 1 byte] [length of the message: 4 bytes] [utf8-encoded string: ??? bytes] +/// On any invalid input enters a failure state (can be checked by `is_broken()`) and +/// never recovers from it. +/// Use either `push_byte()` or `push()` to input byte stream from game server and `pop()` to +/// retrieve resulting messages. +impl MessageReader { + pub fn new() -> MessageReader { + MessageReader { + is_broken: false, + reading_state: ReadingState::Head, + read_bytes: 0, + current_message_length: 0, + current_message: Vec::new(), + read_messages: VecDeque::new(), + next_received_bytes: 0, + received_bytes: 0, + } + } + + pub fn push_byte(&mut self, input: u8) -> Result<(), ReadingStreamError> { + if self.is_broken { + return Err(ReadingStreamError::BrokenStream); + } + match &self.reading_state { + ReadingState::Head => { + if input == HEAD_RECEIVED { + self.reading_state = ReadingState::ReceivedBytes; + } else if input == HEAD_MESSAGE { + self.reading_state = ReadingState::Length; + } else { + self.is_broken = true; + return Err(ReadingStreamError::InvalidHead { input }); + } + } + ReadingState::ReceivedBytes => { + self.next_received_bytes = self.next_received_bytes << 8; + self.next_received_bytes += input as u32; + self.read_bytes += 1; + if self.read_bytes >= RECEIVED_FIELD_SIZE { + self.received_bytes += self.next_received_bytes as u64; + self.next_received_bytes = 0; + self.read_bytes = 0; + self.reading_state = ReadingState::Head; + } + } + ReadingState::Length => { + self.current_message_length = self.current_message_length << 8; + self.current_message_length += input as usize; + self.read_bytes += 1; + if self.read_bytes >= LENGTH_FIELD_SIZE { + self.read_bytes = 0; + self.reading_state = ReadingState::Payload; + if self.current_message_length > MAX_MESSAGE_LENGTH { + self.is_broken = true; + return Err(ReadingStreamError::MessageTooLong { + length: self.current_message_length, + }); + } + self.current_message = Vec::with_capacity(self.current_message_length); + } + } + ReadingState::Payload => { + self.current_message.push(input); + self.read_bytes += 1 as usize; + if self.read_bytes >= self.current_message_length { + match str::from_utf8(&self.current_message) { + Ok(next_message) => self.read_messages.push_front(next_message.to_owned()), + _ => { + self.is_broken = true; + return Err(ReadingStreamError::InvalidUnicode); + } + }; + self.current_message.clear(); + self.current_message_length = 0; + self.read_bytes = 0; + self.reading_state = ReadingState::Head; + } + } + } + Ok(()) + } + + pub fn push(&mut self, input: &[u8]) -> Result<(), ReadingStreamError> { + for &byte in input { + self.push_byte(byte)?; + } + Ok(()) + } + + pub fn pop(&mut self) -> Option { + self.read_messages.pop_back() + } + + pub fn received_bytes(&self) -> u64 { + self.received_bytes + } + + pub fn is_broken(&self) -> bool { + self.is_broken + } +} + +#[test] +fn message_push_byte() { + let mut reader = MessageReader::new(); + reader.push_byte(HEAD_MESSAGE).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(13).unwrap(); + reader.push_byte(0x48).unwrap(); // H + reader.push_byte(0x65).unwrap(); // e + reader.push_byte(0x6c).unwrap(); // l + reader.push_byte(0x6c).unwrap(); // l + reader.push_byte(0x6f).unwrap(); // o + reader.push_byte(0x2c).unwrap(); // , + reader.push_byte(0x20).unwrap(); // + reader.push_byte(0x77).unwrap(); // w + reader.push_byte(0x6f).unwrap(); // o + reader.push_byte(0x72).unwrap(); // r + reader.push_byte(0x6c).unwrap(); // l + reader.push_byte(0x64).unwrap(); // d + reader.push_byte(0x21).unwrap(); // + reader.push_byte(HEAD_MESSAGE).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(3).unwrap(); + reader.push_byte(0x59).unwrap(); // Y + reader.push_byte(0x6f).unwrap(); // o + reader.push_byte(0x21).unwrap(); // + assert_eq!(reader.pop().unwrap(), "Hello, world!"); + assert_eq!(reader.pop().unwrap(), "Yo!"); + assert_eq!(reader.pop(), None); +} + +#[test] +fn received_push_byte() { + let mut reader = MessageReader::new(); + reader.push_byte(HEAD_RECEIVED).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(243).unwrap(); + assert_eq!(reader.received_bytes(), 243); + reader.push_byte(HEAD_RECEIVED).unwrap(); + reader.push_byte(65).unwrap(); + reader.push_byte(25).unwrap(); + reader.push_byte(178).unwrap(); + reader.push_byte(4).unwrap(); + assert_eq!(reader.received_bytes(), 1092203255); + reader.push_byte(HEAD_RECEIVED).unwrap(); + reader.push_byte(231).unwrap(); + reader.push_byte(34).unwrap(); + reader.push_byte(154).unwrap(); + assert_eq!(reader.received_bytes(), 1092203255); +} + +#[test] +fn mixed_push_byte() { + let mut reader = MessageReader::new(); + reader.push_byte(HEAD_RECEIVED).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(243).unwrap(); + reader.push_byte(HEAD_MESSAGE).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(3).unwrap(); + reader.push_byte(0x59).unwrap(); // Y + reader.push_byte(0x6f).unwrap(); // o + reader.push_byte(0x21).unwrap(); // + reader.push_byte(HEAD_RECEIVED).unwrap(); + reader.push_byte(65).unwrap(); + reader.push_byte(25).unwrap(); + reader.push_byte(178).unwrap(); + reader.push_byte(4).unwrap(); + assert_eq!(reader.received_bytes(), 1092203255); + assert_eq!(reader.pop().unwrap(), "Yo!"); + assert_eq!(reader.pop(), None); +} + +#[test] +fn pushing_many_bytes_at_once() { + let mut reader = MessageReader::new(); + reader + .push(&[ + HEAD_RECEIVED, + 0, + 0, + 0, + 243, + HEAD_MESSAGE, + 0, + 0, + 0, + 3, + 0x59, // Y + 0x6f, // o + 0x21, // + HEAD_RECEIVED, + 65, + 25, + 178, + 4, + ]) + .unwrap(); + assert_eq!(reader.received_bytes(), 1092203255); + assert_eq!(reader.pop().unwrap(), "Yo!"); + assert_eq!(reader.pop(), None); +} + +#[test] +fn generates_error_invalid_head() { + let mut reader = MessageReader::new(); + reader.push_byte(HEAD_RECEIVED).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(243).unwrap(); + assert!(!reader.is_broken()); + reader + .push_byte(25) + .expect_err("Testing failing on incorrect HEAD"); + assert!(reader.is_broken()); +} + +#[test] +fn generates_error_message_too_long() { + let mut reader = MessageReader::new(); + reader.push_byte(HEAD_MESSAGE).unwrap(); + // Max length right now is `0xA00000` + reader.push_byte(0).unwrap(); + reader.push_byte(0xA0).unwrap(); + reader.push_byte(0).unwrap(); + assert!(!reader.is_broken()); + reader + .push_byte(1) + .expect_err("Testing failing on exceeding allowed message length"); + assert!(reader.is_broken()); +} + +#[test] +fn generates_error_invalid_unicode() { + let mut reader = MessageReader::new(); + reader.push_byte(HEAD_MESSAGE).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(2).unwrap(); + reader.push_byte(0b11010011).unwrap(); // start of 2-byte sequence + assert!(!reader.is_broken()); + // Bytes inside multi-byte code point have to have `1` for their high bit + reader + .push_byte(0b01010011) + .expect_err("Testing failing on incorrect unicode"); + assert!(reader.is_broken()); +} diff --git a/src/main.rs b/src/main.rs index 8f664c9..604053c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,6 @@ use std::env; use std::path::Path; +mod link; mod unreal_config; fn main() {