diff --git a/Cargo.lock b/Cargo.lock index b0eb9bf..2f72667 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3,4 +3,45 @@ [[package]] name = "avarice" version = "0.1.0" +dependencies = [ + "custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)", +] +[[package]] +name = "custom_error" +version = "1.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "itoa" +version = "0.4.7" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "ryu" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "serde" +version = "1.0.126" +source = "registry+https://github.com/rust-lang/crates.io-index" + +[[package]] +name = "serde_json" +version = "1.0.66" +source = "registry+https://github.com/rust-lang/crates.io-index" +dependencies = [ + "itoa 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", + "ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)", + "serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[metadata] +"checksum custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4f8a51dd197fa6ba5b4dc98a990a43cc13693c23eb0089ebb0fcc1f04152bca6" +"checksum itoa 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" +"checksum ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e" +"checksum serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)" = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03" +"checksum serde_json 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)" = "336b10da19a12ad094b59d870ebde26a45402e5b470add4b5fd03c5048a32127" diff --git a/Cargo.toml b/Cargo.toml index e8facc0..af73fa0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,3 +7,6 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +custom_error="1.*" +serde ="1.*" +serde_json="1.*" \ No newline at end of file diff --git a/src/link/message.rs b/src/link/message.rs new file mode 100644 index 0000000..03049ac --- /dev/null +++ b/src/link/message.rs @@ -0,0 +1,38 @@ +//! Implements Avarice message: target service, message type, json parameters +use serde_json; +use serde_json::json; +use std::fmt; + +pub struct AvariceMessage { + pub service: String, + pub message_type: String, + pub parameters: serde_json::Value, +} + +impl AvariceMessage { + /// Parses JSON form of a message into `AvariceMessage` struct + pub fn from(message_str: &str) -> Option { + let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap(); + let message_json = message_json.as_object_mut()?; + Some(AvariceMessage { + service: message_json.remove("s")?.as_str()?.to_owned(), + message_type: message_json.remove("t")?.as_str()?.to_owned(), + parameters: message_json.remove("p")?, + }) + } +} + +impl fmt::Display for AvariceMessage { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "{}", + json!({ + "s": self.service, + "t": self.message_type, + "p": self.parameters, + }) + .to_string() + ) + } +} diff --git a/src/link/mod.rs b/src/link/mod.rs new file mode 100644 index 0000000..72ba9b4 --- /dev/null +++ b/src/link/mod.rs @@ -0,0 +1,111 @@ +//! This module provides a simple interface to exchange messages (`message::AvariceMessage`) between +//! Avarice and ue-server. The model is simple - we create an mpsc-channel of `Sender` and +//! `Receiver`, then pass `Sender` to the `network` sub-module and wait for its messages about +//! ue-servers' connections using `Receiver`. +use std::collections::HashMap; +use std::error::Error; +use std::io::Write; +use std::net::{SocketAddr, TcpStream}; +use std::sync::mpsc::{channel, Receiver}; +pub use writer::MessageWriter; + +mod message; +mod network; +mod reader; +mod writer; +pub use message::AvariceMessage; +pub use network::{run_server, NetworkMessage}; + +/// For collecting messages from all connected ue-servers and providing a way to reply back. +pub struct AvariceServer { + connected_links: HashMap, + receiver: Receiver, +} + +/// For representing a link to one of the connected ue-servers, can be used to send messages back. +/// To receive messages use `AvariceServer`'s `next()` method instead. +pub struct Link { + ue_server_address: SocketAddr, + writer: MessageWriter, + writing_stream: TcpStream, +} + +impl AvariceServer { + /// Blocks until a new message arrives from one of the ue-servers. Returns a pair of `Link`, + /// corresponding to the ue-server that sent next message and `AvariceMessage`, + /// representing that message. + pub fn next(&mut self) -> Option<(&mut Link, AvariceMessage)> { + loop { + match self.receiver.recv() { + Ok(NetworkMessage::ConnectionEstablished(ue_server_address, writing_stream)) => { + // If `ue_server_address` was already present in `self.connected_links` + // hash map, then it means we have failed to clean it up after it + // has disconnected. We can just throw away the old value here. + self.connected_links.insert( + ue_server_address, + Link { + ue_server_address, + writing_stream, + writer: MessageWriter::new(), + }, + ); + continue; + } + Ok(NetworkMessage::ConnectionLost(ue_server_address)) => { + self.connected_links.remove(&ue_server_address); + continue; + } + Ok(NetworkMessage::InvalidDataReceived(ue_server_address)) => { + self.connected_links.remove(&ue_server_address); + continue; + } + Ok(NetworkMessage::UEReceivedUpdate(ue_server_address, ue_received_bytes)) => { + if let Some(link) = self.connected_links.get_mut(&ue_server_address) { + link.update_ue_received_bytes(ue_received_bytes) + } + continue; + } + Ok(NetworkMessage::MessageReceived(ue_server_address, message)) => { + // Not having a link with key `ue_server_address` should be impossible here + return self + .connected_links + .get_mut(&ue_server_address) + .and_then(|x| Some((x, message))); + } + _ => return None, + } + } + } +} + +impl Link { + pub fn send(&mut self, message: AvariceMessage) { + self.writer.push(&message.to_string()); + self.flush(); + } + + pub fn socket_address(&self) -> SocketAddr { + self.ue_server_address + } + + fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) { + self.writer.update_ue_received_bytes(ue_received_bytes); + self.flush(); + } + + fn flush(&mut self) { + if let Some(bytes) = self.writer.try_pop() { + self.writing_stream.write_all(&bytes).unwrap(); + } + } +} + +/// Creates a new `AvariceServer` that will listen for ue-server connections on the specified port. +pub fn start_avarice(port: u16) -> Result> { + let (sender, receiver) = channel(); + run_server(port, sender)?; + Ok(AvariceServer { + connected_links: HashMap::new(), + receiver, + }) +} diff --git a/src/link/network.rs b/src/link/network.rs new file mode 100644 index 0000000..9424240 --- /dev/null +++ b/src/link/network.rs @@ -0,0 +1,106 @@ +//! Implements a network model where messages from all the ue-servers are collected in a single +//! main thread. For that we spawn a new thread that listens for new connections, which in turn +//! spawns a new thread for every connected ue-server to handle reading data from it. +//! Since all reading is handled in ue-servers' own threads, to collect messages they have +//! received in the main thread we use `std::sync::mpsc::Sender`. Conversely, all the writing to +//! ue-server is handled in the main thread itself. Writing `TcpStream` is sent to the main thread +//! by the same `std::sync::mpsc::Sender` object. +use super::message::AvariceMessage; +pub use super::reader::MessageReader; +use std::error::Error; +use std::io::Read; +use std::net::{SocketAddr, TcpListener, TcpStream}; +use std::sync::mpsc::Sender; +use std::thread; + +pub struct UEConnection { + pub address: SocketAddr, + pub reader: MessageReader, + pub reading_stream: TcpStream, + pub message_sender: Sender, +} + +/// Possible messages to the main thread +pub enum NetworkMessage { + ConnectionEstablished(SocketAddr, TcpStream), + InvalidDataReceived(SocketAddr), + ConnectionLost(SocketAddr), + MessageReceived(SocketAddr, AvariceMessage), + UEReceivedUpdate(SocketAddr, u64), +} + +pub fn run_server(port: u16, message_sender: Sender) -> Result<(), Box> { + let address = SocketAddr::from(([0, 0, 0, 0], port)); + let listener = TcpListener::bind(address)?; + thread::spawn(move || loop { + // Listen to new (multiple) connection + let (reading_stream, address) = listener.accept().unwrap(); + let writing_stream = reading_stream.try_clone().unwrap(); + message_sender + .send(NetworkMessage::ConnectionEstablished( + address, + writing_stream, + )) + .unwrap(); + // On connection - spawn a new thread + let sender_clone = message_sender.clone(); + thread::spawn(move || { + manage_connection(UEConnection { + reader: MessageReader::new(), + message_sender: sender_clone, + reading_stream, + address, + }) + }); + }); + Ok(()) +} + +fn manage_connection(mut connection: UEConnection) { + let mut buffer = [0; 1024]; + loop { + // Reading cycle + match connection.reading_stream.read(&mut buffer) { + Ok(n) => connection.reader.push(&buffer[..n]).unwrap(), + _ => { + connection + .message_sender + .send(NetworkMessage::ConnectionLost(connection.address)) + .unwrap(); + return; + } + }; + if connection.reader.is_broken() { + connection + .message_sender + .send(NetworkMessage::InvalidDataReceived(connection.address)) + .unwrap(); + return; + } + // Decoding cycle + while let Some(text_message) = connection.reader.pop() { + if let Some(avarice_message) = AvariceMessage::from(&text_message) { + connection + .message_sender + .send(NetworkMessage::MessageReceived( + connection.address, + avarice_message, + )) + .unwrap(); + } else { + connection + .message_sender + .send(NetworkMessage::InvalidDataReceived(connection.address)) + .unwrap(); + return; + } + } + connection + .message_sender + .send(NetworkMessage::UEReceivedUpdate( + connection.address, + connection.reader.ue_received_bytes(), + )) + .unwrap(); + } +} diff --git a/src/link/reader.rs b/src/link/reader.rs new file mode 100644 index 0000000..f2f49b5 --- /dev/null +++ b/src/link/reader.rs @@ -0,0 +1,321 @@ +//! Implements reader that receives data from UE2 game server. + +use std::collections::VecDeque; +use std::str; + +extern crate custom_error; +use custom_error::custom_error; + +// Defines how many bytes is used to encode "AMOUNT" field in the response from ue-server about +// amount of bytes it received since the last update +const UE_RECEIVED_FIELD_SIZE: usize = 2; +// Defines how many bytes is used to encode "LENGTH" field, describing length of +// next JSON message from ue-server +const UE_LENGTH_FIELD_SIZE: usize = 4; +// Arbitrary value indicating that next byte sequence from ue-server reports amount of bytes +// received by that server so far. +const HEAD_UE_RECEIVED: u8 = 85; +// Arbitrary value indicating that next byte sequence from ue-server contains JSON message. +const HEAD_UE_MESSAGE: u8 = 42; +// Maximum allowed size of JSON message sent from ue-server. +const MAX_UE_MESSAGE_LENGTH: usize = 25 * 1024 * 1024; + +custom_error! { pub ReadingStreamError + InvalidHead{input: u8} = "Invalid byte used as a HEAD: {input}", + MessageTooLong{length: usize} = "Message to receive is too long: {length}", + InvalidUnicode = "Invalid utf-8 was received", + BrokenStream = "Used stream is broken" +} + +enum ReadingState { + Head, + ReceivedBytes, + Length, + Payload, +} + +/// For converting byte stream that is expected from the ue-server into actual messages. +/// +/// Expected format is a sequence of either: +/// +/// | Data | Length | +/// |---------|---------| +/// | [`HEAD_UE_RECEIVED`] (marker byte) | 1 byte | +/// | Amount of bytes received by ue-server since last update | 2 bytes: u16 BE| +/// +/// or +/// +/// | Data | Length | +/// |---------|---------| +/// | [`HEAD_UE_MESSAGE`] (marker byte) | 1 byte| +/// | `LENGTH` of the JSON message in utf8 encoding | 4 bytes: u32 BE| +/// | UTF8-encoded string | `LENGTH` bytes| +/// +/// On any invalid input enters into a failure state (can be checked by `is_broken()`) and never recovers from it. +/// Use either `push_byte()` or `push()` to input byte stream from ue-server and `pop()` to +/// retrieve resulting messages. +pub struct MessageReader { + is_broken: bool, + reading_state: ReadingState, + read_bytes: usize, + length_buffer: [u8; 4], + current_message_length: usize, + current_message: Vec, + read_messages: VecDeque, + ue_received_bytes: u64, +} + +impl MessageReader { + pub fn new() -> MessageReader { + MessageReader { + is_broken: false, + reading_state: ReadingState::Head, + read_bytes: 0, + length_buffer: [0; 4], + current_message_length: 0, + // Will be recreated with `with_capacity` in `push_byte()` + current_message: Vec::new(), + // This value should be more than enough for typical use + read_messages: VecDeque::with_capacity(100), + ue_received_bytes: 0, + } + } + + pub fn push_byte(&mut self, input: u8) -> Result<(), ReadingStreamError> { + if self.is_broken { + return Err(ReadingStreamError::BrokenStream); + } + match &self.reading_state { + ReadingState::Head => { + if input == HEAD_UE_RECEIVED { + self.change_state(ReadingState::ReceivedBytes); + } else if input == HEAD_UE_MESSAGE { + self.change_state(ReadingState::Length); + } else { + self.is_broken = true; + return Err(ReadingStreamError::InvalidHead { input }); + } + } + ReadingState::ReceivedBytes => { + self.length_buffer[self.read_bytes] = input; + self.read_bytes += 1; + if self.read_bytes >= UE_RECEIVED_FIELD_SIZE { + self.ue_received_bytes += u64::from(array_of_u8_to_u16(self.length_buffer)); + self.change_state(ReadingState::Head); + } + } + ReadingState::Length => { + self.length_buffer[self.read_bytes] = input; + self.read_bytes += 1; + if self.read_bytes >= UE_LENGTH_FIELD_SIZE { + self.current_message_length = array_of_u8_to_u32(self.length_buffer) as usize; + if self.current_message_length > MAX_UE_MESSAGE_LENGTH { + self.is_broken = true; + return Err(ReadingStreamError::MessageTooLong { + length: self.current_message_length, + }); + } + self.current_message = Vec::with_capacity(self.current_message_length); + self.change_state(ReadingState::Payload); + } + } + ReadingState::Payload => { + self.current_message.push(input); + self.read_bytes += 1; + if self.read_bytes >= self.current_message_length { + match str::from_utf8(&self.current_message) { + Ok(next_message) => self.read_messages.push_front(next_message.to_owned()), + _ => { + self.is_broken = true; + return Err(ReadingStreamError::InvalidUnicode); + } + }; + self.current_message.clear(); + self.current_message_length = 0; + self.change_state(ReadingState::Head); + } + } + } + Ok(()) + } + + pub fn push(&mut self, input: &[u8]) -> Result<(), ReadingStreamError> { + for &byte in input { + self.push_byte(byte)?; + } + Ok(()) + } + + pub fn pop(&mut self) -> Option { + self.read_messages.pop_back() + } + + pub fn ue_received_bytes(&self) -> u64 { + self.ue_received_bytes + } + + pub fn is_broken(&self) -> bool { + self.is_broken + } + + fn change_state(&mut self, next_state: ReadingState) { + self.read_bytes = 0; + self.reading_state = next_state; + } +} + +fn array_of_u8_to_u16(bytes: [u8; 4]) -> u16 { + (u16::from(bytes[0]) << 8) + u16::from(bytes[1]) +} + +fn array_of_u8_to_u32(bytes: [u8; 4]) -> u32 { + (u32::from(bytes[0]) << 24) + + (u32::from(bytes[1]) << 16) + + (u32::from(bytes[2]) << 8) + + (u32::from(bytes[3])) +} + +#[test] +fn message_push_byte() { + let mut reader = MessageReader::new(); + reader.push_byte(HEAD_UE_MESSAGE).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(13).unwrap(); + reader.push_byte(b'H').unwrap(); + reader.push_byte(b'e').unwrap(); + reader.push_byte(b'l').unwrap(); + reader.push_byte(b'l').unwrap(); + reader.push_byte(b'o').unwrap(); + reader.push_byte(b',').unwrap(); + reader.push_byte(b' ').unwrap(); + reader.push_byte(b'w').unwrap(); + reader.push_byte(b'o').unwrap(); + reader.push_byte(b'r').unwrap(); + reader.push_byte(b'l').unwrap(); + reader.push_byte(b'd').unwrap(); + reader.push_byte(b'!').unwrap(); + reader.push_byte(HEAD_UE_MESSAGE).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(3).unwrap(); + reader.push_byte(b'Y').unwrap(); + reader.push_byte(b'o').unwrap(); + reader.push_byte(b'!').unwrap(); + assert_eq!(reader.pop().unwrap(), "Hello, world!"); + assert_eq!(reader.pop().unwrap(), "Yo!"); + assert_eq!(reader.pop(), None); +} + +#[test] +fn received_push_byte() { + let mut reader = MessageReader::new(); + reader.push_byte(HEAD_UE_RECEIVED).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0xf3).unwrap(); + assert_eq!(reader.ue_received_bytes(), 0xf3); + reader.push_byte(HEAD_UE_RECEIVED).unwrap(); + reader.push_byte(0xb2).unwrap(); + reader.push_byte(0x04).unwrap(); + assert_eq!(reader.ue_received_bytes(), 0xb2_f7); // 0xf7 = 0x04 + 0xf3 + reader.push_byte(HEAD_UE_RECEIVED).unwrap(); + reader.push_byte(231).unwrap(); + assert_eq!(reader.ue_received_bytes(), 0xb2_f7); +} + +#[test] +fn mixed_push_byte() { + let mut reader = MessageReader::new(); + reader.push_byte(HEAD_UE_RECEIVED).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0xf3).unwrap(); + reader.push_byte(HEAD_UE_MESSAGE).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(3).unwrap(); + reader.push_byte(b'Y').unwrap(); + reader.push_byte(b'o').unwrap(); + reader.push_byte(b'!').unwrap(); + reader.push_byte(HEAD_UE_RECEIVED).unwrap(); + reader.push_byte(0xb2).unwrap(); + reader.push_byte(0x04).unwrap(); + assert_eq!(reader.ue_received_bytes(), 0xb2_f7); // 0xf7 = 0x04 + 0xf3 + assert_eq!(reader.pop().unwrap(), "Yo!"); + assert_eq!(reader.pop(), None); +} + +#[test] +fn pushing_many_bytes_at_once() { + let mut reader = MessageReader::new(); + reader + .push(&[ + HEAD_UE_RECEIVED, + 0, + 0xf3, + HEAD_UE_MESSAGE, + 0, + 0, + 0, + 3, + b'Y', + b'o', + b'!', + HEAD_UE_RECEIVED, + 0xb2, + 0x04, + ]) + .unwrap(); + assert_eq!(reader.ue_received_bytes(), 0xb2_f7); + assert_eq!(reader.pop().unwrap(), "Yo!"); + assert_eq!(reader.pop(), None); +} + +#[test] +fn generates_error_invalid_head() { + let mut reader = MessageReader::new(); + reader.push_byte(HEAD_UE_RECEIVED).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0xf3).unwrap(); + assert!(!reader.is_broken()); + reader + .push_byte(25) + .expect_err("Testing failing on incorrect HEAD"); + assert!(reader.is_broken()); +} + +#[test] +fn generates_error_message_too_long() { + let mut reader = MessageReader::new(); + let huge_length = MAX_UE_MESSAGE_LENGTH + 1; + let bytes = (huge_length as u32).to_be_bytes(); + + reader.push_byte(HEAD_UE_MESSAGE).unwrap(); + reader.push_byte(bytes[0]).unwrap(); + reader.push_byte(bytes[1]).unwrap(); + reader.push_byte(bytes[2]).unwrap(); + assert!(!reader.is_broken()); + reader + .push_byte(bytes[3]) + .expect_err("Testing failing on exceeding allowed message length"); + assert!(reader.is_broken()); +} + +#[test] +fn generates_error_invalid_unicode() { + let mut reader = MessageReader::new(); + reader.push_byte(HEAD_UE_MESSAGE).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(0).unwrap(); + reader.push_byte(2).unwrap(); + reader.push_byte(0b11010011).unwrap(); // start of 2-byte sequence + assert!(!reader.is_broken()); + // Bytes inside multi-byte code point have to have `1` for their high bit + reader + .push_byte(0b01010011) + .expect_err("Testing failing on incorrect unicode"); + assert!(reader.is_broken()); +} diff --git a/src/link/writer.rs b/src/link/writer.rs new file mode 100644 index 0000000..e769cf1 --- /dev/null +++ b/src/link/writer.rs @@ -0,0 +1,169 @@ +//! Implements writer that sends data to UE2 game server. + +use std::cmp::{max, min}; +use std::collections::VecDeque; +use std::convert::TryFrom; +use std::iter::Extend; + +// Maximum amount of bytes ue-server is able to receive at once +const UE_INPUT_BUFFER: usize = 4095; + +/// For converting text messages into chunks of bytes that can be sent to the ue-server. +/// +/// Every string message is converted into a length-prefixed array of utf8 bytes: +/// +/// | Data | Length | +/// |---------|---------| +/// | Message `LENGTH` | 4 bytes: u32 BE | +/// | UTF8-encoded string | `LENGTH` bytes| +/// +/// Resulting byte sequences from all messages are then concatenated (in the same order as they were +/// "written") into a single data stream. Bytes from the data stream are returned in chunks of +/// size no more than `UE_INPUT_BUFFER`. New chunk is returned only when `MessageWriter` knows +/// that ue-server's buffer has enough space to accept it. +/// +/// Use `push()` to input string messages and `try_pop()` to retrieve next chunk. +/// NOTE: `try_pop()` can return `None` even if not all message data has been returned, +/// in case `MessageWriter` thinks that ue-server's buffer does not have enough space. +/// +/// Call `update_ue_received_bytes()` to update `MessageWriter`'s information about +/// how many bytes ue-server has received so far. This can signal that its buffer has enough +/// free space. `MessageWriter` assumes that all data returned by its `try_pop()` method is sent +/// to ue-server. +/// +/// Use `is_empty()` method to check for whether `MessageWriter` still has some data to return +/// (possibly after ``update_ue_received_bytes()`). +pub struct MessageWriter { + sent_bytes: u64, + ue_received_bytes: u64, + pending_data: VecDeque, +} + +impl MessageWriter { + pub fn new() -> MessageWriter { + MessageWriter { + sent_bytes: 0, + ue_received_bytes: 0, + // This value should be more than enough for typical use: + // it will take at least one second to send this much data to a 30 tick rate ue-server. + pending_data: VecDeque::with_capacity(30 * UE_INPUT_BUFFER), + } + } + + pub fn push(&mut self, message: &str) { + let message_as_utf8 = message.as_bytes(); + let message_as_utf8 = [ + &(message_as_utf8.len() as u32).to_be_bytes(), + message_as_utf8, + ] + .concat(); + self.pending_data.extend(message_as_utf8.into_iter()); + } + + pub fn try_pop(&mut self) -> Option> { + if self.is_empty() { + return None; + } + let chunk_size = min(self.available_ue_buffer_capacity(), self.pending_data.len()); + if chunk_size == 0 { + return None; + } + let mut bytes_to_send = Vec::with_capacity(chunk_size); + for next_byte in self.pending_data.drain(..chunk_size) { + bytes_to_send.push(next_byte); + } + self.sent_bytes += bytes_to_send.len() as u64; + Some(bytes_to_send) + } + + pub fn is_empty(&self) -> bool { + self.pending_data.is_empty() + } + + /// Takes total amount of bytes received so far by the ue-server, not just bytes received after + /// the last `update_ue_received_bytes()` call. + pub fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) { + self.ue_received_bytes = max(ue_received_bytes, self.ue_received_bytes); + } + + fn available_ue_buffer_capacity(&self) -> usize { + match usize::try_from(self.sent_bytes - self.ue_received_bytes).ok() { + Some(ue_buffered_bytes) => max(0, UE_INPUT_BUFFER - ue_buffered_bytes), + _ => 0, + } + } +} + +#[test] +fn writer_contents_after_creation() { + let writer = MessageWriter::new(); + assert_eq!(writer.is_empty(), true); + assert_eq!(writer.available_ue_buffer_capacity(), UE_INPUT_BUFFER); +} + +#[test] +fn writer_content_after_push() { + let mut writer = MessageWriter::new(); + writer.push("Hello, world!"); + assert_eq!(writer.is_empty(), false); + assert_eq!(writer.available_ue_buffer_capacity(), UE_INPUT_BUFFER); +} + +#[test] +fn writing_single_short_message() { + let mut writer = MessageWriter::new(); + writer.push("Hello, world!"); + let resulting_bytes = writer.try_pop().unwrap(); + let expected_bytes = [ + 0, 0, 0, 13, // Bytes in the message + b'H', b'e', b'l', b'l', b'o', b',', b' ', b'w', b'o', b'r', b'l', b'd', b'!', + ]; + assert_eq!(writer.is_empty(), true); + assert_eq!( + writer.available_ue_buffer_capacity(), + UE_INPUT_BUFFER - "Hello, world!".len() - 4 + ); + assert_eq!(resulting_bytes, expected_bytes); + assert_eq!(writer.sent_bytes, expected_bytes.len() as u64); +} + +#[test] +fn writing_first_chunk_of_single_long_message() { + let mut writer = MessageWriter::new(); + // Because we also have to pass message length, this will go over the sending limit + let long_message = "Q".repeat(UE_INPUT_BUFFER); + writer.push(&long_message); + let resulting_bytes = writer.try_pop().unwrap(); + assert_eq!(writer.is_empty(), false); + assert_eq!(resulting_bytes.len(), UE_INPUT_BUFFER); + assert_eq!(writer.available_ue_buffer_capacity(), 0); + // Bytes in message = 4095 = 0x0fff + assert_eq!(resulting_bytes[0], 0); + assert_eq!(resulting_bytes[1], 0); + assert_eq!(resulting_bytes[2], 0x0f); + assert_eq!(resulting_bytes[3], 0xff); + for &byte in resulting_bytes[4..].iter() { + assert_eq!(byte, b'Q'); + } + assert_eq!(writer.try_pop(), None); + assert_eq!(writer.is_empty(), false); +} + +#[test] +fn writing_second_chunk_of_single_long_message() { + let mut writer = MessageWriter::new(); + // Because we also have to pass lengths, this will go over the sending limit + let long_message = "Q".repeat(UE_INPUT_BUFFER); + writer.push(&long_message); + // This pops all but 4 bytes of `long_message`, that were required to encode message length + let first_bytes = writer.try_pop().unwrap(); + writer.update_ue_received_bytes(first_bytes.len() as u64); + let resulting_bytes = writer.try_pop().unwrap(); + assert_eq!( + writer.available_ue_buffer_capacity(), + UE_INPUT_BUFFER - resulting_bytes.len() + ); + assert_eq!(writer.is_empty(), true); + // Bytes left for the next chunk = 4 + assert_eq!(resulting_bytes, [b'Q', b'Q', b'Q', b'Q']) +} diff --git a/src/main.rs b/src/main.rs index 8f664c9..59c341b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,13 +1,14 @@ -use std::env; -use std::path::Path; -mod unreal_config; +// ! This utility provides a way to communicate with Unreal Engine 2 servers running Acedia mutator. +mod link; +use link::start_avarice; fn main() { - let args: Vec = env::args().collect(); - let filename = &args[1]; - let config = unreal_config::load_file(Path::new(filename)); - match config { - Ok(config) => print!("{}", config), - _ => (), + let mut server = start_avarice(1234).unwrap(); + while let Some((link, message)) = server.next() { + println!("{}: {}", link.socket_address(), message.to_string()); + if message.message_type != "end" { + link.send(message); + } } + println!("Avarice has shut down!"); }