Add network link to ue-server implementation #13

Open
dkanus wants to merge 23 commits from feature_link into master
dkanus commented 3 years ago
Owner

I've decided that it would be a good idea to separete
compoments that transform text messages into byte chunks to send to
Unreal Engine and back. This means that each of them will have only one
responsibility and using them will be easier in case we decide to read
and write via tcp/ip in separate threads. The only interesting to them
both parameter is the amount of bytes that were confirmed to be received
from the Unreal Engine side, which can be easily shared "by hand".

This patch implements "reader" that accepts byte stream expected from
the game server and converts it into:

  1. separate string messages;
  2. amount of bytes game server reported to already have received.
I've decided that it would be a good idea to separete compoments that transform text messages into byte chunks to send to Unreal Engine and back. This means that each of them will have only one responsibility and using them will be easier in case we decide to read and write via tcp/ip in separate threads. The only interesting to them both parameter is the amount of bytes that were confirmed to be received from the Unreal Engine side, which can be easily shared "by hand". This patch implements "reader" that accepts byte stream expected from the game server and converts it into: 1. separate string messages; 2. amount of bytes game server reported to already have received.
dkanus added 1 commit 3 years ago
0a86852197 Add `MessageReader` implementation
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
use custom_error::custom_error;
const RECEIVED_FIELD_SIZE: usize = 4;
const LENGTH_FIELD_SIZE: usize = 4;
Collaborator

Wtf

Wtf
Collaborator

replace with array[u8;4] and
pub fn array_of_u8_to_u32(bytes: [u8; 4]) -> u32 {
(u32::from(bytes[0]) << 24)
+ (u32::from(bytes[1]) << 16)
+ (u32::from(bytes[2]) << 8)
+ (u32::from(bytes[3]))
}

replace with array[u8;4] and pub fn array_of_u8_to_u32(bytes: [u8; 4]) -> u32 { (u32::from(bytes[0]) << 24) + (u32::from(bytes[1]) << 16) + (u32::from(bytes[2]) << 8) + (u32::from(bytes[3])) }
dkanus marked this conversation as resolved
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
const RECEIVED_FIELD_SIZE: usize = 4;
const LENGTH_FIELD_SIZE: usize = 4;
const HEAD_RECEIVED: u8 = 85;
const HEAD_MESSAGE: u8 = 42;
Collaborator

Wtf

Wtf
dkanus marked this conversation as resolved
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
const LENGTH_FIELD_SIZE: usize = 4;
const HEAD_RECEIVED: u8 = 85;
const HEAD_MESSAGE: u8 = 42;
const MAX_MESSAGE_LENGTH: usize = 0xA00000;
Collaborator

Wtf

Wtf
dkanus marked this conversation as resolved
Ggg_123 reviewed 3 years ago
Ggg_123 reviewed 3 years ago
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
received_bytes: u64,
}
/// For converting byte stream expected to be generated by Acedia mod from the game server into
Collaborator

"For converting byte stream expected to be generated"?

"For converting byte stream expected to be generated"?
dkanus marked this conversation as resolved
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
/// For converting byte stream expected to be generated by Acedia mod from the game server into
/// actual messages. Expected format is a sequence of either:
/// [HEAD_RECEIVED: 1 byte] [amount of bytes received by game server since last update: 4 bytes]
Collaborator

4 bytes
u32? i32? BEi32? LEi32?

> 4 bytes u32? i32? BEi32? LEi32?
dkanus marked this conversation as resolved
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
reading_state: ReadingState::Head,
read_bytes: 0,
current_message_length: 0,
current_message: Vec::new(),
Collaborator

with_capactiy?

with_capactiy?
Collaborator
  • // will be recreated with with_capacity in push()
+ // will be recreated with with_capacity in push()
dkanus marked this conversation as resolved
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
read_bytes: 0,
current_message_length: 0,
current_message: Vec::new(),
read_messages: VecDeque::new(),
Collaborator

with_capactiy?

with_capactiy?
dkanus marked this conversation as resolved
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
}
}
ReadingState::ReceivedBytes => {
self.next_received_bytes = self.next_received_bytes << 8;
Collaborator

<< 8
wtf

> << 8 wtf
Ggg_123 marked this conversation as resolved
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
self.next_received_bytes += input as u32;
self.read_bytes += 1;
if self.read_bytes >= RECEIVED_FIELD_SIZE {
self.received_bytes += self.next_received_bytes as u64;
Collaborator

as u64;
wtf

> as u64; wtf
Ggg_123 marked this conversation as resolved
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
}
ReadingState::Payload => {
self.current_message.push(input);
self.read_bytes += 1 as usize;
Collaborator

"1 as usize" => "1usize"
Or just remove.

"1 as usize" => "1usize" Or just remove.
dkanus marked this conversation as resolved
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
current_message_length: usize,
current_message: Vec<u8>,
read_messages: VecDeque<String>,
next_received_bytes: u32,
Collaborator

Unclear meaning.

Unclear meaning.
Collaborator

rename to ue_received_bytes

rename to ue_received_bytes
dkanus marked this conversation as resolved
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
reader.push_byte(0).unwrap();
reader.push_byte(0).unwrap();
reader.push_byte(13).unwrap();
reader.push_byte(0x48).unwrap(); // H
Collaborator

0x48 => b'H'

0x48 => b'H'
Ggg_123 marked this conversation as resolved
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
reader.push_byte(0).unwrap();
reader.push_byte(13).unwrap();
reader.push_byte(0x48).unwrap(); // H
reader.push_byte(0x65).unwrap(); // e
Collaborator

0x65 => b'e'
etc

0x65 => b'e' etc
Ggg_123 marked this conversation as resolved
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
reader.push_byte(25).unwrap();
reader.push_byte(178).unwrap();
reader.push_byte(4).unwrap();
assert_eq!(reader.received_bytes(), 1092203255);
Collaborator

1092203255 bytes received?! wtf

1092203255 bytes received?! wtf
Ggg_123 marked this conversation as resolved
dkanus added 5 commits 3 years ago
0dedd1d1f1 Change `MessageReader` to use `with_capacity`
af3341c1e7 Refactor `MessageReader` for clarity
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
const MAX_UE_MESSAGE_LENGTH: usize = 25 * 1024 * 1024;
// We do not expect to receive more that this much messages at once from ue-server
const EXPECTED_LIMIT_TO_UE_MESSAGES: usize = 100;
Collaborator

with_capacity != limit.
Should it really be a constant?
I'd just inline it.

with_capacity != limit. Should it really be a constant? I'd just inline it.
dkanus commented 3 years ago
Poster
Owner

It is expected limit that we use as a capacity.

It is expected limit that we use as a capacity.
dkanus commented 3 years ago
Poster
Owner

But upon further consideration I agree that there is no sense in defining this as a constant.

But upon further consideration I agree that there is no sense in defining this as a constant.
dkanus marked this conversation as resolved
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
// next JSON message from ue-server
const UE_LENGTH_FIELD_SIZE: usize = 4;
// Value indicating that next byte sequence from ue-server reports amount of bytes received by
// that server so far. Value itself is arbitrary.
Collaborator

Remove " Value itself is arbitrary", change to "Arbitrary value indicating that ..."?

Remove " Value itself is arbitrary", change to "Arbitrary value indicating that ..."?
dkanus marked this conversation as resolved
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
is_broken: bool,
reading_state: ReadingState,
read_bytes: usize,
buffer: [u8; 4],
Collaborator

buffer => length_buffer, since it's used only for storing length bytes

buffer => length_buffer, since it's used only for storing length bytes
dkanus commented 3 years ago
Poster
Owner

Agreed

Agreed
dkanus marked this conversation as resolved
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
reader.push_byte(25).unwrap();
reader.push_byte(178).unwrap();
reader.push_byte(4).unwrap();
assert_eq!(reader.ue_received_bytes(), 1092203255);
Collaborator

Replace with:
reader.push_byte(0xf3).unwrap();
...
reader.push_byte(0x41).unwrap();
reader.push_byte(0x19).unwrap();
reader.push_byte(0xb2).unwrap();
reader.push_byte(0x04).unwrap();
assert_eq!(reader.ue_received_bytes(), 0x41_19_b2_f7); // 0xf7 = 0x04 + 0xf3
Because otherwise this is "magic numbers wtf".

Replace with: reader.push_byte(0xf3).unwrap(); ... reader.push_byte(0x41).unwrap(); reader.push_byte(0x19).unwrap(); reader.push_byte(0xb2).unwrap(); reader.push_byte(0x04).unwrap(); assert_eq!(reader.ue_received_bytes(), 0x41_19_b2_f7); // 0xf7 = 0x04 + 0xf3 Because otherwise this is "magic numbers wtf".
dkanus commented 3 years ago
Poster
Owner

Woa, that's neat. Agreed.

Woa, that's neat. Agreed.
dkanus marked this conversation as resolved
Collaborator

Suggestion: remove UE_RECEIVED_FIELD_SIZE & UE_LENGTH_FIELD_SIZE.
They're used only in 1 place, they cannot be changed and they should not be changed and their usage is obvious from array_of_u8_to_u32.

Suggestion: remove UE_RECEIVED_FIELD_SIZE & UE_LENGTH_FIELD_SIZE. They're used only in 1 place, they cannot be changed and they should not be changed and their usage is obvious from array_of_u8_to_u32.
Collaborator

Suggestion: run cargo clippy & fix warnings.

Suggestion: run `cargo clippy` & fix warnings.
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
self.read_bytes += 1;
if self.read_bytes >= UE_LENGTH_FIELD_SIZE {
self.current_message_length = array_of_u8_to_u32(self.buffer) as usize;
self.change_state(ReadingState::Payload);
Collaborator

Move this line below error checking, since it contains early exit

Move this line below error checking, since it contains early exit
dkanus commented 3 years ago
Poster
Owner

Agreed.

Agreed.
dkanus marked this conversation as resolved
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
const UE_LENGTH_FIELD_SIZE: usize = 4;
// Value indicating that next byte sequence from ue-server reports amount of bytes received by
// that server so far. Value itself is arbitrary.
Collaborator

Weird empty line

Weird empty line
dkanus marked this conversation as resolved
Poster
Owner

Using clippy is a great suggestion, but the only warnings it produces regarding files in src/link/* are about dead-code. "unreal_config" code might be deleted (moved out into another project), so I don't want to bother with it for now. Applied it's suggestion for main.rs.

Using clippy is a great suggestion, but the only warnings it produces regarding files in `src/link/*` are about dead-code. "unreal_config" code might be deleted (moved out into another project), so I don't want to bother with it for now. Applied it's suggestion for `main.rs`.
Poster
Owner

In general I am against replacing numeric constants with magic numbers unless it is something super-obvious like 1 or just something used in tests. It's not just about conveniently refactoring code later, but also it's readability.

I am not sure about EXPECTED_LIMIT_TO_UE_MESSAGES, however, since this value is somewhat arbitrary.

In general I am against replacing numeric constants with magic numbers unless it is something super-obvious like `1` or just something used in tests. It's not just about conveniently refactoring code later, but also it's readability. I am not sure about `EXPECTED_LIMIT_TO_UE_MESSAGES`, however, since this value is somewhat arbitrary.
dkanus added 8 commits 3 years ago
dkanus added 3 commits 3 years ago
dkanus added 3 commits 3 years ago
26b7d1dd91 Add network prototype
dkanus changed title from [WIP] Add `MessageReader` implementation to [WIP] Add network linkto ue-server implementation 3 years ago
dkanus changed title from [WIP] Add network linkto ue-server implementation to [WIP] Add network link to ue-server implementation 3 years ago
Poster
Owner

MessageWriter is now a proper implementation.

Also have added Link implementation that:

  1. Trying to listen to a port and spawns a new handler thread for every connection;
  2. Currently uses 1 thread to both read and write for every connection (most definetly will have to change this later).

Currently it exists as a prototype, to have something sijmple working that can actually connect with ue-server, so it doesn't have to be perfect. But any input about it is still welcome.

Question: how to automate connection testing? It should probably be unrelated from database, and, since UnrealScript's testing capabilities are limited, should be handled on the rust side. I can prepare ue-server that connects to us and responds in some fixed manner.

EDIT: used response at this link to help me get started.

`MessageWriter` is now a proper implementation. Also have added `Link` implementation that: 1. Trying to listen to a port and spawns a new handler thread for every connection; 2. Currently uses 1 thread to both read and write for every connection (most definetly will have to change this later). Currently it exists as a prototype, to have something sijmple working that can actually connect with ue-server, so it doesn't have to be perfect. But any input about it is still welcome. Question: how to automate connection testing? It should probably be unrelated from database, and, since UnrealScript's testing capabilities are limited, should be handled on the rust side. I can prepare ue-server that connects to us and responds in some fixed manner. EDIT: used response at [this link](https://stackoverflow.com/questions/36211389/can-a-rust-closure-be-used-by-multiple-threads) to help me get started.
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
let listener = TcpListener::bind(address)?;
let handler = Arc::new(handler);
loop {
// Listen to new (multiple) connection
Collaborator

Can be simplified => // Listen to new connections

Can be simplified => // Listen to new connections
Ggg_123 reviewed 3 years ago
src/link/mod.rs Outdated
let handler_clone = handler.clone();
// On connection - spawn a new thread
thread::spawn(move || loop {
let mut buffer = [0; 1024];
Collaborator

1024? Not 4096?

1024? Not 4096?
dkanus commented 3 years ago
Poster
Owner

4096 is limitation on how much we can write, it's unrelated to reading and this is an arbitrary constant. Honestly I don't know what to use, but one option is to add BufReader and read byte-by-byte.

4096 is limitation on how much we can write, it's unrelated to reading and this is an arbitrary constant. Honestly I don't know what to use, but one option is to add `BufReader` and read byte-by-byte.
dkanus commented 3 years ago
Poster
Owner

Did some tests with ue-server. As of now, buffer size does not make a difference in speed, since bottleneck is ue-server's side by far. And it's unlikely that situation will change even with multiple ue-servers connecting to Avarice.

Did some tests with ue-server. As of now, buffer size does not make a difference in speed, since bottleneck is ue-server's side by far. And it's unlikely that situation will change even with multiple ue-servers connecting to Avarice.
Ggg_123 reviewed 3 years ago
let handler = Arc::new(handler);
loop {
// Listen to new (multiple) connection
let mut reading_stream = listener.accept()?.0;
Collaborator

Probably needs a timeout, to avoid waiting forever:
stream
.set_read_timeout(Some(Duration::from_secs(5)))
.unwrap();

Probably needs a timeout, to avoid waiting forever: stream .set_read_timeout(Some(Duration::from_secs(5))) .unwrap();
Ggg_123 marked this conversation as resolved
Ggg_123 reviewed 3 years ago
/// For converting byte stream that is expected from the ue-server into actual messages.
/// Conversion process has two steps:
/// 1. Every string message is converted into it's utf8 representation and is pre-pended with
Collaborator

it's => its
pre-pended => prepended
"utf8 representation"?

Can be simplified => Every message is a length-prefixed array of utf8 bytes.

it's => its pre-pended => prepended "utf8 representation"? Can be simplified => Every message is a length-prefixed array of utf8 bytes.
dkanus commented 3 years ago
Poster
Owner

Fixed in upcoming patch.

Fixed in upcoming patch.
dkanus marked this conversation as resolved
Ggg_123 reviewed 3 years ago
/// Conversion process has two steps:
/// 1. Every string message is converted into it's utf8 representation and is pre-pended with
/// it's own length in format:
/// [MESSAGE_LENGTH: length of the message in utf8 encoding | 4 bytes: u32 BE]
Collaborator

Remove "in utf8 encoding", because it's wrong and misleading - length is sent as u32, not as string.

Remove "in utf8 encoding", because it's wrong and misleading - length is sent as u32, not as string.
dkanus marked this conversation as resolved
Ggg_123 reviewed 3 years ago
/// it's own length in format:
/// [MESSAGE_LENGTH: length of the message in utf8 encoding | 4 bytes: u32 BE]
/// [MESSAGE: utf8-encoded string | `MESSAGE_LENGTH` bytes]
/// Resulting byte sequences from all the messages then concatenated, in order, into
Collaborator

the messages => messages
then => is then
"in order" - in what order?

the messages => messages then => is then "in order" - in what order?
dkanus commented 3 years ago
Poster
Owner

Changed in upcoming patch.

Changed in upcoming patch.
dkanus marked this conversation as resolved
Ggg_123 reviewed 3 years ago
/// how many bytes ue-server has received so far.
/// NOTE: `try_pop()` can return `None` even if not all message data has been transferred,
/// in case ue-server's buffer does not have enough space. Use `is_empty()` call to check for
/// whether `MessageWriter` has depleted all it's data.
Collaborator

"has depleted all it's data." => "has successfully sent all data."?

"has depleted all it's data." => "has successfully sent all data."?
dkanus commented 3 years ago
Poster
Owner

Changed in upcoming patch.

Changed in upcoming patch.
dkanus marked this conversation as resolved
Ggg_123 reviewed 3 years ago
}
pub fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) {
self.ue_received_bytes = max(ue_received_bytes, self.ue_received_bytes);
Collaborator

Missing +=?

Missing +=?
dkanus commented 3 years ago
Poster
Owner

It passes to MessageWriter the whole amount of bytes passed so far, not just after the last update. This way it's harder to mess up.

It passes to `MessageWriter` the whole amount of bytes passed so far, not just after the last update. This way it's harder to mess up.
Ggg_123 reviewed 3 years ago
}
fn available_ue_buffer_capacity(&self) -> usize {
match usize::try_from(self.sent_bytes - self.ue_received_bytes).ok() {
Collaborator

Seems complicated - why?

Seems complicated - why?
dkanus commented 3 years ago
Poster
Owner

self.sent_bytes and self.ue_received_bytes are u64 and usize can be u32. In case we have fucked up - difference between them can get huge, this accounts for that.

I don't actually think this is something that can happen, but this seemed like a most straitforward way to do it. Whole function is, like, 3-4 loc, so I didn't saw it as comlplicated.

`self.sent_bytes` and `self.ue_received_bytes` are u64 and `usize` can be `u32`. In case we have fucked up - difference between them can get huge, this accounts for that. I don't actually think this is something that can happen, but this seemed like a most straitforward way to do it. Whole function is, like, 3-4 loc, so I didn't saw it as comlplicated.
Ggg_123 reviewed 3 years ago
/// Resulting byte sequences from all the messages then concatenated, in order, into
/// a single data stream.
/// 2. Resulting data stream is then separated into "chunks" that can be accepted by
/// the ue-server (each no longer than `UE_INPUT_BUFFER` in total) and are sent in a format:
Collaborator

But Writer doesn't send anything?

But Writer doesn't send anything?
dkanus commented 3 years ago
Poster
Owner

Fixed in upcoming patch.

Fixed in upcoming patch.
dkanus marked this conversation as resolved
Ggg_123 reviewed 3 years ago
}
/// For converting byte stream that is expected from the ue-server into actual messages.
/// Conversion process has two steps:
Collaborator

Elaborate => "Conversion happens in two steps"?

Elaborate => "Conversion happens in two steps"?
dkanus commented 3 years ago
Poster
Owner

Changed in upcoming patch.

Changed in upcoming patch.
dkanus marked this conversation as resolved
Ggg_123 added 1 commit 3 years ago
dkanus added 2 commits 3 years ago
a6ac0b7a17 Change `MessageWriter`'s protocol
cfe680d770 Refactor `link` module for use in one thread
dkanus changed title from [WIP] Add network link to ue-server implementation to Add network link to ue-server implementation 3 years ago
Poster
Owner

I think enough is implemented on this branch to merge it once it passes peer review.

I've left a bunch of unwrap()s in the code for now - I think it's best to get rid of them later, when we decide on how to log errors.

I think enough is implemented on this branch to merge it once it passes peer review. I've left a bunch of `unwrap()`s in the code for now - I think it's best to get rid of them later, when we decide on how to log errors.
Ggg_123 reviewed 3 years ago
[dependencies]
custom_error="1.*"
serde ="1.*"
serde_json="1.*"
Collaborator

No newline at end of file

No newline at end of file
Ggg_123 reviewed 3 years ago
f,
"{}",
json!({
"s": self.service,
Collaborator

Maybe expand s/t/p into words?

Maybe expand s/t/p into words?
Ggg_123 reviewed 3 years ago
impl AvariceMessage {
/// Parses JSON form of a message into `AvariceMessage` struct
pub fn from(message_str: &str) -> Option<AvariceMessage> {
Collaborator

Confusing name, since it's not related to https://doc.rust-lang.org/std/convert/trait.From.html#tymethod.from, which is very common.

Confusing name, since it's not related to https://doc.rust-lang.org/std/convert/trait.From.html#tymethod.from, which is very common.
Collaborator

Suggestion:

--- a/src/link/message.rs
+++ b/src/link/message.rs
@@ -9,19 +9,22 @@ pub struct AvariceMessage {
     pub parameters: serde_json::Value,
 }

-impl AvariceMessage {
-    /// Parses JSON form of a message into `AvariceMessage` struct
-    pub fn from(message_str: &str) -> Option<AvariceMessage> {
-        let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap();
-        let message_json = message_json.as_object_mut()?;
-        Some(AvariceMessage {
-            service: message_json.remove("s")?.as_str()?.to_owned(),
-            message_type: message_json.remove("t")?.as_str()?.to_owned(),
-            parameters: message_json.remove("p")?,
+use std::convert::TryFrom;
+use std::error::Error;
+impl TryFrom<&str> for AvariceMessage {
+    type Error = Box<dyn Error>;
+    fn try_from(value: &str) -> Result<Self, Self::Error> {
+        let mut message_json: serde_json::Value = serde_json::from_str(value)?;
+        let message_json = message_json.as_object_mut().ok_or("Value was not an Object.")?;
+        Ok(AvariceMessage {
+            service: message_json.remove("s").ok_or("Missing field 's'.")?.as_str().ok_or("s was not a String.")?.to_ow
ned(),
+            message_type: message_json.remove("t").ok_or("Missing field 't'.")?.as_str().ok_or("t was not a String.")?.
to_owned(),
+            parameters: message_json.remove("p").ok_or("Missing field 'p'.")?,
         })

--- a/src/link/network.rs
+++ b/src/link/network.rs
@@ -7,6 +7,7 @@
 //! by the same `std::sync::mpsc::Sender` object.
 use super::message::AvariceMessage;
 pub use super::reader::MessageReader;
+use std::convert::TryFrom;
 use std::error::Error;
 use std::io::Read;
 use std::net::{SocketAddr, TcpListener, TcpStream};
@@ -79,7 +80,7 @@ fn manage_connection(mut connection: UEConnection) {
         }
         //  Decoding cycle
         while let Some(text_message) = connection.reader.pop() {
-            if let Some(avarice_message) = AvariceMessage::from(&text_message) {
+            if let Ok(avarice_message) = AvariceMessage::try_from(text_message.as_ref()) {

Free bonus: more detailed errors.

Suggestion: ``` --- a/src/link/message.rs +++ b/src/link/message.rs @@ -9,19 +9,22 @@ pub struct AvariceMessage { pub parameters: serde_json::Value, } -impl AvariceMessage { - /// Parses JSON form of a message into `AvariceMessage` struct - pub fn from(message_str: &str) -> Option<AvariceMessage> { - let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap(); - let message_json = message_json.as_object_mut()?; - Some(AvariceMessage { - service: message_json.remove("s")?.as_str()?.to_owned(), - message_type: message_json.remove("t")?.as_str()?.to_owned(), - parameters: message_json.remove("p")?, +use std::convert::TryFrom; +use std::error::Error; +impl TryFrom<&str> for AvariceMessage { + type Error = Box<dyn Error>; + fn try_from(value: &str) -> Result<Self, Self::Error> { + let mut message_json: serde_json::Value = serde_json::from_str(value)?; + let message_json = message_json.as_object_mut().ok_or("Value was not an Object.")?; + Ok(AvariceMessage { + service: message_json.remove("s").ok_or("Missing field 's'.")?.as_str().ok_or("s was not a String.")?.to_ow ned(), + message_type: message_json.remove("t").ok_or("Missing field 't'.")?.as_str().ok_or("t was not a String.")?. to_owned(), + parameters: message_json.remove("p").ok_or("Missing field 'p'.")?, }) --- a/src/link/network.rs +++ b/src/link/network.rs @@ -7,6 +7,7 @@ //! by the same `std::sync::mpsc::Sender` object. use super::message::AvariceMessage; pub use super::reader::MessageReader; +use std::convert::TryFrom; use std::error::Error; use std::io::Read; use std::net::{SocketAddr, TcpListener, TcpStream}; @@ -79,7 +80,7 @@ fn manage_connection(mut connection: UEConnection) { } // Decoding cycle while let Some(text_message) = connection.reader.pop() { - if let Some(avarice_message) = AvariceMessage::from(&text_message) { + if let Ok(avarice_message) = AvariceMessage::try_from(text_message.as_ref()) { ``` Free bonus: more detailed errors.
Ggg_123 reviewed 3 years ago
use serde_json::json;
use std::fmt;
pub struct AvariceMessage {
Collaborator

Any explanation/doc on wtf is service/message_type/parameters?

Any explanation/doc on wtf is service/message_type/parameters?
Ggg_123 reviewed 3 years ago
let mut message_json: serde_json::Value = serde_json::from_str(message_str).unwrap();
let message_json = message_json.as_object_mut()?;
Some(AvariceMessage {
service: message_json.remove("s")?.as_str()?.to_owned(),
Collaborator

Why "remove"?

Why "remove"?
Ggg_123 reviewed 3 years ago
//! Implements Avarice message: target service, message type, json parameters
use serde_json;
Collaborator

warning: this import is redundant
--> src\link\message.rs:2:1
|
2 | use serde_json;
| ^^^^^^^^^^^^^^^ help: remove it entirely

warning: this import is redundant --> src\link\message.rs:2:1 | 2 | use serde_json; | ^^^^^^^^^^^^^^^ help: remove it entirely
Ggg_123 reviewed 3 years ago
impl AvariceServer {
/// Blocks until a new message arrives from one of the ue-servers. Returns a pair of `Link`,
/// corresponding to the ue-server that sent next message and `AvariceMessage`,
Collaborator

sent next message => sent this message?

sent next message => sent this message?
Ggg_123 reviewed 3 years ago
/// Blocks until a new message arrives from one of the ue-servers. Returns a pair of `Link`,
/// corresponding to the ue-server that sent next message and `AvariceMessage`,
/// representing that message.
pub fn next(&mut self) -> Option<(&mut Link, AvariceMessage)> {
Collaborator

Confusing name, since it's not related to https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next, which is very common.

Confusing name, since it's not related to https://doc.rust-lang.org/std/iter/trait.Iterator.html#tymethod.next, which is very common.
Ggg_123 reviewed 3 years ago
writer: MessageWriter::new(),
},
);
continue;
Collaborator

Maybe remove continue?

Maybe remove continue?
Ggg_123 reviewed 3 years ago
continue;
}
Ok(NetworkMessage::UEReceivedUpdate(ue_server_address, ue_received_bytes)) => {
if let Some(link) = self.connected_links.get_mut(&ue_server_address) {
Collaborator

Missing else? Condition seems important.

Missing else? Condition seems important.
Ggg_123 reviewed 3 years ago
continue;
}
Ok(NetworkMessage::MessageReceived(ue_server_address, message)) => {
// Not having a link with key `ue_server_address` should be impossible here
Collaborator

Why?

Why?
Ggg_123 reviewed 3 years ago
self.ue_server_address
}
fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) {
Collaborator

Why is it a separate function, and not inside send/flush?
When should I call it?

Why is it a separate function, and not inside send/flush? When should I call it?
Ggg_123 reviewed 3 years ago
}
}
/// Creates a new `AvariceServer` that will listen for ue-server connections on the specified port.
Collaborator

// todo - add IP support, since 0.0.0.0 is subjective (make it default, but changeable)

// todo - add IP support, since 0.0.0.0 is subjective (make it default, but changeable)
Ggg_123 reviewed 3 years ago
//! spawns a new thread for every connected ue-server to handle reading data from it.
//! Since all reading is handled in ue-servers' own threads, to collect messages they have
//! received in the main thread we use `std::sync::mpsc::Sender`. Conversely, all the writing to
//! ue-server is handled in the main thread itself. Writing `TcpStream` is sent to the main thread
Collaborator

Writing TcpStream is sent - rewrite? No idea what that means.

Writing `TcpStream` is sent - rewrite? No idea what that means.
Ggg_123 reviewed 3 years ago
Ok(())
}
pub fn pop(&mut self) -> Option<String> {
Collaborator

Confusing - you have push(add N bytes), push_byte(add 1 byte) and pop(get 1 byte message).

Confusing - you have push(add N bytes), push_byte(add 1 byte) and pop(get 1 ~~byte~~ message).
Ggg_123 reviewed 3 years ago
self.ue_received_bytes
}
pub fn is_broken(&self) -> bool {
Collaborator

Why a method?

Why a method?
Ggg_123 reviewed 3 years ago
self.pending_data.extend(message_as_utf8.into_iter());
}
pub fn try_pop(&mut self) -> Option<Vec<u8>> {
Collaborator

Confusing name.
Reader has pop(&mut self) -> Option<String>
Writer has try_pop(&mut self) -> Option<Vec<u8>>

Confusing name. Reader has `pop(&mut self) -> Option<String>` Writer has `try_pop(&mut self) -> Option<Vec<u8>>`
This pull request can be merged automatically.
You are not authorized to merge this pull request.
Sign in to join this conversation.
No reviewers
No Milestone
No Assignees
2 Participants
Notifications
Due Date

No due date set.

Dependencies

This pull request currently doesn't have any dependencies.

Loading…
There is no content yet.