something O(n^2) this way comes
#[derive(Default)]
struct StreamPartBundler {
bundles: HashMap<Range<u16>, Vec<StreamPart>>,
}
impl StreamPartBundler {
/// Consume part of a chunked message and if it happens
/// to complete as a result of this final piece of the puzzle then
/// please assemble it and output the final Bytes payload.
fn process_part(&mut self, sequence_number: &u16, part: StreamPart) -> Option<Bytes> {
if part.part_offset == 0 && part.part_total == 1 {
// this is a special case: a simple <=MTU send! we can
// emit it right away.
return if part.payload.len() > 0 {
Some(part.payload)
} else {
// If the payload is empty for some reason
// let's reduce confusion and not emit it. This might
// be the case for "just ACKs" for example.
None
};
}
let range_start = sequence_number - part.part_offset;
let range_end = range_start + part.part_total;
let range_key = range_start..range_end;
if let Some(pile) = self.bundles.get_mut(&range_key) {
let parts_total = part.part_total as usize;
pile.push(part);
match pile.len() {
i if i == parts_total => {
// it looks like the parts are complete, it's time to sort and
// assemble!
// NOTE: this is probably gonna be pretty slow and shitty!!! we probably want to
// sort on insert since those will be smaller discrete moments
pile.sort_by(|a, b| a.part_offset.partial_cmp(&b.part_offset).unwrap())
//TODO: when i return, assemble da boi
}
i if i > parts_total => {
panic!("This should be impossible bro!!")
}
_ => {} // noop
}
} else {
self.bundles.insert(range_key.clone(), vec![part]);
// todo: spot check that we dont have more than 32 of these in progress at any
// point in time, that SHOULDNT BE POSSIBLE with how the ack masks work, unless
// we start interleaving fragmented messages in the stream for some god-forsaken
// reason in the future.
}
None
}
}