Change MessageWriter
's protocol
There is no need to send length of each data chunk. This patch simplifies protocol by only sending length of each message.
This commit is contained in:
parent
4f67a7b72c
commit
a6ac0b7a17
@ -5,43 +5,34 @@ use std::collections::VecDeque;
|
||||
use std::convert::TryFrom;
|
||||
use std::iter::Extend;
|
||||
|
||||
// Defines how many bytes is used to encode "LENGTH" field in the chunk sent to ue-server
|
||||
const CHUNK_LENGTH_FIELD: usize = 2;
|
||||
// Maximum amount of bytes ue-server is able to receive at once
|
||||
const UE_INPUT_BUFFER: usize = 4095;
|
||||
// Minimal payload size (in bytes) to send, unless there is not enough data left
|
||||
const MIN_PAYLOAD_SIZE: usize = 50;
|
||||
|
||||
/// For converting byte stream that is expected from the ue-server into actual messages.
|
||||
/// Conversion process has two steps:
|
||||
/// 1. Every string message is converted into it's utf8 representation and is pre-pended with
|
||||
/// it's own length in format:
|
||||
/// For converting text messages into chunks of bytes that can be sent to the ue-server.
|
||||
///
|
||||
/// Every string message is converted into a length-prefixed array of utf8 bytes:
|
||||
///
|
||||
/// | Data | Length |
|
||||
/// |---------|---------|
|
||||
/// | Message `LENGTH` | 4 bytes: u32 BE |
|
||||
/// | UTF8-encoded string | `LENGTH` bytes|
|
||||
///
|
||||
/// Resulting byte sequences from all the messages then concatenated, in order, into
|
||||
/// a single data stream.
|
||||
/// Resulting byte sequences from all messages are then concatenated (in the same order as they were
|
||||
/// "written") into a single data stream. Bytes from the data stream are returned in chunks of
|
||||
/// size no more than `UE_INPUT_BUFFER`. New chunk is returned only when `MessageWriter` knows
|
||||
/// that ue-server's buffer has enough space to accept it.
|
||||
///
|
||||
/// 2. Resulting data stream is then separated into "chunks" that can be accepted by
|
||||
/// the ue-server (each no longer than `UE_INPUT_BUFFER` in total) and are sent in a format:
|
||||
///
|
||||
/// | Data | Length |
|
||||
/// |---------|---------|
|
||||
/// | Chunk `LENGTH` | 2 bytes: u16 BE |
|
||||
/// | UTF8-encoded string | `LENGTH` bytes|
|
||||
///
|
||||
/// Use `push()` to input string messages and `try_pop()` to retrieve next chunk, if ue-server
|
||||
/// can accept it.
|
||||
/// NOTE: `try_pop()` can return `None` even if not all message data has been transferred,
|
||||
/// in case ue-server's buffer does not have enough space.
|
||||
/// Use `push()` to input string messages and `try_pop()` to retrieve next chunk.
|
||||
/// NOTE: `try_pop()` can return `None` even if not all message data has been returned,
|
||||
/// in case `MessageWriter` thinks that ue-server's buffer does not have enough space.
|
||||
///
|
||||
/// Call `update_ue_received_bytes()` to update `MessageWriter`'s information about
|
||||
/// how many bytes ue-server has received so far.
|
||||
/// how many bytes ue-server has received so far. This can signal that its buffer has enough
|
||||
/// free space. `MessageWriter` assumes that all data returned by its `try_pop()` method is sent
|
||||
/// to ue-server.
|
||||
///
|
||||
/// Use `is_empty()` call to check for whether `MessageWriter` has depleted all it's data.
|
||||
/// Use `is_empty()` method to check for whether `MessageWriter` still has some data to return
|
||||
/// (possibly after ``update_ue_received_bytes()`).
|
||||
pub struct MessageWriter {
|
||||
sent_bytes: u64,
|
||||
ue_received_bytes: u64,
|
||||
@ -69,24 +60,16 @@ impl MessageWriter {
|
||||
self.pending_data.extend(message_as_utf8.into_iter());
|
||||
}
|
||||
|
||||
/// This method will always return chunk if all remaining data will fit inside it, otherwise it
|
||||
/// will wait until ue-server's buffer has enough space for at least `MIN_PAYLOAD_SIZE` bytes.
|
||||
pub fn try_pop(&mut self) -> Option<Vec<u8>> {
|
||||
if self.is_empty() {
|
||||
return None;
|
||||
}
|
||||
let required_payload_size = min(self.pending_data.len(), MIN_PAYLOAD_SIZE);
|
||||
let available_payload_space = self
|
||||
.available_ue_buffer_capacity()
|
||||
.checked_sub(CHUNK_LENGTH_FIELD)
|
||||
.unwrap_or_default();
|
||||
if required_payload_size > available_payload_space {
|
||||
let chunk_size = min(self.available_ue_buffer_capacity(), self.pending_data.len());
|
||||
if chunk_size == 0 {
|
||||
return None;
|
||||
}
|
||||
let payload_size = min(available_payload_space, self.pending_data.len());
|
||||
let mut bytes_to_send = Vec::with_capacity(CHUNK_LENGTH_FIELD + payload_size);
|
||||
bytes_to_send.extend((payload_size as u16).to_be_bytes().iter());
|
||||
for next_byte in self.pending_data.drain(..payload_size) {
|
||||
let mut bytes_to_send = Vec::with_capacity(chunk_size);
|
||||
for next_byte in self.pending_data.drain(..chunk_size) {
|
||||
bytes_to_send.push(next_byte);
|
||||
}
|
||||
self.sent_bytes += bytes_to_send.len() as u64;
|
||||
@ -97,6 +80,8 @@ impl MessageWriter {
|
||||
self.pending_data.is_empty()
|
||||
}
|
||||
|
||||
/// Takes total amount of bytes received so far by the ue-server, not just bytes received after
|
||||
/// the last `update_ue_received_bytes()` call.
|
||||
pub fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) {
|
||||
self.ue_received_bytes = max(ue_received_bytes, self.ue_received_bytes);
|
||||
}
|
||||
@ -130,14 +115,13 @@ fn writing_single_short_message() {
|
||||
writer.push("Hello, world!");
|
||||
let resulting_bytes = writer.try_pop().unwrap();
|
||||
let expected_bytes = [
|
||||
0, 17, // Bytes in the chunk = message length (4 bytes) + message (13 bytes)
|
||||
0, 0, 0, 13, // Bytes in the message
|
||||
b'H', b'e', b'l', b'l', b'o', b',', b' ', b'w', b'o', b'r', b'l', b'd', b'!',
|
||||
];
|
||||
assert_eq!(writer.is_empty(), true);
|
||||
assert_eq!(
|
||||
writer.available_ue_buffer_capacity(),
|
||||
UE_INPUT_BUFFER - "Hello, world!".len() - 2 - 4
|
||||
UE_INPUT_BUFFER - "Hello, world!".len() - 4
|
||||
);
|
||||
assert_eq!(resulting_bytes, expected_bytes);
|
||||
assert_eq!(writer.sent_bytes, expected_bytes.len() as u64);
|
||||
@ -146,22 +130,19 @@ fn writing_single_short_message() {
|
||||
#[test]
|
||||
fn writing_first_chunk_of_single_long_message() {
|
||||
let mut writer = MessageWriter::new();
|
||||
// Because we also have to pass lengths, this will go over the sending limit
|
||||
// Because we also have to pass message length, this will go over the sending limit
|
||||
let long_message = "Q".repeat(UE_INPUT_BUFFER);
|
||||
writer.push(&long_message);
|
||||
let resulting_bytes = writer.try_pop().unwrap();
|
||||
assert_eq!(writer.is_empty(), false);
|
||||
assert_eq!(resulting_bytes.len(), UE_INPUT_BUFFER);
|
||||
assert_eq!(writer.available_ue_buffer_capacity(), 0);
|
||||
// Bytes in the chunk = 4095 - 2 = 4093 = 0x0ffd
|
||||
assert_eq!(resulting_bytes[0], 0x0f);
|
||||
assert_eq!(resulting_bytes[1], 0xfd);
|
||||
// Bytes in message = 4095 = 0x0fff
|
||||
assert_eq!(resulting_bytes[2], 0);
|
||||
assert_eq!(resulting_bytes[3], 0);
|
||||
assert_eq!(resulting_bytes[4], 0x0f);
|
||||
assert_eq!(resulting_bytes[5], 0xff);
|
||||
for &byte in resulting_bytes[6..].iter() {
|
||||
assert_eq!(resulting_bytes[0], 0);
|
||||
assert_eq!(resulting_bytes[1], 0);
|
||||
assert_eq!(resulting_bytes[2], 0x0f);
|
||||
assert_eq!(resulting_bytes[3], 0xff);
|
||||
for &byte in resulting_bytes[4..].iter() {
|
||||
assert_eq!(byte, b'Q');
|
||||
}
|
||||
assert_eq!(writer.try_pop(), None);
|
||||
@ -174,8 +155,7 @@ fn writing_second_chunk_of_single_long_message() {
|
||||
// Because we also have to pass lengths, this will go over the sending limit
|
||||
let long_message = "Q".repeat(UE_INPUT_BUFFER);
|
||||
writer.push(&long_message);
|
||||
// This pops all but 6 bytes of `long_message`, that were required to encode lengths of
|
||||
// message and first chunk
|
||||
// This pops all but 4 bytes of `long_message`, that were required to encode message length
|
||||
let first_bytes = writer.try_pop().unwrap();
|
||||
writer.update_ue_received_bytes(first_bytes.len() as u64);
|
||||
let resulting_bytes = writer.try_pop().unwrap();
|
||||
@ -184,46 +164,6 @@ fn writing_second_chunk_of_single_long_message() {
|
||||
UE_INPUT_BUFFER - resulting_bytes.len()
|
||||
);
|
||||
assert_eq!(writer.is_empty(), true);
|
||||
// Bytes in the chunk = 6
|
||||
assert_eq!(resulting_bytes[0], 0);
|
||||
assert_eq!(resulting_bytes[1], 6);
|
||||
assert_eq!(resulting_bytes[2..], [b'Q', b'Q', b'Q', b'Q', b'Q', b'Q'])
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn will_write_small_chunks_if_no_more_data() {
|
||||
let mut writer = MessageWriter::new();
|
||||
// Because we also have to pass lengths (of chunk `CHUNK_LENGTH_FIELD` amd of message `4`),
|
||||
// sending this will leave us with exactly 10 free bytes in the buffer
|
||||
let long_message = "Q".repeat(UE_INPUT_BUFFER / 2);
|
||||
writer.push(&long_message);
|
||||
writer.try_pop();
|
||||
let short_message = "Hello, world!";
|
||||
writer.push(&short_message);
|
||||
let expected_bytes = [
|
||||
0, 17, // Bytes in the chunk = message length (4 bytes) + message (13 bytes)
|
||||
0, 0, 0, 13, // Bytes in the message
|
||||
b'H', b'e', b'l', b'l', b'o', b',', b' ', b'w', b'o', b'r', b'l', b'd', b'!',
|
||||
];
|
||||
// There should be enough space in the ue-server buffer to send `short_message`
|
||||
let resulting_bytes = writer.try_pop().unwrap();
|
||||
assert_eq!(resulting_bytes, expected_bytes);
|
||||
assert_eq!(writer.try_pop(), None);
|
||||
assert_eq!(writer.is_empty(), true);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn will_not_write_small_chunks_if_more_data_remains() {
|
||||
let mut writer = MessageWriter::new();
|
||||
// Because we also have to pass lengths (of chunk `CHUNK_LENGTH_FIELD` amd of message `4`),
|
||||
// sending this will leave us with exactly 10 free bytes in the buffer
|
||||
let long_message = "Q".repeat(UE_INPUT_BUFFER - CHUNK_LENGTH_FIELD - 4 - 10);
|
||||
writer.push(&long_message);
|
||||
writer.try_pop();
|
||||
let short_message = "Hello, world!";
|
||||
writer.push(&short_message);
|
||||
// `MessageWriter` can neither send full message, nor a chunk of size 10
|
||||
// (because it is too short)
|
||||
assert_eq!(writer.try_pop(), None);
|
||||
assert_eq!(writer.is_empty(), false);
|
||||
// Bytes left for the next chunk = 4
|
||||
assert_eq!(resulting_bytes, [b'Q', b'Q', b'Q', b'Q'])
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user