Add network link to ue-server implementation #13
41
Cargo.lock
generated
@ -3,4 +3,45 @@
|
|||||||
[[package]]
|
[[package]]
|
||||||
name = "avarice"
|
name = "avarice"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"serde_json 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "custom_error"
|
||||||
|
version = "1.9.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "itoa"
|
||||||
|
version = "0.4.7"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "ryu"
|
||||||
|
version = "1.0.5"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde"
|
||||||
|
version = "1.0.126"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "serde_json"
|
||||||
|
version = "1.0.66"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
dependencies = [
|
||||||
|
"itoa 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
"serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||||
|
]
|
||||||
|
|
||||||
|
[metadata]
|
||||||
|
"checksum custom_error 1.9.2 (registry+https://github.com/rust-lang/crates.io-index)" = "4f8a51dd197fa6ba5b4dc98a990a43cc13693c23eb0089ebb0fcc1f04152bca6"
|
||||||
|
"checksum itoa 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)" = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
|
||||||
|
"checksum ryu 1.0.5 (registry+https://github.com/rust-lang/crates.io-index)" = "71d301d4193d031abdd79ff7e3dd721168a9572ef3fe51a1517aba235bd8f86e"
|
||||||
|
"checksum serde 1.0.126 (registry+https://github.com/rust-lang/crates.io-index)" = "ec7505abeacaec74ae4778d9d9328fe5a5d04253220a85c4ee022239fc996d03"
|
||||||
|
"checksum serde_json 1.0.66 (registry+https://github.com/rust-lang/crates.io-index)" = "336b10da19a12ad094b59d870ebde26a45402e5b470add4b5fd03c5048a32127"
|
||||||
|
@ -7,3 +7,6 @@ edition = "2018"
|
|||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
custom_error="1.*"
|
||||||
|
serde ="1.*"
|
||||||
|
serde_json="1.*"
|
||||||
|
38
src/link/message.rs
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
//! Implements Avarice message: target service, message type, json parameters
|
||||||
|
use serde_json;
|
||||||
Ggg_123
commented
warning: this import is redundant warning: this import is redundant
--> src\link\message.rs:2:1
|
2 | use serde_json;
| ^^^^^^^^^^^^^^^ help: remove it entirely
|
|||||||
|
use serde_json::json;
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
|
pub struct AvariceMessage {
|
||||||
Ggg_123
commented
Any explanation/doc on wtf is service/message_type/parameters? Any explanation/doc on wtf is service/message_type/parameters?
|
|||||||
|
pub service: String,
|
||||||
|
pub message_type: String,
|
||||||
|
pub parameters: serde_json::Value,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AvariceMessage {
|
||||||
|
/// Parses JSON form of a message into `AvariceMessage` struct
|
||||||
|
pub fn from(message_str: &str) -> Option<AvariceMessage> {
|
||||||
Ggg_123
commented
Confusing name, since it's not related to https://doc.rust-lang.org/std/convert/trait.From.html#tymethod.from, which is very common. Confusing name, since it's not related to https://doc.rust-lang.org/std/convert/trait.From.html#tymethod.from, which is very common.
Ggg_123
commented
Suggestion:
Free bonus: more detailed errors. Suggestion:
```
--- a/src/link/message.rs
+++ b/src/link/message.rs
@@ -9,19 +9,22 @@ pub struct AvariceMessage {
pub parameters: serde_json::Value,
}
-impl AvariceMessage {
- /// Parses JSON form of a message into `AvariceMessage` struct
- pub fn from(message_str: &str) -> Option<AvariceMessage> {
- let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap();
- let message_json = message_json.as_object_mut()?;
- Some(AvariceMessage {
- service: message_json.remove("s")?.as_str()?.to_owned(),
- message_type: message_json.remove("t")?.as_str()?.to_owned(),
- parameters: message_json.remove("p")?,
+use std::convert::TryFrom;
+use std::error::Error;
+impl TryFrom<&str> for AvariceMessage {
+ type Error = Box<dyn Error>;
+ fn try_from(value: &str) -> Result<Self, Self::Error> {
+ let mut message_json: serde_json::Value = serde_json::from_str(value)?;
+ let message_json = message_json.as_object_mut().ok_or("Value was not an Object.")?;
+ Ok(AvariceMessage {
+ service: message_json.remove("s").ok_or("Missing field 's'.")?.as_str().ok_or("s was not a String.")?.to_ow
ned(),
+ message_type: message_json.remove("t").ok_or("Missing field 't'.")?.as_str().ok_or("t was not a String.")?.
to_owned(),
+ parameters: message_json.remove("p").ok_or("Missing field 'p'.")?,
})
--- a/src/link/network.rs
+++ b/src/link/network.rs
@@ -7,6 +7,7 @@
//! by the same `std::sync::mpsc::Sender` object.
use super::message::AvariceMessage;
pub use super::reader::MessageReader;
+use std::convert::TryFrom;
use std::error::Error;
use std::io::Read;
use std::net::{SocketAddr, TcpListener, TcpStream};
@@ -79,7 +80,7 @@ fn manage_connection(mut connection: UEConnection) {
}
// Decoding cycle
while let Some(text_message) = connection.reader.pop() {
- if let Some(avarice_message) = AvariceMessage::from(&text_message) {
+ if let Ok(avarice_message) = AvariceMessage::try_from(text_message.as_ref()) {
```
Free bonus: more detailed errors.
|
|||||||
|
let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap();
|
||||||
|
let message_json = message_json.as_object_mut()?;
|
||||||
|
Some(AvariceMessage {
|
||||||
|
service: message_json.remove("s")?.as_str()?.to_owned(),
|
||||||
Ggg_123
commented
Why "remove"? Why "remove"?
|
|||||||
|
message_type: message_json.remove("t")?.as_str()?.to_owned(),
|
||||||
|
parameters: message_json.remove("p")?,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for AvariceMessage {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(
|
||||||
|
f,
|
||||||
|
"{}",
|
||||||
|
json!({
|
||||||
|
"s": self.service,
|
||||||
Ggg_123
commented
Maybe expand s/t/p into words? Maybe expand s/t/p into words?
|
|||||||
|
"t": self.message_type,
|
||||||
|
"p": self.parameters,
|
||||||
|
})
|
||||||
|
.to_string()
|
||||||
|
)
|
||||||
|
}
|
||||||
|
}
|
111
src/link/mod.rs
Normal file
@ -0,0 +1,111 @@
|
|||||||
|
//! This module provides a simple interface to exchange messages (`message::AvariceMessage`) between
|
||||||
|
//! Avarice and ue-server. The model is simple - we create an mpsc-channel of `Sender` and
|
||||||
|
//! `Receiver`, then pass `Sender` to the `network` sub-module and wait for its messages about
|
||||||
|
//! ue-servers' connections using `Receiver`.
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::error::Error;
|
||||||
|
use std::io::Write;
|
||||||
|
use std::net::{SocketAddr, TcpStream};
|
||||||
dkanus marked this conversation as resolved
Outdated
Ggg_123
commented
Wtf Wtf
Ggg_123
commented
replace with array[u8;4] and replace with array[u8;4] and
pub fn array_of_u8_to_u32(bytes: [u8; 4]) -> u32 {
(u32::from(bytes[0]) << 24)
+ (u32::from(bytes[1]) << 16)
+ (u32::from(bytes[2]) << 8)
+ (u32::from(bytes[3]))
}
|
|||||||
|
use std::sync::mpsc::{channel, Receiver};
|
||||||
|
pub use writer::MessageWriter;
|
||||||
dkanus marked this conversation as resolved
Outdated
Ggg_123
commented
Wtf Wtf
|
|||||||
|
|
||||||
dkanus marked this conversation as resolved
Outdated
Ggg_123
commented
Wtf Wtf
|
|||||||
|
mod message;
|
||||||
|
mod network;
|
||||||
|
mod reader;
|
||||||
dkanus marked this conversation as resolved
Outdated
Ggg_123
commented
Remove " Value itself is arbitrary", change to "Arbitrary value indicating that ..."? Remove " Value itself is arbitrary", change to "Arbitrary value indicating that ..."?
|
|||||||
|
mod writer;
|
||||||
dkanus marked this conversation as resolved
Outdated
Ggg_123
commented
Weird empty line Weird empty line
|
|||||||
|
pub use message::AvariceMessage;
|
||||||
|
pub use network::{run_server, NetworkMessage};
|
||||||
|
|
||||||
|
/// For collecting messages from all connected ue-servers and providing a way to reply back.
|
||||||
|
pub struct AvariceServer {
|
||||||
|
connected_links: HashMap<SocketAddr, Link>,
|
||||||
|
receiver: Receiver<NetworkMessage>,
|
||||||
|
}
|
||||||
|
|
||||||
dkanus marked this conversation as resolved
Outdated
Ggg_123
commented
with_capacity != limit. with_capacity != limit.
Should it really be a constant?
I'd just inline it.
dkanus
commented
It is expected limit that we use as a capacity. It is expected limit that we use as a capacity.
dkanus
commented
But upon further consideration I agree that there is no sense in defining this as a constant. But upon further consideration I agree that there is no sense in defining this as a constant.
|
|||||||
|
/// For representing a link to one of the connected ue-servers, can be used to send messages back.
|
||||||
|
/// To receive messages use `AvariceServer`'s `next()` method instead.
|
||||||
Ggg_123
commented
Can be simplified => // Listen to new connections Can be simplified => // Listen to new connections
|
|||||||
|
pub struct Link {
|
||||||
Ggg_123 marked this conversation as resolved
Ggg_123
commented
Probably needs a timeout, to avoid waiting forever: Probably needs a timeout, to avoid waiting forever:
stream
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();
|
|||||||
|
ue_server_address: SocketAddr,
|
||||||
|
writer: MessageWriter,
|
||||||
|
writing_stream: TcpStream,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AvariceServer {
|
||||||
|
/// Blocks until a new message arrives from one of the ue-servers. Returns a pair of `Link`,
|
||||||
dkanus marked this conversation as resolved
Outdated
Ggg_123
commented
Unclear meaning. Unclear meaning.
Ggg_123
commented
rename to ue_received_bytes rename to ue_received_bytes
|
|||||||
|
/// corresponding to the ue-server that sent next message and `AvariceMessage`,
|
||||||
Ggg_123
commented
sent next message => sent this message? sent next message => sent this message?
|
|||||||
|
/// representing that message.
|
||||||
Ggg_123
commented
1024? Not 4096? 1024? Not 4096?
dkanus
commented
4096 is limitation on how much we can write, it's unrelated to reading and this is an arbitrary constant. Honestly I don't know what to use, but one option is to add 4096 is limitation on how much we can write, it's unrelated to reading and this is an arbitrary constant. Honestly I don't know what to use, but one option is to add `BufReader` and read byte-by-byte.
dkanus
commented
Did some tests with ue-server. As of now, buffer size does not make a difference in speed, since bottleneck is ue-server's side by far. And it's unlikely that situation will change even with multiple ue-servers connecting to Avarice. Did some tests with ue-server. As of now, buffer size does not make a difference in speed, since bottleneck is ue-server's side by far. And it's unlikely that situation will change even with multiple ue-servers connecting to Avarice.
|
|||||||
|
pub fn next(&mut self) -> Option<(&mut Link, AvariceMessage)> {
|
||||||
Ggg_123
commented
Confusing name, since it's not related to https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next, which is very common. Confusing name, since it's not related to https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next, which is very common.
|
|||||||
|
loop {
|
||||||
dkanus marked this conversation as resolved
Outdated
Ggg_123
commented
"For converting byte stream expected to be generated"? "For converting byte stream expected to be generated"?
|
|||||||
|
match self.receiver.recv() {
|
||||||
|
Ok(NetworkMessage::ConnectionEstablished(ue_server_address, writing_stream)) => {
|
||||||
dkanus marked this conversation as resolved
Outdated
Ggg_123
commented
> 4 bytes
u32? i32? BEi32? LEi32?
|
|||||||
|
// If `ue_server_address` was already present in `self.connected_links`
|
||||||
|
// hash map, then it means we have failed to clean it up after it
|
||||||
|
// has disconnected. We can just throw away the old value here.
|
||||||
|
self.connected_links.insert(
|
||||||
dkanus marked this conversation as resolved
Outdated
Ggg_123
commented
buffer => length_buffer, since it's used only for storing length bytes buffer => length_buffer, since it's used only for storing length bytes
dkanus
commented
Agreed Agreed
|
|||||||
|
ue_server_address,
|
||||||
|
Link {
|
||||||
|
ue_server_address,
|
||||||
|
writing_stream,
|
||||||
|
writer: MessageWriter::new(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
continue;
|
||||||
Ggg_123
commented
Maybe remove continue? Maybe remove continue?
|
|||||||
|
}
|
||||||
dkanus marked this conversation as resolved
Outdated
Ggg_123
commented
with_capactiy? with_capactiy?
Ggg_123
commented
+ // will be recreated with with_capacity in push()
|
|||||||
|
Ok(NetworkMessage::ConnectionLost(ue_server_address)) => {
|
||||||
dkanus marked this conversation as resolved
Outdated
Ggg_123
commented
with_capactiy? with_capactiy?
|
|||||||
|
self.connected_links.remove(&ue_server_address);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Ok(NetworkMessage::InvalidDataReceived(ue_server_address)) => {
|
||||||
|
self.connected_links.remove(&ue_server_address);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Ok(NetworkMessage::UEReceivedUpdate(ue_server_address, ue_received_bytes)) => {
|
||||||
|
if let Some(link) = self.connected_links.get_mut(&ue_server_address) {
|
||||||
Ggg_123
commented
Missing else? Condition seems important. Missing else? Condition seems important.
|
|||||||
|
link.update_ue_received_bytes(ue_received_bytes)
|
||||||
|
}
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Ok(NetworkMessage::MessageReceived(ue_server_address, message)) => {
|
||||||
|
// Not having a link with key `ue_server_address` should be impossible here
|
||||||
Ggg_123
commented
Why? Why?
|
|||||||
|
return self
|
||||||
|
.connected_links
|
||||||
|
.get_mut(&ue_server_address)
|
||||||
|
.and_then(|x| Some((x, message)));
|
||||||
|
}
|
||||||
|
_ => return None,
|
||||||
|
}
|
||||||
Ggg_123 marked this conversation as resolved
Outdated
Ggg_123
commented
> << 8
wtf
|
|||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ggg_123 marked this conversation as resolved
Outdated
Ggg_123
commented
> as u64;
wtf
|
|||||||
|
impl Link {
|
||||||
|
pub fn send(&mut self, message: AvariceMessage) {
|
||||||
|
self.writer.push(&message.to_string());
|
||||||
|
self.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn socket_address(&self) -> SocketAddr {
|
||||||
|
self.ue_server_address
|
||||||
|
}
|
||||||
|
|
||||||
|
fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) {
|
||||||
Ggg_123
commented
Why is it a separate function, and not inside send/flush? Why is it a separate function, and not inside send/flush?
When should I call it?
|
|||||||
|
self.writer.update_ue_received_bytes(ue_received_bytes);
|
||||||
|
self.flush();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn flush(&mut self) {
|
||||||
|
if let Some(bytes) = self.writer.try_pop() {
|
||||||
|
self.writing_stream.write_all(&bytes).unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Creates a new `AvariceServer` that will listen for ue-server connections on the specified port.
|
||||||
Ggg_123
commented
// todo - add IP support, since 0.0.0.0 is subjective (make it default, but changeable) // todo - add IP support, since 0.0.0.0 is subjective (make it default, but changeable)
|
|||||||
|
pub fn start_avarice(port: u16) -> Result<AvariceServer, Box<dyn Error>> {
|
||||||
dkanus marked this conversation as resolved
Outdated
Ggg_123
commented
"1 as usize" => "1usize" "1 as usize" => "1usize"
Or just remove.
|
|||||||
|
let (sender, receiver) = channel();
|
||||||
dkanus marked this conversation as resolved
Outdated
Ggg_123
commented
Move this line below error checking, since it contains early exit Move this line below error checking, since it contains early exit
dkanus
commented
Agreed. Agreed.
|
|||||||
|
run_server(port, sender)?;
|
||||||
|
Ok(AvariceServer {
|
||||||
|
connected_links: HashMap::new(),
|
||||||
|
receiver,
|
||||||
|
})
|
||||||
|
}
|
106
src/link/network.rs
Normal file
@ -0,0 +1,106 @@
|
|||||||
|
//! Implements a network model where messages from all the ue-servers are collected in a single
|
||||||
|
//! main thread. For that we spawn a new thread that listens for new connections, which in turn
|
||||||
|
//! spawns a new thread for every connected ue-server to handle reading data from it.
|
||||||
|
//! Since all reading is handled in ue-servers' own threads, to collect messages they have
|
||||||
|
//! received in the main thread we use `std::sync::mpsc::Sender`. Conversely, all the writing to
|
||||||
|
//! ue-server is handled in the main thread itself. Writing `TcpStream` is sent to the main thread
|
||||||
Ggg_123
commented
Writing Writing `TcpStream` is sent - rewrite? No idea what that means.
|
|||||||
|
//! by the same `std::sync::mpsc::Sender` object.
|
||||||
|
use super::message::AvariceMessage;
|
||||||
|
pub use super::reader::MessageReader;
|
||||||
|
use std::error::Error;
|
||||||
|
use std::io::Read;
|
||||||
|
use std::net::{SocketAddr, TcpListener, TcpStream};
|
||||||
|
use std::sync::mpsc::Sender;
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
pub struct UEConnection {
|
||||||
|
pub address: SocketAddr,
|
||||||
|
pub reader: MessageReader,
|
||||||
|
pub reading_stream: TcpStream,
|
||||||
|
pub message_sender: Sender<NetworkMessage>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Possible messages to the main thread
|
||||||
|
pub enum NetworkMessage {
|
||||||
|
ConnectionEstablished(SocketAddr, TcpStream),
|
||||||
|
InvalidDataReceived(SocketAddr),
|
||||||
|
ConnectionLost(SocketAddr),
|
||||||
|
MessageReceived(SocketAddr, AvariceMessage),
|
||||||
|
UEReceivedUpdate(SocketAddr, u64),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn run_server(port: u16, message_sender: Sender<NetworkMessage>) -> Result<(), Box<dyn Error>> {
|
||||||
|
let address = SocketAddr::from(([0, 0, 0, 0], port));
|
||||||
|
let listener = TcpListener::bind(address)?;
|
||||||
|
thread::spawn(move || loop {
|
||||||
|
// Listen to new (multiple) connection
|
||||||
|
let (reading_stream, address) = listener.accept().unwrap();
|
||||||
|
let writing_stream = reading_stream.try_clone().unwrap();
|
||||||
|
message_sender
|
||||||
|
.send(NetworkMessage::ConnectionEstablished(
|
||||||
|
address,
|
||||||
|
writing_stream,
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
// On connection - spawn a new thread
|
||||||
|
let sender_clone = message_sender.clone();
|
||||||
|
thread::spawn(move || {
|
||||||
|
manage_connection(UEConnection {
|
||||||
|
reader: MessageReader::new(),
|
||||||
|
message_sender: sender_clone,
|
||||||
|
reading_stream,
|
||||||
|
address,
|
||||||
|
})
|
||||||
|
});
|
||||||
|
});
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn manage_connection(mut connection: UEConnection) {
|
||||||
|
let mut buffer = [0; 1024];
|
||||||
|
loop {
|
||||||
|
// Reading cycle
|
||||||
|
match connection.reading_stream.read(&mut buffer) {
|
||||||
|
Ok(n) => connection.reader.push(&buffer[..n]).unwrap(),
|
||||||
|
_ => {
|
||||||
|
connection
|
||||||
|
.message_sender
|
||||||
|
.send(NetworkMessage::ConnectionLost(connection.address))
|
||||||
|
.unwrap();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if connection.reader.is_broken() {
|
||||||
|
connection
|
||||||
|
.message_sender
|
||||||
|
.send(NetworkMessage::InvalidDataReceived(connection.address))
|
||||||
|
.unwrap();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Decoding cycle
|
||||||
|
while let Some(text_message) = connection.reader.pop() {
|
||||||
|
if let Some(avarice_message) = AvariceMessage::from(&text_message) {
|
||||||
|
connection
|
||||||
|
.message_sender
|
||||||
|
.send(NetworkMessage::MessageReceived(
|
||||||
|
connection.address,
|
||||||
|
avarice_message,
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
} else {
|
||||||
|
connection
|
||||||
|
.message_sender
|
||||||
|
.send(NetworkMessage::InvalidDataReceived(connection.address))
|
||||||
|
.unwrap();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
connection
|
||||||
|
.message_sender
|
||||||
|
.send(NetworkMessage::UEReceivedUpdate(
|
||||||
|
connection.address,
|
||||||
|
connection.reader.ue_received_bytes(),
|
||||||
|
))
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
}
|
321
src/link/reader.rs
Normal file
@ -0,0 +1,321 @@
|
|||||||
|
//! Implements reader that receives data from UE2 game server.
|
||||||
|
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
use std::str;
|
||||||
|
|
||||||
|
extern crate custom_error;
|
||||||
|
use custom_error::custom_error;
|
||||||
|
|
||||||
|
// Defines how many bytes is used to encode "AMOUNT" field in the response from ue-server about
|
||||||
|
// amount of bytes it received since the last update
|
||||||
|
const UE_RECEIVED_FIELD_SIZE: usize = 2;
|
||||||
|
// Defines how many bytes is used to encode "LENGTH" field, describing length of
|
||||||
|
// next JSON message from ue-server
|
||||||
|
const UE_LENGTH_FIELD_SIZE: usize = 4;
|
||||||
|
// Arbitrary value indicating that next byte sequence from ue-server reports amount of bytes
|
||||||
|
// received by that server so far.
|
||||||
|
const HEAD_UE_RECEIVED: u8 = 85;
|
||||||
|
// Arbitrary value indicating that next byte sequence from ue-server contains JSON message.
|
||||||
|
const HEAD_UE_MESSAGE: u8 = 42;
|
||||||
|
// Maximum allowed size of JSON message sent from ue-server.
|
||||||
|
const MAX_UE_MESSAGE_LENGTH: usize = 25 * 1024 * 1024;
|
||||||
|
|
||||||
|
custom_error! { pub ReadingStreamError
|
||||||
|
InvalidHead{input: u8} = "Invalid byte used as a HEAD: {input}",
|
||||||
|
MessageTooLong{length: usize} = "Message to receive is too long: {length}",
|
||||||
|
InvalidUnicode = "Invalid utf-8 was received",
|
||||||
|
BrokenStream = "Used stream is broken"
|
||||||
|
}
|
||||||
|
|
||||||
|
enum ReadingState {
|
||||||
|
Head,
|
||||||
|
ReceivedBytes,
|
||||||
|
Length,
|
||||||
|
Payload,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// For converting byte stream that is expected from the ue-server into actual messages.
|
||||||
|
///
|
||||||
|
/// Expected format is a sequence of either:
|
||||||
|
///
|
||||||
|
/// | Data | Length |
|
||||||
|
/// |---------|---------|
|
||||||
|
/// | [`HEAD_UE_RECEIVED`] (marker byte) | 1 byte |
|
||||||
|
/// | Amount of bytes received by ue-server since last update | 2 bytes: u16 BE|
|
||||||
|
///
|
||||||
|
/// or
|
||||||
|
///
|
||||||
|
/// | Data | Length |
|
||||||
|
/// |---------|---------|
|
||||||
|
/// | [`HEAD_UE_MESSAGE`] (marker byte) | 1 byte|
|
||||||
|
/// | `LENGTH` of the JSON message in utf8 encoding | 4 bytes: u32 BE|
|
||||||
|
/// | UTF8-encoded string | `LENGTH` bytes|
|
||||||
|
///
|
||||||
|
/// On any invalid input enters into a failure state (can be checked by `is_broken()`) and never recovers from it.
|
||||||
|
/// Use either `push_byte()` or `push()` to input byte stream from ue-server and `pop()` to
|
||||||
|
/// retrieve resulting messages.
|
||||||
|
pub struct MessageReader {
|
||||||
|
is_broken: bool,
|
||||||
|
reading_state: ReadingState,
|
||||||
|
read_bytes: usize,
|
||||||
|
length_buffer: [u8; 4],
|
||||||
|
current_message_length: usize,
|
||||||
|
current_message: Vec<u8>,
|
||||||
|
read_messages: VecDeque<String>,
|
||||||
|
ue_received_bytes: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MessageReader {
|
||||||
|
pub fn new() -> MessageReader {
|
||||||
|
MessageReader {
|
||||||
|
is_broken: false,
|
||||||
|
reading_state: ReadingState::Head,
|
||||||
|
read_bytes: 0,
|
||||||
|
length_buffer: [0; 4],
|
||||||
|
current_message_length: 0,
|
||||||
|
// Will be recreated with `with_capacity` in `push_byte()`
|
||||||
|
current_message: Vec::new(),
|
||||||
|
// This value should be more than enough for typical use
|
||||||
|
read_messages: VecDeque::with_capacity(100),
|
||||||
|
ue_received_bytes: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn push_byte(&mut self, input: u8) -> Result<(), ReadingStreamError> {
|
||||||
|
if self.is_broken {
|
||||||
|
return Err(ReadingStreamError::BrokenStream);
|
||||||
|
}
|
||||||
|
match &self.reading_state {
|
||||||
|
ReadingState::Head => {
|
||||||
|
if input == HEAD_UE_RECEIVED {
|
||||||
|
self.change_state(ReadingState::ReceivedBytes);
|
||||||
|
} else if input == HEAD_UE_MESSAGE {
|
||||||
|
self.change_state(ReadingState::Length);
|
||||||
|
} else {
|
||||||
|
self.is_broken = true;
|
||||||
|
return Err(ReadingStreamError::InvalidHead { input });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ReadingState::ReceivedBytes => {
|
||||||
|
self.length_buffer[self.read_bytes] = input;
|
||||||
|
self.read_bytes += 1;
|
||||||
|
if self.read_bytes >= UE_RECEIVED_FIELD_SIZE {
|
||||||
|
self.ue_received_bytes += u64::from(array_of_u8_to_u16(self.length_buffer));
|
||||||
|
self.change_state(ReadingState::Head);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ReadingState::Length => {
|
||||||
|
self.length_buffer[self.read_bytes] = input;
|
||||||
|
self.read_bytes += 1;
|
||||||
|
if self.read_bytes >= UE_LENGTH_FIELD_SIZE {
|
||||||
|
self.current_message_length = array_of_u8_to_u32(self.length_buffer) as usize;
|
||||||
|
if self.current_message_length > MAX_UE_MESSAGE_LENGTH {
|
||||||
|
self.is_broken = true;
|
||||||
|
return Err(ReadingStreamError::MessageTooLong {
|
||||||
|
length: self.current_message_length,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
self.current_message = Vec::with_capacity(self.current_message_length);
|
||||||
|
self.change_state(ReadingState::Payload);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ReadingState::Payload => {
|
||||||
|
self.current_message.push(input);
|
||||||
|
self.read_bytes += 1;
|
||||||
|
if self.read_bytes >= self.current_message_length {
|
||||||
|
match str::from_utf8(&self.current_message) {
|
||||||
|
Ok(next_message) => self.read_messages.push_front(next_message.to_owned()),
|
||||||
|
_ => {
|
||||||
|
self.is_broken = true;
|
||||||
|
return Err(ReadingStreamError::InvalidUnicode);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
self.current_message.clear();
|
||||||
|
self.current_message_length = 0;
|
||||||
|
self.change_state(ReadingState::Head);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn push(&mut self, input: &[u8]) -> Result<(), ReadingStreamError> {
|
||||||
|
for &byte in input {
|
||||||
|
self.push_byte(byte)?;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn pop(&mut self) -> Option<String> {
|
||||||
Ggg_123
commented
Confusing - you have push(add N bytes), push_byte(add 1 byte) and pop(get 1 Confusing - you have push(add N bytes), push_byte(add 1 byte) and pop(get 1 ~~byte~~ message).
|
|||||||
|
self.read_messages.pop_back()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn ue_received_bytes(&self) -> u64 {
|
||||||
|
self.ue_received_bytes
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_broken(&self) -> bool {
|
||||||
Ggg_123
commented
Why a method? Why a method?
|
|||||||
|
self.is_broken
|
||||||
|
}
|
||||||
|
|
||||||
|
fn change_state(&mut self, next_state: ReadingState) {
|
||||||
|
self.read_bytes = 0;
|
||||||
|
self.reading_state = next_state;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn array_of_u8_to_u16(bytes: [u8; 4]) -> u16 {
|
||||||
|
(u16::from(bytes[0]) << 8) + u16::from(bytes[1])
|
||||||
|
}
|
||||||
|
|
||||||
|
fn array_of_u8_to_u32(bytes: [u8; 4]) -> u32 {
|
||||||
|
(u32::from(bytes[0]) << 24)
|
||||||
|
+ (u32::from(bytes[1]) << 16)
|
||||||
|
+ (u32::from(bytes[2]) << 8)
|
||||||
|
+ (u32::from(bytes[3]))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn message_push_byte() {
|
||||||
|
let mut reader = MessageReader::new();
|
||||||
|
reader.push_byte(HEAD_UE_MESSAGE).unwrap();
|
||||||
|
reader.push_byte(0).unwrap();
|
||||||
|
reader.push_byte(0).unwrap();
|
||||||
|
reader.push_byte(0).unwrap();
|
||||||
|
reader.push_byte(13).unwrap();
|
||||||
|
reader.push_byte(b'H').unwrap();
|
||||||
|
reader.push_byte(b'e').unwrap();
|
||||||
|
reader.push_byte(b'l').unwrap();
|
||||||
|
reader.push_byte(b'l').unwrap();
|
||||||
|
reader.push_byte(b'o').unwrap();
|
||||||
|
reader.push_byte(b',').unwrap();
|
||||||
|
reader.push_byte(b' ').unwrap();
|
||||||
|
reader.push_byte(b'w').unwrap();
|
||||||
|
reader.push_byte(b'o').unwrap();
|
||||||
|
reader.push_byte(b'r').unwrap();
|
||||||
|
reader.push_byte(b'l').unwrap();
|
||||||
|
reader.push_byte(b'd').unwrap();
|
||||||
|
reader.push_byte(b'!').unwrap();
|
||||||
|
reader.push_byte(HEAD_UE_MESSAGE).unwrap();
|
||||||
|
reader.push_byte(0).unwrap();
|
||||||
|
reader.push_byte(0).unwrap();
|
||||||
|
reader.push_byte(0).unwrap();
|
||||||
|
reader.push_byte(3).unwrap();
|
||||||
|
reader.push_byte(b'Y').unwrap();
|
||||||
|
reader.push_byte(b'o').unwrap();
|
||||||
|
reader.push_byte(b'!').unwrap();
|
||||||
|
assert_eq!(reader.pop().unwrap(), "Hello, world!");
|
||||||
|
assert_eq!(reader.pop().unwrap(), "Yo!");
|
||||||
|
assert_eq!(reader.pop(), None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn received_push_byte() {
|
||||||
|
let mut reader = MessageReader::new();
|
||||||
|
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
|
||||||
|
reader.push_byte(0).unwrap();
|
||||||
|
reader.push_byte(0xf3).unwrap();
|
||||||
|
assert_eq!(reader.ue_received_bytes(), 0xf3);
|
||||||
|
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
|
||||||
|
reader.push_byte(0xb2).unwrap();
|
||||||
|
reader.push_byte(0x04).unwrap();
|
||||||
|
assert_eq!(reader.ue_received_bytes(), 0xb2_f7); // 0xf7 = 0x04 + 0xf3
|
||||||
|
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
|
||||||
|
reader.push_byte(231).unwrap();
|
||||||
|
assert_eq!(reader.ue_received_bytes(), 0xb2_f7);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn mixed_push_byte() {
|
||||||
|
let mut reader = MessageReader::new();
|
||||||
|
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
|
||||||
|
reader.push_byte(0).unwrap();
|
||||||
|
reader.push_byte(0xf3).unwrap();
|
||||||
|
reader.push_byte(HEAD_UE_MESSAGE).unwrap();
|
||||||
|
reader.push_byte(0).unwrap();
|
||||||
|
reader.push_byte(0).unwrap();
|
||||||
|
reader.push_byte(0).unwrap();
|
||||||
|
reader.push_byte(3).unwrap();
|
||||||
|
reader.push_byte(b'Y').unwrap();
|
||||||
|
reader.push_byte(b'o').unwrap();
|
||||||
|
reader.push_byte(b'!').unwrap();
|
||||||
|
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
|
||||||
|
reader.push_byte(0xb2).unwrap();
|
||||||
|
reader.push_byte(0x04).unwrap();
|
||||||
|
assert_eq!(reader.ue_received_bytes(), 0xb2_f7); // 0xf7 = 0x04 + 0xf3
|
||||||
|
assert_eq!(reader.pop().unwrap(), "Yo!");
|
||||||
|
assert_eq!(reader.pop(), None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn pushing_many_bytes_at_once() {
|
||||||
|
let mut reader = MessageReader::new();
|
||||||
|
reader
|
||||||
|
.push(&[
|
||||||
|
HEAD_UE_RECEIVED,
|
||||||
|
0,
|
||||||
|
0xf3,
|
||||||
|
HEAD_UE_MESSAGE,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
3,
|
||||||
|
b'Y',
|
||||||
|
b'o',
|
||||||
|
b'!',
|
||||||
|
HEAD_UE_RECEIVED,
|
||||||
|
0xb2,
|
||||||
|
0x04,
|
||||||
|
])
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(reader.ue_received_bytes(), 0xb2_f7);
|
||||||
|
assert_eq!(reader.pop().unwrap(), "Yo!");
|
||||||
|
assert_eq!(reader.pop(), None);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn generates_error_invalid_head() {
|
||||||
|
let mut reader = MessageReader::new();
|
||||||
|
reader.push_byte(HEAD_UE_RECEIVED).unwrap();
|
||||||
|
reader.push_byte(0).unwrap();
|
||||||
|
reader.push_byte(0xf3).unwrap();
|
||||||
|
assert!(!reader.is_broken());
|
||||||
|
reader
|
||||||
|
.push_byte(25)
|
||||||
|
.expect_err("Testing failing on incorrect HEAD");
|
||||||
|
assert!(reader.is_broken());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn generates_error_message_too_long() {
|
||||||
|
let mut reader = MessageReader::new();
|
||||||
|
let huge_length = MAX_UE_MESSAGE_LENGTH + 1;
|
||||||
|
let bytes = (huge_length as u32).to_be_bytes();
|
||||||
|
|
||||||
|
reader.push_byte(HEAD_UE_MESSAGE).unwrap();
|
||||||
|
reader.push_byte(bytes[0]).unwrap();
|
||||||
|
reader.push_byte(bytes[1]).unwrap();
|
||||||
|
reader.push_byte(bytes[2]).unwrap();
|
||||||
|
assert!(!reader.is_broken());
|
||||||
|
reader
|
||||||
|
.push_byte(bytes[3])
|
||||||
|
.expect_err("Testing failing on exceeding allowed message length");
|
||||||
|
assert!(reader.is_broken());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn generates_error_invalid_unicode() {
|
||||||
|
let mut reader = MessageReader::new();
|
||||||
|
reader.push_byte(HEAD_UE_MESSAGE).unwrap();
|
||||||
|
reader.push_byte(0).unwrap();
|
||||||
|
reader.push_byte(0).unwrap();
|
||||||
|
reader.push_byte(0).unwrap();
|
||||||
|
reader.push_byte(2).unwrap();
|
||||||
|
reader.push_byte(0b11010011).unwrap(); // start of 2-byte sequence
|
||||||
|
assert!(!reader.is_broken());
|
||||||
|
// Bytes inside multi-byte code point have to have `1` for their high bit
|
||||||
|
reader
|
||||||
|
.push_byte(0b01010011)
|
||||||
|
.expect_err("Testing failing on incorrect unicode");
|
||||||
|
assert!(reader.is_broken());
|
||||||
|
}
|
169
src/link/writer.rs
Normal file
@ -0,0 +1,169 @@
|
|||||||
|
//! Implements writer that sends data to UE2 game server.
|
||||||
|
|
||||||
|
use std::cmp::{max, min};
|
||||||
|
use std::collections::VecDeque;
|
||||||
|
use std::convert::TryFrom;
|
||||||
|
use std::iter::Extend;
|
||||||
|
|
||||||
|
// Maximum amount of bytes ue-server is able to receive at once
|
||||||
|
const UE_INPUT_BUFFER: usize = 4095;
|
||||||
|
|
||||||
|
/// For converting text messages into chunks of bytes that can be sent to the ue-server.
|
||||||
|
///
|
||||||
|
/// Every string message is converted into a length-prefixed array of utf8 bytes:
|
||||||
|
///
|
||||||
|
/// | Data | Length |
|
||||||
|
/// |---------|---------|
|
||||||
|
/// | Message `LENGTH` | 4 bytes: u32 BE |
|
||||||
|
/// | UTF8-encoded string | `LENGTH` bytes|
|
||||||
|
///
|
||||||
|
/// Resulting byte sequences from all messages are then concatenated (in the same order as they were
|
||||||
dkanus marked this conversation as resolved
Outdated
Ggg_123
commented
Elaborate => "Conversion happens in two steps"? Elaborate => "Conversion happens in two steps"?
dkanus
commented
Changed in upcoming patch. Changed in upcoming patch.
|
|||||||
|
/// "written") into a single data stream. Bytes from the data stream are returned in chunks of
|
||||||
dkanus marked this conversation as resolved
Outdated
Ggg_123
commented
it's => its Can be simplified => Every message is a length-prefixed array of utf8 bytes. it's => its
pre-pended => prepended
"utf8 representation"?
Can be simplified => Every message is a length-prefixed array of utf8 bytes.
dkanus
commented
Fixed in upcoming patch. Fixed in upcoming patch.
|
|||||||
|
/// size no more than `UE_INPUT_BUFFER`. New chunk is returned only when `MessageWriter` knows
|
||||||
|
/// that ue-server's buffer has enough space to accept it.
|
||||||
dkanus marked this conversation as resolved
Outdated
Ggg_123
commented
Remove "in utf8 encoding", because it's wrong and misleading - length is sent as u32, not as string. Remove "in utf8 encoding", because it's wrong and misleading - length is sent as u32, not as string.
|
|||||||
|
///
|
||||||
|
/// Use `push()` to input string messages and `try_pop()` to retrieve next chunk.
|
||||||
dkanus marked this conversation as resolved
Outdated
Ggg_123
commented
the messages => messages the messages => messages
then => is then
"in order" - in what order?
dkanus
commented
Changed in upcoming patch. Changed in upcoming patch.
|
|||||||
|
/// NOTE: `try_pop()` can return `None` even if not all message data has been returned,
|
||||||
|
/// in case `MessageWriter` thinks that ue-server's buffer does not have enough space.
|
||||||
|
///
|
||||||
dkanus marked this conversation as resolved
Outdated
Ggg_123
commented
But Writer doesn't send anything? But Writer doesn't send anything?
dkanus
commented
Fixed in upcoming patch. Fixed in upcoming patch.
|
|||||||
|
/// Call `update_ue_received_bytes()` to update `MessageWriter`'s information about
|
||||||
|
/// how many bytes ue-server has received so far. This can signal that its buffer has enough
|
||||||
|
/// free space. `MessageWriter` assumes that all data returned by its `try_pop()` method is sent
|
||||||
|
/// to ue-server.
|
||||||
|
///
|
||||||
|
/// Use `is_empty()` method to check for whether `MessageWriter` still has some data to return
|
||||||
|
/// (possibly after ``update_ue_received_bytes()`).
|
||||||
|
pub struct MessageWriter {
|
||||||
dkanus marked this conversation as resolved
Outdated
Ggg_123
commented
"has depleted all it's data." => "has successfully sent all data."? "has depleted all it's data." => "has successfully sent all data."?
dkanus
commented
Changed in upcoming patch. Changed in upcoming patch.
|
|||||||
|
sent_bytes: u64,
|
||||||
|
ue_received_bytes: u64,
|
||||||
|
pending_data: VecDeque<u8>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MessageWriter {
|
||||||
|
pub fn new() -> MessageWriter {
|
||||||
|
MessageWriter {
|
||||||
|
sent_bytes: 0,
|
||||||
|
ue_received_bytes: 0,
|
||||||
|
// This value should be more than enough for typical use:
|
||||||
|
// it will take at least one second to send this much data to a 30 tick rate ue-server.
|
||||||
|
pending_data: VecDeque::with_capacity(30 * UE_INPUT_BUFFER),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn push(&mut self, message: &str) {
|
||||||
|
let message_as_utf8 = message.as_bytes();
|
||||||
|
let message_as_utf8 = [
|
||||||
|
&(message_as_utf8.len() as u32).to_be_bytes(),
|
||||||
|
message_as_utf8,
|
||||||
|
]
|
||||||
|
.concat();
|
||||||
|
self.pending_data.extend(message_as_utf8.into_iter());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn try_pop(&mut self) -> Option<Vec<u8>> {
|
||||||
Ggg_123
commented
Confusing name. Confusing name.
Reader has `pop(&mut self) -> Option<String>`
Writer has `try_pop(&mut self) -> Option<Vec<u8>>`
|
|||||||
|
if self.is_empty() {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let chunk_size = min(self.available_ue_buffer_capacity(), self.pending_data.len());
|
||||||
|
if chunk_size == 0 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let mut bytes_to_send = Vec::with_capacity(chunk_size);
|
||||||
|
for next_byte in self.pending_data.drain(..chunk_size) {
|
||||||
|
bytes_to_send.push(next_byte);
|
||||||
|
}
|
||||||
|
self.sent_bytes += bytes_to_send.len() as u64;
|
||||||
|
Some(bytes_to_send)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_empty(&self) -> bool {
|
||||||
|
self.pending_data.is_empty()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Takes total amount of bytes received so far by the ue-server, not just bytes received after
|
||||||
|
/// the last `update_ue_received_bytes()` call.
|
||||||
|
pub fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) {
|
||||||
|
self.ue_received_bytes = max(ue_received_bytes, self.ue_received_bytes);
|
||||||
|
}
|
||||||
Ggg_123
commented
Missing +=? Missing +=?
dkanus
commented
It passes to It passes to `MessageWriter` the whole amount of bytes passed so far, not just after the last update. This way it's harder to mess up.
|
|||||||
|
|
||||||
|
fn available_ue_buffer_capacity(&self) -> usize {
|
||||||
|
match usize::try_from(self.sent_bytes - self.ue_received_bytes).ok() {
|
||||||
|
Some(ue_buffered_bytes) => max(0, UE_INPUT_BUFFER - ue_buffered_bytes),
|
||||||
Ggg_123
commented
Seems complicated - why? Seems complicated - why?
dkanus
commented
I don't actually think this is something that can happen, but this seemed like a most straitforward way to do it. Whole function is, like, 3-4 loc, so I didn't saw it as comlplicated. `self.sent_bytes` and `self.ue_received_bytes` are u64 and `usize` can be `u32`. In case we have fucked up - difference between them can get huge, this accounts for that.
I don't actually think this is something that can happen, but this seemed like a most straitforward way to do it. Whole function is, like, 3-4 loc, so I didn't saw it as comlplicated.
|
|||||||
|
_ => 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn writer_contents_after_creation() {
|
||||||
|
let writer = MessageWriter::new();
|
||||||
|
assert_eq!(writer.is_empty(), true);
|
||||||
|
assert_eq!(writer.available_ue_buffer_capacity(), UE_INPUT_BUFFER);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn writer_content_after_push() {
|
||||||
|
let mut writer = MessageWriter::new();
|
||||||
|
writer.push("Hello, world!");
|
||||||
|
assert_eq!(writer.is_empty(), false);
|
||||||
|
assert_eq!(writer.available_ue_buffer_capacity(), UE_INPUT_BUFFER);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn writing_single_short_message() {
|
||||||
|
let mut writer = MessageWriter::new();
|
||||||
|
writer.push("Hello, world!");
|
||||||
|
let resulting_bytes = writer.try_pop().unwrap();
|
||||||
|
let expected_bytes = [
|
||||||
|
0, 0, 0, 13, // Bytes in the message
|
||||||
|
b'H', b'e', b'l', b'l', b'o', b',', b' ', b'w', b'o', b'r', b'l', b'd', b'!',
|
||||||
|
];
|
||||||
|
assert_eq!(writer.is_empty(), true);
|
||||||
|
assert_eq!(
|
||||||
|
writer.available_ue_buffer_capacity(),
|
||||||
|
UE_INPUT_BUFFER - "Hello, world!".len() - 4
|
||||||
|
);
|
||||||
|
assert_eq!(resulting_bytes, expected_bytes);
|
||||||
|
assert_eq!(writer.sent_bytes, expected_bytes.len() as u64);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn writing_first_chunk_of_single_long_message() {
|
||||||
|
let mut writer = MessageWriter::new();
|
||||||
|
// Because we also have to pass message length, this will go over the sending limit
|
||||||
|
let long_message = "Q".repeat(UE_INPUT_BUFFER);
|
||||||
|
writer.push(&long_message);
|
||||||
|
let resulting_bytes = writer.try_pop().unwrap();
|
||||||
|
assert_eq!(writer.is_empty(), false);
|
||||||
|
assert_eq!(resulting_bytes.len(), UE_INPUT_BUFFER);
|
||||||
|
assert_eq!(writer.available_ue_buffer_capacity(), 0);
|
||||||
|
// Bytes in message = 4095 = 0x0fff
|
||||||
|
assert_eq!(resulting_bytes[0], 0);
|
||||||
|
assert_eq!(resulting_bytes[1], 0);
|
||||||
|
assert_eq!(resulting_bytes[2], 0x0f);
|
||||||
|
assert_eq!(resulting_bytes[3], 0xff);
|
||||||
|
for &byte in resulting_bytes[4..].iter() {
|
||||||
|
assert_eq!(byte, b'Q');
|
||||||
|
}
|
||||||
|
assert_eq!(writer.try_pop(), None);
|
||||||
|
assert_eq!(writer.is_empty(), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn writing_second_chunk_of_single_long_message() {
|
||||||
|
let mut writer = MessageWriter::new();
|
||||||
|
// Because we also have to pass lengths, this will go over the sending limit
|
||||||
|
let long_message = "Q".repeat(UE_INPUT_BUFFER);
|
||||||
|
writer.push(&long_message);
|
||||||
|
// This pops all but 4 bytes of `long_message`, that were required to encode message length
|
||||||
|
let first_bytes = writer.try_pop().unwrap();
|
||||||
|
writer.update_ue_received_bytes(first_bytes.len() as u64);
|
||||||
|
let resulting_bytes = writer.try_pop().unwrap();
|
||||||
|
assert_eq!(
|
||||||
|
writer.available_ue_buffer_capacity(),
|
||||||
|
UE_INPUT_BUFFER - resulting_bytes.len()
|
||||||
|
);
|
||||||
|
assert_eq!(writer.is_empty(), true);
|
||||||
|
// Bytes left for the next chunk = 4
|
||||||
|
assert_eq!(resulting_bytes, [b'Q', b'Q', b'Q', b'Q'])
|
||||||
|
}
|
19
src/main.rs
@ -1,13 +1,14 @@
|
|||||||
use std::env;
|
// ! This utility provides a way to communicate with Unreal Engine 2 servers running Acedia mutator.
|
||||||
use std::path::Path;
|
mod link;
|
||||||
mod unreal_config;
|
use link::start_avarice;
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
let args: Vec<String> = env::args().collect();
|
let mut server = start_avarice(1234).unwrap();
|
||||||
let filename = &args[1];
|
while let Some((link, message)) = server.next() {
|
||||||
let config = unreal_config::load_file(Path::new(filename));
|
println!("{}: {}", link.socket_address(), message.to_string());
|
||||||
match config {
|
if message.message_type != "end" {
|
||||||
Ok(config) => print!("{}", config),
|
link.send(message);
|
||||||
_ => (),
|
}
|
||||||
}
|
}
|
||||||
|
println!("Avarice has shut down!");
|
||||||
}
|
}
|
||||||
|
No newline at end of file