From cfe680d7702c7fad3801726024d46128fa0175ba Mon Sep 17 00:00:00 2001 From: Anton Tarasenko Date: Wed, 4 Aug 2021 04:29:20 +0700 Subject: [PATCH] Refactor `link` module for use in one thread While `link` module is still spawning new threads to handle new connections and reading data from connected clients - it now collects all the received messages in one thread, simplifying their handling. --- Cargo.lock | 31 ++++++++++ Cargo.toml | 4 +- src/link/message.rs | 38 ++++++++++++ src/link/mod.rs | 139 ++++++++++++++++++++++++++++++-------------- src/link/network.rs | 106 +++++++++++++++++++++++++++++++++ src/main.rs | 15 +++-- 6 files changed, 282 insertions(+), 51 deletions(-) create mode 100644 src/link/message.rs create mode 100644 src/link/network.rs diff --git a/Cargo.lock b/Cargo.lock index 11949f5..2f72667 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5,6 +5,8 @@ name = "avarice" version = "0.1.0" dependencies = [ "custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -12,5 +14,34 @@ name = "custom_error" version = "1.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" +[[package]] +name = "itoa" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "ryu" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "serde" +version = "1.0.126" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "serde_json" +version = "1.0.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "itoa 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", + "ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)", +] + [metadata] "checksum custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4f8a51dd197fa6ba5b4dc98a990a43cc13693c23eb0089ebb0fcc1f04152bca6" +"checksum itoa 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" +"checksum ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" +"checksum serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)" = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03" +"checksum serde_json 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)" = "336b10da19a12ad094b59d870ebde26a45402e5b470add4b5fd03c5048a32127" diff --git a/Cargo.toml b/Cargo.toml index 58d91ba..af73fa0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,4 +7,6 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -custom_error="1.9.2" \ No newline at end of file +custom_error="1.*" +serde ="1.*" +serde_json="1.*" \ No newline at end of file diff --git a/src/link/message.rs b/src/link/message.rs new file mode 100644 index 0000000..03049ac --- /dev/null +++ b/src/link/message.rs @@ -0,0 +1,38 @@ +//! Implements Avarice message: target service, message type, json parameters +use serde_json; +use serde_json::json; +use std::fmt; + +pub struct AvariceMessage { + pub service: String, + pub message_type: String, + pub parameters: serde_json::Value, +} + +impl AvariceMessage { + /// Parses JSON form of a message into `AvariceMessage` struct + pub fn from(message_str: &str) -> Option { + let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap(); + let message_json = message_json.as_object_mut()?; + Some(AvariceMessage { + service: message_json.remove("s")?.as_str()?.to_owned(), + message_type: message_json.remove("t")?.as_str()?.to_owned(), + parameters: message_json.remove("p")?, + }) + } +} + +impl fmt::Display for AvariceMessage { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "{}", + json!({ + "s": self.service, + "t": self.message_type, + "p": self.parameters, + }) + .to_string() + ) + } +} diff --git a/src/link/mod.rs b/src/link/mod.rs index 6c4d781..72ba9b4 100644 --- a/src/link/mod.rs +++ b/src/link/mod.rs @@ -1,62 +1,111 @@ -//! Implements reader and writer to use when talking to UE2 game server. - +//! This module provides a simple interface to exchange messages (`message::AvariceMessage`) between +//! Avarice and ue-server. The model is simple - we create an mpsc-channel of `Sender` and +//! `Receiver`, then pass `Sender` to the `network` sub-module and wait for its messages about +//! ue-servers' connections using `Receiver`. +use std::collections::HashMap; use std::error::Error; -use std::io::{Read, Write}; -use std::net::{SocketAddr, TcpListener}; -use std::sync::Arc; -use std::{str, thread}; +use std::io::Write; +use std::net::{SocketAddr, TcpStream}; +use std::sync::mpsc::{channel, Receiver}; +pub use writer::MessageWriter; +mod message; +mod network; mod reader; mod writer; -pub use reader::MessageReader; -pub use writer::MessageWriter; +pub use message::AvariceMessage; +pub use network::{run_server, NetworkMessage}; + +/// For collecting messages from all connected ue-servers and providing a way to reply back. +pub struct AvariceServer { + connected_links: HashMap, + receiver: Receiver, +} +/// For representing a link to one of the connected ue-servers, can be used to send messages back. +/// To receive messages use `AvariceServer`'s `next()` method instead. pub struct Link { - reader: MessageReader, + ue_server_address: SocketAddr, writer: MessageWriter, + writing_stream: TcpStream, } -impl Link { - pub fn run(port: u16, handler: F) -> Result<(), Box> - where - F: Fn(&mut Link, &str) -> () + Send + Sync + 'static, - { - let address = SocketAddr::from(([0, 0, 0, 0], port)); - let listener = TcpListener::bind(address)?; - let handler = Arc::new(handler); +impl AvariceServer { + /// Blocks until a new message arrives from one of the ue-servers. Returns a pair of `Link`, + /// corresponding to the ue-server that sent next message and `AvariceMessage`, + /// representing that message. + pub fn next(&mut self) -> Option<(&mut Link, AvariceMessage)> { loop { - // Listen to new (multiple) connection - let mut reading_stream = listener.accept()?.0; - let mut writing_stream = reading_stream.try_clone()?; - let mut avarice_link = Link { - reader: MessageReader::new(), - writer: MessageWriter::new(), - }; - let handler_clone = handler.clone(); - // On connection - spawn a new thread - thread::spawn(move || loop { - let mut buffer = [0; 1024]; - // Reading cycle - match reading_stream.read(&mut buffer) { - Ok(n) => avarice_link.reader.push(&buffer[..n]).unwrap(), - _ => panic!("Connection issue!"), - }; - // Handling cycle - while let Some(message) = avarice_link.reader.pop() { - handler_clone(&mut avarice_link, &message); + match self.receiver.recv() { + Ok(NetworkMessage::ConnectionEstablished(ue_server_address, writing_stream)) => { + // If `ue_server_address` was already present in `self.connected_links` + // hash map, then it means we have failed to clean it up after it + // has disconnected. We can just throw away the old value here. + self.connected_links.insert( + ue_server_address, + Link { + ue_server_address, + writing_stream, + writer: MessageWriter::new(), + }, + ); + continue; + } + Ok(NetworkMessage::ConnectionLost(ue_server_address)) => { + self.connected_links.remove(&ue_server_address); + continue; + } + Ok(NetworkMessage::InvalidDataReceived(ue_server_address)) => { + self.connected_links.remove(&ue_server_address); + continue; } - // Writing - avarice_link - .writer - .update_ue_received_bytes(avarice_link.reader.ue_received_bytes()); - if let Some(bytes) = avarice_link.writer.try_pop() { - writing_stream.write_all(&bytes).unwrap(); + Ok(NetworkMessage::UEReceivedUpdate(ue_server_address, ue_received_bytes)) => { + if let Some(link) = self.connected_links.get_mut(&ue_server_address) { + link.update_ue_received_bytes(ue_received_bytes) + } + continue; } - }); + Ok(NetworkMessage::MessageReceived(ue_server_address, message)) => { + // Not having a link with key `ue_server_address` should be impossible here + return self + .connected_links + .get_mut(&ue_server_address) + .and_then(|x| Some((x, message))); + } + _ => return None, + } } } +} + +impl Link { + pub fn send(&mut self, message: AvariceMessage) { + self.writer.push(&message.to_string()); + self.flush(); + } + + pub fn socket_address(&self) -> SocketAddr { + self.ue_server_address + } - pub fn write(&mut self, message: &str) { - self.writer.push(message); + fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) { + self.writer.update_ue_received_bytes(ue_received_bytes); + self.flush(); } + + fn flush(&mut self) { + if let Some(bytes) = self.writer.try_pop() { + self.writing_stream.write_all(&bytes).unwrap(); + } + } +} + +/// Creates a new `AvariceServer` that will listen for ue-server connections on the specified port. +pub fn start_avarice(port: u16) -> Result> { + let (sender, receiver) = channel(); + run_server(port, sender)?; + Ok(AvariceServer { + connected_links: HashMap::new(), + receiver, + }) } diff --git a/src/link/network.rs b/src/link/network.rs new file mode 100644 index 0000000..9424240 --- /dev/null +++ b/src/link/network.rs @@ -0,0 +1,106 @@ +//! Implements a network model where messages from all the ue-servers are collected in a single +//! main thread. For that we spawn a new thread that listens for new connections, which in turn +//! spawns a new thread for every connected ue-server to handle reading data from it. +//! Since all reading is handled in ue-servers' own threads, to collect messages they have +//! received in the main thread we use `std::sync::mpsc::Sender`. Conversely, all the writing to +//! ue-server is handled in the main thread itself. Writing `TcpStream` is sent to the main thread +//! by the same `std::sync::mpsc::Sender` object. +use super::message::AvariceMessage; +pub use super::reader::MessageReader; +use std::error::Error; +use std::io::Read; +use std::net::{SocketAddr, TcpListener, TcpStream}; +use std::sync::mpsc::Sender; +use std::thread; + +pub struct UEConnection { + pub address: SocketAddr, + pub reader: MessageReader, + pub reading_stream: TcpStream, + pub message_sender: Sender, +} + +/// Possible messages to the main thread +pub enum NetworkMessage { + ConnectionEstablished(SocketAddr, TcpStream), + InvalidDataReceived(SocketAddr), + ConnectionLost(SocketAddr), + MessageReceived(SocketAddr, AvariceMessage), + UEReceivedUpdate(SocketAddr, u64), +} + +pub fn run_server(port: u16, message_sender: Sender) -> Result<(), Box> { + let address = SocketAddr::from(([0, 0, 0, 0], port)); + let listener = TcpListener::bind(address)?; + thread::spawn(move || loop { + // Listen to new (multiple) connection + let (reading_stream, address) = listener.accept().unwrap(); + let writing_stream = reading_stream.try_clone().unwrap(); + message_sender + .send(NetworkMessage::ConnectionEstablished( + address, + writing_stream, + )) + .unwrap(); + // On connection - spawn a new thread + let sender_clone = message_sender.clone(); + thread::spawn(move || { + manage_connection(UEConnection { + reader: MessageReader::new(), + message_sender: sender_clone, + reading_stream, + address, + }) + }); + }); + Ok(()) +} + +fn manage_connection(mut connection: UEConnection) { + let mut buffer = [0; 1024]; + loop { + // Reading cycle + match connection.reading_stream.read(&mut buffer) { + Ok(n) => connection.reader.push(&buffer[..n]).unwrap(), + _ => { + connection + .message_sender + .send(NetworkMessage::ConnectionLost(connection.address)) + .unwrap(); + return; + } + }; + if connection.reader.is_broken() { + connection + .message_sender + .send(NetworkMessage::InvalidDataReceived(connection.address)) + .unwrap(); + return; + } + // Decoding cycle + while let Some(text_message) = connection.reader.pop() { + if let Some(avarice_message) = AvariceMessage::from(&text_message) { + connection + .message_sender + .send(NetworkMessage::MessageReceived( + connection.address, + avarice_message, + )) + .unwrap(); + } else { + connection + .message_sender + .send(NetworkMessage::InvalidDataReceived(connection.address)) + .unwrap(); + return; + } + } + connection + .message_sender + .send(NetworkMessage::UEReceivedUpdate( + connection.address, + connection.reader.ue_received_bytes(), + )) + .unwrap(); + } +} diff --git a/src/main.rs b/src/main.rs index 9378c87..59c341b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,9 +1,14 @@ +// ! This utility provides a way to communicate with Unreal Engine 2 servers running Acedia mutator. mod link; -use link::Link; +use link::start_avarice; fn main() { - match Link::run(1234, |link, message| { link.write(message);}) { - Ok(_) => print!("Connect!"), - _ => print!("Connection error!"), - }; + let mut server = start_avarice(1234).unwrap(); + while let Some((link, message)) = server.next() { + println!("{}: {}", link.socket_address(), message.to_string()); + if message.message_type != "end" { + link.send(message); + } + } + println!("Avarice has shut down!"); }