Add network link to ue-server implementation #13

Open
dkanus wants to merge 23 commits from feature_link into master
  1. 41
      Cargo.lock
  2. 3
      Cargo.toml
  3. 38
      src/link/message.rs
  4. 111
      src/link/mod.rs
  5. 106
      src/link/network.rs
  6. 321
      src/link/reader.rs
  7. 169
      src/link/writer.rs
  8. 19
      src/main.rs

41
Cargo.lock generated

@ -3,4 +3,45 @@
[[package]] [[package]]
name = "avarice" name = "avarice"
version = "0.1.0" version = "0.1.0"
dependencies = [
"custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "custom_error"
version = "1.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "itoa"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "ryu"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "serde"
version = "1.0.126"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "serde_json"
version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"itoa 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
"ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)",
]
[metadata]
"checksum custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4f8a51dd197fa6ba5b4dc98a990a43cc13693c23eb0089ebb0fcc1f04152bca6"
"checksum itoa 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
"checksum ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
"checksum serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)" = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03"
"checksum serde_json 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)" = "336b10da19a12ad094b59d870ebde26a45402e5b470add4b5fd03c5048a32127"

3
Cargo.toml

@ -7,3 +7,6 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
custom_error="1.*"
serde ="1.*"
serde_json="1.*"
Review

No newline at end of file

No newline at end of file

38
src/link/message.rs

@ -0,0 +1,38 @@
//! Implements Avarice message: target service, message type, json parameters
use serde_json;
Review

warning: this import is redundant
--> src\link\message.rs:2:1
|
2 | use serde_json;
| ^^^^^^^^^^^^^^^ help: remove it entirely

warning: this import is redundant --> src\link\message.rs:2:1 | 2 | use serde_json; | ^^^^^^^^^^^^^^^ help: remove it entirely
use serde_json::json;
use std::fmt;
pub struct AvariceMessage {
Review

Any explanation/doc on wtf is service/message_type/parameters?

Any explanation/doc on wtf is service/message_type/parameters?
pub service: String,
pub message_type: String,
pub parameters: serde_json::Value,
}
impl AvariceMessage {
/// Parses JSON form of a message into `AvariceMessage` struct
pub fn from(message_str: &str) -> Option<AvariceMessage> {
Review

Confusing name, since it's not related to https://doc.rust-lang.org/std/convert/trait.From.html#tymethod.from, which is very common.

Confusing name, since it's not related to https://doc.rust-lang.org/std/convert/trait.From.html#tymethod.from, which is very common.
Review

Suggestion:

--- a/src/link/message.rs
+++ b/src/link/message.rs
@@ -9,19 +9,22 @@ pub struct AvariceMessage {
     pub parameters: serde_json::Value,
 }

-impl AvariceMessage {
-    /// Parses JSON form of a message into `AvariceMessage` struct
-    pub fn from(message_str: &str) -> Option<AvariceMessage> {
-        let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap();
-        let message_json = message_json.as_object_mut()?;
-        Some(AvariceMessage {
-            service: message_json.remove("s")?.as_str()?.to_owned(),
-            message_type: message_json.remove("t")?.as_str()?.to_owned(),
-            parameters: message_json.remove("p")?,
+use std::convert::TryFrom;
+use std::error::Error;
+impl TryFrom<&str> for AvariceMessage {
+    type Error = Box<dyn Error>;
+    fn try_from(value: &str) -> Result<Self, Self::Error> {
+        let mut message_json: serde_json::Value = serde_json::from_str(value)?;
+        let message_json = message_json.as_object_mut().ok_or("Value was not an Object.")?;
+        Ok(AvariceMessage {
+            service: message_json.remove("s").ok_or("Missing field 's'.")?.as_str().ok_or("s was not a String.")?.to_ow
ned(),
+            message_type: message_json.remove("t").ok_or("Missing field 't'.")?.as_str().ok_or("t was not a String.")?.
to_owned(),
+            parameters: message_json.remove("p").ok_or("Missing field 'p'.")?,
         })

--- a/src/link/network.rs
+++ b/src/link/network.rs
@@ -7,6 +7,7 @@
 //! by the same `std::sync::mpsc::Sender` object.
 use super::message::AvariceMessage;
 pub use super::reader::MessageReader;
+use std::convert::TryFrom;
 use std::error::Error;
 use std::io::Read;
 use std::net::{SocketAddr, TcpListener, TcpStream};
@@ -79,7 +80,7 @@ fn manage_connection(mut connection: UEConnection) {
         }
         //  Decoding cycle
         while let Some(text_message) = connection.reader.pop() {
-            if let Some(avarice_message) = AvariceMessage::from(&text_message) {
+            if let Ok(avarice_message) = AvariceMessage::try_from(text_message.as_ref()) {

Free bonus: more detailed errors.

Suggestion: ``` --- a/src/link/message.rs +++ b/src/link/message.rs @@ -9,19 +9,22 @@ pub struct AvariceMessage { pub parameters: serde_json::Value, } -impl AvariceMessage { - /// Parses JSON form of a message into `AvariceMessage` struct - pub fn from(message_str: &str) -> Option<AvariceMessage> { - let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap(); - let message_json = message_json.as_object_mut()?; - Some(AvariceMessage { - service: message_json.remove("s")?.as_str()?.to_owned(), - message_type: message_json.remove("t")?.as_str()?.to_owned(), - parameters: message_json.remove("p")?, +use std::convert::TryFrom; +use std::error::Error; +impl TryFrom<&str> for AvariceMessage { + type Error = Box<dyn Error>; + fn try_from(value: &str) -> Result<Self, Self::Error> { + let mut message_json: serde_json::Value = serde_json::from_str(value)?; + let message_json = message_json.as_object_mut().ok_or("Value was not an Object.")?; + Ok(AvariceMessage { + service: message_json.remove("s").ok_or("Missing field 's'.")?.as_str().ok_or("s was not a String.")?.to_ow ned(), + message_type: message_json.remove("t").ok_or("Missing field 't'.")?.as_str().ok_or("t was not a String.")?. to_owned(), + parameters: message_json.remove("p").ok_or("Missing field 'p'.")?, }) --- a/src/link/network.rs +++ b/src/link/network.rs @@ -7,6 +7,7 @@ //! by the same `std::sync::mpsc::Sender` object. use super::message::AvariceMessage; pub use super::reader::MessageReader; +use std::convert::TryFrom; use std::error::Error; use std::io::Read; use std::net::{SocketAddr, TcpListener, TcpStream}; @@ -79,7 +80,7 @@ fn manage_connection(mut connection: UEConnection) { } // Decoding cycle while let Some(text_message) = connection.reader.pop() { - if let Some(avarice_message) = AvariceMessage::from(&text_message) { + if let Ok(avarice_message) = AvariceMessage::try_from(text_message.as_ref()) { ``` Free bonus: more detailed errors.
let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap();
let message_json = message_json.as_object_mut()?;
Some(AvariceMessage {
service: message_json.remove("s")?.as_str()?.to_owned(),
Review

Why "remove"?

Why "remove"?
message_type: message_json.remove("t")?.as_str()?.to_owned(),
parameters: message_json.remove("p")?,
})
}
}
impl fmt::Display for AvariceMessage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{}",
json!({
"s": self.service,
Review

Maybe expand s/t/p into words?

Maybe expand s/t/p into words?
"t": self.message_type,
"p": self.parameters,
})
.to_string()
)
}
}

111
src/link/mod.rs

@ -0,0 +1,111 @@
//! This module provides a simple interface to exchange messages (`message::AvariceMessage`) between
//! Avarice and ue-server. The model is simple - we create an mpsc-channel of `Sender` and
//! `Receiver`, then pass `Sender` to the `network` sub-module and wait for its messages about
//! ue-servers' connections using `Receiver`.
use std::collections::HashMap;
use std::error::Error;
use std::io::Write;
use std::net::{SocketAddr, TcpStream};
use std::sync::mpsc::{channel, Receiver};
pub use writer::MessageWriter;
mod message;
mod network;
mod reader;
mod writer;
pub use message::AvariceMessage;
pub use network::{run_server, NetworkMessage};
/// For collecting messages from all connected ue-servers and providing a way to reply back.
pub struct AvariceServer {
connected_links: HashMap<SocketAddr, Link>,
receiver: Receiver<NetworkMessage>,
}
/// For representing a link to one of the connected ue-servers, can be used to send messages back.
/// To receive messages use `AvariceServer`'s `next()` method instead.
pub struct Link {
Ggg_123 marked this conversation as resolved
Review

Probably needs a timeout, to avoid waiting forever:
stream
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();

Probably needs a timeout, to avoid waiting forever: stream .set_read_timeout(Some(Duration::from_secs(5))) .unwrap();
ue_server_address: SocketAddr,
writer: MessageWriter,
writing_stream: TcpStream,
}
impl AvariceServer {
/// Blocks until a new message arrives from one of the ue-servers. Returns a pair of `Link`,
/// corresponding to the ue-server that sent next message and `AvariceMessage`,
Review

sent next message => sent this message?

sent next message => sent this message?
/// representing that message.
pub fn next(&mut self) -> Option<(&mut Link, AvariceMessage)> {
Review

Confusing name, since it's not related to https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next, which is very common.

Confusing name, since it's not related to https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next, which is very common.
loop {
match self.receiver.recv() {
Ok(NetworkMessage::ConnectionEstablished(ue_server_address, writing_stream)) => {
// If `ue_server_address` was already present in `self.connected_links`
// hash map, then it means we have failed to clean it up after it
// has disconnected. We can just throw away the old value here.
self.connected_links.insert(
ue_server_address,
Link {
ue_server_address,
writing_stream,
writer: MessageWriter::new(),
},
);
continue;
Review

Maybe remove continue?

Maybe remove continue?
}
Ok(NetworkMessage::ConnectionLost(ue_server_address)) => {
self.connected_links.remove(&ue_server_address);
continue;
}
Ok(NetworkMessage::InvalidDataReceived(ue_server_address)) => {
self.connected_links.remove(&ue_server_address);
continue;
}
Ok(NetworkMessage::UEReceivedUpdate(ue_server_address, ue_received_bytes)) => {
if let Some(link) = self.connected_links.get_mut(&ue_server_address) {
Review

Missing else? Condition seems important.

Missing else? Condition seems important.
link.update_ue_received_bytes(ue_received_bytes)
}
continue;
}
Ok(NetworkMessage::MessageReceived(ue_server_address, message)) => {
// Not having a link with key `ue_server_address` should be impossible here
Review

Why?

Why?
return self
.connected_links
.get_mut(&ue_server_address)
.and_then(|x| Some((x, message)));
}
_ => return None,
}
}
}
}
impl Link {
pub fn send(&mut self, message: AvariceMessage) {
self.writer.push(&message.to_string());
self.flush();
}
pub fn socket_address(&self) -> SocketAddr {
self.ue_server_address
}
fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) {
Review

Why is it a separate function, and not inside send/flush?
When should I call it?

Why is it a separate function, and not inside send/flush? When should I call it?
self.writer.update_ue_received_bytes(ue_received_bytes);
self.flush();
}
fn flush(&mut self) {
if let Some(bytes) = self.writer.try_pop() {
self.writing_stream.write_all(&bytes).unwrap();
}
}
}
/// Creates a new `AvariceServer` that will listen for ue-server connections on the specified port.
Review

// todo - add IP support, since 0.0.0.0 is subjective (make it default, but changeable)

// todo - add IP support, since 0.0.0.0 is subjective (make it default, but changeable)
pub fn start_avarice(port: u16) -> Result<AvariceServer, Box<dyn Error>> {
let (sender, receiver) = channel();
run_server(port, sender)?;
Ok(AvariceServer {
connected_links: HashMap::new(),
receiver,
})
}

106
src/link/network.rs

@ -0,0 +1,106 @@
//! Implements a network model where messages from all the ue-servers are collected in a single
//! main thread. For that we spawn a new thread that listens for new connections, which in turn
//! spawns a new thread for every connected ue-server to handle reading data from it.
//! Since all reading is handled in ue-servers' own threads, to collect messages they have
//! received in the main thread we use `std::sync::mpsc::Sender`. Conversely, all the writing to
//! ue-server is handled in the main thread itself. Writing `TcpStream` is sent to the main thread
Review

Writing TcpStream is sent - rewrite? No idea what that means.

Writing `TcpStream` is sent - rewrite? No idea what that means.
//! by the same `std::sync::mpsc::Sender` object.
use super::message::AvariceMessage;
pub use super::reader::MessageReader;
use std::error::Error;
use std::io::Read;
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::Sender;
use std::thread;
pub struct UEConnection {
pub address: SocketAddr,
pub reader: MessageReader,
pub reading_stream: TcpStream,
pub message_sender: Sender<NetworkMessage>,
}
/// Possible messages to the main thread
pub enum NetworkMessage {
ConnectionEstablished(SocketAddr, TcpStream),
InvalidDataReceived(SocketAddr),
ConnectionLost(SocketAddr),
MessageReceived(SocketAddr, AvariceMessage),
UEReceivedUpdate(SocketAddr, u64),
}
pub fn run_server(port: u16, message_sender: Sender<NetworkMessage>) -> Result<(), Box<dyn Error>> {
let address = SocketAddr::from(([0, 0, 0, 0], port));
let listener = TcpListener::bind(address)?;
thread::spawn(move || loop {
// Listen to new (multiple) connection
let (reading_stream, address) = listener.accept().unwrap();
let writing_stream = reading_stream.try_clone().unwrap();
message_sender
.send(NetworkMessage::ConnectionEstablished(
address,
writing_stream,
))
.unwrap();
// On connection - spawn a new thread
let sender_clone = message_sender.clone();
thread::spawn(move || {
manage_connection(UEConnection {
reader: MessageReader::new(),
message_sender: sender_clone,
reading_stream,
address,
})
});
});
Ok(())
}
fn manage_connection(mut connection: UEConnection) {
let mut buffer = [0; 1024];
loop {
// Reading cycle
match connection.reading_stream.read(&mut buffer) {
Ok(n) => connection.reader.push(&buffer[..n]).unwrap(),
_ => {
connection
.message_sender
.send(NetworkMessage::ConnectionLost(connection.address))
.unwrap();
return;
}
};
if connection.reader.is_broken() {
connection
.message_sender
.send(NetworkMessage::InvalidDataReceived(connection.address))
.unwrap();
return;
}
// Decoding cycle
while let Some(text_message) = connection.reader.pop() {
if let Some(avarice_message) = AvariceMessage::from(&text_message) {
connection
.message_sender
.send(NetworkMessage::MessageReceived(
connection.address,
avarice_message,
))
.unwrap();
} else {
connection
.message_sender
.send(NetworkMessage::InvalidDataReceived(connection.address))
.unwrap();
return;
}
}
connection
.message_sender
.send(NetworkMessage::UEReceivedUpdate(
connection.address,
connection.reader.ue_received_bytes(),
))
.unwrap();
}
}

321
src/link/reader.rs

@ -0,0 +1,321 @@
//! Implements reader that receives data from UE2 game server.
use std::collections::VecDeque;
use std::str;
extern crate custom_error;
use custom_error::custom_error;
// Defines how many bytes is used to encode "AMOUNT" field in the response from ue-server about
// amount of bytes it received since the last update
const UE_RECEIVED_FIELD_SIZE: usize = 2;
// Defines how many bytes is used to encode "LENGTH" field, describing length of
// next JSON message from ue-server
const UE_LENGTH_FIELD_SIZE: usize = 4;
// Arbitrary value indicating that next byte sequence from ue-server reports amount of bytes
// received by that server so far.
const HEAD_UE_RECEIVED: u8 = 85;
// Arbitrary value indicating that next byte sequence from ue-server contains JSON message.
const HEAD_UE_MESSAGE: u8 = 42;
// Maximum allowed size of JSON message sent from ue-server.
const MAX_UE_MESSAGE_LENGTH: usize = 25 * 1024 * 1024;
custom_error! { pub ReadingStreamError
InvalidHead{input: u8} = "Invalid byte used as a HEAD: {input}",
MessageTooLong{length: usize} = "Message to receive is too long: {length}",
InvalidUnicode = "Invalid utf-8 was received",
BrokenStream = "Used stream is broken"
}
enum ReadingState {
Head,
ReceivedBytes,
Length,
Payload,
}
/// For converting byte stream that is expected from the ue-server into actual messages.
///
/// Expected format is a sequence of either:
///
/// | Data | Length |
/// |---------|---------|
/// | [`HEAD_UE_RECEIVED`] (marker byte) | 1 byte |
/// | Amount of bytes received by ue-server since last update | 2 bytes: u16 BE|
///
/// or
///
/// | Data | Length |
/// |---------|---------|
/// | [`HEAD_UE_MESSAGE`] (marker byte) | 1 byte|
/// | `LENGTH` of the JSON message in utf8 encoding | 4 bytes: u32 BE|
/// | UTF8-encoded string | `LENGTH` bytes|
///
/// On any invalid input enters into a failure state (can be checked by `is_broken()`) and never recovers from it.
/// Use either `push_byte()` or `push()` to input byte stream from ue-server and `pop()` to
/// retrieve resulting messages.
pub struct MessageReader {
is_broken: bool,
reading_state: ReadingState,
read_bytes: usize,
length_buffer: [u8; 4],
current_message_length: usize,
current_message: Vec<u8>,
read_messages: VecDeque<String>,
ue_received_bytes: u64,
}
impl MessageReader {
pub fn new() -> MessageReader {
MessageReader {
is_broken: false,
reading_state: ReadingState::Head,
read_bytes: 0,
length_buffer: [0; 4],
current_message_length: 0,
// Will be recreated with `with_capacity` in `push_byte()`
current_message: Vec::new(),
// This value should be more than enough for typical use
read_messages: VecDeque::with_capacity(100),
ue_received_bytes: 0,
}
}
pub fn push_byte(&mut self, input: u8) -> Result<(), ReadingStreamError> {
if self.is_broken {
return Err(ReadingStreamError::BrokenStream);
}
match &self.reading_state {
ReadingState::Head => {
if input == HEAD_UE_RECEIVED {
self.change_state(ReadingState::ReceivedBytes);
} else if input == HEAD_UE_MESSAGE {
self.change_state(ReadingState::Length);
} else {
self.is_broken = true;
return Err(ReadingStreamError::InvalidHead { input });
}
}
ReadingState::ReceivedBytes => {
self.length_buffer[self.read_bytes] = input;
self.read_bytes += 1;
if self.read_bytes >= UE_RECEIVED_FIELD_SIZE {
self.ue_received_bytes += u64::from(array_of_u8_to_u16(self.length_buffer));
self.change_state(ReadingState::Head);
}
}
ReadingState::Length => {
self.length_buffer[self.read_bytes] = input;
self.read_bytes += 1;
if self.read_bytes >= UE_LENGTH_FIELD_SIZE {
self.current_message_length = array_of_u8_to_u32(self.length_buffer) as usize;
if self.current_message_length > MAX_UE_MESSAGE_LENGTH {
self.is_broken = true;
return Err(ReadingStreamError::MessageTooLong {
length: self.current_message_length,
});
}
self.current_message = Vec::with_capacity(self.current_message_length);
self.change_state(ReadingState::Payload);
}
}
ReadingState::Payload => {
self.current_message.push(input);
self.read_bytes += 1;
if self.read_bytes >= self.current_message_length {
match str::from_utf8(&self.current_message) {
Ok(next_message) => self.read_messages.push_front(next_message.to_owned()),
_ => {
self.is_broken = true;
return Err(ReadingStreamError::InvalidUnicode);
}
};
self.current_message.clear();
self.current_message_length = 0;
self.change_state(ReadingState::Head);
}
}
}
Ok(())
}
pub fn push(&mut self, input: &[u8]) -> Result<(), ReadingStreamError> {
for &byte in input {
self.push_byte(byte)?;
}
Ok(())
}
pub fn pop(&mut self) -> Option<String> {
Review

Confusing - you have push(add N bytes), push_byte(add 1 byte) and pop(get 1 byte message).

Confusing - you have push(add N bytes), push_byte(add 1 byte) and pop(get 1 ~~byte~~ message).
self.read_messages.pop_back()
}
pub fn ue_received_bytes(&self) -> u64 {
self.ue_received_bytes
}
pub fn is_broken(&self) -> bool {
Review

Why a method?

Why a method?
self.is_broken
}
fn change_state(&mut self, next_state: ReadingState) {
self.read_bytes = 0;
self.reading_state = next_state;
}
}
fn array_of_u8_to_u16(bytes: [u8; 4]) -> u16 {
(u16::from(bytes[0]) << 8) + u16::from(bytes[1])
}
fn array_of_u8_to_u32(bytes: [u8; 4]) -> u32 {
(u32::from(bytes[0]) << 24)
+ (u32::from(bytes[1]) << 16)
+ (u32::from(bytes[2]) << 8)
+ (u32::from(bytes[3]))
}
#[test]
fn message_push_byte() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_UE_MESSAGE).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(13).unwrap();
reader.push_byte(b'H').unwrap();
reader.push_byte(b'e').unwrap();
reader.push_byte(b'l').unwrap();
reader.push_byte(b'l').unwrap();
reader.push_byte(b'o').unwrap();
reader.push_byte(b',').unwrap();
reader.push_byte(b' ').unwrap();
reader.push_byte(b'w').unwrap();
reader.push_byte(b'o').unwrap();
reader.push_byte(b'r').unwrap();
reader.push_byte(b'l').unwrap();
reader.push_byte(b'd').unwrap();
reader.push_byte(b'!').unwrap();
reader.push_byte(HEAD_UE_MESSAGE).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(3).unwrap();
reader.push_byte(b'Y').unwrap();
reader.push_byte(b'o').unwrap();
reader.push_byte(b'!').unwrap();
assert_eq!(reader.pop().unwrap(), "Hello, world!");
assert_eq!(reader.pop().unwrap(), "Yo!");
assert_eq!(reader.pop(), None);
}
#[test]
fn received_push_byte() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0xf3).unwrap();
assert_eq!(reader.ue_received_bytes(), 0xf3);
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
reader.push_byte(0xb2).unwrap();
reader.push_byte(0x04).unwrap();
assert_eq!(reader.ue_received_bytes(), 0xb2_f7); // 0xf7 = 0x04 + 0xf3
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
reader.push_byte(231).unwrap();
assert_eq!(reader.ue_received_bytes(), 0xb2_f7);
}
#[test]
fn mixed_push_byte() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0xf3).unwrap();
reader.push_byte(HEAD_UE_MESSAGE).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(3).unwrap();
reader.push_byte(b'Y').unwrap();
reader.push_byte(b'o').unwrap();
reader.push_byte(b'!').unwrap();
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
reader.push_byte(0xb2).unwrap();
reader.push_byte(0x04).unwrap();
assert_eq!(reader.ue_received_bytes(), 0xb2_f7); // 0xf7 = 0x04 + 0xf3
assert_eq!(reader.pop().unwrap(), "Yo!");
assert_eq!(reader.pop(), None);
}
#[test]
fn pushing_many_bytes_at_once() {
let mut reader = MessageReader::new();
reader
.push(&[
HEAD_UE_RECEIVED,
0,
0xf3,
HEAD_UE_MESSAGE,
0,
0,
0,
3,
b'Y',
b'o',
b'!',
HEAD_UE_RECEIVED,
0xb2,
0x04,
])
.unwrap();
assert_eq!(reader.ue_received_bytes(), 0xb2_f7);
assert_eq!(reader.pop().unwrap(), "Yo!");
assert_eq!(reader.pop(), None);
}
#[test]
fn generates_error_invalid_head() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0xf3).unwrap();
assert!(!reader.is_broken());
reader
.push_byte(25)
.expect_err("Testing failing on incorrect HEAD");
assert!(reader.is_broken());
}
#[test]
fn generates_error_message_too_long() {
let mut reader = MessageReader::new();
let huge_length = MAX_UE_MESSAGE_LENGTH + 1;
let bytes = (huge_length as u32).to_be_bytes();
reader.push_byte(HEAD_UE_MESSAGE).unwrap();
reader.push_byte(bytes[0]).unwrap();
reader.push_byte(bytes[1]).unwrap();
reader.push_byte(bytes[2]).unwrap();
assert!(!reader.is_broken());
reader
.push_byte(bytes[3])
.expect_err("Testing failing on exceeding allowed message length");
assert!(reader.is_broken());
}
#[test]
fn generates_error_invalid_unicode() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_UE_MESSAGE).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(2).unwrap();
reader.push_byte(0b11010011).unwrap(); // start of 2-byte sequence
assert!(!reader.is_broken());
// Bytes inside multi-byte code point have to have `1` for their high bit
reader
.push_byte(0b01010011)
.expect_err("Testing failing on incorrect unicode");
assert!(reader.is_broken());
}

169
src/link/writer.rs

@ -0,0 +1,169 @@
//! Implements writer that sends data to UE2 game server.
use std::cmp::{max, min};
use std::collections::VecDeque;
use std::convert::TryFrom;
use std::iter::Extend;
// Maximum amount of bytes ue-server is able to receive at once
const UE_INPUT_BUFFER: usize = 4095;
/// For converting text messages into chunks of bytes that can be sent to the ue-server.
///
/// Every string message is converted into a length-prefixed array of utf8 bytes:
///
/// | Data | Length |
/// |---------|---------|
/// | Message `LENGTH` | 4 bytes: u32 BE |
/// | UTF8-encoded string | `LENGTH` bytes|
///
/// Resulting byte sequences from all messages are then concatenated (in the same order as they were
/// "written") into a single data stream. Bytes from the data stream are returned in chunks of
/// size no more than `UE_INPUT_BUFFER`. New chunk is returned only when `MessageWriter` knows
/// that ue-server's buffer has enough space to accept it.
///
/// Use `push()` to input string messages and `try_pop()` to retrieve next chunk.
/// NOTE: `try_pop()` can return `None` even if not all message data has been returned,
/// in case `MessageWriter` thinks that ue-server's buffer does not have enough space.
///
/// Call `update_ue_received_bytes()` to update `MessageWriter`'s information about
/// how many bytes ue-server has received so far. This can signal that its buffer has enough
/// free space. `MessageWriter` assumes that all data returned by its `try_pop()` method is sent
/// to ue-server.
///
/// Use `is_empty()` method to check for whether `MessageWriter` still has some data to return
/// (possibly after ``update_ue_received_bytes()`).
pub struct MessageWriter {
sent_bytes: u64,
ue_received_bytes: u64,
pending_data: VecDeque<u8>,
}
impl MessageWriter {
pub fn new() -> MessageWriter {
MessageWriter {
sent_bytes: 0,
ue_received_bytes: 0,
// This value should be more than enough for typical use:
// it will take at least one second to send this much data to a 30 tick rate ue-server.
pending_data: VecDeque::with_capacity(30 * UE_INPUT_BUFFER),
}
}
pub fn push(&mut self, message: &str) {
let message_as_utf8 = message.as_bytes();
let message_as_utf8 = [
&(message_as_utf8.len() as u32).to_be_bytes(),
message_as_utf8,
]
.concat();
self.pending_data.extend(message_as_utf8.into_iter());
}
pub fn try_pop(&mut self) -> Option<Vec<u8>> {
Review

Confusing name.
Reader has pop(&mut self) -> Option<String>
Writer has try_pop(&mut self) -> Option<Vec<u8>>

Confusing name. Reader has `pop(&mut self) -> Option<String>` Writer has `try_pop(&mut self) -> Option<Vec<u8>>`
if self.is_empty() {
return None;
}
let chunk_size = min(self.available_ue_buffer_capacity(), self.pending_data.len());
if chunk_size == 0 {
return None;
}
let mut bytes_to_send = Vec::with_capacity(chunk_size);
for next_byte in self.pending_data.drain(..chunk_size) {
bytes_to_send.push(next_byte);
}
self.sent_bytes += bytes_to_send.len() as u64;
Some(bytes_to_send)
}
pub fn is_empty(&self) -> bool {
self.pending_data.is_empty()
}
/// Takes total amount of bytes received so far by the ue-server, not just bytes received after
/// the last `update_ue_received_bytes()` call.
pub fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) {
self.ue_received_bytes = max(ue_received_bytes, self.ue_received_bytes);
}
fn available_ue_buffer_capacity(&self) -> usize {
match usize::try_from(self.sent_bytes - self.ue_received_bytes).ok() {
Some(ue_buffered_bytes) => max(0, UE_INPUT_BUFFER - ue_buffered_bytes),
Review

Seems complicated - why?

Seems complicated - why?
Review

self.sent_bytes and self.ue_received_bytes are u64 and usize can be u32. In case we have fucked up - difference between them can get huge, this accounts for that.

I don't actually think this is something that can happen, but this seemed like a most straitforward way to do it. Whole function is, like, 3-4 loc, so I didn't saw it as comlplicated.

`self.sent_bytes` and `self.ue_received_bytes` are u64 and `usize` can be `u32`. In case we have fucked up - difference between them can get huge, this accounts for that. I don't actually think this is something that can happen, but this seemed like a most straitforward way to do it. Whole function is, like, 3-4 loc, so I didn't saw it as comlplicated.
_ => 0,
}
}
}
#[test]
fn writer_contents_after_creation() {
let writer = MessageWriter::new();
assert_eq!(writer.is_empty(), true);
assert_eq!(writer.available_ue_buffer_capacity(), UE_INPUT_BUFFER);
}
#[test]
fn writer_content_after_push() {
let mut writer = MessageWriter::new();
writer.push("Hello, world!");
assert_eq!(writer.is_empty(), false);
assert_eq!(writer.available_ue_buffer_capacity(), UE_INPUT_BUFFER);
}
#[test]
fn writing_single_short_message() {
let mut writer = MessageWriter::new();
writer.push("Hello, world!");
let resulting_bytes = writer.try_pop().unwrap();
let expected_bytes = [
0, 0, 0, 13, // Bytes in the message
b'H', b'e', b'l', b'l', b'o', b',', b' ', b'w', b'o', b'r', b'l', b'd', b'!',
];
assert_eq!(writer.is_empty(), true);
assert_eq!(
writer.available_ue_buffer_capacity(),
UE_INPUT_BUFFER - "Hello, world!".len() - 4
);
assert_eq!(resulting_bytes, expected_bytes);
assert_eq!(writer.sent_bytes, expected_bytes.len() as u64);
}
#[test]
fn writing_first_chunk_of_single_long_message() {
let mut writer = MessageWriter::new();
// Because we also have to pass message length, this will go over the sending limit
let long_message = "Q".repeat(UE_INPUT_BUFFER);
writer.push(&long_message);
let resulting_bytes = writer.try_pop().unwrap();
assert_eq!(writer.is_empty(), false);
assert_eq!(resulting_bytes.len(), UE_INPUT_BUFFER);
assert_eq!(writer.available_ue_buffer_capacity(), 0);
// Bytes in message = 4095 = 0x0fff
assert_eq!(resulting_bytes[0], 0);
assert_eq!(resulting_bytes[1], 0);
assert_eq!(resulting_bytes[2], 0x0f);
assert_eq!(resulting_bytes[3], 0xff);
for &byte in resulting_bytes[4..].iter() {
assert_eq!(byte, b'Q');
}
assert_eq!(writer.try_pop(), None);
assert_eq!(writer.is_empty(), false);
}
#[test]
fn writing_second_chunk_of_single_long_message() {
let mut writer = MessageWriter::new();
// Because we also have to pass lengths, this will go over the sending limit
let long_message = "Q".repeat(UE_INPUT_BUFFER);
writer.push(&long_message);
// This pops all but 4 bytes of `long_message`, that were required to encode message length
let first_bytes = writer.try_pop().unwrap();
writer.update_ue_received_bytes(first_bytes.len() as u64);
let resulting_bytes = writer.try_pop().unwrap();
assert_eq!(
writer.available_ue_buffer_capacity(),
UE_INPUT_BUFFER - resulting_bytes.len()
);
assert_eq!(writer.is_empty(), true);
// Bytes left for the next chunk = 4
assert_eq!(resulting_bytes, [b'Q', b'Q', b'Q', b'Q'])
}

19
src/main.rs

@ -1,13 +1,14 @@
use std::env; // ! This utility provides a way to communicate with Unreal Engine 2 servers running Acedia mutator.
use std::path::Path; mod link;
mod unreal_config; use link::start_avarice;
fn main() { fn main() {
let args: Vec<String> = env::args().collect(); let mut server = start_avarice(1234).unwrap();
let filename = &args[1]; while let Some((link, message)) = server.next() {
let config = unreal_config::load_file(Path::new(filename)); println!("{}: {}", link.socket_address(), message.to_string());
match config { if message.message_type != "end" {
Ok(config) => print!("{}", config), link.send(message);
_ => (),
} }
} }
println!("Avarice has shut down!");
}

Loading…
Cancel
Save