Compare commits

..

23 Commits

Author SHA1 Message Date
cfe680d770 Refactor link module for use in one thread
While `link` module is still spawning new threads to handle new
connections and reading data from connected clients - it now collects
all the received messages in one thread, simplifying their handling.
2021-08-04 04:29:20 +07:00
a6ac0b7a17 Change MessageWriter's protocol
There is no need to send length of each data chunk. This patch
simplifies protocol by only sending length of each message.
2021-07-29 17:41:28 +07:00
g
4f67a7b72c fix docs 2021-07-24 14:40:32 +07:00
26b7d1dd91 Add network prototype
This prototype is untested, since there is currently no client to test it with.

Related to #2
2021-07-24 04:46:19 +07:00
65ef791f00 Fix clippy warnings 2021-07-24 02:36:39 +07:00
034b7860d6 Refactor MessageWriter 2021-07-24 02:26:43 +07:00
5bfdd61248 Add MessageWriter implementation
This is WIP implementation, buggy, untested and not properly commented
2021-07-23 05:53:26 +07:00
f310febe62 Add new mod.rs file with for link module 2021-07-23 02:38:36 +07:00
816454f4cf Move MessageReader into a separate file 2021-07-23 02:36:05 +07:00
b187041d9e Move documentation for MessageReader 2021-07-23 02:02:05 +07:00
3f660f54d5 Remove EXPECTED_LIMIT_TO_UE_MESSAGES constant 2021-07-23 02:01:17 +07:00
5ce511c5a7 Refactor main.rs to use if let construction 2021-07-23 01:50:47 +07:00
978a5c4182 Change UE_RECEIVED_FIELD_SIZE to 2
ue-server is only supposed to receive up to 4095 bytes at once and is
expected to report after reading them, therefore 4 bytes for the amount
of received bytes is excessive and 2 will suffice.
2021-07-23 01:48:42 +07:00
e3f554218a Refactor push_byte's code
Move logic below error checking with early exit.
2021-07-23 01:31:40 +07:00
9ca00c5b8c Refactor tests for clarity 2021-07-23 01:28:33 +07:00
f118767e3f Rename buffer into length_buffer 2021-07-23 01:23:13 +07:00
202fa39a42 Fix comments for some MessageReader contants 2021-07-23 01:22:29 +07:00
bb1f73a755 Change character byte definitions into b'X' form 2021-07-22 18:54:12 +07:00
af3341c1e7 Refactor MessageReader for clarity
Rename `received_bytes` into `ue_received_bytes` and get rid of
`next_received_bytes` by reading relevant data into new byte buffer
instead.
2021-07-22 18:48:09 +07:00
0dedd1d1f1 Change MessageReader to use with_capacity
Some of the collections inside `MessageReader` were created with `new()`
instead of `with_capacity()` call. This patch fixes that or comments why
it was not done in some places.
2021-07-22 18:18:29 +07:00
b846fcf55b Fix MessageReader documentation 2021-07-22 18:11:44 +07:00
ba3ac088dd Refactor numeric constants for MessageReader 2021-07-22 18:03:48 +07:00
0a86852197 Add MessageReader implementation
I've decided that it would be a good idea to separete
compoments that transform text messages into byte chunks to send to
Unreal Engine and back. This means that each of them will have only one
responsibility and using them will be easier in case we decide to read
and write via tcp/ip in separate threads. The only interesting to them
both parameter is the amount of bytes that were confirmed to be received
from the Unreal Engine side, which can be easily shared "by hand".

This patch implements "reader" that accepts byte stream expected from
the game server and converts it into:
1. separate string messages;
2. amount of bytes game server reported to already have received.
2021-07-21 03:40:29 +07:00
8 changed files with 799 additions and 9 deletions

41
Cargo.lock generated
View File

@ -3,4 +3,45 @@
[[package]]
name = "avarice"
version = "0.1.0"
dependencies = [
"custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "custom_error"
version = "1.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "itoa"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "ryu"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "serde"
version = "1.0.126"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "serde_json"
version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"itoa 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
"ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)",
]
[metadata]
"checksum custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4f8a51dd197fa6ba5b4dc98a990a43cc13693c23eb0089ebb0fcc1f04152bca6"
"checksum itoa 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
"checksum ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
"checksum serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)" = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03"
"checksum serde_json 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)" = "336b10da19a12ad094b59d870ebde26a45402e5b470add4b5fd03c5048a32127"

View File

@ -7,3 +7,6 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
custom_error="1.*"
serde ="1.*"
serde_json="1.*"

38
src/link/message.rs Normal file
View File

@ -0,0 +1,38 @@
//! Implements Avarice message: target service, message type, json parameters
use serde_json;
use serde_json::json;
use std::fmt;
pub struct AvariceMessage {
pub service: String,
pub message_type: String,
pub parameters: serde_json::Value,
}
impl AvariceMessage {
/// Parses JSON form of a message into `AvariceMessage` struct
pub fn from(message_str: &str) -> Option<AvariceMessage> {
let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap();
let message_json = message_json.as_object_mut()?;
Some(AvariceMessage {
service: message_json.remove("s")?.as_str()?.to_owned(),
message_type: message_json.remove("t")?.as_str()?.to_owned(),
parameters: message_json.remove("p")?,
})
}
}
impl fmt::Display for AvariceMessage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{}",
json!({
"s": self.service,
"t": self.message_type,
"p": self.parameters,
})
.to_string()
)
}
}

111
src/link/mod.rs Normal file
View File

@ -0,0 +1,111 @@
//! This module provides a simple interface to exchange messages (`message::AvariceMessage`) between
//! Avarice and ue-server. The model is simple - we create an mpsc-channel of `Sender` and
//! `Receiver`, then pass `Sender` to the `network` sub-module and wait for its messages about
//! ue-servers' connections using `Receiver`.
use std::collections::HashMap;
use std::error::Error;
use std::io::Write;
use std::net::{SocketAddr, TcpStream};
use std::sync::mpsc::{channel, Receiver};
pub use writer::MessageWriter;
mod message;
mod network;
mod reader;
mod writer;
pub use message::AvariceMessage;
pub use network::{run_server, NetworkMessage};
/// For collecting messages from all connected ue-servers and providing a way to reply back.
pub struct AvariceServer {
connected_links: HashMap<SocketAddr, Link>,
receiver: Receiver<NetworkMessage>,
}
/// For representing a link to one of the connected ue-servers, can be used to send messages back.
/// To receive messages use `AvariceServer`'s `next()` method instead.
pub struct Link {
ue_server_address: SocketAddr,
writer: MessageWriter,
writing_stream: TcpStream,
}
impl AvariceServer {
/// Blocks until a new message arrives from one of the ue-servers. Returns a pair of `Link`,
/// corresponding to the ue-server that sent next message and `AvariceMessage`,
/// representing that message.
pub fn next(&mut self) -> Option<(&mut Link, AvariceMessage)> {
loop {
match self.receiver.recv() {
Ok(NetworkMessage::ConnectionEstablished(ue_server_address, writing_stream)) => {
// If `ue_server_address` was already present in `self.connected_links`
// hash map, then it means we have failed to clean it up after it
// has disconnected. We can just throw away the old value here.
self.connected_links.insert(
ue_server_address,
Link {
ue_server_address,
writing_stream,
writer: MessageWriter::new(),
},
);
continue;
}
Ok(NetworkMessage::ConnectionLost(ue_server_address)) => {
self.connected_links.remove(&ue_server_address);
continue;
}
Ok(NetworkMessage::InvalidDataReceived(ue_server_address)) => {
self.connected_links.remove(&ue_server_address);
continue;
}
Ok(NetworkMessage::UEReceivedUpdate(ue_server_address, ue_received_bytes)) => {
if let Some(link) = self.connected_links.get_mut(&ue_server_address) {
link.update_ue_received_bytes(ue_received_bytes)
}
continue;
}
Ok(NetworkMessage::MessageReceived(ue_server_address, message)) => {
// Not having a link with key `ue_server_address` should be impossible here
return self
.connected_links
.get_mut(&ue_server_address)
.and_then(|x| Some((x, message)));
}
_ => return None,
}
}
}
}
impl Link {
pub fn send(&mut self, message: AvariceMessage) {
self.writer.push(&message.to_string());
self.flush();
}
pub fn socket_address(&self) -> SocketAddr {
self.ue_server_address
}
fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) {
self.writer.update_ue_received_bytes(ue_received_bytes);
self.flush();
}
fn flush(&mut self) {
if let Some(bytes) = self.writer.try_pop() {
self.writing_stream.write_all(&bytes).unwrap();
}
}
}
/// Creates a new `AvariceServer` that will listen for ue-server connections on the specified port.
pub fn start_avarice(port: u16) -> Result<AvariceServer, Box<dyn Error>> {
let (sender, receiver) = channel();
run_server(port, sender)?;
Ok(AvariceServer {
connected_links: HashMap::new(),
receiver,
})
}

106
src/link/network.rs Normal file
View File

@ -0,0 +1,106 @@
//! Implements a network model where messages from all the ue-servers are collected in a single
//! main thread. For that we spawn a new thread that listens for new connections, which in turn
//! spawns a new thread for every connected ue-server to handle reading data from it.
//! Since all reading is handled in ue-servers' own threads, to collect messages they have
//! received in the main thread we use `std::sync::mpsc::Sender`. Conversely, all the writing to
//! ue-server is handled in the main thread itself. Writing `TcpStream` is sent to the main thread
//! by the same `std::sync::mpsc::Sender` object.
use super::message::AvariceMessage;
pub use super::reader::MessageReader;
use std::error::Error;
use std::io::Read;
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::Sender;
use std::thread;
pub struct UEConnection {
pub address: SocketAddr,
pub reader: MessageReader,
pub reading_stream: TcpStream,
pub message_sender: Sender<NetworkMessage>,
}
/// Possible messages to the main thread
pub enum NetworkMessage {
ConnectionEstablished(SocketAddr, TcpStream),
InvalidDataReceived(SocketAddr),
ConnectionLost(SocketAddr),
MessageReceived(SocketAddr, AvariceMessage),
UEReceivedUpdate(SocketAddr, u64),
}
pub fn run_server(port: u16, message_sender: Sender<NetworkMessage>) -> Result<(), Box<dyn Error>> {
let address = SocketAddr::from(([0, 0, 0, 0], port));
let listener = TcpListener::bind(address)?;
thread::spawn(move || loop {
// Listen to new (multiple) connection
let (reading_stream, address) = listener.accept().unwrap();
let writing_stream = reading_stream.try_clone().unwrap();
message_sender
.send(NetworkMessage::ConnectionEstablished(
address,
writing_stream,
))
.unwrap();
// On connection - spawn a new thread
let sender_clone = message_sender.clone();
thread::spawn(move || {
manage_connection(UEConnection {
reader: MessageReader::new(),
message_sender: sender_clone,
reading_stream,
address,
})
});
});
Ok(())
}
fn manage_connection(mut connection: UEConnection) {
let mut buffer = [0; 1024];
loop {
// Reading cycle
match connection.reading_stream.read(&mut buffer) {
Ok(n) => connection.reader.push(&buffer[..n]).unwrap(),
_ => {
connection
.message_sender
.send(NetworkMessage::ConnectionLost(connection.address))
.unwrap();
return;
}
};
if connection.reader.is_broken() {
connection
.message_sender
.send(NetworkMessage::InvalidDataReceived(connection.address))
.unwrap();
return;
}
// Decoding cycle
while let Some(text_message) = connection.reader.pop() {
if let Some(avarice_message) = AvariceMessage::from(&text_message) {
connection
.message_sender
.send(NetworkMessage::MessageReceived(
connection.address,
avarice_message,
))
.unwrap();
} else {
connection
.message_sender
.send(NetworkMessage::InvalidDataReceived(connection.address))
.unwrap();
return;
}
}
connection
.message_sender
.send(NetworkMessage::UEReceivedUpdate(
connection.address,
connection.reader.ue_received_bytes(),
))
.unwrap();
}
}

321
src/link/reader.rs Normal file
View File

@ -0,0 +1,321 @@
//! Implements reader that receives data from UE2 game server.
use std::collections::VecDeque;
use std::str;
extern crate custom_error;
use custom_error::custom_error;
// Defines how many bytes is used to encode "AMOUNT" field in the response from ue-server about
// amount of bytes it received since the last update
const UE_RECEIVED_FIELD_SIZE: usize = 2;
// Defines how many bytes is used to encode "LENGTH" field, describing length of
// next JSON message from ue-server
const UE_LENGTH_FIELD_SIZE: usize = 4;
// Arbitrary value indicating that next byte sequence from ue-server reports amount of bytes
// received by that server so far.
const HEAD_UE_RECEIVED: u8 = 85;
// Arbitrary value indicating that next byte sequence from ue-server contains JSON message.
const HEAD_UE_MESSAGE: u8 = 42;
// Maximum allowed size of JSON message sent from ue-server.
const MAX_UE_MESSAGE_LENGTH: usize = 25 * 1024 * 1024;
custom_error! { pub ReadingStreamError
InvalidHead{input: u8} = "Invalid byte used as a HEAD: {input}",
MessageTooLong{length: usize} = "Message to receive is too long: {length}",
InvalidUnicode = "Invalid utf-8 was received",
BrokenStream = "Used stream is broken"
}
enum ReadingState {
Head,
ReceivedBytes,
Length,
Payload,
}
/// For converting byte stream that is expected from the ue-server into actual messages.
///
/// Expected format is a sequence of either:
///
/// | Data | Length |
/// |---------|---------|
/// | [`HEAD_UE_RECEIVED`] (marker byte) | 1 byte |
/// | Amount of bytes received by ue-server since last update | 2 bytes: u16 BE|
///
/// or
///
/// | Data | Length |
/// |---------|---------|
/// | [`HEAD_UE_MESSAGE`] (marker byte) | 1 byte|
/// | `LENGTH` of the JSON message in utf8 encoding | 4 bytes: u32 BE|
/// | UTF8-encoded string | `LENGTH` bytes|
///
/// On any invalid input enters into a failure state (can be checked by `is_broken()`) and never recovers from it.
/// Use either `push_byte()` or `push()` to input byte stream from ue-server and `pop()` to
/// retrieve resulting messages.
pub struct MessageReader {
is_broken: bool,
reading_state: ReadingState,
read_bytes: usize,
length_buffer: [u8; 4],
current_message_length: usize,
current_message: Vec<u8>,
read_messages: VecDeque<String>,
ue_received_bytes: u64,
}
impl MessageReader {
pub fn new() -> MessageReader {
MessageReader {
is_broken: false,
reading_state: ReadingState::Head,
read_bytes: 0,
length_buffer: [0; 4],
current_message_length: 0,
// Will be recreated with `with_capacity` in `push_byte()`
current_message: Vec::new(),
// This value should be more than enough for typical use
read_messages: VecDeque::with_capacity(100),
ue_received_bytes: 0,
}
}
pub fn push_byte(&mut self, input: u8) -> Result<(), ReadingStreamError> {
if self.is_broken {
return Err(ReadingStreamError::BrokenStream);
}
match &self.reading_state {
ReadingState::Head => {
if input == HEAD_UE_RECEIVED {
self.change_state(ReadingState::ReceivedBytes);
} else if input == HEAD_UE_MESSAGE {
self.change_state(ReadingState::Length);
} else {
self.is_broken = true;
return Err(ReadingStreamError::InvalidHead { input });
}
}
ReadingState::ReceivedBytes => {
self.length_buffer[self.read_bytes] = input;
self.read_bytes += 1;
if self.read_bytes >= UE_RECEIVED_FIELD_SIZE {
self.ue_received_bytes += u64::from(array_of_u8_to_u16(self.length_buffer));
self.change_state(ReadingState::Head);
}
}
ReadingState::Length => {
self.length_buffer[self.read_bytes] = input;
self.read_bytes += 1;
if self.read_bytes >= UE_LENGTH_FIELD_SIZE {
self.current_message_length = array_of_u8_to_u32(self.length_buffer) as usize;
if self.current_message_length > MAX_UE_MESSAGE_LENGTH {
self.is_broken = true;
return Err(ReadingStreamError::MessageTooLong {
length: self.current_message_length,
});
}
self.current_message = Vec::with_capacity(self.current_message_length);
self.change_state(ReadingState::Payload);
}
}
ReadingState::Payload => {
self.current_message.push(input);
self.read_bytes += 1;
if self.read_bytes >= self.current_message_length {
match str::from_utf8(&self.current_message) {
Ok(next_message) => self.read_messages.push_front(next_message.to_owned()),
_ => {
self.is_broken = true;
return Err(ReadingStreamError::InvalidUnicode);
}
};
self.current_message.clear();
self.current_message_length = 0;
self.change_state(ReadingState::Head);
}
}
}
Ok(())
}
pub fn push(&mut self, input: &[u8]) -> Result<(), ReadingStreamError> {
for &byte in input {
self.push_byte(byte)?;
}
Ok(())
}
pub fn pop(&mut self) -> Option<String> {
self.read_messages.pop_back()
}
pub fn ue_received_bytes(&self) -> u64 {
self.ue_received_bytes
}
pub fn is_broken(&self) -> bool {
self.is_broken
}
fn change_state(&mut self, next_state: ReadingState) {
self.read_bytes = 0;
self.reading_state = next_state;
}
}
fn array_of_u8_to_u16(bytes: [u8; 4]) -> u16 {
(u16::from(bytes[0]) << 8) + u16::from(bytes[1])
}
fn array_of_u8_to_u32(bytes: [u8; 4]) -> u32 {
(u32::from(bytes[0]) << 24)
+ (u32::from(bytes[1]) << 16)
+ (u32::from(bytes[2]) << 8)
+ (u32::from(bytes[3]))
}
#[test]
fn message_push_byte() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_UE_MESSAGE).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(13).unwrap();
reader.push_byte(b'H').unwrap();
reader.push_byte(b'e').unwrap();
reader.push_byte(b'l').unwrap();
reader.push_byte(b'l').unwrap();
reader.push_byte(b'o').unwrap();
reader.push_byte(b',').unwrap();
reader.push_byte(b' ').unwrap();
reader.push_byte(b'w').unwrap();
reader.push_byte(b'o').unwrap();
reader.push_byte(b'r').unwrap();
reader.push_byte(b'l').unwrap();
reader.push_byte(b'd').unwrap();
reader.push_byte(b'!').unwrap();
reader.push_byte(HEAD_UE_MESSAGE).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(3).unwrap();
reader.push_byte(b'Y').unwrap();
reader.push_byte(b'o').unwrap();
reader.push_byte(b'!').unwrap();
assert_eq!(reader.pop().unwrap(), "Hello, world!");
assert_eq!(reader.pop().unwrap(), "Yo!");
assert_eq!(reader.pop(), None);
}
#[test]
fn received_push_byte() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0xf3).unwrap();
assert_eq!(reader.ue_received_bytes(), 0xf3);
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
reader.push_byte(0xb2).unwrap();
reader.push_byte(0x04).unwrap();
assert_eq!(reader.ue_received_bytes(), 0xb2_f7); // 0xf7 = 0x04 + 0xf3
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
reader.push_byte(231).unwrap();
assert_eq!(reader.ue_received_bytes(), 0xb2_f7);
}
#[test]
fn mixed_push_byte() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0xf3).unwrap();
reader.push_byte(HEAD_UE_MESSAGE).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(3).unwrap();
reader.push_byte(b'Y').unwrap();
reader.push_byte(b'o').unwrap();
reader.push_byte(b'!').unwrap();
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
reader.push_byte(0xb2).unwrap();
reader.push_byte(0x04).unwrap();
assert_eq!(reader.ue_received_bytes(), 0xb2_f7); // 0xf7 = 0x04 + 0xf3
assert_eq!(reader.pop().unwrap(), "Yo!");
assert_eq!(reader.pop(), None);
}
#[test]
fn pushing_many_bytes_at_once() {
let mut reader = MessageReader::new();
reader
.push(&[
HEAD_UE_RECEIVED,
0,
0xf3,
HEAD_UE_MESSAGE,
0,
0,
0,
3,
b'Y',
b'o',
b'!',
HEAD_UE_RECEIVED,
0xb2,
0x04,
])
.unwrap();
assert_eq!(reader.ue_received_bytes(), 0xb2_f7);
assert_eq!(reader.pop().unwrap(), "Yo!");
assert_eq!(reader.pop(), None);
}
#[test]
fn generates_error_invalid_head() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0xf3).unwrap();
assert!(!reader.is_broken());
reader
.push_byte(25)
.expect_err("Testing failing on incorrect HEAD");
assert!(reader.is_broken());
}
#[test]
fn generates_error_message_too_long() {
let mut reader = MessageReader::new();
let huge_length = MAX_UE_MESSAGE_LENGTH + 1;
let bytes = (huge_length as u32).to_be_bytes();
reader.push_byte(HEAD_UE_MESSAGE).unwrap();
reader.push_byte(bytes[0]).unwrap();
reader.push_byte(bytes[1]).unwrap();
reader.push_byte(bytes[2]).unwrap();
assert!(!reader.is_broken());
reader
.push_byte(bytes[3])
.expect_err("Testing failing on exceeding allowed message length");
assert!(reader.is_broken());
}
#[test]
fn generates_error_invalid_unicode() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_UE_MESSAGE).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(2).unwrap();
reader.push_byte(0b11010011).unwrap(); // start of 2-byte sequence
assert!(!reader.is_broken());
// Bytes inside multi-byte code point have to have `1` for their high bit
reader
.push_byte(0b01010011)
.expect_err("Testing failing on incorrect unicode");
assert!(reader.is_broken());
}

169
src/link/writer.rs Normal file
View File

@ -0,0 +1,169 @@
//! Implements writer that sends data to UE2 game server.
use std::cmp::{max, min};
use std::collections::VecDeque;
use std::convert::TryFrom;
use std::iter::Extend;
// Maximum amount of bytes ue-server is able to receive at once
const UE_INPUT_BUFFER: usize = 4095;
/// For converting text messages into chunks of bytes that can be sent to the ue-server.
///
/// Every string message is converted into a length-prefixed array of utf8 bytes:
///
/// | Data | Length |
/// |---------|---------|
/// | Message `LENGTH` | 4 bytes: u32 BE |
/// | UTF8-encoded string | `LENGTH` bytes|
///
/// Resulting byte sequences from all messages are then concatenated (in the same order as they were
/// "written") into a single data stream. Bytes from the data stream are returned in chunks of
/// size no more than `UE_INPUT_BUFFER`. New chunk is returned only when `MessageWriter` knows
/// that ue-server's buffer has enough space to accept it.
///
/// Use `push()` to input string messages and `try_pop()` to retrieve next chunk.
/// NOTE: `try_pop()` can return `None` even if not all message data has been returned,
/// in case `MessageWriter` thinks that ue-server's buffer does not have enough space.
///
/// Call `update_ue_received_bytes()` to update `MessageWriter`'s information about
/// how many bytes ue-server has received so far. This can signal that its buffer has enough
/// free space. `MessageWriter` assumes that all data returned by its `try_pop()` method is sent
/// to ue-server.
///
/// Use `is_empty()` method to check for whether `MessageWriter` still has some data to return
/// (possibly after ``update_ue_received_bytes()`).
pub struct MessageWriter {
sent_bytes: u64,
ue_received_bytes: u64,
pending_data: VecDeque<u8>,
}
impl MessageWriter {
pub fn new() -> MessageWriter {
MessageWriter {
sent_bytes: 0,
ue_received_bytes: 0,
// This value should be more than enough for typical use:
// it will take at least one second to send this much data to a 30 tick rate ue-server.
pending_data: VecDeque::with_capacity(30 * UE_INPUT_BUFFER),
}
}
pub fn push(&mut self, message: &str) {
let message_as_utf8 = message.as_bytes();
let message_as_utf8 = [
&(message_as_utf8.len() as u32).to_be_bytes(),
message_as_utf8,
]
.concat();
self.pending_data.extend(message_as_utf8.into_iter());
}
pub fn try_pop(&mut self) -> Option<Vec<u8>> {
if self.is_empty() {
return None;
}
let chunk_size = min(self.available_ue_buffer_capacity(), self.pending_data.len());
if chunk_size == 0 {
return None;
}
let mut bytes_to_send = Vec::with_capacity(chunk_size);
for next_byte in self.pending_data.drain(..chunk_size) {
bytes_to_send.push(next_byte);
}
self.sent_bytes += bytes_to_send.len() as u64;
Some(bytes_to_send)
}
pub fn is_empty(&self) -> bool {
self.pending_data.is_empty()
}
/// Takes total amount of bytes received so far by the ue-server, not just bytes received after
/// the last `update_ue_received_bytes()` call.
pub fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) {
self.ue_received_bytes = max(ue_received_bytes, self.ue_received_bytes);
}
fn available_ue_buffer_capacity(&self) -> usize {
match usize::try_from(self.sent_bytes - self.ue_received_bytes).ok() {
Some(ue_buffered_bytes) => max(0, UE_INPUT_BUFFER - ue_buffered_bytes),
_ => 0,
}
}
}
#[test]
fn writer_contents_after_creation() {
let writer = MessageWriter::new();
assert_eq!(writer.is_empty(), true);
assert_eq!(writer.available_ue_buffer_capacity(), UE_INPUT_BUFFER);
}
#[test]
fn writer_content_after_push() {
let mut writer = MessageWriter::new();
writer.push("Hello, world!");
assert_eq!(writer.is_empty(), false);
assert_eq!(writer.available_ue_buffer_capacity(), UE_INPUT_BUFFER);
}
#[test]
fn writing_single_short_message() {
let mut writer = MessageWriter::new();
writer.push("Hello, world!");
let resulting_bytes = writer.try_pop().unwrap();
let expected_bytes = [
0, 0, 0, 13, // Bytes in the message
b'H', b'e', b'l', b'l', b'o', b',', b' ', b'w', b'o', b'r', b'l', b'd', b'!',
];
assert_eq!(writer.is_empty(), true);
assert_eq!(
writer.available_ue_buffer_capacity(),
UE_INPUT_BUFFER - "Hello, world!".len() - 4
);
assert_eq!(resulting_bytes, expected_bytes);
assert_eq!(writer.sent_bytes, expected_bytes.len() as u64);
}
#[test]
fn writing_first_chunk_of_single_long_message() {
let mut writer = MessageWriter::new();
// Because we also have to pass message length, this will go over the sending limit
let long_message = "Q".repeat(UE_INPUT_BUFFER);
writer.push(&long_message);
let resulting_bytes = writer.try_pop().unwrap();
assert_eq!(writer.is_empty(), false);
assert_eq!(resulting_bytes.len(), UE_INPUT_BUFFER);
assert_eq!(writer.available_ue_buffer_capacity(), 0);
// Bytes in message = 4095 = 0x0fff
assert_eq!(resulting_bytes[0], 0);
assert_eq!(resulting_bytes[1], 0);
assert_eq!(resulting_bytes[2], 0x0f);
assert_eq!(resulting_bytes[3], 0xff);
for &byte in resulting_bytes[4..].iter() {
assert_eq!(byte, b'Q');
}
assert_eq!(writer.try_pop(), None);
assert_eq!(writer.is_empty(), false);
}
#[test]
fn writing_second_chunk_of_single_long_message() {
let mut writer = MessageWriter::new();
// Because we also have to pass lengths, this will go over the sending limit
let long_message = "Q".repeat(UE_INPUT_BUFFER);
writer.push(&long_message);
// This pops all but 4 bytes of `long_message`, that were required to encode message length
let first_bytes = writer.try_pop().unwrap();
writer.update_ue_received_bytes(first_bytes.len() as u64);
let resulting_bytes = writer.try_pop().unwrap();
assert_eq!(
writer.available_ue_buffer_capacity(),
UE_INPUT_BUFFER - resulting_bytes.len()
);
assert_eq!(writer.is_empty(), true);
// Bytes left for the next chunk = 4
assert_eq!(resulting_bytes, [b'Q', b'Q', b'Q', b'Q'])
}

View File

@ -1,13 +1,14 @@
use std::env;
use std::path::Path;
mod unreal_config;
// ! This utility provides a way to communicate with Unreal Engine 2 servers running Acedia mutator.
mod link;
use link::start_avarice;
fn main() {
let args: Vec<String> = env::args().collect();
let filename = &args[1];
let config = unreal_config::load_file(Path::new(filename));
match config {
Ok(config) => print!("{}", config),
_ => (),
let mut server = start_avarice(1234).unwrap();
while let Some((link, message)) = server.next() {
println!("{}: {}", link.socket_address(), message.to_string());
if message.message_type != "end" {
link.send(message);
}
}
println!("Avarice has shut down!");
}