Compare commits
No commits in common. "cfe680d7702c7fad3801726024d46128fa0175ba" and "4f67a7b72ccfcb12668ca5d669f39d6b80557786" have entirely different histories.
cfe680d770
...
4f67a7b72c
31
Cargo.lock
generated
31
Cargo.lock
generated
@ -5,8 +5,6 @@ name = "avarice"
|
|||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
"custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
"serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)",
|
|
||||||
"serde_json 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@ -14,34 +12,5 @@ name = "custom_error"
|
|||||||
version = "1.9.2"
|
version = "1.9.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "itoa"
|
|
||||||
version = "0.4.7"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "ryu"
|
|
||||||
version = "1.0.5"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "serde"
|
|
||||||
version = "1.0.126"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "serde_json"
|
|
||||||
version = "1.0.66"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
dependencies = [
|
|
||||||
"itoa 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
|
||||||
"ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
|
||||||
"serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)",
|
|
||||||
]
|
|
||||||
|
|
||||||
[metadata]
|
[metadata]
|
||||||
"checksum custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4f8a51dd197fa6ba5b4dc98a990a43cc13693c23eb0089ebb0fcc1f04152bca6"
|
"checksum custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4f8a51dd197fa6ba5b4dc98a990a43cc13693c23eb0089ebb0fcc1f04152bca6"
|
||||||
"checksum itoa 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
|
|
||||||
"checksum ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
|
|
||||||
"checksum serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)" = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03"
|
|
||||||
"checksum serde_json 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)" = "336b10da19a12ad094b59d870ebde26a45402e5b470add4b5fd03c5048a32127"
|
|
||||||
|
@ -7,6 +7,4 @@ edition = "2018"
|
|||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
custom_error="1.*"
|
custom_error="1.9.2"
|
||||||
serde ="1.*"
|
|
||||||
serde_json="1.*"
|
|
@ -1,38 +0,0 @@
|
|||||||
//! Implements Avarice message: target service, message type, json parameters
|
|
||||||
use serde_json;
|
|
||||||
use serde_json::json;
|
|
||||||
use std::fmt;
|
|
||||||
|
|
||||||
pub struct AvariceMessage {
|
|
||||||
pub service: String,
|
|
||||||
pub message_type: String,
|
|
||||||
pub parameters: serde_json::Value,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AvariceMessage {
|
|
||||||
/// Parses JSON form of a message into `AvariceMessage` struct
|
|
||||||
pub fn from(message_str: &str) -> Option<AvariceMessage> {
|
|
||||||
let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap();
|
|
||||||
let message_json = message_json.as_object_mut()?;
|
|
||||||
Some(AvariceMessage {
|
|
||||||
service: message_json.remove("s")?.as_str()?.to_owned(),
|
|
||||||
message_type: message_json.remove("t")?.as_str()?.to_owned(),
|
|
||||||
parameters: message_json.remove("p")?,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Display for AvariceMessage {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
|
||||||
write!(
|
|
||||||
f,
|
|
||||||
"{}",
|
|
||||||
json!({
|
|
||||||
"s": self.service,
|
|
||||||
"t": self.message_type,
|
|
||||||
"p": self.parameters,
|
|
||||||
})
|
|
||||||
.to_string()
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
147
src/link/mod.rs
147
src/link/mod.rs
@ -1,111 +1,62 @@
|
|||||||
//! This module provides a simple interface to exchange messages (`message::AvariceMessage`) between
|
//! Implements reader and writer to use when talking to UE2 game server.
|
||||||
//! Avarice and ue-server. The model is simple - we create an mpsc-channel of `Sender` and
|
|
||||||
//! `Receiver`, then pass `Sender` to the `network` sub-module and wait for its messages about
|
use std::error::Error;
|
||||||
//! ue-servers' connections using `Receiver`.
|
use std::io::{Read, Write};
|
||||||
use std::collections::HashMap;
|
use std::net::{SocketAddr, TcpListener};
|
||||||
use std::error::Error;
|
use std::sync::Arc;
|
||||||
use std::io::Write;
|
use std::{str, thread};
|
||||||
use std::net::{SocketAddr, TcpStream};
|
|
||||||
use std::sync::mpsc::{channel, Receiver};
|
|
||||||
pub use writer::MessageWriter;
|
|
||||||
|
|
||||||
mod message;
|
|
||||||
mod network;
|
|
||||||
mod reader;
|
mod reader;
|
||||||
mod writer;
|
mod writer;
|
||||||
pub use message::AvariceMessage;
|
pub use reader::MessageReader;
|
||||||
pub use network::{run_server, NetworkMessage};
|
pub use writer::MessageWriter;
|
||||||
|
|
||||||
/// For collecting messages from all connected ue-servers and providing a way to reply back.
|
|
||||||
pub struct AvariceServer {
|
|
||||||
connected_links: HashMap<SocketAddr, Link>,
|
|
||||||
receiver: Receiver<NetworkMessage>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// For representing a link to one of the connected ue-servers, can be used to send messages back.
|
|
||||||
/// To receive messages use `AvariceServer`'s `next()` method instead.
|
|
||||||
pub struct Link {
|
pub struct Link {
|
||||||
ue_server_address: SocketAddr,
|
reader: MessageReader,
|
||||||
writer: MessageWriter,
|
writer: MessageWriter,
|
||||||
writing_stream: TcpStream,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AvariceServer {
|
|
||||||
/// Blocks until a new message arrives from one of the ue-servers. Returns a pair of `Link`,
|
|
||||||
/// corresponding to the ue-server that sent next message and `AvariceMessage`,
|
|
||||||
/// representing that message.
|
|
||||||
pub fn next(&mut self) -> Option<(&mut Link, AvariceMessage)> {
|
|
||||||
loop {
|
|
||||||
match self.receiver.recv() {
|
|
||||||
Ok(NetworkMessage::ConnectionEstablished(ue_server_address, writing_stream)) => {
|
|
||||||
// If `ue_server_address` was already present in `self.connected_links`
|
|
||||||
// hash map, then it means we have failed to clean it up after it
|
|
||||||
// has disconnected. We can just throw away the old value here.
|
|
||||||
self.connected_links.insert(
|
|
||||||
ue_server_address,
|
|
||||||
Link {
|
|
||||||
ue_server_address,
|
|
||||||
writing_stream,
|
|
||||||
writer: MessageWriter::new(),
|
|
||||||
},
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Ok(NetworkMessage::ConnectionLost(ue_server_address)) => {
|
|
||||||
self.connected_links.remove(&ue_server_address);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Ok(NetworkMessage::InvalidDataReceived(ue_server_address)) => {
|
|
||||||
self.connected_links.remove(&ue_server_address);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Ok(NetworkMessage::UEReceivedUpdate(ue_server_address, ue_received_bytes)) => {
|
|
||||||
if let Some(link) = self.connected_links.get_mut(&ue_server_address) {
|
|
||||||
link.update_ue_received_bytes(ue_received_bytes)
|
|
||||||
}
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Ok(NetworkMessage::MessageReceived(ue_server_address, message)) => {
|
|
||||||
// Not having a link with key `ue_server_address` should be impossible here
|
|
||||||
return self
|
|
||||||
.connected_links
|
|
||||||
.get_mut(&ue_server_address)
|
|
||||||
.and_then(|x| Some((x, message)));
|
|
||||||
}
|
|
||||||
_ => return None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Link {
|
impl Link {
|
||||||
pub fn send(&mut self, message: AvariceMessage) {
|
pub fn run<F>(port: u16, handler: F) -> Result<(), Box<dyn Error>>
|
||||||
self.writer.push(&message.to_string());
|
where
|
||||||
self.flush();
|
F: Fn(&mut Link, &str) -> () + Send + Sync + 'static,
|
||||||
}
|
{
|
||||||
|
let address = SocketAddr::from(([0, 0, 0, 0], port));
|
||||||
pub fn socket_address(&self) -> SocketAddr {
|
let listener = TcpListener::bind(address)?;
|
||||||
self.ue_server_address
|
let handler = Arc::new(handler);
|
||||||
}
|
loop {
|
||||||
|
// Listen to new (multiple) connection
|
||||||
fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) {
|
let mut reading_stream = listener.accept()?.0;
|
||||||
self.writer.update_ue_received_bytes(ue_received_bytes);
|
let mut writing_stream = reading_stream.try_clone()?;
|
||||||
self.flush();
|
let mut avarice_link = Link {
|
||||||
}
|
reader: MessageReader::new(),
|
||||||
|
writer: MessageWriter::new(),
|
||||||
fn flush(&mut self) {
|
};
|
||||||
if let Some(bytes) = self.writer.try_pop() {
|
let handler_clone = handler.clone();
|
||||||
self.writing_stream.write_all(&bytes).unwrap();
|
// On connection - spawn a new thread
|
||||||
|
thread::spawn(move || loop {
|
||||||
|
let mut buffer = [0; 1024];
|
||||||
|
// Reading cycle
|
||||||
|
match reading_stream.read(&mut buffer) {
|
||||||
|
Ok(n) => avarice_link.reader.push(&buffer[..n]).unwrap(),
|
||||||
|
_ => panic!("Connection issue!"),
|
||||||
|
};
|
||||||
|
// Handling cycle
|
||||||
|
while let Some(message) = avarice_link.reader.pop() {
|
||||||
|
handler_clone(&mut avarice_link, &message);
|
||||||
|
}
|
||||||
|
// Writing
|
||||||
|
avarice_link
|
||||||
|
.writer
|
||||||
|
.update_ue_received_bytes(avarice_link.reader.ue_received_bytes());
|
||||||
|
if let Some(bytes) = avarice_link.writer.try_pop() {
|
||||||
|
writing_stream.write_all(&bytes).unwrap();
|
||||||
|
}
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/// Creates a new `AvariceServer` that will listen for ue-server connections on the specified port.
|
pub fn write(&mut self, message: &str) {
|
||||||
pub fn start_avarice(port: u16) -> Result<AvariceServer, Box<dyn Error>> {
|
self.writer.push(message);
|
||||||
let (sender, receiver) = channel();
|
}
|
||||||
run_server(port, sender)?;
|
|
||||||
Ok(AvariceServer {
|
|
||||||
connected_links: HashMap::new(),
|
|
||||||
receiver,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
@ -1,106 +0,0 @@
|
|||||||
//! Implements a network model where messages from all the ue-servers are collected in a single
|
|
||||||
//! main thread. For that we spawn a new thread that listens for new connections, which in turn
|
|
||||||
//! spawns a new thread for every connected ue-server to handle reading data from it.
|
|
||||||
//! Since all reading is handled in ue-servers' own threads, to collect messages they have
|
|
||||||
//! received in the main thread we use `std::sync::mpsc::Sender`. Conversely, all the writing to
|
|
||||||
//! ue-server is handled in the main thread itself. Writing `TcpStream` is sent to the main thread
|
|
||||||
//! by the same `std::sync::mpsc::Sender` object.
|
|
||||||
use super::message::AvariceMessage;
|
|
||||||
pub use super::reader::MessageReader;
|
|
||||||
use std::error::Error;
|
|
||||||
use std::io::Read;
|
|
||||||
use std::net::{SocketAddr, TcpListener, TcpStream};
|
|
||||||
use std::sync::mpsc::Sender;
|
|
||||||
use std::thread;
|
|
||||||
|
|
||||||
pub struct UEConnection {
|
|
||||||
pub address: SocketAddr,
|
|
||||||
pub reader: MessageReader,
|
|
||||||
pub reading_stream: TcpStream,
|
|
||||||
pub message_sender: Sender<NetworkMessage>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Possible messages to the main thread
|
|
||||||
pub enum NetworkMessage {
|
|
||||||
ConnectionEstablished(SocketAddr, TcpStream),
|
|
||||||
InvalidDataReceived(SocketAddr),
|
|
||||||
ConnectionLost(SocketAddr),
|
|
||||||
MessageReceived(SocketAddr, AvariceMessage),
|
|
||||||
UEReceivedUpdate(SocketAddr, u64),
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn run_server(port: u16, message_sender: Sender<NetworkMessage>) -> Result<(), Box<dyn Error>> {
|
|
||||||
let address = SocketAddr::from(([0, 0, 0, 0], port));
|
|
||||||
let listener = TcpListener::bind(address)?;
|
|
||||||
thread::spawn(move || loop {
|
|
||||||
// Listen to new (multiple) connection
|
|
||||||
let (reading_stream, address) = listener.accept().unwrap();
|
|
||||||
let writing_stream = reading_stream.try_clone().unwrap();
|
|
||||||
message_sender
|
|
||||||
.send(NetworkMessage::ConnectionEstablished(
|
|
||||||
address,
|
|
||||||
writing_stream,
|
|
||||||
))
|
|
||||||
.unwrap();
|
|
||||||
// On connection - spawn a new thread
|
|
||||||
let sender_clone = message_sender.clone();
|
|
||||||
thread::spawn(move || {
|
|
||||||
manage_connection(UEConnection {
|
|
||||||
reader: MessageReader::new(),
|
|
||||||
message_sender: sender_clone,
|
|
||||||
reading_stream,
|
|
||||||
address,
|
|
||||||
})
|
|
||||||
});
|
|
||||||
});
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn manage_connection(mut connection: UEConnection) {
|
|
||||||
let mut buffer = [0; 1024];
|
|
||||||
loop {
|
|
||||||
// Reading cycle
|
|
||||||
match connection.reading_stream.read(&mut buffer) {
|
|
||||||
Ok(n) => connection.reader.push(&buffer[..n]).unwrap(),
|
|
||||||
_ => {
|
|
||||||
connection
|
|
||||||
.message_sender
|
|
||||||
.send(NetworkMessage::ConnectionLost(connection.address))
|
|
||||||
.unwrap();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
if connection.reader.is_broken() {
|
|
||||||
connection
|
|
||||||
.message_sender
|
|
||||||
.send(NetworkMessage::InvalidDataReceived(connection.address))
|
|
||||||
.unwrap();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// Decoding cycle
|
|
||||||
while let Some(text_message) = connection.reader.pop() {
|
|
||||||
if let Some(avarice_message) = AvariceMessage::from(&text_message) {
|
|
||||||
connection
|
|
||||||
.message_sender
|
|
||||||
.send(NetworkMessage::MessageReceived(
|
|
||||||
connection.address,
|
|
||||||
avarice_message,
|
|
||||||
))
|
|
||||||
.unwrap();
|
|
||||||
} else {
|
|
||||||
connection
|
|
||||||
.message_sender
|
|
||||||
.send(NetworkMessage::InvalidDataReceived(connection.address))
|
|
||||||
.unwrap();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
connection
|
|
||||||
.message_sender
|
|
||||||
.send(NetworkMessage::UEReceivedUpdate(
|
|
||||||
connection.address,
|
|
||||||
connection.reader.ue_received_bytes(),
|
|
||||||
))
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
}
|
|
@ -5,34 +5,43 @@ use std::collections::VecDeque;
|
|||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::iter::Extend;
|
use std::iter::Extend;
|
||||||
|
|
||||||
|
// Defines how many bytes is used to encode "LENGTH" field in the chunk sent to ue-server
|
||||||
|
const CHUNK_LENGTH_FIELD: usize = 2;
|
||||||
// Maximum amount of bytes ue-server is able to receive at once
|
// Maximum amount of bytes ue-server is able to receive at once
|
||||||
const UE_INPUT_BUFFER: usize = 4095;
|
const UE_INPUT_BUFFER: usize = 4095;
|
||||||
|
// Minimal payload size (in bytes) to send, unless there is not enough data left
|
||||||
|
const MIN_PAYLOAD_SIZE: usize = 50;
|
||||||
|
|
||||||
/// For converting text messages into chunks of bytes that can be sent to the ue-server.
|
/// For converting byte stream that is expected from the ue-server into actual messages.
|
||||||
///
|
/// Conversion process has two steps:
|
||||||
/// Every string message is converted into a length-prefixed array of utf8 bytes:
|
/// 1. Every string message is converted into it's utf8 representation and is pre-pended with
|
||||||
|
/// it's own length in format:
|
||||||
///
|
///
|
||||||
/// | Data | Length |
|
/// | Data | Length |
|
||||||
/// |---------|---------|
|
/// |---------|---------|
|
||||||
/// | Message `LENGTH` | 4 bytes: u32 BE |
|
/// | Message `LENGTH` | 4 bytes: u32 BE |
|
||||||
/// | UTF8-encoded string | `LENGTH` bytes|
|
/// | UTF8-encoded string | `LENGTH` bytes|
|
||||||
///
|
///
|
||||||
/// Resulting byte sequences from all messages are then concatenated (in the same order as they were
|
/// Resulting byte sequences from all the messages then concatenated, in order, into
|
||||||
/// "written") into a single data stream. Bytes from the data stream are returned in chunks of
|
/// a single data stream.
|
||||||
/// size no more than `UE_INPUT_BUFFER`. New chunk is returned only when `MessageWriter` knows
|
|
||||||
/// that ue-server's buffer has enough space to accept it.
|
|
||||||
///
|
///
|
||||||
/// Use `push()` to input string messages and `try_pop()` to retrieve next chunk.
|
/// 2. Resulting data stream is then separated into "chunks" that can be accepted by
|
||||||
/// NOTE: `try_pop()` can return `None` even if not all message data has been returned,
|
/// the ue-server (each no longer than `UE_INPUT_BUFFER` in total) and are sent in a format:
|
||||||
/// in case `MessageWriter` thinks that ue-server's buffer does not have enough space.
|
///
|
||||||
|
/// | Data | Length |
|
||||||
|
/// |---------|---------|
|
||||||
|
/// | Chunk `LENGTH` | 2 bytes: u16 BE |
|
||||||
|
/// | UTF8-encoded string | `LENGTH` bytes|
|
||||||
|
///
|
||||||
|
/// Use `push()` to input string messages and `try_pop()` to retrieve next chunk, if ue-server
|
||||||
|
/// can accept it.
|
||||||
|
/// NOTE: `try_pop()` can return `None` even if not all message data has been transferred,
|
||||||
|
/// in case ue-server's buffer does not have enough space.
|
||||||
///
|
///
|
||||||
/// Call `update_ue_received_bytes()` to update `MessageWriter`'s information about
|
/// Call `update_ue_received_bytes()` to update `MessageWriter`'s information about
|
||||||
/// how many bytes ue-server has received so far. This can signal that its buffer has enough
|
/// how many bytes ue-server has received so far.
|
||||||
/// free space. `MessageWriter` assumes that all data returned by its `try_pop()` method is sent
|
|
||||||
/// to ue-server.
|
|
||||||
///
|
///
|
||||||
/// Use `is_empty()` method to check for whether `MessageWriter` still has some data to return
|
/// Use `is_empty()` call to check for whether `MessageWriter` has depleted all it's data.
|
||||||
/// (possibly after ``update_ue_received_bytes()`).
|
|
||||||
pub struct MessageWriter {
|
pub struct MessageWriter {
|
||||||
sent_bytes: u64,
|
sent_bytes: u64,
|
||||||
ue_received_bytes: u64,
|
ue_received_bytes: u64,
|
||||||
@ -60,16 +69,24 @@ impl MessageWriter {
|
|||||||
self.pending_data.extend(message_as_utf8.into_iter());
|
self.pending_data.extend(message_as_utf8.into_iter());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// This method will always return chunk if all remaining data will fit inside it, otherwise it
|
||||||
|
/// will wait until ue-server's buffer has enough space for at least `MIN_PAYLOAD_SIZE` bytes.
|
||||||
pub fn try_pop(&mut self) -> Option<Vec<u8>> {
|
pub fn try_pop(&mut self) -> Option<Vec<u8>> {
|
||||||
if self.is_empty() {
|
if self.is_empty() {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
let chunk_size = min(self.available_ue_buffer_capacity(), self.pending_data.len());
|
let required_payload_size = min(self.pending_data.len(), MIN_PAYLOAD_SIZE);
|
||||||
if chunk_size == 0 {
|
let available_payload_space = self
|
||||||
|
.available_ue_buffer_capacity()
|
||||||
|
.checked_sub(CHUNK_LENGTH_FIELD)
|
||||||
|
.unwrap_or_default();
|
||||||
|
if required_payload_size > available_payload_space {
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
let mut bytes_to_send = Vec::with_capacity(chunk_size);
|
let payload_size = min(available_payload_space, self.pending_data.len());
|
||||||
for next_byte in self.pending_data.drain(..chunk_size) {
|
let mut bytes_to_send = Vec::with_capacity(CHUNK_LENGTH_FIELD + payload_size);
|
||||||
|
bytes_to_send.extend((payload_size as u16).to_be_bytes().iter());
|
||||||
|
for next_byte in self.pending_data.drain(..payload_size) {
|
||||||
bytes_to_send.push(next_byte);
|
bytes_to_send.push(next_byte);
|
||||||
}
|
}
|
||||||
self.sent_bytes += bytes_to_send.len() as u64;
|
self.sent_bytes += bytes_to_send.len() as u64;
|
||||||
@ -80,8 +97,6 @@ impl MessageWriter {
|
|||||||
self.pending_data.is_empty()
|
self.pending_data.is_empty()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Takes total amount of bytes received so far by the ue-server, not just bytes received after
|
|
||||||
/// the last `update_ue_received_bytes()` call.
|
|
||||||
pub fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) {
|
pub fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) {
|
||||||
self.ue_received_bytes = max(ue_received_bytes, self.ue_received_bytes);
|
self.ue_received_bytes = max(ue_received_bytes, self.ue_received_bytes);
|
||||||
}
|
}
|
||||||
@ -115,13 +130,14 @@ fn writing_single_short_message() {
|
|||||||
writer.push("Hello, world!");
|
writer.push("Hello, world!");
|
||||||
let resulting_bytes = writer.try_pop().unwrap();
|
let resulting_bytes = writer.try_pop().unwrap();
|
||||||
let expected_bytes = [
|
let expected_bytes = [
|
||||||
|
0, 17, // Bytes in the chunk = message length (4 bytes) + message (13 bytes)
|
||||||
0, 0, 0, 13, // Bytes in the message
|
0, 0, 0, 13, // Bytes in the message
|
||||||
b'H', b'e', b'l', b'l', b'o', b',', b' ', b'w', b'o', b'r', b'l', b'd', b'!',
|
b'H', b'e', b'l', b'l', b'o', b',', b' ', b'w', b'o', b'r', b'l', b'd', b'!',
|
||||||
];
|
];
|
||||||
assert_eq!(writer.is_empty(), true);
|
assert_eq!(writer.is_empty(), true);
|
||||||
assert_eq!(
|
assert_eq!(
|
||||||
writer.available_ue_buffer_capacity(),
|
writer.available_ue_buffer_capacity(),
|
||||||
UE_INPUT_BUFFER - "Hello, world!".len() - 4
|
UE_INPUT_BUFFER - "Hello, world!".len() - 2 - 4
|
||||||
);
|
);
|
||||||
assert_eq!(resulting_bytes, expected_bytes);
|
assert_eq!(resulting_bytes, expected_bytes);
|
||||||
assert_eq!(writer.sent_bytes, expected_bytes.len() as u64);
|
assert_eq!(writer.sent_bytes, expected_bytes.len() as u64);
|
||||||
@ -130,19 +146,22 @@ fn writing_single_short_message() {
|
|||||||
#[test]
|
#[test]
|
||||||
fn writing_first_chunk_of_single_long_message() {
|
fn writing_first_chunk_of_single_long_message() {
|
||||||
let mut writer = MessageWriter::new();
|
let mut writer = MessageWriter::new();
|
||||||
// Because we also have to pass message length, this will go over the sending limit
|
// Because we also have to pass lengths, this will go over the sending limit
|
||||||
let long_message = "Q".repeat(UE_INPUT_BUFFER);
|
let long_message = "Q".repeat(UE_INPUT_BUFFER);
|
||||||
writer.push(&long_message);
|
writer.push(&long_message);
|
||||||
let resulting_bytes = writer.try_pop().unwrap();
|
let resulting_bytes = writer.try_pop().unwrap();
|
||||||
assert_eq!(writer.is_empty(), false);
|
assert_eq!(writer.is_empty(), false);
|
||||||
assert_eq!(resulting_bytes.len(), UE_INPUT_BUFFER);
|
assert_eq!(resulting_bytes.len(), UE_INPUT_BUFFER);
|
||||||
assert_eq!(writer.available_ue_buffer_capacity(), 0);
|
assert_eq!(writer.available_ue_buffer_capacity(), 0);
|
||||||
|
// Bytes in the chunk = 4095 - 2 = 4093 = 0x0ffd
|
||||||
|
assert_eq!(resulting_bytes[0], 0x0f);
|
||||||
|
assert_eq!(resulting_bytes[1], 0xfd);
|
||||||
// Bytes in message = 4095 = 0x0fff
|
// Bytes in message = 4095 = 0x0fff
|
||||||
assert_eq!(resulting_bytes[0], 0);
|
assert_eq!(resulting_bytes[2], 0);
|
||||||
assert_eq!(resulting_bytes[1], 0);
|
assert_eq!(resulting_bytes[3], 0);
|
||||||
assert_eq!(resulting_bytes[2], 0x0f);
|
assert_eq!(resulting_bytes[4], 0x0f);
|
||||||
assert_eq!(resulting_bytes[3], 0xff);
|
assert_eq!(resulting_bytes[5], 0xff);
|
||||||
for &byte in resulting_bytes[4..].iter() {
|
for &byte in resulting_bytes[6..].iter() {
|
||||||
assert_eq!(byte, b'Q');
|
assert_eq!(byte, b'Q');
|
||||||
}
|
}
|
||||||
assert_eq!(writer.try_pop(), None);
|
assert_eq!(writer.try_pop(), None);
|
||||||
@ -155,7 +174,8 @@ fn writing_second_chunk_of_single_long_message() {
|
|||||||
// Because we also have to pass lengths, this will go over the sending limit
|
// Because we also have to pass lengths, this will go over the sending limit
|
||||||
let long_message = "Q".repeat(UE_INPUT_BUFFER);
|
let long_message = "Q".repeat(UE_INPUT_BUFFER);
|
||||||
writer.push(&long_message);
|
writer.push(&long_message);
|
||||||
// This pops all but 4 bytes of `long_message`, that were required to encode message length
|
// This pops all but 6 bytes of `long_message`, that were required to encode lengths of
|
||||||
|
// message and first chunk
|
||||||
let first_bytes = writer.try_pop().unwrap();
|
let first_bytes = writer.try_pop().unwrap();
|
||||||
writer.update_ue_received_bytes(first_bytes.len() as u64);
|
writer.update_ue_received_bytes(first_bytes.len() as u64);
|
||||||
let resulting_bytes = writer.try_pop().unwrap();
|
let resulting_bytes = writer.try_pop().unwrap();
|
||||||
@ -164,6 +184,46 @@ fn writing_second_chunk_of_single_long_message() {
|
|||||||
UE_INPUT_BUFFER - resulting_bytes.len()
|
UE_INPUT_BUFFER - resulting_bytes.len()
|
||||||
);
|
);
|
||||||
assert_eq!(writer.is_empty(), true);
|
assert_eq!(writer.is_empty(), true);
|
||||||
// Bytes left for the next chunk = 4
|
// Bytes in the chunk = 6
|
||||||
assert_eq!(resulting_bytes, [b'Q', b'Q', b'Q', b'Q'])
|
assert_eq!(resulting_bytes[0], 0);
|
||||||
|
assert_eq!(resulting_bytes[1], 6);
|
||||||
|
assert_eq!(resulting_bytes[2..], [b'Q', b'Q', b'Q', b'Q', b'Q', b'Q'])
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn will_write_small_chunks_if_no_more_data() {
|
||||||
|
let mut writer = MessageWriter::new();
|
||||||
|
// Because we also have to pass lengths (of chunk `CHUNK_LENGTH_FIELD` amd of message `4`),
|
||||||
|
// sending this will leave us with exactly 10 free bytes in the buffer
|
||||||
|
let long_message = "Q".repeat(UE_INPUT_BUFFER / 2);
|
||||||
|
writer.push(&long_message);
|
||||||
|
writer.try_pop();
|
||||||
|
let short_message = "Hello, world!";
|
||||||
|
writer.push(&short_message);
|
||||||
|
let expected_bytes = [
|
||||||
|
0, 17, // Bytes in the chunk = message length (4 bytes) + message (13 bytes)
|
||||||
|
0, 0, 0, 13, // Bytes in the message
|
||||||
|
b'H', b'e', b'l', b'l', b'o', b',', b' ', b'w', b'o', b'r', b'l', b'd', b'!',
|
||||||
|
];
|
||||||
|
// There should be enough space in the ue-server buffer to send `short_message`
|
||||||
|
let resulting_bytes = writer.try_pop().unwrap();
|
||||||
|
assert_eq!(resulting_bytes, expected_bytes);
|
||||||
|
assert_eq!(writer.try_pop(), None);
|
||||||
|
assert_eq!(writer.is_empty(), true);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn will_not_write_small_chunks_if_more_data_remains() {
|
||||||
|
let mut writer = MessageWriter::new();
|
||||||
|
// Because we also have to pass lengths (of chunk `CHUNK_LENGTH_FIELD` amd of message `4`),
|
||||||
|
// sending this will leave us with exactly 10 free bytes in the buffer
|
||||||
|
let long_message = "Q".repeat(UE_INPUT_BUFFER - CHUNK_LENGTH_FIELD - 4 - 10);
|
||||||
|
writer.push(&long_message);
|
||||||
|
writer.try_pop();
|
||||||
|
let short_message = "Hello, world!";
|
||||||
|
writer.push(&short_message);
|
||||||
|
// `MessageWriter` can neither send full message, nor a chunk of size 10
|
||||||
|
// (because it is too short)
|
||||||
|
assert_eq!(writer.try_pop(), None);
|
||||||
|
assert_eq!(writer.is_empty(), false);
|
||||||
}
|
}
|
||||||
|
15
src/main.rs
15
src/main.rs
@ -1,14 +1,9 @@
|
|||||||
// ! This utility provides a way to communicate with Unreal Engine 2 servers running Acedia mutator.
|
|
||||||
mod link;
|
mod link;
|
||||||
use link::start_avarice;
|
use link::Link;
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let mut server = start_avarice(1234).unwrap();
|
match Link::run(1234, |link, message| { link.write(message);}) {
|
||||||
while let Some((link, message)) = server.next() {
|
Ok(_) => print!("Connect!"),
|
||||||
println!("{}: {}", link.socket_address(), message.to_string());
|
_ => print!("Connection error!"),
|
||||||
if message.message_type != "end" {
|
};
|
||||||
link.send(message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
println!("Avarice has shut down!");
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user