Add network link to ue-server implementation #13

Open
dkanus wants to merge 23 commits from feature_link into master
8 changed files with 799 additions and 9 deletions

41
Cargo.lock generated
View File

@ -3,4 +3,45 @@
[[package]]
name = "avarice"
version = "0.1.0"
dependencies = [
"custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "custom_error"
version = "1.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "itoa"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "ryu"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "serde"
version = "1.0.126"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "serde_json"
version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"itoa 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
"ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)",
]
[metadata]
"checksum custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4f8a51dd197fa6ba5b4dc98a990a43cc13693c23eb0089ebb0fcc1f04152bca6"
"checksum itoa 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
"checksum ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
"checksum serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)" = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03"
"checksum serde_json 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)" = "336b10da19a12ad094b59d870ebde26a45402e5b470add4b5fd03c5048a32127"

View File

@ -7,3 +7,6 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
custom_error="1.*"
serde ="1.*"
serde_json="1.*"
Review

No newline at end of file

No newline at end of file

38
src/link/message.rs Normal file
View File

@ -0,0 +1,38 @@
//! Implements Avarice message: target service, message type, json parameters
use serde_json;
Review

warning: this import is redundant
--> src\link\message.rs:2:1
|
2 | use serde_json;
| ^^^^^^^^^^^^^^^ help: remove it entirely

warning: this import is redundant --> src\link\message.rs:2:1 | 2 | use serde_json; | ^^^^^^^^^^^^^^^ help: remove it entirely
use serde_json::json;
use std::fmt;
pub struct AvariceMessage {
Review

Any explanation/doc on wtf is service/message_type/parameters?

Any explanation/doc on wtf is service/message_type/parameters?
pub service: String,
pub message_type: String,
pub parameters: serde_json::Value,
}
impl AvariceMessage {
/// Parses JSON form of a message into `AvariceMessage` struct
pub fn from(message_str: &str) -> Option<AvariceMessage> {
Review

Confusing name, since it's not related to https://doc.rust-lang.org/std/convert/trait.From.html#tymethod.from, which is very common.

Confusing name, since it's not related to https://doc.rust-lang.org/std/convert/trait.From.html#tymethod.from, which is very common.
Review

Suggestion:

--- a/src/link/message.rs
+++ b/src/link/message.rs
@@ -9,19 +9,22 @@ pub struct AvariceMessage {
     pub parameters: serde_json::Value,
 }

-impl AvariceMessage {
-    /// Parses JSON form of a message into `AvariceMessage` struct
-    pub fn from(message_str: &str) -> Option<AvariceMessage> {
-        let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap();
-        let message_json = message_json.as_object_mut()?;
-        Some(AvariceMessage {
-            service: message_json.remove("s")?.as_str()?.to_owned(),
-            message_type: message_json.remove("t")?.as_str()?.to_owned(),
-            parameters: message_json.remove("p")?,
+use std::convert::TryFrom;
+use std::error::Error;
+impl TryFrom<&str> for AvariceMessage {
+    type Error = Box<dyn Error>;
+    fn try_from(value: &str) -> Result<Self, Self::Error> {
+        let mut message_json: serde_json::Value = serde_json::from_str(value)?;
+        let message_json = message_json.as_object_mut().ok_or("Value was not an Object.")?;
+        Ok(AvariceMessage {
+            service: message_json.remove("s").ok_or("Missing field 's'.")?.as_str().ok_or("s was not a String.")?.to_ow
ned(),
+            message_type: message_json.remove("t").ok_or("Missing field 't'.")?.as_str().ok_or("t was not a String.")?.
to_owned(),
+            parameters: message_json.remove("p").ok_or("Missing field 'p'.")?,
         })

--- a/src/link/network.rs
+++ b/src/link/network.rs
@@ -7,6 +7,7 @@
 //! by the same `std::sync::mpsc::Sender` object.
 use super::message::AvariceMessage;
 pub use super::reader::MessageReader;
+use std::convert::TryFrom;
 use std::error::Error;
 use std::io::Read;
 use std::net::{SocketAddr, TcpListener, TcpStream};
@@ -79,7 +80,7 @@ fn manage_connection(mut connection: UEConnection) {
         }
         //  Decoding cycle
         while let Some(text_message) = connection.reader.pop() {
-            if let Some(avarice_message) = AvariceMessage::from(&text_message) {
+            if let Ok(avarice_message) = AvariceMessage::try_from(text_message.as_ref()) {

Free bonus: more detailed errors.

Suggestion: ``` --- a/src/link/message.rs +++ b/src/link/message.rs @@ -9,19 +9,22 @@ pub struct AvariceMessage { pub parameters: serde_json::Value, } -impl AvariceMessage { - /// Parses JSON form of a message into `AvariceMessage` struct - pub fn from(message_str: &str) -> Option<AvariceMessage> { - let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap(); - let message_json = message_json.as_object_mut()?; - Some(AvariceMessage { - service: message_json.remove("s")?.as_str()?.to_owned(), - message_type: message_json.remove("t")?.as_str()?.to_owned(), - parameters: message_json.remove("p")?, +use std::convert::TryFrom; +use std::error::Error; +impl TryFrom<&str> for AvariceMessage { + type Error = Box<dyn Error>; + fn try_from(value: &str) -> Result<Self, Self::Error> { + let mut message_json: serde_json::Value = serde_json::from_str(value)?; + let message_json = message_json.as_object_mut().ok_or("Value was not an Object.")?; + Ok(AvariceMessage { + service: message_json.remove("s").ok_or("Missing field 's'.")?.as_str().ok_or("s was not a String.")?.to_ow ned(), + message_type: message_json.remove("t").ok_or("Missing field 't'.")?.as_str().ok_or("t was not a String.")?. to_owned(), + parameters: message_json.remove("p").ok_or("Missing field 'p'.")?, }) --- a/src/link/network.rs +++ b/src/link/network.rs @@ -7,6 +7,7 @@ //! by the same `std::sync::mpsc::Sender` object. use super::message::AvariceMessage; pub use super::reader::MessageReader; +use std::convert::TryFrom; use std::error::Error; use std::io::Read; use std::net::{SocketAddr, TcpListener, TcpStream}; @@ -79,7 +80,7 @@ fn manage_connection(mut connection: UEConnection) { } // Decoding cycle while let Some(text_message) = connection.reader.pop() { - if let Some(avarice_message) = AvariceMessage::from(&text_message) { + if let Ok(avarice_message) = AvariceMessage::try_from(text_message.as_ref()) { ``` Free bonus: more detailed errors.
let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap();
let message_json = message_json.as_object_mut()?;
Some(AvariceMessage {
service: message_json.remove("s")?.as_str()?.to_owned(),
Review

Why "remove"?

Why "remove"?
message_type: message_json.remove("t")?.as_str()?.to_owned(),
parameters: message_json.remove("p")?,
})
}
}
impl fmt::Display for AvariceMessage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{}",
json!({
"s": self.service,
Review

Maybe expand s/t/p into words?

Maybe expand s/t/p into words?
"t": self.message_type,
"p": self.parameters,
})
.to_string()
)
}
}

111
src/link/mod.rs Normal file
View File

@ -0,0 +1,111 @@
//! This module provides a simple interface to exchange messages (`message::AvariceMessage`) between
//! Avarice and ue-server. The model is simple - we create an mpsc-channel of `Sender` and
//! `Receiver`, then pass `Sender` to the `network` sub-module and wait for its messages about
//! ue-servers' connections using `Receiver`.
use std::collections::HashMap;
use std::error::Error;
use std::io::Write;
use std::net::{SocketAddr, TcpStream};
dkanus marked this conversation as resolved Outdated

Wtf

Wtf

replace with array[u8;4] and
pub fn array_of_u8_to_u32(bytes: [u8; 4]) -> u32 {
(u32::from(bytes[0]) << 24)
+ (u32::from(bytes[1]) << 16)
+ (u32::from(bytes[2]) << 8)
+ (u32::from(bytes[3]))
}

replace with array[u8;4] and pub fn array_of_u8_to_u32(bytes: [u8; 4]) -> u32 { (u32::from(bytes[0]) << 24) + (u32::from(bytes[1]) << 16) + (u32::from(bytes[2]) << 8) + (u32::from(bytes[3])) }
use std::sync::mpsc::{channel, Receiver};
pub use writer::MessageWriter;
dkanus marked this conversation as resolved Outdated

Wtf

Wtf
dkanus marked this conversation as resolved Outdated

Wtf

Wtf
mod message;
mod network;
mod reader;
dkanus marked this conversation as resolved Outdated

Remove " Value itself is arbitrary", change to "Arbitrary value indicating that ..."?

Remove " Value itself is arbitrary", change to "Arbitrary value indicating that ..."?
mod writer;
dkanus marked this conversation as resolved Outdated

Weird empty line

Weird empty line
pub use message::AvariceMessage;
pub use network::{run_server, NetworkMessage};
/// For collecting messages from all connected ue-servers and providing a way to reply back.
pub struct AvariceServer {
connected_links: HashMap<SocketAddr, Link>,
receiver: Receiver<NetworkMessage>,
}
dkanus marked this conversation as resolved Outdated

with_capacity != limit.
Should it really be a constant?
I'd just inline it.

with_capacity != limit. Should it really be a constant? I'd just inline it.

It is expected limit that we use as a capacity.

It is expected limit that we use as a capacity.

But upon further consideration I agree that there is no sense in defining this as a constant.

But upon further consideration I agree that there is no sense in defining this as a constant.
/// For representing a link to one of the connected ue-servers, can be used to send messages back.
/// To receive messages use `AvariceServer`'s `next()` method instead.

Can be simplified => // Listen to new connections

Can be simplified => // Listen to new connections
pub struct Link {
Ggg_123 marked this conversation as resolved
Review

Probably needs a timeout, to avoid waiting forever:
stream
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();

Probably needs a timeout, to avoid waiting forever: stream .set_read_timeout(Some(Duration::from_secs(5))) .unwrap();
ue_server_address: SocketAddr,
writer: MessageWriter,
writing_stream: TcpStream,
}
impl AvariceServer {
/// Blocks until a new message arrives from one of the ue-servers. Returns a pair of `Link`,
dkanus marked this conversation as resolved Outdated

Unclear meaning.

Unclear meaning.

rename to ue_received_bytes

rename to ue_received_bytes
/// corresponding to the ue-server that sent next message and `AvariceMessage`,
Review

sent next message => sent this message?

sent next message => sent this message?
/// representing that message.

1024? Not 4096?

1024? Not 4096?

4096 is limitation on how much we can write, it's unrelated to reading and this is an arbitrary constant. Honestly I don't know what to use, but one option is to add BufReader and read byte-by-byte.

4096 is limitation on how much we can write, it's unrelated to reading and this is an arbitrary constant. Honestly I don't know what to use, but one option is to add `BufReader` and read byte-by-byte.

Did some tests with ue-server. As of now, buffer size does not make a difference in speed, since bottleneck is ue-server's side by far. And it's unlikely that situation will change even with multiple ue-servers connecting to Avarice.

Did some tests with ue-server. As of now, buffer size does not make a difference in speed, since bottleneck is ue-server's side by far. And it's unlikely that situation will change even with multiple ue-servers connecting to Avarice.
pub fn next(&mut self) -> Option<(&mut Link, AvariceMessage)> {
Review

Confusing name, since it's not related to https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next, which is very common.

Confusing name, since it's not related to https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next, which is very common.
loop {
dkanus marked this conversation as resolved Outdated

"For converting byte stream expected to be generated"?

"For converting byte stream expected to be generated"?
match self.receiver.recv() {
Ok(NetworkMessage::ConnectionEstablished(ue_server_address, writing_stream)) => {
dkanus marked this conversation as resolved Outdated

4 bytes
u32? i32? BEi32? LEi32?

> 4 bytes u32? i32? BEi32? LEi32?
// If `ue_server_address` was already present in `self.connected_links`
// hash map, then it means we have failed to clean it up after it
// has disconnected. We can just throw away the old value here.
self.connected_links.insert(
dkanus marked this conversation as resolved Outdated

buffer => length_buffer, since it's used only for storing length bytes

buffer => length_buffer, since it's used only for storing length bytes

Agreed

Agreed
ue_server_address,
Link {
ue_server_address,
writing_stream,
writer: MessageWriter::new(),
},
);
continue;
Review

Maybe remove continue?

Maybe remove continue?
}
dkanus marked this conversation as resolved Outdated

with_capactiy?

with_capactiy?
  • // will be recreated with with_capacity in push()
+ // will be recreated with with_capacity in push()
Ok(NetworkMessage::ConnectionLost(ue_server_address)) => {
dkanus marked this conversation as resolved Outdated

with_capactiy?

with_capactiy?
self.connected_links.remove(&ue_server_address);
continue;
}
Ok(NetworkMessage::InvalidDataReceived(ue_server_address)) => {
self.connected_links.remove(&ue_server_address);
continue;
}
Ok(NetworkMessage::UEReceivedUpdate(ue_server_address, ue_received_bytes)) => {
if let Some(link) = self.connected_links.get_mut(&ue_server_address) {
Review

Missing else? Condition seems important.

Missing else? Condition seems important.
link.update_ue_received_bytes(ue_received_bytes)
}
continue;
}
Ok(NetworkMessage::MessageReceived(ue_server_address, message)) => {
// Not having a link with key `ue_server_address` should be impossible here
Review

Why?

Why?
return self
.connected_links
.get_mut(&ue_server_address)
.and_then(|x| Some((x, message)));
}
_ => return None,
}
Ggg_123 marked this conversation as resolved Outdated

<< 8
wtf

> << 8 wtf
}
}
}
Ggg_123 marked this conversation as resolved Outdated

as u64;
wtf

> as u64; wtf
impl Link {
pub fn send(&mut self, message: AvariceMessage) {
self.writer.push(&message.to_string());
self.flush();
}
pub fn socket_address(&self) -> SocketAddr {
self.ue_server_address
}
fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) {
Review

Why is it a separate function, and not inside send/flush?
When should I call it?

Why is it a separate function, and not inside send/flush? When should I call it?
self.writer.update_ue_received_bytes(ue_received_bytes);
self.flush();
}
fn flush(&mut self) {
if let Some(bytes) = self.writer.try_pop() {
self.writing_stream.write_all(&bytes).unwrap();
}
}
}
/// Creates a new `AvariceServer` that will listen for ue-server connections on the specified port.
Review

// todo - add IP support, since 0.0.0.0 is subjective (make it default, but changeable)

// todo - add IP support, since 0.0.0.0 is subjective (make it default, but changeable)
pub fn start_avarice(port: u16) -> Result<AvariceServer, Box<dyn Error>> {
dkanus marked this conversation as resolved Outdated

"1 as usize" => "1usize"
Or just remove.

"1 as usize" => "1usize" Or just remove.
let (sender, receiver) = channel();
dkanus marked this conversation as resolved Outdated

Move this line below error checking, since it contains early exit

Move this line below error checking, since it contains early exit

Agreed.

Agreed.
run_server(port, sender)?;
Ok(AvariceServer {
connected_links: HashMap::new(),
receiver,
})
}

106
src/link/network.rs Normal file
View File

@ -0,0 +1,106 @@
//! Implements a network model where messages from all the ue-servers are collected in a single
//! main thread. For that we spawn a new thread that listens for new connections, which in turn
//! spawns a new thread for every connected ue-server to handle reading data from it.
//! Since all reading is handled in ue-servers' own threads, to collect messages they have
//! received in the main thread we use `std::sync::mpsc::Sender`. Conversely, all the writing to
//! ue-server is handled in the main thread itself. Writing `TcpStream` is sent to the main thread
Review

Writing TcpStream is sent - rewrite? No idea what that means.

Writing `TcpStream` is sent - rewrite? No idea what that means.
//! by the same `std::sync::mpsc::Sender` object.
use super::message::AvariceMessage;
pub use super::reader::MessageReader;
use std::error::Error;
use std::io::Read;
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::Sender;
use std::thread;
pub struct UEConnection {
pub address: SocketAddr,
pub reader: MessageReader,
pub reading_stream: TcpStream,
pub message_sender: Sender<NetworkMessage>,
}
/// Possible messages to the main thread
pub enum NetworkMessage {
ConnectionEstablished(SocketAddr, TcpStream),
InvalidDataReceived(SocketAddr),
ConnectionLost(SocketAddr),
MessageReceived(SocketAddr, AvariceMessage),
UEReceivedUpdate(SocketAddr, u64),
}
pub fn run_server(port: u16, message_sender: Sender<NetworkMessage>) -> Result<(), Box<dyn Error>> {
let address = SocketAddr::from(([0, 0, 0, 0], port));
let listener = TcpListener::bind(address)?;
thread::spawn(move || loop {
// Listen to new (multiple) connection
let (reading_stream, address) = listener.accept().unwrap();
let writing_stream = reading_stream.try_clone().unwrap();
message_sender
.send(NetworkMessage::ConnectionEstablished(
address,
writing_stream,
))
.unwrap();
// On connection - spawn a new thread
let sender_clone = message_sender.clone();
thread::spawn(move || {
manage_connection(UEConnection {
reader: MessageReader::new(),
message_sender: sender_clone,
reading_stream,
address,
})
});
});
Ok(())
}
fn manage_connection(mut connection: UEConnection) {
let mut buffer = [0; 1024];
loop {
// Reading cycle
match connection.reading_stream.read(&mut buffer) {
Ok(n) => connection.reader.push(&buffer[..n]).unwrap(),
_ => {
connection
.message_sender
.send(NetworkMessage::ConnectionLost(connection.address))
.unwrap();
return;
}
};
if connection.reader.is_broken() {
connection
.message_sender
.send(NetworkMessage::InvalidDataReceived(connection.address))
.unwrap();
return;
}
// Decoding cycle
while let Some(text_message) = connection.reader.pop() {
if let Some(avarice_message) = AvariceMessage::from(&text_message) {
connection
.message_sender
.send(NetworkMessage::MessageReceived(
connection.address,
avarice_message,
))
.unwrap();
} else {
connection
.message_sender
.send(NetworkMessage::InvalidDataReceived(connection.address))
.unwrap();
return;
}
}
connection
.message_sender
.send(NetworkMessage::UEReceivedUpdate(
connection.address,
connection.reader.ue_received_bytes(),
))
.unwrap();
}
}

321
src/link/reader.rs Normal file
View File

@ -0,0 +1,321 @@
//! Implements reader that receives data from UE2 game server.
use std::collections::VecDeque;
use std::str;
extern crate custom_error;
use custom_error::custom_error;
// Defines how many bytes is used to encode "AMOUNT" field in the response from ue-server about
// amount of bytes it received since the last update
const UE_RECEIVED_FIELD_SIZE: usize = 2;
// Defines how many bytes is used to encode "LENGTH" field, describing length of
// next JSON message from ue-server
const UE_LENGTH_FIELD_SIZE: usize = 4;
// Arbitrary value indicating that next byte sequence from ue-server reports amount of bytes
// received by that server so far.
const HEAD_UE_RECEIVED: u8 = 85;
// Arbitrary value indicating that next byte sequence from ue-server contains JSON message.
const HEAD_UE_MESSAGE: u8 = 42;
// Maximum allowed size of JSON message sent from ue-server.
const MAX_UE_MESSAGE_LENGTH: usize = 25 * 1024 * 1024;
custom_error! { pub ReadingStreamError
InvalidHead{input: u8} = "Invalid byte used as a HEAD: {input}",
MessageTooLong{length: usize} = "Message to receive is too long: {length}",
InvalidUnicode = "Invalid utf-8 was received",
BrokenStream = "Used stream is broken"
}
enum ReadingState {
Head,
ReceivedBytes,
Length,
Payload,
}
/// For converting byte stream that is expected from the ue-server into actual messages.
///
/// Expected format is a sequence of either:
///
/// | Data | Length |
/// |---------|---------|
/// | [`HEAD_UE_RECEIVED`] (marker byte) | 1 byte |
/// | Amount of bytes received by ue-server since last update | 2 bytes: u16 BE|
///
/// or
///
/// | Data | Length |
/// |---------|---------|
/// | [`HEAD_UE_MESSAGE`] (marker byte) | 1 byte|
/// | `LENGTH` of the JSON message in utf8 encoding | 4 bytes: u32 BE|
/// | UTF8-encoded string | `LENGTH` bytes|
///
/// On any invalid input enters into a failure state (can be checked by `is_broken()`) and never recovers from it.
/// Use either `push_byte()` or `push()` to input byte stream from ue-server and `pop()` to
/// retrieve resulting messages.
pub struct MessageReader {
is_broken: bool,
reading_state: ReadingState,
read_bytes: usize,
length_buffer: [u8; 4],
current_message_length: usize,
current_message: Vec<u8>,
read_messages: VecDeque<String>,
ue_received_bytes: u64,
}
impl MessageReader {
pub fn new() -> MessageReader {
MessageReader {
is_broken: false,
reading_state: ReadingState::Head,
read_bytes: 0,
length_buffer: [0; 4],
current_message_length: 0,
// Will be recreated with `with_capacity` in `push_byte()`
current_message: Vec::new(),
// This value should be more than enough for typical use
read_messages: VecDeque::with_capacity(100),
ue_received_bytes: 0,
}
}
pub fn push_byte(&mut self, input: u8) -> Result<(), ReadingStreamError> {
if self.is_broken {
return Err(ReadingStreamError::BrokenStream);
}
match &self.reading_state {
ReadingState::Head => {
if input == HEAD_UE_RECEIVED {
self.change_state(ReadingState::ReceivedBytes);
} else if input == HEAD_UE_MESSAGE {
self.change_state(ReadingState::Length);
} else {
self.is_broken = true;
return Err(ReadingStreamError::InvalidHead { input });
}
}
ReadingState::ReceivedBytes => {
self.length_buffer[self.read_bytes] = input;
self.read_bytes += 1;
if self.read_bytes >= UE_RECEIVED_FIELD_SIZE {
self.ue_received_bytes += u64::from(array_of_u8_to_u16(self.length_buffer));
self.change_state(ReadingState::Head);
}
}
ReadingState::Length => {
self.length_buffer[self.read_bytes] = input;
self.read_bytes += 1;
if self.read_bytes >= UE_LENGTH_FIELD_SIZE {
self.current_message_length = array_of_u8_to_u32(self.length_buffer) as usize;
if self.current_message_length > MAX_UE_MESSAGE_LENGTH {
self.is_broken = true;
return Err(ReadingStreamError::MessageTooLong {
length: self.current_message_length,
});
}
self.current_message = Vec::with_capacity(self.current_message_length);
self.change_state(ReadingState::Payload);
}
}
ReadingState::Payload => {
self.current_message.push(input);
self.read_bytes += 1;
if self.read_bytes >= self.current_message_length {
match str::from_utf8(&self.current_message) {
Ok(next_message) => self.read_messages.push_front(next_message.to_owned()),
_ => {
self.is_broken = true;
return Err(ReadingStreamError::InvalidUnicode);
}
};
self.current_message.clear();
self.current_message_length = 0;
self.change_state(ReadingState::Head);
}
}
}
Ok(())
}
pub fn push(&mut self, input: &[u8]) -> Result<(), ReadingStreamError> {
for &byte in input {
self.push_byte(byte)?;
}
Ok(())
}
pub fn pop(&mut self) -> Option<String> {
Review

Confusing - you have push(add N bytes), push_byte(add 1 byte) and pop(get 1 byte message).

Confusing - you have push(add N bytes), push_byte(add 1 byte) and pop(get 1 ~~byte~~ message).
self.read_messages.pop_back()
}
pub fn ue_received_bytes(&self) -> u64 {
self.ue_received_bytes
}
pub fn is_broken(&self) -> bool {
Review

Why a method?

Why a method?
self.is_broken
}
fn change_state(&mut self, next_state: ReadingState) {
self.read_bytes = 0;
self.reading_state = next_state;
}
}
fn array_of_u8_to_u16(bytes: [u8; 4]) -> u16 {
(u16::from(bytes[0]) << 8) + u16::from(bytes[1])
}
fn array_of_u8_to_u32(bytes: [u8; 4]) -> u32 {
(u32::from(bytes[0]) << 24)
+ (u32::from(bytes[1]) << 16)
+ (u32::from(bytes[2]) << 8)
+ (u32::from(bytes[3]))
}
#[test]
fn message_push_byte() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_UE_MESSAGE).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(13).unwrap();
reader.push_byte(b'H').unwrap();
reader.push_byte(b'e').unwrap();
reader.push_byte(b'l').unwrap();
reader.push_byte(b'l').unwrap();
reader.push_byte(b'o').unwrap();
reader.push_byte(b',').unwrap();
reader.push_byte(b' ').unwrap();
reader.push_byte(b'w').unwrap();
reader.push_byte(b'o').unwrap();
reader.push_byte(b'r').unwrap();
reader.push_byte(b'l').unwrap();
reader.push_byte(b'd').unwrap();
reader.push_byte(b'!').unwrap();
reader.push_byte(HEAD_UE_MESSAGE).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(3).unwrap();
reader.push_byte(b'Y').unwrap();
reader.push_byte(b'o').unwrap();
reader.push_byte(b'!').unwrap();
assert_eq!(reader.pop().unwrap(), "Hello, world!");
assert_eq!(reader.pop().unwrap(), "Yo!");
assert_eq!(reader.pop(), None);
}
#[test]
fn received_push_byte() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0xf3).unwrap();
assert_eq!(reader.ue_received_bytes(), 0xf3);
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
reader.push_byte(0xb2).unwrap();
reader.push_byte(0x04).unwrap();
assert_eq!(reader.ue_received_bytes(), 0xb2_f7); // 0xf7 = 0x04 + 0xf3
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
reader.push_byte(231).unwrap();
assert_eq!(reader.ue_received_bytes(), 0xb2_f7);
}
#[test]
fn mixed_push_byte() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0xf3).unwrap();
reader.push_byte(HEAD_UE_MESSAGE).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(3).unwrap();
reader.push_byte(b'Y').unwrap();
reader.push_byte(b'o').unwrap();
reader.push_byte(b'!').unwrap();
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
reader.push_byte(0xb2).unwrap();
reader.push_byte(0x04).unwrap();
assert_eq!(reader.ue_received_bytes(), 0xb2_f7); // 0xf7 = 0x04 + 0xf3
assert_eq!(reader.pop().unwrap(), "Yo!");
assert_eq!(reader.pop(), None);
}
#[test]
fn pushing_many_bytes_at_once() {
let mut reader = MessageReader::new();
reader
.push(&[
HEAD_UE_RECEIVED,
0,
0xf3,
HEAD_UE_MESSAGE,
0,
0,
0,
3,
b'Y',
b'o',
b'!',
HEAD_UE_RECEIVED,
0xb2,
0x04,
])
.unwrap();
assert_eq!(reader.ue_received_bytes(), 0xb2_f7);
assert_eq!(reader.pop().unwrap(), "Yo!");
assert_eq!(reader.pop(), None);
}
#[test]
fn generates_error_invalid_head() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0xf3).unwrap();
assert!(!reader.is_broken());
reader
.push_byte(25)
.expect_err("Testing failing on incorrect HEAD");
assert!(reader.is_broken());
}
#[test]
fn generates_error_message_too_long() {
let mut reader = MessageReader::new();
let huge_length = MAX_UE_MESSAGE_LENGTH + 1;
let bytes = (huge_length as u32).to_be_bytes();
reader.push_byte(HEAD_UE_MESSAGE).unwrap();
reader.push_byte(bytes[0]).unwrap();
reader.push_byte(bytes[1]).unwrap();
reader.push_byte(bytes[2]).unwrap();
assert!(!reader.is_broken());
reader
.push_byte(bytes[3])
.expect_err("Testing failing on exceeding allowed message length");
assert!(reader.is_broken());
}
#[test]
fn generates_error_invalid_unicode() {
let mut reader = MessageReader::new();
reader.push_byte(HEAD_UE_MESSAGE).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(2).unwrap();
reader.push_byte(0b11010011).unwrap(); // start of 2-byte sequence
assert!(!reader.is_broken());
// Bytes inside multi-byte code point have to have `1` for their high bit
reader
.push_byte(0b01010011)
.expect_err("Testing failing on incorrect unicode");
assert!(reader.is_broken());
}

169
src/link/writer.rs Normal file
View File

@ -0,0 +1,169 @@
//! Implements writer that sends data to UE2 game server.
use std::cmp::{max, min};
use std::collections::VecDeque;
use std::convert::TryFrom;
use std::iter::Extend;
// Maximum amount of bytes ue-server is able to receive at once
const UE_INPUT_BUFFER: usize = 4095;
/// For converting text messages into chunks of bytes that can be sent to the ue-server.
///
/// Every string message is converted into a length-prefixed array of utf8 bytes:
///
/// | Data | Length |
/// |---------|---------|
/// | Message `LENGTH` | 4 bytes: u32 BE |
/// | UTF8-encoded string | `LENGTH` bytes|
///
/// Resulting byte sequences from all messages are then concatenated (in the same order as they were
dkanus marked this conversation as resolved Outdated

Elaborate => "Conversion happens in two steps"?

Elaborate => "Conversion happens in two steps"?

Changed in upcoming patch.

Changed in upcoming patch.
/// "written") into a single data stream. Bytes from the data stream are returned in chunks of
dkanus marked this conversation as resolved Outdated

it's => its
pre-pended => prepended
"utf8 representation"?

Can be simplified => Every message is a length-prefixed array of utf8 bytes.

it's => its pre-pended => prepended "utf8 representation"? Can be simplified => Every message is a length-prefixed array of utf8 bytes.

Fixed in upcoming patch.

Fixed in upcoming patch.
/// size no more than `UE_INPUT_BUFFER`. New chunk is returned only when `MessageWriter` knows
/// that ue-server's buffer has enough space to accept it.
dkanus marked this conversation as resolved Outdated

Remove "in utf8 encoding", because it's wrong and misleading - length is sent as u32, not as string.

Remove "in utf8 encoding", because it's wrong and misleading - length is sent as u32, not as string.
///
/// Use `push()` to input string messages and `try_pop()` to retrieve next chunk.
dkanus marked this conversation as resolved Outdated

the messages => messages
then => is then
"in order" - in what order?

the messages => messages then => is then "in order" - in what order?

Changed in upcoming patch.

Changed in upcoming patch.
/// NOTE: `try_pop()` can return `None` even if not all message data has been returned,
/// in case `MessageWriter` thinks that ue-server's buffer does not have enough space.
///
dkanus marked this conversation as resolved Outdated

But Writer doesn't send anything?

But Writer doesn't send anything?

Fixed in upcoming patch.

Fixed in upcoming patch.
/// Call `update_ue_received_bytes()` to update `MessageWriter`'s information about
/// how many bytes ue-server has received so far. This can signal that its buffer has enough
/// free space. `MessageWriter` assumes that all data returned by its `try_pop()` method is sent
/// to ue-server.
///
/// Use `is_empty()` method to check for whether `MessageWriter` still has some data to return
/// (possibly after ``update_ue_received_bytes()`).
pub struct MessageWriter {
dkanus marked this conversation as resolved Outdated

"has depleted all it's data." => "has successfully sent all data."?

"has depleted all it's data." => "has successfully sent all data."?

Changed in upcoming patch.

Changed in upcoming patch.
sent_bytes: u64,
ue_received_bytes: u64,
pending_data: VecDeque<u8>,
}
impl MessageWriter {
pub fn new() -> MessageWriter {
MessageWriter {
sent_bytes: 0,
ue_received_bytes: 0,
// This value should be more than enough for typical use:
// it will take at least one second to send this much data to a 30 tick rate ue-server.
pending_data: VecDeque::with_capacity(30 * UE_INPUT_BUFFER),
}
}
pub fn push(&mut self, message: &str) {
let message_as_utf8 = message.as_bytes();
let message_as_utf8 = [
&(message_as_utf8.len() as u32).to_be_bytes(),
message_as_utf8,
]
.concat();
self.pending_data.extend(message_as_utf8.into_iter());
}
pub fn try_pop(&mut self) -> Option<Vec<u8>> {
Review

Confusing name.
Reader has pop(&mut self) -> Option<String>
Writer has try_pop(&mut self) -> Option<Vec<u8>>

Confusing name. Reader has `pop(&mut self) -> Option<String>` Writer has `try_pop(&mut self) -> Option<Vec<u8>>`
if self.is_empty() {
return None;
}
let chunk_size = min(self.available_ue_buffer_capacity(), self.pending_data.len());
if chunk_size == 0 {
return None;
}
let mut bytes_to_send = Vec::with_capacity(chunk_size);
for next_byte in self.pending_data.drain(..chunk_size) {
bytes_to_send.push(next_byte);
}
self.sent_bytes += bytes_to_send.len() as u64;
Some(bytes_to_send)
}
pub fn is_empty(&self) -> bool {
self.pending_data.is_empty()
}
/// Takes total amount of bytes received so far by the ue-server, not just bytes received after
/// the last `update_ue_received_bytes()` call.
pub fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) {
self.ue_received_bytes = max(ue_received_bytes, self.ue_received_bytes);
}

Missing +=?

Missing +=?

It passes to MessageWriter the whole amount of bytes passed so far, not just after the last update. This way it's harder to mess up.

It passes to `MessageWriter` the whole amount of bytes passed so far, not just after the last update. This way it's harder to mess up.
fn available_ue_buffer_capacity(&self) -> usize {
match usize::try_from(self.sent_bytes - self.ue_received_bytes).ok() {
Some(ue_buffered_bytes) => max(0, UE_INPUT_BUFFER - ue_buffered_bytes),
Review

Seems complicated - why?

Seems complicated - why?
Review

self.sent_bytes and self.ue_received_bytes are u64 and usize can be u32. In case we have fucked up - difference between them can get huge, this accounts for that.

I don't actually think this is something that can happen, but this seemed like a most straitforward way to do it. Whole function is, like, 3-4 loc, so I didn't saw it as comlplicated.

`self.sent_bytes` and `self.ue_received_bytes` are u64 and `usize` can be `u32`. In case we have fucked up - difference between them can get huge, this accounts for that. I don't actually think this is something that can happen, but this seemed like a most straitforward way to do it. Whole function is, like, 3-4 loc, so I didn't saw it as comlplicated.
_ => 0,
}
}
}
#[test]
fn writer_contents_after_creation() {
let writer = MessageWriter::new();
assert_eq!(writer.is_empty(), true);
assert_eq!(writer.available_ue_buffer_capacity(), UE_INPUT_BUFFER);
}
#[test]
fn writer_content_after_push() {
let mut writer = MessageWriter::new();
writer.push("Hello, world!");
assert_eq!(writer.is_empty(), false);
assert_eq!(writer.available_ue_buffer_capacity(), UE_INPUT_BUFFER);
}
#[test]
fn writing_single_short_message() {
let mut writer = MessageWriter::new();
writer.push("Hello, world!");
let resulting_bytes = writer.try_pop().unwrap();
let expected_bytes = [
0, 0, 0, 13, // Bytes in the message
b'H', b'e', b'l', b'l', b'o', b',', b' ', b'w', b'o', b'r', b'l', b'd', b'!',
];
assert_eq!(writer.is_empty(), true);
assert_eq!(
writer.available_ue_buffer_capacity(),
UE_INPUT_BUFFER - "Hello, world!".len() - 4
);
assert_eq!(resulting_bytes, expected_bytes);
assert_eq!(writer.sent_bytes, expected_bytes.len() as u64);
}
#[test]
fn writing_first_chunk_of_single_long_message() {
let mut writer = MessageWriter::new();
// Because we also have to pass message length, this will go over the sending limit
let long_message = "Q".repeat(UE_INPUT_BUFFER);
writer.push(&long_message);
let resulting_bytes = writer.try_pop().unwrap();
assert_eq!(writer.is_empty(), false);
assert_eq!(resulting_bytes.len(), UE_INPUT_BUFFER);
assert_eq!(writer.available_ue_buffer_capacity(), 0);
// Bytes in message = 4095 = 0x0fff
assert_eq!(resulting_bytes[0], 0);
assert_eq!(resulting_bytes[1], 0);
assert_eq!(resulting_bytes[2], 0x0f);
assert_eq!(resulting_bytes[3], 0xff);
for &byte in resulting_bytes[4..].iter() {
assert_eq!(byte, b'Q');
}
assert_eq!(writer.try_pop(), None);
assert_eq!(writer.is_empty(), false);
}
#[test]
fn writing_second_chunk_of_single_long_message() {
let mut writer = MessageWriter::new();
// Because we also have to pass lengths, this will go over the sending limit
let long_message = "Q".repeat(UE_INPUT_BUFFER);
writer.push(&long_message);
// This pops all but 4 bytes of `long_message`, that were required to encode message length
let first_bytes = writer.try_pop().unwrap();
writer.update_ue_received_bytes(first_bytes.len() as u64);
let resulting_bytes = writer.try_pop().unwrap();
assert_eq!(
writer.available_ue_buffer_capacity(),
UE_INPUT_BUFFER - resulting_bytes.len()
);
assert_eq!(writer.is_empty(), true);
// Bytes left for the next chunk = 4
assert_eq!(resulting_bytes, [b'Q', b'Q', b'Q', b'Q'])
}

View File

@ -1,13 +1,14 @@
use std::env;
use std::path::Path;
mod unreal_config;
// ! This utility provides a way to communicate with Unreal Engine 2 servers running Acedia mutator.
mod link;
use link::start_avarice;
fn main() {
let args: Vec<String> = env::args().collect();
let filename = &args[1];
let config = unreal_config::load_file(Path::new(filename));
match config {
Ok(config) => print!("{}", config),
_ => (),
let mut server = start_avarice(1234).unwrap();
while let Some((link, message)) = server.next() {
println!("{}: {}", link.socket_address(), message.to_string());
if message.message_type != "end" {
link.send(message);
}
}
println!("Avarice has shut down!");
}