mido

thinking so ur dog doesnt have to

I like making little games with friends :] Mostly shit posting and vibing.


it's time to fold up our kerchief into a little bindle and start bundling things. special thanks to @colinwd who paired with me this evening and helped me move the needle <3.

I did some big refactors I wanted to write a blog post about, but maybe I'll go back and do that for pt.9. I wanted to write about something more fun and algorithmically interesting.

general problem space

diagram of the ordering and re-assembly objectives (network traffic rarely is this incredibly out of order, statistically)

right now we're working on reliability and fragmentation assembly. for those not in the know, the general frame-size byte limit for a packet moving across networking gear and across the internet etc, is about 1500 bytes, or the "MTU". Some things can eat into this frame size, such as vlan tagging (removes 4 bytes!). So most folks will wisely just give themselves some nice overhead. I chose 1430 as my arbitrary MTU.

A lot of messages don't need to be very big. Most game packets can travel in small deltas, and can be quantized Vector2s and Vector3s, floats, u32s and the like. But it's very common that a game scene must perform some sort of "bigger" event, perhaps periodic snapshots or maybe at the end of a round or during a special event, MANY entities need to be shuffled in one or a few frames, causing many deltas to get emitted and bundled into one big update for that frame.

πŸ¦€πŸ¦€ code time πŸ¦€πŸ¦€

Last night and tonight I got around to getting started on this and the general solution is to throw together a simple struct I'm calling StreamPartAssembler. We'll get to the StreamPartAssembler in a moment, but first let's focus on one of it's simpler components, the Pile. It's a joke name I'll probably regret, but it's a (potentially temporarily disparate) pile of incoming payloads that are all fragments of the whole.

Let's take a look:

#[derive(Default, Debug)]
struct Pile {
    parts: Vec<Option<StreamPart>>,
    gaps_remaining: u16,
}

Oh and StreamPart for reference:

#[derive(Debug)]
pub(super) struct StreamPart {
    pub part_offset: u16,
    pub part_total: u16,
    pub payload: Bytes,
}

It will pre-allocate a vec with Option<T>s which will start as None, and give us fixed constant-time offsets to reach over to and fill with StreamParts as they arrive.

We'll only decrement gaps_remaining when we fill in a slot that was previously None, and will currently panic if we somehow try to double-apply a StreamPart (stuff happening around this construct shouldn't allow this to happen! The stream construct that owns the StreamPartAssembler has checks to ignore the body of any payloads we've already received according to our ack mask, so they shouldn't reach the Assembler!)

That pre-allocation is pretty straight forward:

impl Pile {
    fn new(total_parts: u16) -> Self {
        Pile {
            parts: std::iter::repeat_with(|| None)
                .take(total_parts as usize)
                .collect(),
            gaps_remaining: total_parts,
        }
    }
// ...
}

Note the kinda silly pattern with the None generator -> take() -> collect(), Rust's type system struggles in this specific situation and this is the least-bad way I've found to do it so far.

Then we have our simple insertion algorithm:

    fn insert(&mut self, part: StreamPart) {
        if let None = self.parts[part.part_offset as usize] {
            // part gets moved during the insert so we need a copy
            let part_offset = part.part_offset as usize;

            self.gaps_remaining -= 1;
            self.parts[part_offset] = Some(part);
        } else {
            panic!(
                "Unexpected repeat stream part... {}/{}",
                part.part_offset, part.part_total
            )
        }
    }

And last but not least, the check for when we're complete:

    fn is_complete(&self) -> bool {
        self.gaps_remaining == 0
    }

Now that we kind of know the shape of our little piles/bins, we can talk about the StreamPartAssembler. This struct is dead simple at a glance:

#[derive(Default, Debug)]
pub(super) struct StreamPartAssembler {
    bundles: HashMap<u16, Pile>,
}

It just contains a common ID for each pile, which for now is the originating sending sequence id that this fragmented packet was initially sent on (the sending side will emit these contiguously, so the sending sequence counter will be associated to each fragment monotonically).

We only need one little function interface on this struct:

pub fn process_part(&mut self, sequence_number: &u16, part: StreamPart) -> Option<Bytes>

We'll take the stream part we received and the remote peer's send sequence number they sent the stream part with. If the stream part is the final piece of a fragment, we'll emit the assembled payload back to the caller, otherwise we'll return None as it was sent into a pile (or ignored).

Our first order of business ins this function body is that we do have one special case: stream parts that present themselves as the one and only part of a fragment can be emitted right away and skip the piling algorithm:

        if part.part_offset == 0 && part.part_total == 1 {
            // this is a special case: a simple <=MTU send! we can
            // emit it right away.
            return if part.payload.len() > 0 {
                Some(part.payload)
            } else {
                // If the payload is empty for some reason
                // let's reduce confusion and not emit it. This might
                // be the case for "just ACKs" for example.
                None
            };
        }

The rest of the function can just use context on the StreamPart to resolve which sequence id it belongs to (our pile map key) and either start a new pile on that ID, or add to an existing one:

        // each part should have a consistent relationship to the
        // sequence id that they were formed around, which we'll use
        // as a key here.
        let sequence_key = sequence_number.wrapping_sub(part.part_offset as u16);

        if let Some(pile) = self.bundles.get_mut(&sequence_key) {
            pile.insert(part);
            if pile.is_complete() {
                // TODO: emit!
            }
        } else {
            let mut new_pile = Pile::new(part.part_total);
            new_pile.insert(part);
            self.bundles.insert(sequence_key.clone(), new_pile);

            // <= MTU payloads don't need any bundling and can emit as-is immediately, making this
            // any more than half-our-ack-mask impossible. If we're seeing this assert fail then
            // maybe somehow malformed payloads are arriving, our windowing algorithm is failing,
            // or some very exciting combination of both.
            assert!(
                self.bundles.len() <= 16,
                "Having more than 16 shouldn't be possible. Leak?"
            );
        }

        None
    }
}

Note the wrapping_sub() method, which lets this work just fine even when we underflow the u16 sequence counter that is utilized universally.

Note also the assert! near the end. It's possible if not probable we'll have some bugs in the future where we leak stream contents.

Note, finally, that we're missing a few pieces in this system right now, an important one is that we have no way of knowing if there are other piles earlier in time, still in progress, and that we should wait to emit a currently completed, bin. I think that's going to be for another night, but the plan is to more or less pass in the ReceiverState struct which contains all our awareness about what messages we've received/not-received from our remote peer. The StreamPartAssembler could take a reference to this and build a complete picture of how ready we really are to emit which completed piles in which order.

We also need to actually assemble the fragments into the payload when is_complete() is true. That'll be next.

I have additional concerns with the current implementation, such as needing to detect potential expansion attacks if a client is abusing the protocol. Right now you could try to allocate at least 16x65535xN where N is a reliable stream. We may want to come up with a better cap for this, or detect clearly out of order fragments. Right now perfect is the enemy of good-enough and I'll revisit polish like that later.

UPDATE:

This works totally fine once I derive Copy on the StreamPart struct and I have no clear memory of what I was confused about to think it wasn't possible:

Pile {
    parts: vec![None; total_parts as usize],
    gaps_remaining: total_parts,
}

You must log in to comment.