Browse Source

Add `MessageReader` implementation

I've decided that it would be a good idea to separete
compoments that transform text messages into byte chunks to send to
Unreal Engine and back. This means that each of them will have only one
responsibility and using them will be easier in case we decide to read
and write via tcp/ip in separate threads. The only interesting to them
both parameter is the amount of bytes that were confirmed to be received
from the Unreal Engine side, which can be easily shared "by hand".

This patch implements "reader" that accepts byte stream expected from
the game server and converts it into:
1. separate string messages;
2. amount of bytes game server reported to already have received.
feature_link
Anton Tarasenko 3 years ago
parent
commit
0a86852197
  1. 10
      Cargo.lock
  2. 1
      Cargo.toml
  3. 300
      src/link/mod.rs
  4. 1
      src/main.rs

10
Cargo.lock generated

@ -3,4 +3,14 @@
[[package]] [[package]]
name = "avarice" name = "avarice"
version = "0.1.0" version = "0.1.0"
dependencies = [
"custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "custom_error"
version = "1.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[metadata]
"checksum custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4f8a51dd197fa6ba5b4dc98a990a43cc13693c23eb0089ebb0fcc1f04152bca6"

1
Cargo.toml

@ -7,3 +7,4 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
custom_error="1.9.2"

300
src/link/mod.rs

@ -0,0 +1,300 @@
use std::collections::VecDeque;
use std::str;
extern crate custom_error;
use custom_error::custom_error;
const RECEIVED_FIELD_SIZE: usize = 4;
const LENGTH_FIELD_SIZE: usize = 4;
const HEAD_RECEIVED: u8 = 85;
const HEAD_MESSAGE: u8 = 42;
const MAX_MESSAGE_LENGTH: usize = 0xA00000;
custom_error! { pub ReadingStreamError
InvalidHead{input: u8} = "Invalid byte used as a HEAD: {input}",
MessageTooLong{length: usize} = "Message to receive is too long: {length}",
InvalidUnicode = "Invalid utf-8 was received",
BrokenStream = "Used stream is broken"
}
enum ReadingState {
Head,
ReceivedBytes,
Length,
Payload,
}
pub struct MessageReader {
is_broken: bool,
reading_state: ReadingState,
read_bytes: usize,
current_message_length: usize,
current_message: Vec<u8>,
read_messages: VecDeque<String>,
next_received_bytes: u32,
received_bytes: u64,
}
/// For converting byte stream expected to be generated by Acedia mod from the game server into
/// actual messages. Expected format is a sequence of either:
/// [HEAD_RECEIVED: 1 byte] [amount of bytes received by game server since last update: 4 bytes]
/// [HEAD_MESSAGE: 1 byte] [length of the message: 4 bytes] [utf8-encoded string: ??? bytes]
/// On any invalid input enters a failure state (can be checked by `is_broken()`) and
/// never recovers from it.
/// Use either `push_byte()` or `push()` to input byte stream from game server and `pop()` to
/// retrieve resulting messages.
impl MessageReader {
pub fn new() -> MessageReader {
MessageReader {
is_broken: false,
reading_state: ReadingState::Head,
read_bytes: 0,
current_message_length: 0,
current_message: Vec::new(),
read_messages: VecDeque::new(),
next_received_bytes: 0,
received_bytes: 0,
}
}
pub fn push_byte(&mut self, input: u8) -> Result<(), ReadingStreamError> {
if self.is_broken {
return Err(ReadingStreamError::BrokenStream);
}
match &self.reading_state {
ReadingState::Head => {
if input == HEAD_RECEIVED {
self.reading_state = ReadingState::ReceivedBytes;
} else if input == HEAD_MESSAGE {
self.reading_state = ReadingState::Length;
} else {
self.is_broken = true;
return Err(ReadingStreamError::InvalidHead { input });
}
}
ReadingState::ReceivedBytes => {
self.next_received_bytes = self.next_received_bytes << 8;
self.next_received_bytes += input as u32;
self.read_bytes += 1;
if self.read_bytes >= RECEIVED_FIELD_SIZE {
self.received_bytes += self.next_received_bytes as u64;
self.next_received_bytes = 0;
self.read_bytes = 0;
self.reading_state = ReadingState::Head;
}
}
ReadingState::Length => {
self.current_message_length = self.current_message_length << 8;
self.current_message_length += input as usize;
self.read_bytes += 1;
if self.read_bytes >= LENGTH_FIELD_SIZE {
self.read_bytes = 0;
self.reading_state = ReadingState::Payload;
if self.current_message_length > MAX_MESSAGE_LENGTH {
self.is_broken = true;
return Err(ReadingStreamError::MessageTooLong {
length: self.current_message_length,
});
}
self.current_message = Vec::with_capacity(self.current_message_length);
}
}
ReadingState::Payload => {
self.current_message.push(input);
self.read_bytes += 1 as usize;
if self.read_bytes >= self.current_message_length {
match str::from_utf8(&self.current_message) {
Ok(next_message) => self.read_messages.push_front(next_message.to_owned()),
_ => {
self.is_broken = true;
return Err(ReadingStreamError::InvalidUnicode);
}
};
self.current_message.clear();
self.current_message_length = 0;
self.read_bytes = 0;
self.reading_state = ReadingState::Head;
}
}
}
Ok(())
}
pub fn push(&mut self, input: &[u8]) -> Result<(), ReadingStreamError> {
for &byte in input {
self.push_byte(byte)?;
}
Ok(())
}
pub fn pop(&mut self) -> Option<String> {
self.read_messages.pop_back()
}
pub fn received_bytes(&self) -> u64 {
self.received_bytes
}
pub fn is_broken(&self) -> bool {
self.is_broken
}
}
#[test]
fn message_push_byte() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_MESSAGE).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(13).unwrap();
reader.push_byte(0x48).unwrap(); // H
reader.push_byte(0x65).unwrap(); // e
reader.push_byte(0x6c).unwrap(); // l
reader.push_byte(0x6c).unwrap(); // l
reader.push_byte(0x6f).unwrap(); // o
reader.push_byte(0x2c).unwrap(); // ,
reader.push_byte(0x20).unwrap(); // <space>
reader.push_byte(0x77).unwrap(); // w
reader.push_byte(0x6f).unwrap(); // o
reader.push_byte(0x72).unwrap(); // r
reader.push_byte(0x6c).unwrap(); // l
reader.push_byte(0x64).unwrap(); // d
reader.push_byte(0x21).unwrap(); // <exclamation mark>
reader.push_byte(HEAD_MESSAGE).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(3).unwrap();
reader.push_byte(0x59).unwrap(); // Y
reader.push_byte(0x6f).unwrap(); // o
reader.push_byte(0x21).unwrap(); // <exclamation mark>
assert_eq!(reader.pop().unwrap(), "Hello, world!");
assert_eq!(reader.pop().unwrap(), "Yo!");
assert_eq!(reader.pop(), None);
}
#[test]
fn received_push_byte() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_RECEIVED).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(243).unwrap();
assert_eq!(reader.received_bytes(), 243);
reader.push_byte(HEAD_RECEIVED).unwrap();
reader.push_byte(65).unwrap();
reader.push_byte(25).unwrap();
reader.push_byte(178).unwrap();
reader.push_byte(4).unwrap();
assert_eq!(reader.received_bytes(), 1092203255);
reader.push_byte(HEAD_RECEIVED).unwrap();
reader.push_byte(231).unwrap();
reader.push_byte(34).unwrap();
reader.push_byte(154).unwrap();
assert_eq!(reader.received_bytes(), 1092203255);
}
#[test]
fn mixed_push_byte() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_RECEIVED).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(243).unwrap();
reader.push_byte(HEAD_MESSAGE).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(3).unwrap();
reader.push_byte(0x59).unwrap(); // Y
reader.push_byte(0x6f).unwrap(); // o
reader.push_byte(0x21).unwrap(); // <exclamation mark>
reader.push_byte(HEAD_RECEIVED).unwrap();
reader.push_byte(65).unwrap();
reader.push_byte(25).unwrap();
reader.push_byte(178).unwrap();
reader.push_byte(4).unwrap();
assert_eq!(reader.received_bytes(), 1092203255);
assert_eq!(reader.pop().unwrap(), "Yo!");
assert_eq!(reader.pop(), None);
}
#[test]
fn pushing_many_bytes_at_once() {
let mut reader = MessageReader::new();
reader
.push(&[
HEAD_RECEIVED,
0,
0,
0,
243,
HEAD_MESSAGE,
0,
0,
0,
3,
0x59, // Y
0x6f, // o
0x21, // <exclamation mark>
HEAD_RECEIVED,
65,
25,
178,
4,
])
.unwrap();
assert_eq!(reader.received_bytes(), 1092203255);
assert_eq!(reader.pop().unwrap(), "Yo!");
assert_eq!(reader.pop(), None);
}
#[test]
fn generates_error_invalid_head() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_RECEIVED).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(243).unwrap();
assert!(!reader.is_broken());
reader
.push_byte(25)
.expect_err("Testing failing on incorrect HEAD");
assert!(reader.is_broken());
}
#[test]
fn generates_error_message_too_long() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_MESSAGE).unwrap();
// Max length right now is `0xA00000`
reader.push_byte(0).unwrap();
reader.push_byte(0xA0).unwrap();
reader.push_byte(0).unwrap();
assert!(!reader.is_broken());
reader
.push_byte(1)
.expect_err("Testing failing on exceeding allowed message length");
assert!(reader.is_broken());
}
#[test]
fn generates_error_invalid_unicode() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_MESSAGE).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(2).unwrap();
reader.push_byte(0b11010011).unwrap(); // start of 2-byte sequence
assert!(!reader.is_broken());
// Bytes inside multi-byte code point have to have `1` for their high bit
reader
.push_byte(0b01010011)
.expect_err("Testing failing on incorrect unicode");
assert!(reader.is_broken());
}

1
src/main.rs

@ -1,5 +1,6 @@
use std::env; use std::env;
use std::path::Path; use std::path::Path;
mod link;
mod unreal_config; mod unreal_config;
fn main() { fn main() {

Loading…
Cancel
Save