Compare commits
No commits in common. 'feature_link' and 'master' have entirely different histories.
feature_li
...
master
8 changed files with 9 additions and 799 deletions
@ -1,38 +0,0 @@ |
|||||||
//! Implements Avarice message: target service, message type, json parameters
|
|
||||||
use serde_json; |
|
||||||
use serde_json::json; |
|
||||||
use std::fmt; |
|
||||||
|
|
||||||
pub struct AvariceMessage { |
|
||||||
pub service: String, |
|
||||||
pub message_type: String, |
|
||||||
pub parameters: serde_json::Value, |
|
||||||
} |
|
||||||
|
|
||||||
impl AvariceMessage { |
|
||||||
/// Parses JSON form of a message into `AvariceMessage` struct
|
|
||||||
pub fn from(message_str: &str) -> Option<AvariceMessage> { |
|
||||||
let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap(); |
|
||||||
let message_json = message_json.as_object_mut()?; |
|
||||||
Some(AvariceMessage { |
|
||||||
service: message_json.remove("s")?.as_str()?.to_owned(), |
|
||||||
message_type: message_json.remove("t")?.as_str()?.to_owned(), |
|
||||||
parameters: message_json.remove("p")?, |
|
||||||
}) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
impl fmt::Display for AvariceMessage { |
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
|
||||||
write!( |
|
||||||
f, |
|
||||||
"{}", |
|
||||||
json!({ |
|
||||||
"s": self.service, |
|
||||||
"t": self.message_type, |
|
||||||
"p": self.parameters, |
|
||||||
}) |
|
||||||
.to_string() |
|
||||||
) |
|
||||||
} |
|
||||||
} |
|
@ -1,111 +0,0 @@ |
|||||||
//! This module provides a simple interface to exchange messages (`message::AvariceMessage`) between
|
|
||||||
//! Avarice and ue-server. The model is simple - we create an mpsc-channel of `Sender` and
|
|
||||||
//! `Receiver`, then pass `Sender` to the `network` sub-module and wait for its messages about
|
|
||||||
//! ue-servers' connections using `Receiver`.
|
|
||||||
use std::collections::HashMap; |
|
||||||
use std::error::Error; |
|
||||||
use std::io::Write; |
|
||||||
use std::net::{SocketAddr, TcpStream}; |
|
||||||
use std::sync::mpsc::{channel, Receiver}; |
|
||||||
pub use writer::MessageWriter; |
|
||||||
|
|
||||||
mod message; |
|
||||||
mod network; |
|
||||||
mod reader; |
|
||||||
mod writer; |
|
||||||
pub use message::AvariceMessage; |
|
||||||
pub use network::{run_server, NetworkMessage}; |
|
||||||
|
|
||||||
/// For collecting messages from all connected ue-servers and providing a way to reply back.
|
|
||||||
pub struct AvariceServer { |
|
||||||
connected_links: HashMap<SocketAddr, Link>, |
|
||||||
receiver: Receiver<NetworkMessage>, |
|
||||||
} |
|
||||||
|
|
||||||
/// For representing a link to one of the connected ue-servers, can be used to send messages back.
|
|
||||||
/// To receive messages use `AvariceServer`'s `next()` method instead.
|
|
||||||
pub struct Link { |
|
||||||
ue_server_address: SocketAddr, |
|
||||||
writer: MessageWriter, |
|
||||||
writing_stream: TcpStream, |
|
||||||
} |
|
||||||
|
|
||||||
impl AvariceServer { |
|
||||||
/// Blocks until a new message arrives from one of the ue-servers. Returns a pair of `Link`,
|
|
||||||
/// corresponding to the ue-server that sent next message and `AvariceMessage`,
|
|
||||||
/// representing that message.
|
|
||||||
pub fn next(&mut self) -> Option<(&mut Link, AvariceMessage)> { |
|
||||||
loop { |
|
||||||
match self.receiver.recv() { |
|
||||||
Ok(NetworkMessage::ConnectionEstablished(ue_server_address, writing_stream)) => { |
|
||||||
// If `ue_server_address` was already present in `self.connected_links`
|
|
||||||
// hash map, then it means we have failed to clean it up after it
|
|
||||||
// has disconnected. We can just throw away the old value here.
|
|
||||||
self.connected_links.insert( |
|
||||||
ue_server_address, |
|
||||||
Link { |
|
||||||
ue_server_address, |
|
||||||
writing_stream, |
|
||||||
writer: MessageWriter::new(), |
|
||||||
}, |
|
||||||
); |
|
||||||
continue; |
|
||||||
} |
|
||||||
Ok(NetworkMessage::ConnectionLost(ue_server_address)) => { |
|
||||||
self.connected_links.remove(&ue_server_address); |
|
||||||
continue; |
|
||||||
} |
|
||||||
Ok(NetworkMessage::InvalidDataReceived(ue_server_address)) => { |
|
||||||
self.connected_links.remove(&ue_server_address); |
|
||||||
continue; |
|
||||||
} |
|
||||||
Ok(NetworkMessage::UEReceivedUpdate(ue_server_address, ue_received_bytes)) => { |
|
||||||
if let Some(link) = self.connected_links.get_mut(&ue_server_address) { |
|
||||||
link.update_ue_received_bytes(ue_received_bytes) |
|
||||||
} |
|
||||||
continue; |
|
||||||
} |
|
||||||
Ok(NetworkMessage::MessageReceived(ue_server_address, message)) => { |
|
||||||
// Not having a link with key `ue_server_address` should be impossible here
|
|
||||||
return self |
|
||||||
.connected_links |
|
||||||
.get_mut(&ue_server_address) |
|
||||||
.and_then(|x| Some((x, message))); |
|
||||||
} |
|
||||||
_ => return None, |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
impl Link { |
|
||||||
pub fn send(&mut self, message: AvariceMessage) { |
|
||||||
self.writer.push(&message.to_string()); |
|
||||||
self.flush(); |
|
||||||
} |
|
||||||
|
|
||||||
pub fn socket_address(&self) -> SocketAddr { |
|
||||||
self.ue_server_address |
|
||||||
} |
|
||||||
|
|
||||||
fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) { |
|
||||||
self.writer.update_ue_received_bytes(ue_received_bytes); |
|
||||||
self.flush(); |
|
||||||
} |
|
||||||
|
|
||||||
fn flush(&mut self) { |
|
||||||
if let Some(bytes) = self.writer.try_pop() { |
|
||||||
self.writing_stream.write_all(&bytes).unwrap(); |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
/// Creates a new `AvariceServer` that will listen for ue-server connections on the specified port.
|
|
||||||
pub fn start_avarice(port: u16) -> Result<AvariceServer, Box<dyn Error>> { |
|
||||||
let (sender, receiver) = channel(); |
|
||||||
run_server(port, sender)?; |
|
||||||
Ok(AvariceServer { |
|
||||||
connected_links: HashMap::new(), |
|
||||||
receiver, |
|
||||||
}) |
|
||||||
} |
|
@ -1,106 +0,0 @@ |
|||||||
//! Implements a network model where messages from all the ue-servers are collected in a single
|
|
||||||
//! main thread. For that we spawn a new thread that listens for new connections, which in turn
|
|
||||||
//! spawns a new thread for every connected ue-server to handle reading data from it.
|
|
||||||
//! Since all reading is handled in ue-servers' own threads, to collect messages they have
|
|
||||||
//! received in the main thread we use `std::sync::mpsc::Sender`. Conversely, all the writing to
|
|
||||||
//! ue-server is handled in the main thread itself. Writing `TcpStream` is sent to the main thread
|
|
||||||
//! by the same `std::sync::mpsc::Sender` object.
|
|
||||||
use super::message::AvariceMessage; |
|
||||||
pub use super::reader::MessageReader; |
|
||||||
use std::error::Error; |
|
||||||
use std::io::Read; |
|
||||||
use std::net::{SocketAddr, TcpListener, TcpStream}; |
|
||||||
use std::sync::mpsc::Sender; |
|
||||||
use std::thread; |
|
||||||
|
|
||||||
pub struct UEConnection { |
|
||||||
pub address: SocketAddr, |
|
||||||
pub reader: MessageReader, |
|
||||||
pub reading_stream: TcpStream, |
|
||||||
pub message_sender: Sender<NetworkMessage>, |
|
||||||
} |
|
||||||
|
|
||||||
/// Possible messages to the main thread
|
|
||||||
pub enum NetworkMessage { |
|
||||||
ConnectionEstablished(SocketAddr, TcpStream), |
|
||||||
InvalidDataReceived(SocketAddr), |
|
||||||
ConnectionLost(SocketAddr), |
|
||||||
MessageReceived(SocketAddr, AvariceMessage), |
|
||||||
UEReceivedUpdate(SocketAddr, u64), |
|
||||||
} |
|
||||||
|
|
||||||
pub fn run_server(port: u16, message_sender: Sender<NetworkMessage>) -> Result<(), Box<dyn Error>> { |
|
||||||
let address = SocketAddr::from(([0, 0, 0, 0], port)); |
|
||||||
let listener = TcpListener::bind(address)?; |
|
||||||
thread::spawn(move || loop { |
|
||||||
// Listen to new (multiple) connection
|
|
||||||
let (reading_stream, address) = listener.accept().unwrap(); |
|
||||||
let writing_stream = reading_stream.try_clone().unwrap(); |
|
||||||
message_sender |
|
||||||
.send(NetworkMessage::ConnectionEstablished( |
|
||||||
address, |
|
||||||
writing_stream, |
|
||||||
)) |
|
||||||
.unwrap(); |
|
||||||
// On connection - spawn a new thread
|
|
||||||
let sender_clone = message_sender.clone(); |
|
||||||
thread::spawn(move || { |
|
||||||
manage_connection(UEConnection { |
|
||||||
reader: MessageReader::new(), |
|
||||||
message_sender: sender_clone, |
|
||||||
reading_stream, |
|
||||||
address, |
|
||||||
}) |
|
||||||
}); |
|
||||||
}); |
|
||||||
Ok(()) |
|
||||||
} |
|
||||||
|
|
||||||
fn manage_connection(mut connection: UEConnection) { |
|
||||||
let mut buffer = [0; 1024]; |
|
||||||
loop { |
|
||||||
// Reading cycle
|
|
||||||
match connection.reading_stream.read(&mut buffer) { |
|
||||||
Ok(n) => connection.reader.push(&buffer[..n]).unwrap(), |
|
||||||
_ => { |
|
||||||
connection |
|
||||||
.message_sender |
|
||||||
.send(NetworkMessage::ConnectionLost(connection.address)) |
|
||||||
.unwrap(); |
|
||||||
return; |
|
||||||
} |
|
||||||
}; |
|
||||||
if connection.reader.is_broken() { |
|
||||||
connection |
|
||||||
.message_sender |
|
||||||
.send(NetworkMessage::InvalidDataReceived(connection.address)) |
|
||||||
.unwrap(); |
|
||||||
return; |
|
||||||
} |
|
||||||
// Decoding cycle
|
|
||||||
while let Some(text_message) = connection.reader.pop() { |
|
||||||
if let Some(avarice_message) = AvariceMessage::from(&text_message) { |
|
||||||
connection |
|
||||||
.message_sender |
|
||||||
.send(NetworkMessage::MessageReceived( |
|
||||||
connection.address, |
|
||||||
avarice_message, |
|
||||||
)) |
|
||||||
.unwrap(); |
|
||||||
} else { |
|
||||||
connection |
|
||||||
.message_sender |
|
||||||
.send(NetworkMessage::InvalidDataReceived(connection.address)) |
|
||||||
.unwrap(); |
|
||||||
return; |
|
||||||
} |
|
||||||
} |
|
||||||
connection |
|
||||||
.message_sender |
|
||||||
.send(NetworkMessage::UEReceivedUpdate( |
|
||||||
connection.address, |
|
||||||
connection.reader.ue_received_bytes(), |
|
||||||
)) |
|
||||||
.unwrap(); |
|
||||||
} |
|
||||||
} |
|
@ -1,321 +0,0 @@ |
|||||||
//! Implements reader that receives data from UE2 game server.
|
|
||||||
|
|
||||||
use std::collections::VecDeque; |
|
||||||
use std::str; |
|
||||||
|
|
||||||
extern crate custom_error; |
|
||||||
use custom_error::custom_error; |
|
||||||
|
|
||||||
// Defines how many bytes is used to encode "AMOUNT" field in the response from ue-server about
|
|
||||||
// amount of bytes it received since the last update
|
|
||||||
const UE_RECEIVED_FIELD_SIZE: usize = 2; |
|
||||||
// Defines how many bytes is used to encode "LENGTH" field, describing length of
|
|
||||||
// next JSON message from ue-server
|
|
||||||
const UE_LENGTH_FIELD_SIZE: usize = 4; |
|
||||||
// Arbitrary value indicating that next byte sequence from ue-server reports amount of bytes
|
|
||||||
// received by that server so far.
|
|
||||||
const HEAD_UE_RECEIVED: u8 = 85; |
|
||||||
// Arbitrary value indicating that next byte sequence from ue-server contains JSON message.
|
|
||||||
const HEAD_UE_MESSAGE: u8 = 42; |
|
||||||
// Maximum allowed size of JSON message sent from ue-server.
|
|
||||||
const MAX_UE_MESSAGE_LENGTH: usize = 25 * 1024 * 1024; |
|
||||||
|
|
||||||
custom_error! { pub ReadingStreamError |
|
||||||
InvalidHead{input: u8} = "Invalid byte used as a HEAD: {input}", |
|
||||||
MessageTooLong{length: usize} = "Message to receive is too long: {length}", |
|
||||||
InvalidUnicode = "Invalid utf-8 was received", |
|
||||||
BrokenStream = "Used stream is broken" |
|
||||||
} |
|
||||||
|
|
||||||
enum ReadingState { |
|
||||||
Head, |
|
||||||
ReceivedBytes, |
|
||||||
Length, |
|
||||||
Payload, |
|
||||||
} |
|
||||||
|
|
||||||
/// For converting byte stream that is expected from the ue-server into actual messages.
|
|
||||||
///
|
|
||||||
/// Expected format is a sequence of either:
|
|
||||||
///
|
|
||||||
/// | Data | Length |
|
|
||||||
/// |---------|---------|
|
|
||||||
/// | [`HEAD_UE_RECEIVED`] (marker byte) | 1 byte |
|
|
||||||
/// | Amount of bytes received by ue-server since last update | 2 bytes: u16 BE|
|
|
||||||
///
|
|
||||||
/// or
|
|
||||||
///
|
|
||||||
/// | Data | Length |
|
|
||||||
/// |---------|---------|
|
|
||||||
/// | [`HEAD_UE_MESSAGE`] (marker byte) | 1 byte|
|
|
||||||
/// | `LENGTH` of the JSON message in utf8 encoding | 4 bytes: u32 BE|
|
|
||||||
/// | UTF8-encoded string | `LENGTH` bytes|
|
|
||||||
///
|
|
||||||
/// On any invalid input enters into a failure state (can be checked by `is_broken()`) and never recovers from it.
|
|
||||||
/// Use either `push_byte()` or `push()` to input byte stream from ue-server and `pop()` to
|
|
||||||
/// retrieve resulting messages.
|
|
||||||
pub struct MessageReader { |
|
||||||
is_broken: bool, |
|
||||||
reading_state: ReadingState, |
|
||||||
read_bytes: usize, |
|
||||||
length_buffer: [u8; 4], |
|
||||||
current_message_length: usize, |
|
||||||
current_message: Vec<u8>, |
|
||||||
read_messages: VecDeque<String>, |
|
||||||
ue_received_bytes: u64, |
|
||||||
} |
|
||||||
|
|
||||||
impl MessageReader { |
|
||||||
pub fn new() -> MessageReader { |
|
||||||
MessageReader { |
|
||||||
is_broken: false, |
|
||||||
reading_state: ReadingState::Head, |
|
||||||
read_bytes: 0, |
|
||||||
length_buffer: [0; 4], |
|
||||||
current_message_length: 0, |
|
||||||
// Will be recreated with `with_capacity` in `push_byte()`
|
|
||||||
current_message: Vec::new(), |
|
||||||
// This value should be more than enough for typical use
|
|
||||||
read_messages: VecDeque::with_capacity(100), |
|
||||||
ue_received_bytes: 0, |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
pub fn push_byte(&mut self, input: u8) -> Result<(), ReadingStreamError> { |
|
||||||
if self.is_broken { |
|
||||||
return Err(ReadingStreamError::BrokenStream); |
|
||||||
} |
|
||||||
match &self.reading_state { |
|
||||||
ReadingState::Head => { |
|
||||||
if input == HEAD_UE_RECEIVED { |
|
||||||
self.change_state(ReadingState::ReceivedBytes); |
|
||||||
} else if input == HEAD_UE_MESSAGE { |
|
||||||
self.change_state(ReadingState::Length); |
|
||||||
} else { |
|
||||||
self.is_broken = true; |
|
||||||
return Err(ReadingStreamError::InvalidHead { input }); |
|
||||||
} |
|
||||||
} |
|
||||||
ReadingState::ReceivedBytes => { |
|
||||||
self.length_buffer[self.read_bytes] = input; |
|
||||||
self.read_bytes += 1; |
|
||||||
if self.read_bytes >= UE_RECEIVED_FIELD_SIZE { |
|
||||||
self.ue_received_bytes += u64::from(array_of_u8_to_u16(self.length_buffer)); |
|
||||||
self.change_state(ReadingState::Head); |
|
||||||
} |
|
||||||
} |
|
||||||
ReadingState::Length => { |
|
||||||
self.length_buffer[self.read_bytes] = input; |
|
||||||
self.read_bytes += 1; |
|
||||||
if self.read_bytes >= UE_LENGTH_FIELD_SIZE { |
|
||||||
self.current_message_length = array_of_u8_to_u32(self.length_buffer) as usize; |
|
||||||
if self.current_message_length > MAX_UE_MESSAGE_LENGTH { |
|
||||||
self.is_broken = true; |
|
||||||
return Err(ReadingStreamError::MessageTooLong { |
|
||||||
length: self.current_message_length, |
|
||||||
}); |
|
||||||
} |
|
||||||
self.current_message = Vec::with_capacity(self.current_message_length); |
|
||||||
self.change_state(ReadingState::Payload); |
|
||||||
} |
|
||||||
} |
|
||||||
ReadingState::Payload => { |
|
||||||
self.current_message.push(input); |
|
||||||
self.read_bytes += 1; |
|
||||||
if self.read_bytes >= self.current_message_length { |
|
||||||
match str::from_utf8(&self.current_message) { |
|
||||||
Ok(next_message) => self.read_messages.push_front(next_message.to_owned()), |
|
||||||
_ => { |
|
||||||
self.is_broken = true; |
|
||||||
return Err(ReadingStreamError::InvalidUnicode); |
|
||||||
} |
|
||||||
}; |
|
||||||
self.current_message.clear(); |
|
||||||
self.current_message_length = 0; |
|
||||||
self.change_state(ReadingState::Head); |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
Ok(()) |
|
||||||
} |
|
||||||
|
|
||||||
pub fn push(&mut self, input: &[u8]) -> Result<(), ReadingStreamError> { |
|
||||||
for &byte in input { |
|
||||||
self.push_byte(byte)?; |
|
||||||
} |
|
||||||
Ok(()) |
|
||||||
} |
|
||||||
|
|
||||||
pub fn pop(&mut self) -> Option<String> { |
|
||||||
self.read_messages.pop_back() |
|
||||||
} |
|
||||||
|
|
||||||
pub fn ue_received_bytes(&self) -> u64 { |
|
||||||
self.ue_received_bytes |
|
||||||
} |
|
||||||
|
|
||||||
pub fn is_broken(&self) -> bool { |
|
||||||
self.is_broken |
|
||||||
} |
|
||||||
|
|
||||||
fn change_state(&mut self, next_state: ReadingState) { |
|
||||||
self.read_bytes = 0; |
|
||||||
self.reading_state = next_state; |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
fn array_of_u8_to_u16(bytes: [u8; 4]) -> u16 { |
|
||||||
(u16::from(bytes[0]) << 8) + u16::from(bytes[1]) |
|
||||||
} |
|
||||||
|
|
||||||
fn array_of_u8_to_u32(bytes: [u8; 4]) -> u32 { |
|
||||||
(u32::from(bytes[0]) << 24) |
|
||||||
+ (u32::from(bytes[1]) << 16) |
|
||||||
+ (u32::from(bytes[2]) << 8) |
|
||||||
+ (u32::from(bytes[3])) |
|
||||||
} |
|
||||||
|
|
||||||
#[test] |
|
||||||
fn message_push_byte() { |
|
||||||
let mut reader = MessageReader::new(); |
|
||||||
reader.push_byte(HEAD_UE_MESSAGE).unwrap(); |
|
||||||
reader.push_byte(0).unwrap(); |
|
||||||
reader.push_byte(0).unwrap(); |
|
||||||
reader.push_byte(0).unwrap(); |
|
||||||
reader.push_byte(13).unwrap(); |
|
||||||
reader.push_byte(b'H').unwrap(); |
|
||||||
reader.push_byte(b'e').unwrap(); |
|
||||||
reader.push_byte(b'l').unwrap(); |
|
||||||
reader.push_byte(b'l').unwrap(); |
|
||||||
reader.push_byte(b'o').unwrap(); |
|
||||||
reader.push_byte(b',').unwrap(); |
|
||||||
reader.push_byte(b' ').unwrap(); |
|
||||||
reader.push_byte(b'w').unwrap(); |
|
||||||
reader.push_byte(b'o').unwrap(); |
|
||||||
reader.push_byte(b'r').unwrap(); |
|
||||||
reader.push_byte(b'l').unwrap(); |
|
||||||
reader.push_byte(b'd').unwrap(); |
|
||||||
reader.push_byte(b'!').unwrap(); |
|
||||||
reader.push_byte(HEAD_UE_MESSAGE).unwrap(); |
|
||||||
reader.push_byte(0).unwrap(); |
|
||||||
reader.push_byte(0).unwrap(); |
|
||||||
reader.push_byte(0).unwrap(); |
|
||||||
reader.push_byte(3).unwrap(); |
|
||||||
reader.push_byte(b'Y').unwrap(); |
|
||||||
reader.push_byte(b'o').unwrap(); |
|
||||||
reader.push_byte(b'!').unwrap(); |
|
||||||
assert_eq!(reader.pop().unwrap(), "Hello, world!"); |
|
||||||
assert_eq!(reader.pop().unwrap(), "Yo!"); |
|
||||||
assert_eq!(reader.pop(), None); |
|
||||||
} |
|
||||||
|
|
||||||
#[test] |
|
||||||
fn received_push_byte() { |
|
||||||
let mut reader = MessageReader::new(); |
|
||||||
reader.push_byte(HEAD_UE_RECEIVED).unwrap(); |
|
||||||
reader.push_byte(0).unwrap(); |
|
||||||
reader.push_byte(0xf3).unwrap(); |
|
||||||
assert_eq!(reader.ue_received_bytes(), 0xf3); |
|
||||||
reader.push_byte(HEAD_UE_RECEIVED).unwrap(); |
|
||||||
reader.push_byte(0xb2).unwrap(); |
|
||||||
reader.push_byte(0x04).unwrap(); |
|
||||||
assert_eq!(reader.ue_received_bytes(), 0xb2_f7); // 0xf7 = 0x04 + 0xf3
|
|
||||||
reader.push_byte(HEAD_UE_RECEIVED).unwrap(); |
|
||||||
reader.push_byte(231).unwrap(); |
|
||||||
assert_eq!(reader.ue_received_bytes(), 0xb2_f7); |
|
||||||
} |
|
||||||
|
|
||||||
#[test] |
|
||||||
fn mixed_push_byte() { |
|
||||||
let mut reader = MessageReader::new(); |
|
||||||
reader.push_byte(HEAD_UE_RECEIVED).unwrap(); |
|
||||||
reader.push_byte(0).unwrap(); |
|
||||||
reader.push_byte(0xf3).unwrap(); |
|
||||||
reader.push_byte(HEAD_UE_MESSAGE).unwrap(); |
|
||||||
reader.push_byte(0).unwrap(); |
|
||||||
reader.push_byte(0).unwrap(); |
|
||||||
reader.push_byte(0).unwrap(); |
|
||||||
reader.push_byte(3).unwrap(); |
|
||||||
reader.push_byte(b'Y').unwrap(); |
|
||||||
reader.push_byte(b'o').unwrap(); |
|
||||||
reader.push_byte(b'!').unwrap(); |
|
||||||
reader.push_byte(HEAD_UE_RECEIVED).unwrap(); |
|
||||||
reader.push_byte(0xb2).unwrap(); |
|
||||||
reader.push_byte(0x04).unwrap(); |
|
||||||
assert_eq!(reader.ue_received_bytes(), 0xb2_f7); // 0xf7 = 0x04 + 0xf3
|
|
||||||
assert_eq!(reader.pop().unwrap(), "Yo!"); |
|
||||||
assert_eq!(reader.pop(), None); |
|
||||||
} |
|
||||||
|
|
||||||
#[test] |
|
||||||
fn pushing_many_bytes_at_once() { |
|
||||||
let mut reader = MessageReader::new(); |
|
||||||
reader |
|
||||||
.push(&[ |
|
||||||
HEAD_UE_RECEIVED, |
|
||||||
0, |
|
||||||
0xf3, |
|
||||||
HEAD_UE_MESSAGE, |
|
||||||
0, |
|
||||||
0, |
|
||||||
0, |
|
||||||
3, |
|
||||||
b'Y', |
|
||||||
b'o', |
|
||||||
b'!', |
|
||||||
HEAD_UE_RECEIVED, |
|
||||||
0xb2, |
|
||||||
0x04, |
|
||||||
]) |
|
||||||
.unwrap(); |
|
||||||
assert_eq!(reader.ue_received_bytes(), 0xb2_f7); |
|
||||||
assert_eq!(reader.pop().unwrap(), "Yo!"); |
|
||||||
assert_eq!(reader.pop(), None); |
|
||||||
} |
|
||||||
|
|
||||||
#[test] |
|
||||||
fn generates_error_invalid_head() { |
|
||||||
let mut reader = MessageReader::new(); |
|
||||||
reader.push_byte(HEAD_UE_RECEIVED).unwrap(); |
|
||||||
reader.push_byte(0).unwrap(); |
|
||||||
reader.push_byte(0xf3).unwrap(); |
|
||||||
assert!(!reader.is_broken()); |
|
||||||
reader |
|
||||||
.push_byte(25) |
|
||||||
.expect_err("Testing failing on incorrect HEAD"); |
|
||||||
assert!(reader.is_broken()); |
|
||||||
} |
|
||||||
|
|
||||||
#[test] |
|
||||||
fn generates_error_message_too_long() { |
|
||||||
let mut reader = MessageReader::new(); |
|
||||||
let huge_length = MAX_UE_MESSAGE_LENGTH + 1; |
|
||||||
let bytes = (huge_length as u32).to_be_bytes(); |
|
||||||
|
|
||||||
reader.push_byte(HEAD_UE_MESSAGE).unwrap(); |
|
||||||
reader.push_byte(bytes[0]).unwrap(); |
|
||||||
reader.push_byte(bytes[1]).unwrap(); |
|
||||||
reader.push_byte(bytes[2]).unwrap(); |
|
||||||
assert!(!reader.is_broken()); |
|
||||||
reader |
|
||||||
.push_byte(bytes[3]) |
|
||||||
.expect_err("Testing failing on exceeding allowed message length"); |
|
||||||
assert!(reader.is_broken()); |
|
||||||
} |
|
||||||
|
|
||||||
#[test] |
|
||||||
fn generates_error_invalid_unicode() { |
|
||||||
let mut reader = MessageReader::new(); |
|
||||||
reader.push_byte(HEAD_UE_MESSAGE).unwrap(); |
|
||||||
reader.push_byte(0).unwrap(); |
|
||||||
reader.push_byte(0).unwrap(); |
|
||||||
reader.push_byte(0).unwrap(); |
|
||||||
reader.push_byte(2).unwrap(); |
|
||||||
reader.push_byte(0b11010011).unwrap(); // start of 2-byte sequence
|
|
||||||
assert!(!reader.is_broken()); |
|
||||||
// Bytes inside multi-byte code point have to have `1` for their high bit
|
|
||||||
reader |
|
||||||
.push_byte(0b01010011) |
|
||||||
.expect_err("Testing failing on incorrect unicode"); |
|
||||||
assert!(reader.is_broken()); |
|
||||||
} |
|
@ -1,169 +0,0 @@ |
|||||||
//! Implements writer that sends data to UE2 game server.
|
|
||||||
|
|
||||||
use std::cmp::{max, min}; |
|
||||||
use std::collections::VecDeque; |
|
||||||
use std::convert::TryFrom; |
|
||||||
use std::iter::Extend; |
|
||||||
|
|
||||||
// Maximum amount of bytes ue-server is able to receive at once
|
|
||||||
const UE_INPUT_BUFFER: usize = 4095; |
|
||||||
|
|
||||||
/// For converting text messages into chunks of bytes that can be sent to the ue-server.
|
|
||||||
///
|
|
||||||
/// Every string message is converted into a length-prefixed array of utf8 bytes:
|
|
||||||
///
|
|
||||||
/// | Data | Length |
|
|
||||||
/// |---------|---------|
|
|
||||||
/// | Message `LENGTH` | 4 bytes: u32 BE |
|
|
||||||
/// | UTF8-encoded string | `LENGTH` bytes|
|
|
||||||
///
|
|
||||||
/// Resulting byte sequences from all messages are then concatenated (in the same order as they were
|
|
||||||
/// "written") into a single data stream. Bytes from the data stream are returned in chunks of
|
|
||||||
/// size no more than `UE_INPUT_BUFFER`. New chunk is returned only when `MessageWriter` knows
|
|
||||||
/// that ue-server's buffer has enough space to accept it.
|
|
||||||
///
|
|
||||||
/// Use `push()` to input string messages and `try_pop()` to retrieve next chunk.
|
|
||||||
/// NOTE: `try_pop()` can return `None` even if not all message data has been returned,
|
|
||||||
/// in case `MessageWriter` thinks that ue-server's buffer does not have enough space.
|
|
||||||
///
|
|
||||||
/// Call `update_ue_received_bytes()` to update `MessageWriter`'s information about
|
|
||||||
/// how many bytes ue-server has received so far. This can signal that its buffer has enough
|
|
||||||
/// free space. `MessageWriter` assumes that all data returned by its `try_pop()` method is sent
|
|
||||||
/// to ue-server.
|
|
||||||
///
|
|
||||||
/// Use `is_empty()` method to check for whether `MessageWriter` still has some data to return
|
|
||||||
/// (possibly after ``update_ue_received_bytes()`).
|
|
||||||
pub struct MessageWriter { |
|
||||||
sent_bytes: u64, |
|
||||||
ue_received_bytes: u64, |
|
||||||
pending_data: VecDeque<u8>, |
|
||||||
} |
|
||||||
|
|
||||||
impl MessageWriter { |
|
||||||
pub fn new() -> MessageWriter { |
|
||||||
MessageWriter { |
|
||||||
sent_bytes: 0, |
|
||||||
ue_received_bytes: 0, |
|
||||||
// This value should be more than enough for typical use:
|
|
||||||
// it will take at least one second to send this much data to a 30 tick rate ue-server.
|
|
||||||
pending_data: VecDeque::with_capacity(30 * UE_INPUT_BUFFER), |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
pub fn push(&mut self, message: &str) { |
|
||||||
let message_as_utf8 = message.as_bytes(); |
|
||||||
let message_as_utf8 = [ |
|
||||||
&(message_as_utf8.len() as u32).to_be_bytes(), |
|
||||||
message_as_utf8, |
|
||||||
] |
|
||||||
.concat(); |
|
||||||
self.pending_data.extend(message_as_utf8.into_iter()); |
|
||||||
} |
|
||||||
|
|
||||||
pub fn try_pop(&mut self) -> Option<Vec<u8>> { |
|
||||||
if self.is_empty() { |
|
||||||
return None; |
|
||||||
} |
|
||||||
let chunk_size = min(self.available_ue_buffer_capacity(), self.pending_data.len()); |
|
||||||
if chunk_size == 0 { |
|
||||||
return None; |
|
||||||
} |
|
||||||
let mut bytes_to_send = Vec::with_capacity(chunk_size); |
|
||||||
for next_byte in self.pending_data.drain(..chunk_size) { |
|
||||||
bytes_to_send.push(next_byte); |
|
||||||
} |
|
||||||
self.sent_bytes += bytes_to_send.len() as u64; |
|
||||||
Some(bytes_to_send) |
|
||||||
} |
|
||||||
|
|
||||||
pub fn is_empty(&self) -> bool { |
|
||||||
self.pending_data.is_empty() |
|
||||||
} |
|
||||||
|
|
||||||
/// Takes total amount of bytes received so far by the ue-server, not just bytes received after
|
|
||||||
/// the last `update_ue_received_bytes()` call.
|
|
||||||
pub fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) { |
|
||||||
self.ue_received_bytes = max(ue_received_bytes, self.ue_received_bytes); |
|
||||||
} |
|
||||||
|
|
||||||
fn available_ue_buffer_capacity(&self) -> usize { |
|
||||||
match usize::try_from(self.sent_bytes - self.ue_received_bytes).ok() { |
|
||||||
Some(ue_buffered_bytes) => max(0, UE_INPUT_BUFFER - ue_buffered_bytes), |
|
||||||
_ => 0, |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
#[test] |
|
||||||
fn writer_contents_after_creation() { |
|
||||||
let writer = MessageWriter::new(); |
|
||||||
assert_eq!(writer.is_empty(), true); |
|
||||||
assert_eq!(writer.available_ue_buffer_capacity(), UE_INPUT_BUFFER); |
|
||||||
} |
|
||||||
|
|
||||||
#[test] |
|
||||||
fn writer_content_after_push() { |
|
||||||
let mut writer = MessageWriter::new(); |
|
||||||
writer.push("Hello, world!"); |
|
||||||
assert_eq!(writer.is_empty(), false); |
|
||||||
assert_eq!(writer.available_ue_buffer_capacity(), UE_INPUT_BUFFER); |
|
||||||
} |
|
||||||
|
|
||||||
#[test] |
|
||||||
fn writing_single_short_message() { |
|
||||||
let mut writer = MessageWriter::new(); |
|
||||||
writer.push("Hello, world!"); |
|
||||||
let resulting_bytes = writer.try_pop().unwrap(); |
|
||||||
let expected_bytes = [ |
|
||||||
0, 0, 0, 13, // Bytes in the message
|
|
||||||
b'H', b'e', b'l', b'l', b'o', b',', b' ', b'w', b'o', b'r', b'l', b'd', b'!', |
|
||||||
]; |
|
||||||
assert_eq!(writer.is_empty(), true); |
|
||||||
assert_eq!( |
|
||||||
writer.available_ue_buffer_capacity(), |
|
||||||
UE_INPUT_BUFFER - "Hello, world!".len() - 4 |
|
||||||
); |
|
||||||
assert_eq!(resulting_bytes, expected_bytes); |
|
||||||
assert_eq!(writer.sent_bytes, expected_bytes.len() as u64); |
|
||||||
} |
|
||||||
|
|
||||||
#[test] |
|
||||||
fn writing_first_chunk_of_single_long_message() { |
|
||||||
let mut writer = MessageWriter::new(); |
|
||||||
// Because we also have to pass message length, this will go over the sending limit
|
|
||||||
let long_message = "Q".repeat(UE_INPUT_BUFFER); |
|
||||||
writer.push(&long_message); |
|
||||||
let resulting_bytes = writer.try_pop().unwrap(); |
|
||||||
assert_eq!(writer.is_empty(), false); |
|
||||||
assert_eq!(resulting_bytes.len(), UE_INPUT_BUFFER); |
|
||||||
assert_eq!(writer.available_ue_buffer_capacity(), 0); |
|
||||||
// Bytes in message = 4095 = 0x0fff
|
|
||||||
assert_eq!(resulting_bytes[0], 0); |
|
||||||
assert_eq!(resulting_bytes[1], 0); |
|
||||||
assert_eq!(resulting_bytes[2], 0x0f); |
|
||||||
assert_eq!(resulting_bytes[3], 0xff); |
|
||||||
for &byte in resulting_bytes[4..].iter() { |
|
||||||
assert_eq!(byte, b'Q'); |
|
||||||
} |
|
||||||
assert_eq!(writer.try_pop(), None); |
|
||||||
assert_eq!(writer.is_empty(), false); |
|
||||||
} |
|
||||||
|
|
||||||
#[test] |
|
||||||
fn writing_second_chunk_of_single_long_message() { |
|
||||||
let mut writer = MessageWriter::new(); |
|
||||||
// Because we also have to pass lengths, this will go over the sending limit
|
|
||||||
let long_message = "Q".repeat(UE_INPUT_BUFFER); |
|
||||||
writer.push(&long_message); |
|
||||||
// This pops all but 4 bytes of `long_message`, that were required to encode message length
|
|
||||||
let first_bytes = writer.try_pop().unwrap(); |
|
||||||
writer.update_ue_received_bytes(first_bytes.len() as u64); |
|
||||||
let resulting_bytes = writer.try_pop().unwrap(); |
|
||||||
assert_eq!( |
|
||||||
writer.available_ue_buffer_capacity(), |
|
||||||
UE_INPUT_BUFFER - resulting_bytes.len() |
|
||||||
); |
|
||||||
assert_eq!(writer.is_empty(), true); |
|
||||||
// Bytes left for the next chunk = 4
|
|
||||||
assert_eq!(resulting_bytes, [b'Q', b'Q', b'Q', b'Q']) |
|
||||||
} |
|
@ -1,14 +1,13 @@ |
|||||||
// ! This utility provides a way to communicate with Unreal Engine 2 servers running Acedia mutator.
|
use std::env; |
||||||
mod link; |
use std::path::Path; |
||||||
use link::start_avarice; |
mod unreal_config; |
||||||
|
|
||||||
fn main() { |
fn main() { |
||||||
let mut server = start_avarice(1234).unwrap(); |
let args: Vec<String> = env::args().collect(); |
||||||
while let Some((link, message)) = server.next() { |
let filename = &args[1]; |
||||||
println!("{}: {}", link.socket_address(), message.to_string()); |
let config = unreal_config::load_file(Path::new(filename)); |
||||||
if message.message_type != "end" { |
match config { |
||||||
link.send(message); |
Ok(config) => print!("{}", config), |
||||||
|
_ => (), |
||||||
} |
} |
||||||
} |
|
||||||
println!("Avarice has shut down!"); |
|
||||||
} |
} |
||||||
|
Loading…
Reference in new issue