Add network link to ue-server implementation #13

Open
dkanus wants to merge 23 commits from feature_link into master
6 changed files with 287 additions and 56 deletions
Showing only changes of commit cfe680d770 - Show all commits

31
Cargo.lock generated
View File

@ -5,6 +5,8 @@ name = "avarice"
version = "0.1.0"
dependencies = [
"custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -12,5 +14,34 @@ name = "custom_error"
version = "1.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "itoa"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "ryu"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "serde"
version = "1.0.126"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "serde_json"
version = "1.0.66"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"itoa 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
"ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)",
]
[metadata]
"checksum custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4f8a51dd197fa6ba5b4dc98a990a43cc13693c23eb0089ebb0fcc1f04152bca6"
"checksum itoa 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
"checksum ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
"checksum serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)" = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03"
"checksum serde_json 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)" = "336b10da19a12ad094b59d870ebde26a45402e5b470add4b5fd03c5048a32127"

View File

@ -7,4 +7,6 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
custom_error="1.9.2"
custom_error="1.*"
serde ="1.*"
serde_json="1.*"
Review

No newline at end of file

No newline at end of file

38
src/link/message.rs Normal file
View File

@ -0,0 +1,38 @@
//! Implements Avarice message: target service, message type, json parameters
use serde_json;
Review

warning: this import is redundant
--> src\link\message.rs:2:1
|
2 | use serde_json;
| ^^^^^^^^^^^^^^^ help: remove it entirely

warning: this import is redundant --> src\link\message.rs:2:1 | 2 | use serde_json; | ^^^^^^^^^^^^^^^ help: remove it entirely
use serde_json::json;
use std::fmt;
pub struct AvariceMessage {
Review

Any explanation/doc on wtf is service/message_type/parameters?

Any explanation/doc on wtf is service/message_type/parameters?
pub service: String,
pub message_type: String,
pub parameters: serde_json::Value,
}
impl AvariceMessage {
/// Parses JSON form of a message into `AvariceMessage` struct
pub fn from(message_str: &str) -> Option<AvariceMessage> {
Review

Confusing name, since it's not related to https://doc.rust-lang.org/std/convert/trait.From.html#tymethod.from, which is very common.

Confusing name, since it's not related to https://doc.rust-lang.org/std/convert/trait.From.html#tymethod.from, which is very common.
Review

Suggestion:

--- a/src/link/message.rs
+++ b/src/link/message.rs
@@ -9,19 +9,22 @@ pub struct AvariceMessage {
     pub parameters: serde_json::Value,
 }

-impl AvariceMessage {
-    /// Parses JSON form of a message into `AvariceMessage` struct
-    pub fn from(message_str: &str) -> Option<AvariceMessage> {
-        let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap();
-        let message_json = message_json.as_object_mut()?;
-        Some(AvariceMessage {
-            service: message_json.remove("s")?.as_str()?.to_owned(),
-            message_type: message_json.remove("t")?.as_str()?.to_owned(),
-            parameters: message_json.remove("p")?,
+use std::convert::TryFrom;
+use std::error::Error;
+impl TryFrom<&str> for AvariceMessage {
+    type Error = Box<dyn Error>;
+    fn try_from(value: &str) -> Result<Self, Self::Error> {
+        let mut message_json: serde_json::Value = serde_json::from_str(value)?;
+        let message_json = message_json.as_object_mut().ok_or("Value was not an Object.")?;
+        Ok(AvariceMessage {
+            service: message_json.remove("s").ok_or("Missing field 's'.")?.as_str().ok_or("s was not a String.")?.to_ow
ned(),
+            message_type: message_json.remove("t").ok_or("Missing field 't'.")?.as_str().ok_or("t was not a String.")?.
to_owned(),
+            parameters: message_json.remove("p").ok_or("Missing field 'p'.")?,
         })

--- a/src/link/network.rs
+++ b/src/link/network.rs
@@ -7,6 +7,7 @@
 //! by the same `std::sync::mpsc::Sender` object.
 use super::message::AvariceMessage;
 pub use super::reader::MessageReader;
+use std::convert::TryFrom;
 use std::error::Error;
 use std::io::Read;
 use std::net::{SocketAddr, TcpListener, TcpStream};
@@ -79,7 +80,7 @@ fn manage_connection(mut connection: UEConnection) {
         }
         //  Decoding cycle
         while let Some(text_message) = connection.reader.pop() {
-            if let Some(avarice_message) = AvariceMessage::from(&text_message) {
+            if let Ok(avarice_message) = AvariceMessage::try_from(text_message.as_ref()) {

Free bonus: more detailed errors.

Suggestion: ``` --- a/src/link/message.rs +++ b/src/link/message.rs @@ -9,19 +9,22 @@ pub struct AvariceMessage { pub parameters: serde_json::Value, } -impl AvariceMessage { - /// Parses JSON form of a message into `AvariceMessage` struct - pub fn from(message_str: &str) -> Option<AvariceMessage> { - let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap(); - let message_json = message_json.as_object_mut()?; - Some(AvariceMessage { - service: message_json.remove("s")?.as_str()?.to_owned(), - message_type: message_json.remove("t")?.as_str()?.to_owned(), - parameters: message_json.remove("p")?, +use std::convert::TryFrom; +use std::error::Error; +impl TryFrom<&str> for AvariceMessage { + type Error = Box<dyn Error>; + fn try_from(value: &str) -> Result<Self, Self::Error> { + let mut message_json: serde_json::Value = serde_json::from_str(value)?; + let message_json = message_json.as_object_mut().ok_or("Value was not an Object.")?; + Ok(AvariceMessage { + service: message_json.remove("s").ok_or("Missing field 's'.")?.as_str().ok_or("s was not a String.")?.to_ow ned(), + message_type: message_json.remove("t").ok_or("Missing field 't'.")?.as_str().ok_or("t was not a String.")?. to_owned(), + parameters: message_json.remove("p").ok_or("Missing field 'p'.")?, }) --- a/src/link/network.rs +++ b/src/link/network.rs @@ -7,6 +7,7 @@ //! by the same `std::sync::mpsc::Sender` object. use super::message::AvariceMessage; pub use super::reader::MessageReader; +use std::convert::TryFrom; use std::error::Error; use std::io::Read; use std::net::{SocketAddr, TcpListener, TcpStream}; @@ -79,7 +80,7 @@ fn manage_connection(mut connection: UEConnection) { } // Decoding cycle while let Some(text_message) = connection.reader.pop() { - if let Some(avarice_message) = AvariceMessage::from(&text_message) { + if let Ok(avarice_message) = AvariceMessage::try_from(text_message.as_ref()) { ``` Free bonus: more detailed errors.
let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap();
let message_json = message_json.as_object_mut()?;
Some(AvariceMessage {
service: message_json.remove("s")?.as_str()?.to_owned(),
Review

Why "remove"?

Why "remove"?
message_type: message_json.remove("t")?.as_str()?.to_owned(),
parameters: message_json.remove("p")?,
})
}
}
impl fmt::Display for AvariceMessage {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"{}",
json!({
"s": self.service,
Review

Maybe expand s/t/p into words?

Maybe expand s/t/p into words?
"t": self.message_type,
"p": self.parameters,
})
.to_string()
)
}
}

View File

@ -1,62 +1,111 @@
//! Implements reader and writer to use when talking to UE2 game server.
//! This module provides a simple interface to exchange messages (`message::AvariceMessage`) between
//! Avarice and ue-server. The model is simple - we create an mpsc-channel of `Sender` and
//! `Receiver`, then pass `Sender` to the `network` sub-module and wait for its messages about
//! ue-servers' connections using `Receiver`.
use std::collections::HashMap;
use std::error::Error;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener};
use std::sync::Arc;
use std::{str, thread};
mod reader;
mod writer;
pub use reader::MessageReader;
use std::io::Write;
use std::net::{SocketAddr, TcpStream};
dkanus marked this conversation as resolved Outdated

Wtf

Wtf

replace with array[u8;4] and
pub fn array_of_u8_to_u32(bytes: [u8; 4]) -> u32 {
(u32::from(bytes[0]) << 24)
+ (u32::from(bytes[1]) << 16)
+ (u32::from(bytes[2]) << 8)
+ (u32::from(bytes[3]))
}

replace with array[u8;4] and pub fn array_of_u8_to_u32(bytes: [u8; 4]) -> u32 { (u32::from(bytes[0]) << 24) + (u32::from(bytes[1]) << 16) + (u32::from(bytes[2]) << 8) + (u32::from(bytes[3])) }
use std::sync::mpsc::{channel, Receiver};
pub use writer::MessageWriter;
dkanus marked this conversation as resolved Outdated

Wtf

Wtf
dkanus marked this conversation as resolved Outdated

Wtf

Wtf
mod message;
mod network;
mod reader;
dkanus marked this conversation as resolved Outdated

Remove " Value itself is arbitrary", change to "Arbitrary value indicating that ..."?

Remove " Value itself is arbitrary", change to "Arbitrary value indicating that ..."?
mod writer;
dkanus marked this conversation as resolved Outdated

Weird empty line

Weird empty line
pub use message::AvariceMessage;
pub use network::{run_server, NetworkMessage};
/// For collecting messages from all connected ue-servers and providing a way to reply back.
pub struct AvariceServer {
connected_links: HashMap<SocketAddr, Link>,
receiver: Receiver<NetworkMessage>,
}
dkanus marked this conversation as resolved Outdated

with_capacity != limit.
Should it really be a constant?
I'd just inline it.

with_capacity != limit. Should it really be a constant? I'd just inline it.

It is expected limit that we use as a capacity.

It is expected limit that we use as a capacity.

But upon further consideration I agree that there is no sense in defining this as a constant.

But upon further consideration I agree that there is no sense in defining this as a constant.
/// For representing a link to one of the connected ue-servers, can be used to send messages back.
/// To receive messages use `AvariceServer`'s `next()` method instead.

Can be simplified => // Listen to new connections

Can be simplified => // Listen to new connections
pub struct Link {
Ggg_123 marked this conversation as resolved
Review

Probably needs a timeout, to avoid waiting forever:
stream
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();

Probably needs a timeout, to avoid waiting forever: stream .set_read_timeout(Some(Duration::from_secs(5))) .unwrap();
reader: MessageReader,
ue_server_address: SocketAddr,
writer: MessageWriter,
writing_stream: TcpStream,
}
impl AvariceServer {
/// Blocks until a new message arrives from one of the ue-servers. Returns a pair of `Link`,
dkanus marked this conversation as resolved Outdated

Unclear meaning.

Unclear meaning.

rename to ue_received_bytes

rename to ue_received_bytes
/// corresponding to the ue-server that sent next message and `AvariceMessage`,
Review

sent next message => sent this message?

sent next message => sent this message?
/// representing that message.

1024? Not 4096?

1024? Not 4096?

4096 is limitation on how much we can write, it's unrelated to reading and this is an arbitrary constant. Honestly I don't know what to use, but one option is to add BufReader and read byte-by-byte.

4096 is limitation on how much we can write, it's unrelated to reading and this is an arbitrary constant. Honestly I don't know what to use, but one option is to add `BufReader` and read byte-by-byte.

Did some tests with ue-server. As of now, buffer size does not make a difference in speed, since bottleneck is ue-server's side by far. And it's unlikely that situation will change even with multiple ue-servers connecting to Avarice.

Did some tests with ue-server. As of now, buffer size does not make a difference in speed, since bottleneck is ue-server's side by far. And it's unlikely that situation will change even with multiple ue-servers connecting to Avarice.
pub fn next(&mut self) -> Option<(&mut Link, AvariceMessage)> {
Review

Confusing name, since it's not related to https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next, which is very common.

Confusing name, since it's not related to https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next, which is very common.
loop {
dkanus marked this conversation as resolved Outdated

"For converting byte stream expected to be generated"?

"For converting byte stream expected to be generated"?
match self.receiver.recv() {
Ok(NetworkMessage::ConnectionEstablished(ue_server_address, writing_stream)) => {
dkanus marked this conversation as resolved Outdated

4 bytes
u32? i32? BEi32? LEi32?

> 4 bytes u32? i32? BEi32? LEi32?
// If `ue_server_address` was already present in `self.connected_links`
// hash map, then it means we have failed to clean it up after it
// has disconnected. We can just throw away the old value here.
self.connected_links.insert(
dkanus marked this conversation as resolved Outdated

buffer => length_buffer, since it's used only for storing length bytes

buffer => length_buffer, since it's used only for storing length bytes

Agreed

Agreed
ue_server_address,
Link {
ue_server_address,
writing_stream,
writer: MessageWriter::new(),
},
);
continue;
Review

Maybe remove continue?

Maybe remove continue?
}
dkanus marked this conversation as resolved Outdated

with_capactiy?

with_capactiy?
  • // will be recreated with with_capacity in push()
+ // will be recreated with with_capacity in push()
Ok(NetworkMessage::ConnectionLost(ue_server_address)) => {
dkanus marked this conversation as resolved Outdated

with_capactiy?

with_capactiy?
self.connected_links.remove(&ue_server_address);
continue;
}
Ok(NetworkMessage::InvalidDataReceived(ue_server_address)) => {
self.connected_links.remove(&ue_server_address);
continue;
}
Ok(NetworkMessage::UEReceivedUpdate(ue_server_address, ue_received_bytes)) => {
if let Some(link) = self.connected_links.get_mut(&ue_server_address) {
Review

Missing else? Condition seems important.

Missing else? Condition seems important.
link.update_ue_received_bytes(ue_received_bytes)
}
continue;
}
Ok(NetworkMessage::MessageReceived(ue_server_address, message)) => {
// Not having a link with key `ue_server_address` should be impossible here
Review

Why?

Why?
return self
.connected_links
.get_mut(&ue_server_address)
.and_then(|x| Some((x, message)));
}
_ => return None,
}
Ggg_123 marked this conversation as resolved Outdated

<< 8
wtf

> << 8 wtf
}
}
}
Ggg_123 marked this conversation as resolved Outdated

as u64;
wtf

> as u64; wtf
impl Link {
pub fn run<F>(port: u16, handler: F) -> Result<(), Box<dyn Error>>
where
F: Fn(&mut Link, &str) -> () + Send + Sync + 'static,
{
let address = SocketAddr::from(([0, 0, 0, 0], port));
let listener = TcpListener::bind(address)?;
let handler = Arc::new(handler);
loop {
// Listen to new (multiple) connection
let mut reading_stream = listener.accept()?.0;
let mut writing_stream = reading_stream.try_clone()?;
let mut avarice_link = Link {
reader: MessageReader::new(),
writer: MessageWriter::new(),
};
let handler_clone = handler.clone();
// On connection - spawn a new thread
thread::spawn(move || loop {
let mut buffer = [0; 1024];
// Reading cycle
match reading_stream.read(&mut buffer) {
Ok(n) => avarice_link.reader.push(&buffer[..n]).unwrap(),
_ => panic!("Connection issue!"),
};
// Handling cycle
while let Some(message) = avarice_link.reader.pop() {
handler_clone(&mut avarice_link, &message);
}
// Writing
avarice_link
.writer
.update_ue_received_bytes(avarice_link.reader.ue_received_bytes());
if let Some(bytes) = avarice_link.writer.try_pop() {
writing_stream.write_all(&bytes).unwrap();
}
});
}
pub fn send(&mut self, message: AvariceMessage) {
self.writer.push(&message.to_string());
self.flush();
}
pub fn write(&mut self, message: &str) {
self.writer.push(message);
pub fn socket_address(&self) -> SocketAddr {
self.ue_server_address
}
fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) {
Review

Why is it a separate function, and not inside send/flush?
When should I call it?

Why is it a separate function, and not inside send/flush? When should I call it?
self.writer.update_ue_received_bytes(ue_received_bytes);
self.flush();
}
fn flush(&mut self) {
if let Some(bytes) = self.writer.try_pop() {
self.writing_stream.write_all(&bytes).unwrap();
}
}
}
/// Creates a new `AvariceServer` that will listen for ue-server connections on the specified port.
Review

// todo - add IP support, since 0.0.0.0 is subjective (make it default, but changeable)

// todo - add IP support, since 0.0.0.0 is subjective (make it default, but changeable)
pub fn start_avarice(port: u16) -> Result<AvariceServer, Box<dyn Error>> {
dkanus marked this conversation as resolved Outdated

"1 as usize" => "1usize"
Or just remove.

"1 as usize" => "1usize" Or just remove.
let (sender, receiver) = channel();
dkanus marked this conversation as resolved Outdated

Move this line below error checking, since it contains early exit

Move this line below error checking, since it contains early exit

Agreed.

Agreed.
run_server(port, sender)?;
Ok(AvariceServer {
connected_links: HashMap::new(),
receiver,
})
}

106
src/link/network.rs Normal file
View File

@ -0,0 +1,106 @@
//! Implements a network model where messages from all the ue-servers are collected in a single
//! main thread. For that we spawn a new thread that listens for new connections, which in turn
//! spawns a new thread for every connected ue-server to handle reading data from it.
//! Since all reading is handled in ue-servers' own threads, to collect messages they have
//! received in the main thread we use `std::sync::mpsc::Sender`. Conversely, all the writing to
//! ue-server is handled in the main thread itself. Writing `TcpStream` is sent to the main thread
Review

Writing TcpStream is sent - rewrite? No idea what that means.

Writing `TcpStream` is sent - rewrite? No idea what that means.
//! by the same `std::sync::mpsc::Sender` object.
use super::message::AvariceMessage;
pub use super::reader::MessageReader;
use std::error::Error;
use std::io::Read;
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::mpsc::Sender;
use std::thread;
pub struct UEConnection {
pub address: SocketAddr,
pub reader: MessageReader,
pub reading_stream: TcpStream,
pub message_sender: Sender<NetworkMessage>,
}
/// Possible messages to the main thread
pub enum NetworkMessage {
ConnectionEstablished(SocketAddr, TcpStream),
InvalidDataReceived(SocketAddr),
ConnectionLost(SocketAddr),
MessageReceived(SocketAddr, AvariceMessage),
UEReceivedUpdate(SocketAddr, u64),
}
pub fn run_server(port: u16, message_sender: Sender<NetworkMessage>) -> Result<(), Box<dyn Error>> {
let address = SocketAddr::from(([0, 0, 0, 0], port));
let listener = TcpListener::bind(address)?;
thread::spawn(move || loop {
// Listen to new (multiple) connection
let (reading_stream, address) = listener.accept().unwrap();
let writing_stream = reading_stream.try_clone().unwrap();
message_sender
.send(NetworkMessage::ConnectionEstablished(
address,
writing_stream,
))
.unwrap();
// On connection - spawn a new thread
let sender_clone = message_sender.clone();
thread::spawn(move || {
manage_connection(UEConnection {
reader: MessageReader::new(),
message_sender: sender_clone,
reading_stream,
address,
})
});
});
Ok(())
}
fn manage_connection(mut connection: UEConnection) {
let mut buffer = [0; 1024];
loop {
// Reading cycle
match connection.reading_stream.read(&mut buffer) {
Ok(n) => connection.reader.push(&buffer[..n]).unwrap(),
_ => {
connection
.message_sender
.send(NetworkMessage::ConnectionLost(connection.address))
.unwrap();
return;
}
};
if connection.reader.is_broken() {
connection
.message_sender
.send(NetworkMessage::InvalidDataReceived(connection.address))
.unwrap();
return;
}
// Decoding cycle
while let Some(text_message) = connection.reader.pop() {
if let Some(avarice_message) = AvariceMessage::from(&text_message) {
connection
.message_sender
.send(NetworkMessage::MessageReceived(
connection.address,
avarice_message,
))
.unwrap();
} else {
connection
.message_sender
.send(NetworkMessage::InvalidDataReceived(connection.address))
.unwrap();
return;
}
}
connection
.message_sender
.send(NetworkMessage::UEReceivedUpdate(
connection.address,
connection.reader.ue_received_bytes(),
))
.unwrap();
}
}

View File

@ -1,9 +1,14 @@
// ! This utility provides a way to communicate with Unreal Engine 2 servers running Acedia mutator.
mod link;
use link::Link;
use link::start_avarice;
fn main() {
match Link::run(1234, |link, message| { link.write(message);}) {
Ok(_) => print!("Connect!"),
_ => print!("Connection error!"),
};
let mut server = start_avarice(1234).unwrap();
while let Some((link, message)) = server.next() {
println!("{}: {}", link.socket_address(), message.to_string());
if message.message_type != "end" {
link.send(message);
}
}
println!("Avarice has shut down!");
}