Browse Source

Add `MessageWriter` implementation

This is WIP implementation, buggy, untested and not properly commented
feature_link
Anton Tarasenko 3 years ago
parent
commit
5bfdd61248
  1. 2
      src/link/mod.rs
  2. 169
      src/link/writer.rs

2
src/link/mod.rs

@ -1,2 +1,4 @@
mod reader;
mod writer;
pub use reader::MessageReader;
pub use writer::MessageWriter;

169
src/link/writer.rs

@ -0,0 +1,169 @@
use std::cmp::{max, min};
use std::collections::VecDeque;
use std::convert::TryFrom;
const LENGTH_FIELD_SIZE: usize = 4;
const UE_INPUT_BUFFER: usize = 4095;
struct PendingMessage {
contents: Vec<u8>,
bytes_sent: usize,
}
pub struct MessageWriter {
sent_bytes: u64,
ue_received_bytes: u64,
pending_messages: VecDeque<PendingMessage>,
}
impl PendingMessage {
fn bytes_left(&self) -> usize {
max(0, self.contents.len() - self.bytes_sent)
}
}
impl MessageWriter {
pub fn new() -> MessageWriter {
MessageWriter {
sent_bytes: 0,
ue_received_bytes: 0,
pending_messages: VecDeque::with_capacity(100),
}
}
pub fn push(&mut self, message: &str) {
let message_as_utf8 = message.as_bytes();
self.pending_messages.push_back(PendingMessage {
contents: [
&(message_as_utf8.len() as u32).to_be_bytes(),
message_as_utf8,
]
.concat(),
bytes_sent: 0,
});
}
pub fn try_pop(&mut self) -> Option<Vec<u8>> {
if !self.should_send_now() {
return None;
}
let max_output_bytes = self.available_ue_buffer_capacity();
let next_payload_size = max_output_bytes - 2;
let next_message = match self.pending_messages.get_mut(0) {
Some(message) => message,
_ => return None,
};
let first_index = next_message.bytes_sent;
let last_index = min(
next_message.bytes_sent + next_payload_size - 1,
next_message.contents.len() - 1,
);
next_message.bytes_sent = last_index + 1;
let chunk_length = last_index - first_index + 1;
let bytes_to_send = [
&(chunk_length as u16).to_be_bytes(),
&next_message.contents[first_index..last_index + 1],
]
.concat();
self.sent_bytes += bytes_to_send.len() as u64;
if next_message.bytes_sent >= next_message.contents.len() {
self.pending_messages.pop_front();
}
Some(bytes_to_send)
}
pub fn is_empty(&self) -> bool {
self.pending_messages.is_empty()
}
pub fn update_ue_received_bytes(&mut self, ue_received_bytes: u64) {
self.ue_received_bytes = max(ue_received_bytes, self.ue_received_bytes);
}
fn available_ue_buffer_capacity(&self) -> usize {
match usize::try_from(self.sent_bytes - self.ue_received_bytes).ok() {
Some(ue_buffered_bytes) => max(0, UE_INPUT_BUFFER - ue_buffered_bytes),
_ => 0,
}
}
fn should_send_now(&self) -> bool {
if self.pending_messages.is_empty() {
return false;
}
let bytes_left_in_message = match self.pending_messages.get(0) {
Some(message) => message.bytes_left(),
_ => return false,
};
let max_output_bytes = self.available_ue_buffer_capacity();
if bytes_left_in_message <= max_output_bytes {
return true;
};
let max_bytes_in_payload = UE_INPUT_BUFFER - LENGTH_FIELD_SIZE;
let next_payload_size = max_output_bytes.checked_sub(4).unwrap_or_default();
let chunks_in_message = (bytes_left_in_message / max_bytes_in_payload) + 1;
let chunks_in_message_after_sending =
((bytes_left_in_message - next_payload_size) / max_bytes_in_payload) + 1;
chunks_in_message_after_sending < chunks_in_message
}
}
#[test]
fn new_writer_is_empty() {
let mut writer = MessageWriter::new();
assert_eq!(writer.is_empty(), true);
}
#[test]
fn single_short_message() {
let mut writer = MessageWriter::new();
writer.push("Hello, world!");
assert_eq!(writer.is_empty(), false);
let resulting_bytes = writer.try_pop().unwrap();
let expected_bytes = [
0, 17, // Bytes in the chunk = message length (4 bytes) + message (13 bytes)
0, 0, 0, 13, // Bytes in the message
b'H', b'e', b'l', b'l', b'o', b',', b' ', b'w', b'o', b'r', b'l', b'd', b'!',
];
assert_eq!(writer.is_empty(), true);
assert_eq!(resulting_bytes, expected_bytes);
assert_eq!(writer.sent_bytes, expected_bytes.len() as u64);
}
#[test]
fn single_long_message() {
let mut writer = MessageWriter::new();
// Because we also have to pass lengths, this will go over the sending limit
let long_message = "Q".repeat(UE_INPUT_BUFFER);
writer.push(&long_message);
assert_eq!(writer.is_empty(), false);
let resulting_bytes = writer.try_pop().unwrap();
assert_eq!(writer.is_empty(), false);
assert_eq!(resulting_bytes.len(), 4095);
// Bytes in the chunk = 4095 - 2 = 4093 = 0x0ffd
assert_eq!(resulting_bytes[0], 0x0f);
assert_eq!(resulting_bytes[1], 0x0fd);
// Bytes in message = 4095 = 0x0fff
assert_eq!(resulting_bytes[2], 0);
assert_eq!(resulting_bytes[3], 0);
assert_eq!(resulting_bytes[4], 0x0f);
assert_eq!(resulting_bytes[5], 0x0ff);
for &byte in resulting_bytes[6..].iter() {
assert_eq!(byte, b'Q');
}
assert_eq!(writer.try_pop(), None);
assert_eq!(writer.is_empty(), false);
writer.update_ue_received_bytes(UE_INPUT_BUFFER as u64);
let resulting_bytes = writer.try_pop().unwrap();
assert_eq!(writer.is_empty(), true);
// Last time we have popped all but 6 bytes from `long_message`
// Bytes in the chunk = 6 = 0x06
assert_eq!(resulting_bytes[0], 0);
assert_eq!(resulting_bytes[1], 0x06);
assert_eq!(resulting_bytes[2..], [b'Q', b'Q', b'Q', b'Q', b'Q', b'Q'])
}
// TODO: test that `update_ue_received_bytes()` cannot reduce that value
// TODO: test several messages, message only partially fitting into remaining buffer
// TODO: test giving just barely enough free buffer space
Loading…
Cancel
Save