Add network link to ue-server implementation #13
Open
dkanus
wants to merge 23 commits from feature_link
into master
Loading…
Reference in new issue
There is no content yet.
Delete Branch 'feature_link'
Deleting a branch is permanent. It CANNOT be undone. Continue?
I've decided that it would be a good idea to separete
compoments that transform text messages into byte chunks to send to
Unreal Engine and back. This means that each of them will have only one
responsibility and using them will be easier in case we decide to read
and write via tcp/ip in separate threads. The only interesting to them
both parameter is the amount of bytes that were confirmed to be received
from the Unreal Engine side, which can be easily shared "by hand".
This patch implements "reader" that accepts byte stream expected from
the game server and converts it into:
use custom_error::custom_error;
const RECEIVED_FIELD_SIZE: usize = 4;
const LENGTH_FIELD_SIZE: usize = 4;
Wtf
replace with array[u8;4] and
pub fn array_of_u8_to_u32(bytes: [u8; 4]) -> u32 {
(u32::from(bytes[0]) << 24)
+ (u32::from(bytes[1]) << 16)
+ (u32::from(bytes[2]) << 8)
+ (u32::from(bytes[3]))
}
const RECEIVED_FIELD_SIZE: usize = 4;
const LENGTH_FIELD_SIZE: usize = 4;
const HEAD_RECEIVED: u8 = 85;
const HEAD_MESSAGE: u8 = 42;
Wtf
const LENGTH_FIELD_SIZE: usize = 4;
const HEAD_RECEIVED: u8 = 85;
const HEAD_MESSAGE: u8 = 42;
const MAX_MESSAGE_LENGTH: usize = 0xA00000;
Wtf
received_bytes: u64,
}
/// For converting byte stream expected to be generated by Acedia mod from the game server into
"For converting byte stream expected to be generated"?
/// For converting byte stream expected to be generated by Acedia mod from the game server into
/// actual messages. Expected format is a sequence of either:
/// [HEAD_RECEIVED: 1 byte] [amount of bytes received by game server since last update: 4 bytes]
reading_state: ReadingState::Head,
read_bytes: 0,
current_message_length: 0,
current_message: Vec::new(),
with_capactiy?
read_bytes: 0,
current_message_length: 0,
current_message: Vec::new(),
read_messages: VecDeque::new(),
with_capactiy?
}
}
ReadingState::ReceivedBytes => {
self.next_received_bytes = self.next_received_bytes << 8;
self.next_received_bytes += input as u32;
self.read_bytes += 1;
if self.read_bytes >= RECEIVED_FIELD_SIZE {
self.received_bytes += self.next_received_bytes as u64;
}
ReadingState::Payload => {
self.current_message.push(input);
self.read_bytes += 1 as usize;
"1 as usize" => "1usize"
Or just remove.
current_message_length: usize,
current_message: Vec<u8>,
read_messages: VecDeque<String>,
next_received_bytes: u32,
Unclear meaning.
rename to ue_received_bytes
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(13).unwrap();
reader.push_byte(0x48).unwrap(); // H
0x48 => b'H'
reader.push_byte(0).unwrap();
reader.push_byte(13).unwrap();
reader.push_byte(0x48).unwrap(); // H
reader.push_byte(0x65).unwrap(); // e
0x65 => b'e'
etc
reader.push_byte(25).unwrap();
reader.push_byte(178).unwrap();
reader.push_byte(4).unwrap();
assert_eq!(reader.received_bytes(), 1092203255);
1092203255 bytes received?! wtf
const MAX_UE_MESSAGE_LENGTH: usize = 25 * 1024 * 1024;
// We do not expect to receive more that this much messages at once from ue-server
const EXPECTED_LIMIT_TO_UE_MESSAGES: usize = 100;
with_capacity != limit.
Should it really be a constant?
I'd just inline it.
It is expected limit that we use as a capacity.
But upon further consideration I agree that there is no sense in defining this as a constant.
// next JSON message from ue-server
const UE_LENGTH_FIELD_SIZE: usize = 4;
// Value indicating that next byte sequence from ue-server reports amount of bytes received by
// that server so far. Value itself is arbitrary.
Remove " Value itself is arbitrary", change to "Arbitrary value indicating that ..."?
is_broken: bool,
reading_state: ReadingState,
read_bytes: usize,
buffer: [u8; 4],
buffer => length_buffer, since it's used only for storing length bytes
Agreed
reader.push_byte(25).unwrap();
reader.push_byte(178).unwrap();
reader.push_byte(4).unwrap();
assert_eq!(reader.ue_received_bytes(), 1092203255);
Replace with:
reader.push_byte(0xf3).unwrap();
...
reader.push_byte(0x41).unwrap();
reader.push_byte(0x19).unwrap();
reader.push_byte(0xb2).unwrap();
reader.push_byte(0x04).unwrap();
assert_eq!(reader.ue_received_bytes(), 0x41_19_b2_f7); // 0xf7 = 0x04 + 0xf3
Because otherwise this is "magic numbers wtf".
Woa, that's neat. Agreed.
Suggestion: remove UE_RECEIVED_FIELD_SIZE & UE_LENGTH_FIELD_SIZE.
They're used only in 1 place, they cannot be changed and they should not be changed and their usage is obvious from array_of_u8_to_u32.
Suggestion: run
cargo clippy
& fix warnings.self.read_bytes += 1;
if self.read_bytes >= UE_LENGTH_FIELD_SIZE {
self.current_message_length = array_of_u8_to_u32(self.buffer) as usize;
self.change_state(ReadingState::Payload);
Move this line below error checking, since it contains early exit
Agreed.
const UE_LENGTH_FIELD_SIZE: usize = 4;
// Value indicating that next byte sequence from ue-server reports amount of bytes received by
// that server so far. Value itself is arbitrary.
Weird empty line
Using clippy is a great suggestion, but the only warnings it produces regarding files in
src/link/*
are about dead-code. "unreal_config" code might be deleted (moved out into another project), so I don't want to bother with it for now. Applied it's suggestion formain.rs
.In general I am against replacing numeric constants with magic numbers unless it is something super-obvious like
1
or just something used in tests. It's not just about conveniently refactoring code later, but also it's readability.I am not sure about
EXPECTED_LIMIT_TO_UE_MESSAGES
, however, since this value is somewhat arbitrary.[WIP] Add `MessageReader` implementationto [WIP] Add network linkto ue-server implementation 3 years ago[WIP] Add network linkto ue-server implementationto [WIP] Add network link to ue-server implementation 3 years agoMessageWriter
is now a proper implementation.Also have added
Link
implementation that:Currently it exists as a prototype, to have something sijmple working that can actually connect with ue-server, so it doesn't have to be perfect. But any input about it is still welcome.
Question: how to automate connection testing? It should probably be unrelated from database, and, since UnrealScript's testing capabilities are limited, should be handled on the rust side. I can prepare ue-server that connects to us and responds in some fixed manner.
EDIT: used response at this link to help me get started.
let listener = TcpListener::bind(address)?;
let handler = Arc::new(handler);
loop {
// Listen to new (multiple) connection
Can be simplified => // Listen to new connections
let handler_clone = handler.clone();
// On connection - spawn a new thread
thread::spawn(move || loop {
let mut buffer = [0; 1024];
1024? Not 4096?
4096 is limitation on how much we can write, it's unrelated to reading and this is an arbitrary constant. Honestly I don't know what to use, but one option is to add
BufReader
and read byte-by-byte.Did some tests with ue-server. As of now, buffer size does not make a difference in speed, since bottleneck is ue-server's side by far. And it's unlikely that situation will change even with multiple ue-servers connecting to Avarice.
let handler = Arc::new(handler);
loop {
// Listen to new (multiple) connection
let mut reading_stream = listener.accept()?.0;
Probably needs a timeout, to avoid waiting forever:
stream
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();
/// For converting byte stream that is expected from the ue-server into actual messages.
/// Conversion process has two steps:
/// 1. Every string message is converted into it's utf8 representation and is pre-pended with
it's => its
pre-pended => prepended
"utf8 representation"?
Can be simplified => Every message is a length-prefixed array of utf8 bytes.
Fixed in upcoming patch.
/// Conversion process has two steps:
/// 1. Every string message is converted into it's utf8 representation and is pre-pended with
/// it's own length in format:
/// [MESSAGE_LENGTH: length of the message in utf8 encoding | 4 bytes: u32 BE]
Remove "in utf8 encoding", because it's wrong and misleading - length is sent as u32, not as string.
/// it's own length in format:
/// [MESSAGE_LENGTH: length of the message in utf8 encoding | 4 bytes: u32 BE]
/// [MESSAGE: utf8-encoded string | `MESSAGE_LENGTH` bytes]
/// Resulting byte sequences from all the messages then concatenated, in order, into
the messages => messages
then => is then
"in order" - in what order?
Changed in upcoming patch.
/// how many bytes ue-server has received so far.
/// NOTE: `try_pop()` can return `None` even if not all message data has been transferred,
/// in case ue-server's buffer does not have enough space. Use `is_empty()` call to check for
/// whether `MessageWriter` has depleted all it's data.
"has depleted all it's data." => "has successfully sent all data."?
Changed in upcoming patch.
}
pub fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) {
self.ue_received_bytes = max(ue_received_bytes, self.ue_received_bytes);
Missing +=?
It passes to
MessageWriter
the whole amount of bytes passed so far, not just after the last update. This way it's harder to mess up.}
fn available_ue_buffer_capacity(&self) -> usize {
match usize::try_from(self.sent_bytes - self.ue_received_bytes).ok() {
Seems complicated - why?
self.sent_bytes
andself.ue_received_bytes
are u64 andusize
can beu32
. In case we have fucked up - difference between them can get huge, this accounts for that.I don't actually think this is something that can happen, but this seemed like a most straitforward way to do it. Whole function is, like, 3-4 loc, so I didn't saw it as comlplicated.
/// Resulting byte sequences from all the messages then concatenated, in order, into
/// a single data stream.
/// 2. Resulting data stream is then separated into "chunks" that can be accepted by
/// the ue-server (each no longer than `UE_INPUT_BUFFER` in total) and are sent in a format:
But Writer doesn't send anything?
Fixed in upcoming patch.
}
/// For converting byte stream that is expected from the ue-server into actual messages.
/// Conversion process has two steps:
Elaborate => "Conversion happens in two steps"?
Changed in upcoming patch.
[WIP] Add network link to ue-server implementationto Add network link to ue-server implementation 3 years agoI think enough is implemented on this branch to merge it once it passes peer review.
I've left a bunch of
unwrap()
s in the code for now - I think it's best to get rid of them later, when we decide on how to log errors.[dependencies]
custom_error="1.*"
serde ="1.*"
serde_json="1.*"
No newline at end of file
f,
"{}",
json!({
"s": self.service,
Maybe expand s/t/p into words?
impl AvariceMessage {
/// Parses JSON form of a message into `AvariceMessage` struct
pub fn from(message_str: &str) -> Option<AvariceMessage> {
Confusing name, since it's not related to https://doc.rust-lang.org/std/convert/trait.From.html#tymethod.from, which is very common.
Suggestion:
Free bonus: more detailed errors.
use serde_json::json;
use std::fmt;
pub struct AvariceMessage {
Any explanation/doc on wtf is service/message_type/parameters?
let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap();
let message_json = message_json.as_object_mut()?;
Some(AvariceMessage {
service: message_json.remove("s")?.as_str()?.to_owned(),
Why "remove"?
//! Implements Avarice message: target service, message type, json parameters
use serde_json;
warning: this import is redundant
--> src\link\message.rs:2:1
|
2 | use serde_json;
| ^^^^^^^^^^^^^^^ help: remove it entirely
impl AvariceServer {
/// Blocks until a new message arrives from one of the ue-servers. Returns a pair of `Link`,
/// corresponding to the ue-server that sent next message and `AvariceMessage`,
sent next message => sent this message?
/// Blocks until a new message arrives from one of the ue-servers. Returns a pair of `Link`,
/// corresponding to the ue-server that sent next message and `AvariceMessage`,
/// representing that message.
pub fn next(&mut self) -> Option<(&mut Link, AvariceMessage)> {
Confusing name, since it's not related to https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next, which is very common.
writer: MessageWriter::new(),
},
);
continue;
Maybe remove continue?
continue;
}
Ok(NetworkMessage::UEReceivedUpdate(ue_server_address, ue_received_bytes)) => {
if let Some(link) = self.connected_links.get_mut(&ue_server_address) {
Missing else? Condition seems important.
continue;
}
Ok(NetworkMessage::MessageReceived(ue_server_address, message)) => {
// Not having a link with key `ue_server_address` should be impossible here
Why?
self.ue_server_address
}
fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) {
Why is it a separate function, and not inside send/flush?
When should I call it?
}
}
/// Creates a new `AvariceServer` that will listen for ue-server connections on the specified port.
// todo - add IP support, since 0.0.0.0 is subjective (make it default, but changeable)
//! spawns a new thread for every connected ue-server to handle reading data from it.
//! Since all reading is handled in ue-servers' own threads, to collect messages they have
//! received in the main thread we use `std::sync::mpsc::Sender`. Conversely, all the writing to
//! ue-server is handled in the main thread itself. Writing `TcpStream` is sent to the main thread
Writing
TcpStream
is sent - rewrite? No idea what that means.Ok(())
}
pub fn pop(&mut self) -> Option<String> {
Confusing - you have push(add N bytes), push_byte(add 1 byte) and pop(get 1
bytemessage).self.ue_received_bytes
}
pub fn is_broken(&self) -> bool {
Why a method?
self.pending_data.extend(message_as_utf8.into_iter());
}
pub fn try_pop(&mut self) -> Option<Vec<u8>> {
Confusing name.
Reader has
pop(&mut self) -> Option<String>
Writer has
try_pop(&mut self) -> Option<Vec<u8>>