Add network link to ue-server implementation #13
No reviewers
Labels
No Label
bug
duplicate
enhancement
help wanted
invalid
question
wontfix
No Milestone
No Assignees
2 Participants
Notifications
Due Date
No due date set.
Dependencies
No dependencies set.
Reference: dkanus/Avarice#13
Loading…
Reference in New Issue
Block a user
No description provided.
Delete Branch "feature_link"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
I've decided that it would be a good idea to separete
compoments that transform text messages into byte chunks to send to
Unreal Engine and back. This means that each of them will have only one
responsibility and using them will be easier in case we decide to read
and write via tcp/ip in separate threads. The only interesting to them
both parameter is the amount of bytes that were confirmed to be received
from the Unreal Engine side, which can be easily shared "by hand".
This patch implements "reader" that accepts byte stream expected from
the game server and converts it into:
MessageReader
implementation 0a86852197@ -0,0 +5,4 @@
use custom_error::custom_error;
const RECEIVED_FIELD_SIZE: usize = 4;
const LENGTH_FIELD_SIZE: usize = 4;
Wtf
replace with array[u8;4] and
pub fn array_of_u8_to_u32(bytes: [u8; 4]) -> u32 {
(u32::from(bytes[0]) << 24)
+ (u32::from(bytes[1]) << 16)
+ (u32::from(bytes[2]) << 8)
+ (u32::from(bytes[3]))
}
@ -0,0 +7,4 @@
const RECEIVED_FIELD_SIZE: usize = 4;
const LENGTH_FIELD_SIZE: usize = 4;
const HEAD_RECEIVED: u8 = 85;
const HEAD_MESSAGE: u8 = 42;
Wtf
@ -0,0 +8,4 @@
const LENGTH_FIELD_SIZE: usize = 4;
const HEAD_RECEIVED: u8 = 85;
const HEAD_MESSAGE: u8 = 42;
const MAX_MESSAGE_LENGTH: usize = 0xA00000;
Wtf
@ -0,0 +35,4 @@
received_bytes: u64,
}
/// For converting byte stream expected to be generated by Acedia mod from the game server into
"For converting byte stream expected to be generated"?
@ -0,0 +37,4 @@
/// For converting byte stream expected to be generated by Acedia mod from the game server into
/// actual messages. Expected format is a sequence of either:
/// [HEAD_RECEIVED: 1 byte] [amount of bytes received by game server since last update: 4 bytes]
@ -0,0 +50,4 @@
reading_state: ReadingState::Head,
read_bytes: 0,
current_message_length: 0,
current_message: Vec::new(),
with_capactiy?
@ -0,0 +51,4 @@
read_bytes: 0,
current_message_length: 0,
current_message: Vec::new(),
read_messages: VecDeque::new(),
with_capactiy?
@ -0,0 +73,4 @@
}
}
ReadingState::ReceivedBytes => {
self.next_received_bytes = self.next_received_bytes << 8;
@ -0,0 +77,4 @@
self.next_received_bytes += input as u32;
self.read_bytes += 1;
if self.read_bytes >= RECEIVED_FIELD_SIZE {
self.received_bytes += self.next_received_bytes as u64;
@ -0,0 +101,4 @@
}
ReadingState::Payload => {
self.current_message.push(input);
self.read_bytes += 1 as usize;
"1 as usize" => "1usize"
Or just remove.
@ -0,0 +31,4 @@
current_message_length: usize,
current_message: Vec<u8>,
read_messages: VecDeque<String>,
next_received_bytes: u32,
Unclear meaning.
rename to ue_received_bytes
@ -0,0 +148,4 @@
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(13).unwrap();
reader.push_byte(0x48).unwrap(); // H
0x48 => b'H'
@ -0,0 +149,4 @@
reader.push_byte(0).unwrap();
reader.push_byte(13).unwrap();
reader.push_byte(0x48).unwrap(); // H
reader.push_byte(0x65).unwrap(); // e
0x65 => b'e'
etc
@ -0,0 +188,4 @@
reader.push_byte(25).unwrap();
reader.push_byte(178).unwrap();
reader.push_byte(4).unwrap();
assert_eq!(reader.received_bytes(), 1092203255);
1092203255 bytes received?! wtf
MessageReader
ba3ac088ddMessageReader
documentation b846fcf55bMessageReader
to usewith_capacity
0dedd1d1f1MessageReader
for clarity af3341c1e7@ -0,0 +21,4 @@
const MAX_UE_MESSAGE_LENGTH: usize = 25 * 1024 * 1024;
// We do not expect to receive more that this much messages at once from ue-server
const EXPECTED_LIMIT_TO_UE_MESSAGES: usize = 100;
with_capacity != limit.
Should it really be a constant?
I'd just inline it.
It is expected limit that we use as a capacity.
But upon further consideration I agree that there is no sense in defining this as a constant.
@ -0,0 +11,4 @@
// next JSON message from ue-server
const UE_LENGTH_FIELD_SIZE: usize = 4;
// Value indicating that next byte sequence from ue-server reports amount of bytes received by
// that server so far. Value itself is arbitrary.
Remove " Value itself is arbitrary", change to "Arbitrary value indicating that ..."?
@ -0,0 +41,4 @@
is_broken: bool,
reading_state: ReadingState,
read_bytes: usize,
buffer: [u8; 4],
buffer => length_buffer, since it's used only for storing length bytes
Agreed
@ -0,0 +212,4 @@
reader.push_byte(25).unwrap();
reader.push_byte(178).unwrap();
reader.push_byte(4).unwrap();
assert_eq!(reader.ue_received_bytes(), 1092203255);
Replace with:
reader.push_byte(0xf3).unwrap();
...
reader.push_byte(0x41).unwrap();
reader.push_byte(0x19).unwrap();
reader.push_byte(0xb2).unwrap();
reader.push_byte(0x04).unwrap();
assert_eq!(reader.ue_received_bytes(), 0x41_19_b2_f7); // 0xf7 = 0x04 + 0xf3
Because otherwise this is "magic numbers wtf".
Woa, that's neat. Agreed.
Suggestion: remove UE_RECEIVED_FIELD_SIZE & UE_LENGTH_FIELD_SIZE.
They're used only in 1 place, they cannot be changed and they should not be changed and their usage is obvious from array_of_u8_to_u32.
Suggestion: run
cargo clippy
& fix warnings.@ -0,0 +102,4 @@
self.read_bytes += 1;
if self.read_bytes >= UE_LENGTH_FIELD_SIZE {
self.current_message_length = array_of_u8_to_u32(self.buffer) as usize;
self.change_state(ReadingState::Payload);
Move this line below error checking, since it contains early exit
Agreed.
@ -0,0 +12,4 @@
const UE_LENGTH_FIELD_SIZE: usize = 4;
// Value indicating that next byte sequence from ue-server reports amount of bytes received by
// that server so far. Value itself is arbitrary.
Weird empty line
Using clippy is a great suggestion, but the only warnings it produces regarding files in
src/link/*
are about dead-code. "unreal_config" code might be deleted (moved out into another project), so I don't want to bother with it for now. Applied it's suggestion formain.rs
.In general I am against replacing numeric constants with magic numbers unless it is something super-obvious like
1
or just something used in tests. It's not just about conveniently refactoring code later, but also it's readability.I am not sure about
EXPECTED_LIMIT_TO_UE_MESSAGES
, however, since this value is somewhat arbitrary.MessageReader
contants 202fa39a42buffer
intolength_buffer
f118767e3fpush_byte
's code e3f554218aUE_RECEIVED_FIELD_SIZE
to 2 978a5c4182main.rs
to useif let
construction 5ce511c5a7EXPECTED_LIMIT_TO_UE_MESSAGES
constant 3f660f54d5MessageReader
b187041d9eMessageReader
into a separate file 816454f4cfmod.rs
file with forlink
module f310febe62MessageWriter
implementation 5bfdd61248MessageWriter
034b7860d6[WIP] Add `MessageReader` implementationto [WIP] Add network linkto ue-server implementation[WIP] Add network linkto ue-server implementationto [WIP] Add network link to ue-server implementationMessageWriter
is now a proper implementation.Also have added
Link
implementation that:Currently it exists as a prototype, to have something sijmple working that can actually connect with ue-server, so it doesn't have to be perfect. But any input about it is still welcome.
Question: how to automate connection testing? It should probably be unrelated from database, and, since UnrealScript's testing capabilities are limited, should be handled on the rust side. I can prepare ue-server that connects to us and responds in some fixed manner.
EDIT: used response at this link to help me get started.
@ -0,0 +23,4 @@
let listener = TcpListener::bind(address)?;
let handler = Arc::new(handler);
loop {
// Listen to new (multiple) connection
Can be simplified => // Listen to new connections
@ -0,0 +33,4 @@
let handler_clone = handler.clone();
// On connection - spawn a new thread
thread::spawn(move || loop {
let mut buffer = [0; 1024];
1024? Not 4096?
4096 is limitation on how much we can write, it's unrelated to reading and this is an arbitrary constant. Honestly I don't know what to use, but one option is to add
BufReader
and read byte-by-byte.Did some tests with ue-server. As of now, buffer size does not make a difference in speed, since bottleneck is ue-server's side by far. And it's unlikely that situation will change even with multiple ue-servers connecting to Avarice.
@ -0,0 +24,4 @@
let handler = Arc::new(handler);
loop {
// Listen to new (multiple) connection
let mut reading_stream = listener.accept()?.0;
Probably needs a timeout, to avoid waiting forever:
stream
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();
@ -0,0 +18,4 @@
/// For converting byte stream that is expected from the ue-server into actual messages.
/// Conversion process has two steps:
/// 1. Every string message is converted into it's utf8 representation and is pre-pended with
it's => its
pre-pended => prepended
"utf8 representation"?
Can be simplified => Every message is a length-prefixed array of utf8 bytes.
Fixed in upcoming patch.
@ -0,0 +20,4 @@
/// Conversion process has two steps:
/// 1. Every string message is converted into it's utf8 representation and is pre-pended with
/// it's own length in format:
/// [MESSAGE_LENGTH: length of the message in utf8 encoding | 4 bytes: u32 BE]
Remove "in utf8 encoding", because it's wrong and misleading - length is sent as u32, not as string.
@ -0,0 +22,4 @@
/// it's own length in format:
/// [MESSAGE_LENGTH: length of the message in utf8 encoding | 4 bytes: u32 BE]
/// [MESSAGE: utf8-encoded string | `MESSAGE_LENGTH` bytes]
/// Resulting byte sequences from all the messages then concatenated, in order, into
the messages => messages
then => is then
"in order" - in what order?
Changed in upcoming patch.
@ -0,0 +33,4 @@
/// how many bytes ue-server has received so far.
/// NOTE: `try_pop()` can return `None` even if not all message data has been transferred,
/// in case ue-server's buffer does not have enough space. Use `is_empty()` call to check for
/// whether `MessageWriter` has depleted all it's data.
"has depleted all it's data." => "has successfully sent all data."?
Changed in upcoming patch.
@ -0,0 +84,4 @@
}
pub fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) {
self.ue_received_bytes = max(ue_received_bytes, self.ue_received_bytes);
Missing +=?
It passes to
MessageWriter
the whole amount of bytes passed so far, not just after the last update. This way it's harder to mess up.@ -0,0 +88,4 @@
}
fn available_ue_buffer_capacity(&self) -> usize {
match usize::try_from(self.sent_bytes - self.ue_received_bytes).ok() {
Seems complicated - why?
self.sent_bytes
andself.ue_received_bytes
are u64 andusize
can beu32
. In case we have fucked up - difference between them can get huge, this accounts for that.I don't actually think this is something that can happen, but this seemed like a most straitforward way to do it. Whole function is, like, 3-4 loc, so I didn't saw it as comlplicated.
@ -0,0 +25,4 @@
/// Resulting byte sequences from all the messages then concatenated, in order, into
/// a single data stream.
/// 2. Resulting data stream is then separated into "chunks" that can be accepted by
/// the ue-server (each no longer than `UE_INPUT_BUFFER` in total) and are sent in a format:
But Writer doesn't send anything?
Fixed in upcoming patch.
@ -0,0 +17,4 @@
}
/// For converting byte stream that is expected from the ue-server into actual messages.
/// Conversion process has two steps:
Elaborate => "Conversion happens in two steps"?
Changed in upcoming patch.
MessageWriter
's protocol a6ac0b7a17link
module for use in one thread cfe680d770[WIP] Add network link to ue-server implementationto Add network link to ue-server implementationI think enough is implemented on this branch to merge it once it passes peer review.
I've left a bunch of
unwrap()
s in the code for now - I think it's best to get rid of them later, when we decide on how to log errors.@ -9,1 +9,4 @@
[dependencies]
custom_error="1.*"
serde ="1.*"
serde_json="1.*"
No newline at end of file
@ -0,0 +28,4 @@
f,
"{}",
json!({
"s": self.service,
Maybe expand s/t/p into words?
@ -0,0 +11,4 @@
impl AvariceMessage {
/// Parses JSON form of a message into `AvariceMessage` struct
pub fn from(message_str: &str) -> Option<AvariceMessage> {
Confusing name, since it's not related to https://doc.rust-lang.org/std/convert/trait.From.html#tymethod.from, which is very common.
Suggestion:
Free bonus: more detailed errors.
@ -0,0 +3,4 @@
use serde_json::json;
use std::fmt;
pub struct AvariceMessage {
Any explanation/doc on wtf is service/message_type/parameters?
@ -0,0 +15,4 @@
let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap();
let message_json = message_json.as_object_mut()?;
Some(AvariceMessage {
service: message_json.remove("s")?.as_str()?.to_owned(),
Why "remove"?
@ -0,0 +1,38 @@
//! Implements Avarice message: target service, message type, json parameters
use serde_json;
warning: this import is redundant
--> src\link\message.rs:2:1
|
2 | use serde_json;
| ^^^^^^^^^^^^^^^ help: remove it entirely
@ -0,0 +32,4 @@
impl AvariceServer {
/// Blocks until a new message arrives from one of the ue-servers. Returns a pair of `Link`,
/// corresponding to the ue-server that sent next message and `AvariceMessage`,
sent next message => sent this message?
@ -0,0 +34,4 @@
/// Blocks until a new message arrives from one of the ue-servers. Returns a pair of `Link`,
/// corresponding to the ue-server that sent next message and `AvariceMessage`,
/// representing that message.
pub fn next(&mut self) -> Option<(&mut Link, AvariceMessage)> {
Confusing name, since it's not related to https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next, which is very common.
@ -0,0 +49,4 @@
writer: MessageWriter::new(),
},
);
continue;
Maybe remove continue?
@ -0,0 +60,4 @@
continue;
}
Ok(NetworkMessage::UEReceivedUpdate(ue_server_address, ue_received_bytes)) => {
if let Some(link) = self.connected_links.get_mut(&ue_server_address) {
Missing else? Condition seems important.
@ -0,0 +66,4 @@
continue;
}
Ok(NetworkMessage::MessageReceived(ue_server_address, message)) => {
// Not having a link with key `ue_server_address` should be impossible here
Why?
@ -0,0 +88,4 @@
self.ue_server_address
}
fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) {
Why is it a separate function, and not inside send/flush?
When should I call it?
@ -0,0 +100,4 @@
}
}
/// Creates a new `AvariceServer` that will listen for ue-server connections on the specified port.
// todo - add IP support, since 0.0.0.0 is subjective (make it default, but changeable)
@ -0,0 +3,4 @@
//! spawns a new thread for every connected ue-server to handle reading data from it.
//! Since all reading is handled in ue-servers' own threads, to collect messages they have
//! received in the main thread we use `std::sync::mpsc::Sender`. Conversely, all the writing to
//! ue-server is handled in the main thread itself. Writing `TcpStream` is sent to the main thread
Writing
TcpStream
is sent - rewrite? No idea what that means.@ -0,0 +146,4 @@
Ok(())
}
pub fn pop(&mut self) -> Option<String> {
Confusing - you have push(add N bytes), push_byte(add 1 byte) and pop(get 1
bytemessage).@ -0,0 +154,4 @@
self.ue_received_bytes
}
pub fn is_broken(&self) -> bool {
Why a method?
@ -0,0 +60,4 @@
self.pending_data.extend(message_as_utf8.into_iter());
}
pub fn try_pop(&mut self) -> Option<Vec<u8>> {
Confusing name.
Reader has
pop(&mut self) -> Option<String>
Writer has
try_pop(&mut self) -> Option<Vec<u8>>
Checkout
From your project repository, check out a new branch and test the changes.