mido

thinking so ur dog doesnt have to

I like making little games with friends :] Mostly shit posting and vibing.

posts from @mido tagged #rust-lang

also:

this little bug has not had the hobby juices but i'm forcing it to happen again today.

before I ran out of steam the big thing I was in the middle of was taking a first crack at a user friendly API wrapper around all the components & concepts i've jammed out so far.

no one wants to get a PhD in some library and all the ways to use it unless they really have to. the goal of a networking library is to help application writers wire up connections and send/receive data on those connections.

The most I want people to really think about is a small set of features my protocol has tried to achieve:

  • 31 arbitrarily configurable 'streams'
  • out of the box "just should work" chacha20 encryption without having to futz with x509 certs manually etc.
  • some basic fragmentation & reassembly on some stream types
  • fast enough

I took what I think/hope is a good approach in keeping a lot of these core ideas and components relatively loosely coupled and composed by messier middle-tier code that brings the feature promises together.

Currently a user would have to know how all these little things fit together as written and it's still a far cry from just networking_library.start_connection("remotehost.game:27015")

As I was writing my final outer layer to simplify all these ideas, I've come to realize a few missing pieces still.

One big piece is that my core networking behavior does not prescribe a protocol or behavior for important ideas like encryption key rotations, or stream assembling/deconstructing. I made a principled choice to treat these outcomes as application concerns. Right now the one big thing the library prescribes/forces is a "system stream", which is reliable-ordered and required to be constructed for a game connection.

Right now I'm putting together a "wrapper" crate for both clients/servers that will likely be what my initial game-engine uses will ultimately bridge to via whatever FFI is appropriate. This wrapper ultimately provides a tokio::task that injects itself as a filter to the OOTB system_stream and can automatically do things like key rotations, or detect a dead connection, etc.

If all goes well I should have a really nice "dummy-proof" wrapper around this library and can start really using it in an engine like Unity or maybe I'll even play around with UE5. I don't have any interesting code samples or technical problems to solve to add to this post but it helps me sort my thoughts out. Thanks!



I got a few things done the last few days. One important thing I got working e2e is the re-assembly of fragments in reliable streams. All I really added since part 8 was this chunk of code and some scaffolding:

    fn is_complete(&self) -> bool {
        self.gaps_remaining == 0
    }

    /// will consume this part and output the re-assembled payload from
    /// all the fragments.
    fn assemble(self) -> Bytes {
        let parts: Vec<_> = self.parts.into_iter().map(|f| f.unwrap().payload).collect();
        parts.concat().into()
    }

With this in place, I decided to set up a test harness with my echo test_client and echo test_server where test_client would send a 50 megabyte file to the server, which the server would send back. I stubbed my toe a few times getting it going and tracked down a few small bugs/issues to get there.

The biggest issue was that I didn't have a clean mechanism for emitting "just an ACK" from an idle receiver. I revisited the recv() method in the ReliableOrdered stream type and restructured how it considers incoming packets. If the incoming packet is EXACTLY a header length, we'll process the header but ONLY for the "sender state", which is the state we use to know which of our messages our remote peer, and what the most-recent sequence counter of ours they've seen:

    fn recv(&mut self, payload: Bytes) {
        let mut payload = payload.to_owned();
        let payload_len = payload.len();
        // TODO: we will probably want ack-only payloads that
        // dont have parts but we'll punt on that for now
        if payload_len < ACK_HEADER_BYTE_SIZE {
            println!("Received a packet that was too small for a header!")
        }

        let ack_header = read_ack_header(&mut payload);

        // we need to write this down BEFORE we change state, but we still want to process
        // the headers
        let should_process = self.recv_state.should_process_body(&ack_header);

        self.send_state.process_header(&ack_header);

        if !should_process || payload_len < ACK_HEADER_BYTE_SIZE + PART_HEADER_BYTES {
            // we've already received this! or it's weird/too-old
            // OR it has no body to process
            //println!("should process body failed");
            return;
        }
        // everything past this point is "should process"

        // if it's an ACK header where we shouldn't process
        // the body, we'll also not process receiver state because
        // the ack header is only trying to tell us about OUR sends,
        // and we shouldn't treat it as meaningful receiver
        // state. This is helpful to ensure that the remote peer
        // can keep sending stuff even when we're out of stuff
        // to send.
        self.recv_state.process_header(&ack_header);

// .. snip

With this and some other "i'll fix it later" hacks I don't think are worth sharing, our "send 50 megabytes of data" test was ready to roll. I fire up a handful of test_clients and ask them all to send the same 50 megabyte payload and watched them whirr. image of some network traffic in my test client and server going swimmingly same as before

     Running `target\debug\test_server.exe`
UDP Socket listening on 0.0.0.0:27015
Starting ingress...
Built new pending connection!
Query incoming
Promoting connection Ready for Some(127.0.0.1:49177)
New player added! id: 1 from: 127.0.0.1:49177
Adopted active connection for 127.0.0.1:49177
Connection heartbeat started...
Received system event from 1(127.0.0.1:49177): hello
Received system event from 1(127.0.0.1:49177): 52428800 bytes! (huge)

Woohoo! It performed well overall for a first run, the client was only able to transmit a whopping 0.3MBytes/s (2.4Mbits) per second with the currently pretty naive tcp-like sliding window mechanisms. But hey, most games, like say FFXIV only need 30-20KBit/s and more realtime-intensive games like CS:GO need approximately 1.0-2.0 Mbits/s, so I'd say we're in good shape! We'll come back and profile this later. I suspect a lot of this right now is due to the limitation of my current heartbeat watchdog, which is in charge of emitting pure ACKs and is very naive right now. We also wouldn't expect this sort of throughput demand on a reliable-ordered stream, we can probably get way more throughput on an unreliable stream where we don't have sliding ACK windows we might pause on until our peer can allow us to send more with some ACKs, etc.

Immediately the server tries to send all this information back to the client, but does run into an expected problem (not pictured anywhere in this post): the cipher we use for encryption becomes exhausted! Right now GameConnection is configured to panic! if we re-use the cipher more than some amount of times, right now that's just u16::MAX but I'll probably increase it. A big TODO for the near future is configuring cipher key rotations.

Tonight

Tonight, post-work, my little bug-brain is too tired for anything too fancy, so I've been working on some unit tests. One of the next things we need to tackle is re-transmission for our coveted ReliableOrdered streams. I don't have the brain to achieve the implementation tonight, but I can maybe treat my future-self with some Test Driven Development juice:

#[tokio::test]
async fn retransmission_required() {
    let mut sender: ReliableOrdered = Default::default();
    let mut receiver: ReliableOrdered = Default::default();

    // enqueue a bunch of messages
    for _i in 0..60 {
        assert!(sender.enqueue_send(b"hello"[..].into()).is_ok());
    }

    // we should only get OLDEST_UNACKED_MESSAGE results, because
    // thats how many the stream should deduce it can emit before
    // it needs to await acks
    let mut emissions = sender.get_emissions(50, usize::MAX).unwrap();

    // let's drop two messages and then pass the rest into the receiver
    emissions.remove(1); // drop!
    emissions.remove(3); // drop!

    for emission in emissions {
        receiver.recv(emission);
    }

    // receiver has 2 gaps in it's recv mask! oh no!!
    assert_eq!(2, receiver.recv_state.recv_ack_mask.count_zeros());

    // generate an ack from the receiver which will both let the sender
    // send a few more messages and, more importantly, will show the
    // sender the fact that we've lost two payloads!
    let ack = receiver.maybe_create_ack().unwrap();
    sender.recv(ack);
    assert_eq!(2, sender.send_state.send_ack_mask.count_zeros());

    // can't keep writing this test because I need to think on
    // these next parts.
    // NEXT BIG TODO FOR ME: decide how we'll detect, schedule and emit re-transmissions.
    // There is a hacky heartbeat tokio task we spawn with each
    // GameConnection that is probably going to need to be the thing which
    // notices the NEED for retransmission, and then ask the stream for
    // payloads to re-transmit, and perform the send itself.
}

Something nice about the current architecture and layering is that GameStream implementations don't have too much scheduling smarts (ie: no little companion tokio task or strict timing contract, (mostly)), they are largely driven by whatever owns them, in this case GameConnection or unit tests. This means that unit testing the complex behavior of things like ReliableOrdered has been really really nice so far. Tests like the above make me feel really confident about how everything is behaving.



it's time to fold up our kerchief into a little bindle and start bundling things. special thanks to @colinwd who paired with me this evening and helped me move the needle <3.

I did some big refactors I wanted to write a blog post about, but maybe I'll go back and do that for pt.9. I wanted to write about something more fun and algorithmically interesting.

general problem space

diagram of the ordering and re-assembly objectives (network traffic rarely is this incredibly out of order, statistically)

right now we're working on reliability and fragmentation assembly. for those not in the know, the general frame-size byte limit for a packet moving across networking gear and across the internet etc, is about 1500 bytes, or the "MTU". Some things can eat into this frame size, such as vlan tagging (removes 4 bytes!). So most folks will wisely just give themselves some nice overhead. I chose 1430 as my arbitrary MTU.

A lot of messages don't need to be very big. Most game packets can travel in small deltas, and can be quantized Vector2s and Vector3s, floats, u32s and the like. But it's very common that a game scene must perform some sort of "bigger" event, perhaps periodic snapshots or maybe at the end of a round or during a special event, MANY entities need to be shuffled in one or a few frames, causing many deltas to get emitted and bundled into one big update for that frame.

🦀🦀 code time 🦀🦀

Last night and tonight I got around to getting started on this and the general solution is to throw together a simple struct I'm calling StreamPartAssembler. We'll get to the StreamPartAssembler in a moment, but first let's focus on one of it's simpler components, the Pile. It's a joke name I'll probably regret, but it's a (potentially temporarily disparate) pile of incoming payloads that are all fragments of the whole.

Let's take a look:

#[derive(Default, Debug)]
struct Pile {
    parts: Vec<Option<StreamPart>>,
    gaps_remaining: u16,
}

Oh and StreamPart for reference:

#[derive(Debug)]
pub(super) struct StreamPart {
    pub part_offset: u16,
    pub part_total: u16,
    pub payload: Bytes,
}

It will pre-allocate a vec with Option<T>s which will start as None, and give us fixed constant-time offsets to reach over to and fill with StreamParts as they arrive.

We'll only decrement gaps_remaining when we fill in a slot that was previously None, and will currently panic if we somehow try to double-apply a StreamPart (stuff happening around this construct shouldn't allow this to happen! The stream construct that owns the StreamPartAssembler has checks to ignore the body of any payloads we've already received according to our ack mask, so they shouldn't reach the Assembler!)

That pre-allocation is pretty straight forward:

impl Pile {
    fn new(total_parts: u16) -> Self {
        Pile {
            parts: std::iter::repeat_with(|| None)
                .take(total_parts as usize)
                .collect(),
            gaps_remaining: total_parts,
        }
    }
// ...
}

Note the kinda silly pattern with the None generator -> take() -> collect(), Rust's type system struggles in this specific situation and this is the least-bad way I've found to do it so far.

Then we have our simple insertion algorithm:

    fn insert(&mut self, part: StreamPart) {
        if let None = self.parts[part.part_offset as usize] {
            // part gets moved during the insert so we need a copy
            let part_offset = part.part_offset as usize;

            self.gaps_remaining -= 1;
            self.parts[part_offset] = Some(part);
        } else {
            panic!(
                "Unexpected repeat stream part... {}/{}",
                part.part_offset, part.part_total
            )
        }
    }

And last but not least, the check for when we're complete:

    fn is_complete(&self) -> bool {
        self.gaps_remaining == 0
    }

Now that we kind of know the shape of our little piles/bins, we can talk about the StreamPartAssembler. This struct is dead simple at a glance:

#[derive(Default, Debug)]
pub(super) struct StreamPartAssembler {
    bundles: HashMap<u16, Pile>,
}

It just contains a common ID for each pile, which for now is the originating sending sequence id that this fragmented packet was initially sent on (the sending side will emit these contiguously, so the sending sequence counter will be associated to each fragment monotonically).

We only need one little function interface on this struct:

pub fn process_part(&mut self, sequence_number: &u16, part: StreamPart) -> Option<Bytes>

We'll take the stream part we received and the remote peer's send sequence number they sent the stream part with. If the stream part is the final piece of a fragment, we'll emit the assembled payload back to the caller, otherwise we'll return None as it was sent into a pile (or ignored).

Our first order of business ins this function body is that we do have one special case: stream parts that present themselves as the one and only part of a fragment can be emitted right away and skip the piling algorithm:

        if part.part_offset == 0 && part.part_total == 1 {
            // this is a special case: a simple <=MTU send! we can
            // emit it right away.
            return if part.payload.len() > 0 {
                Some(part.payload)
            } else {
                // If the payload is empty for some reason
                // let's reduce confusion and not emit it. This might
                // be the case for "just ACKs" for example.
                None
            };
        }

The rest of the function can just use context on the StreamPart to resolve which sequence id it belongs to (our pile map key) and either start a new pile on that ID, or add to an existing one:

        // each part should have a consistent relationship to the
        // sequence id that they were formed around, which we'll use
        // as a key here.
        let sequence_key = sequence_number.wrapping_sub(part.part_offset as u16);

        if let Some(pile) = self.bundles.get_mut(&sequence_key) {
            pile.insert(part);
            if pile.is_complete() {
                // TODO: emit!
            }
        } else {
            let mut new_pile = Pile::new(part.part_total);
            new_pile.insert(part);
            self.bundles.insert(sequence_key.clone(), new_pile);

            // <= MTU payloads don't need any bundling and can emit as-is immediately, making this
            // any more than half-our-ack-mask impossible. If we're seeing this assert fail then
            // maybe somehow malformed payloads are arriving, our windowing algorithm is failing,
            // or some very exciting combination of both.
            assert!(
                self.bundles.len() <= 16,
                "Having more than 16 shouldn't be possible. Leak?"
            );
        }

        None
    }
}

Note the wrapping_sub() method, which lets this work just fine even when we underflow the u16 sequence counter that is utilized universally.

Note also the assert! near the end. It's possible if not probable we'll have some bugs in the future where we leak stream contents.

Note, finally, that we're missing a few pieces in this system right now, an important one is that we have no way of knowing if there are other piles earlier in time, still in progress, and that we should wait to emit a currently completed, bin. I think that's going to be for another night, but the plan is to more or less pass in the ReceiverState struct which contains all our awareness about what messages we've received/not-received from our remote peer. The StreamPartAssembler could take a reference to this and build a complete picture of how ready we really are to emit which completed piles in which order.

We also need to actually assemble the fragments into the payload when is_complete() is true. That'll be next.

I have additional concerns with the current implementation, such as needing to detect potential expansion attacks if a client is abusing the protocol. Right now you could try to allocate at least 16x65535xN where N is a reliable stream. We may want to come up with a better cap for this, or detect clearly out of order fragments. Right now perfect is the enemy of good-enough and I'll revisit polish like that later.

UPDATE:

This works totally fine once I derive Copy on the StreamPart struct and I have no clear memory of what I was confused about to think it wasn't possible:

Pile {
    parts: vec![None; total_parts as usize],
    gaps_remaining: total_parts,
}


Time To Refactor

I've been spending a bunch of time doing refactors and writing extra little bitmask utilities & unit tests etc to get thing set up for proper reliability behavior, but I finally hit a wall today where I need to step back and do a bigger refactor.

My MVP of MVPs here had a very simple approach for unreliable sends, and I was kicking some cans down the road in a "eh I can figure it out later, I think this interface will work" approach.

In true pilot-programming fashion I've realized how much undesirable coupling and side effects I was beginning to create as I began shuffling stuff around to support our reliability concerns. Namely there's three "layers" that have to coordinate to route all the inputs/outputs and state management:

┌──────────────────────────┐
│  GameConnection<Active>  │
├──────────────────────────┤
│       GameStream         │
├──────────────────────────┤
│  dyn ReliabilityStrategy │
└──────────────────────────┘

In the current architecture, the plan was to have a lot of common behavior live in the non-generic, concrete GameStream type, which includes a lot of little behaviors for processing stream headers and their ACK masks, sequence counters, etc. The hope was to let all GameStreams do this work by default, and to then have a family of dyn ReliabilityStrategy elements that GameStream can send raw stream-part data into and get raw stream-part data out of.

In my journey to get the reliability stuff going, I've realized that there are just too many stateful concerns between the reliable approach and unreliable approaches to share a common base without just a ton of special case spaghetti and plumbing across all 3 layers.

The New Plan

I strayed away from this originally but I'm happy to come back to it, the new plan is basically just to turn GameStream into a trait and have concrete implementations per each type, and forego the 3rd ReliabilityStrategy layer.

This will mean having a Box<dyn GameStream> in GameConnection, but the vtable lookup is not an actual concern of mine.

This also means I need to spend a few evenings just breaking things apart and shuffling code around. GameStream was getting a little big, and even bigger with reliability concerns/specializations, so with this lesson learned I've realized a decent new shape for things.

Previously the GameStream struct looked like this:

pub struct GameStream {
    reliability_strategy: Box<dyn ReliabilityStrategy>,

    /// Current sequence counter for outgoing sends.
    send_sequence: u16,
    /// The ack mask my peer has observed
    send_ack_mask: u32,
    /// The most recent sequence id our remote peer has seen from us
    send_ack_seq: u16,

    /// The ack mask i've calculated based on messages
    /// received from my peer.
    recv_ack_mask: u32,
    latest_recv_seq: u16,
    last_recv: Instant,

    // outgoing packet buffer
    // TODO: these may be worth getting rid of as we have the "reliability strategy"
    //       traits as a component of streams now
    packet_buf_seq: [u16; PACKET_BUF_SIZE],
    packet_buf: [Option<Bytes>; PACKET_BUF_SIZE],

    // the sending side of the channel that will ultimately want raw
    // bytes off of this channel.
    recv_handler: Box<dyn Fn(Bytes) -> Result<(), ()> + Sync + Send>,
}

Despite some renaming, I continually found it a little headache-inducing to divide up which state I was supposed to be messing with when jumping around in my work. More than once I found myself touching the wrong mask, for example.

My current high level sketch for the refactor here looks like this:

pub trait NewGameStream {
    fn enqueue_send(&mut self, payload: Bytes) -> Result<(), EnqueueError>;
    fn get_emissions(&mut self, num: usize) -> Option<Vec<Bytes>>;
    fn recv(&mut self, payload: Bytes);
}

#[derive(Debug, Error)]
pub enum EnqueueError {
    #[error("This stream doesn't support fragmentation.")]
    FragmentationUnsupported,
}

/// As a message sender we need to continually monitor some ideas,
/// such as our send sequence counter, a bit mask of which of our
/// traffic our remote peer has successfully received (by processing
/// the ack header)
struct SenderState {
    /// Current sequence counter for outgoing sends.
    send_sequence: u16,
    /// The ack mask my peer has observed
    send_ack_mask: u32,
    /// The most recent sequence id our remote peer has seen from us
    send_ack_seq: u16,
}

/// Observations of our remote peer, namely tracking their most
/// recent send_id (for THEIR sends), and realizing a bit mask that
/// best represents their recent sends to us, which is all state
/// we'll reflect to them so they can constantly be informed of
/// our perspective of their behavior. This "reflection" of their
/// state is vital for them to make choices, or at least help them
/// be aware of the health of the connection.
struct ReceiverState {
    /// The ack mask i've calculated based on messages
    /// received from the remote peer.
    recv_ack_mask: u32,
    /// the latest sequence id i've seen from the remote peer
    latest_recv_seq: u16,
    /// The last point in time we've received any traffic from them
    last_recv: Instant,
}

pub struct UnreliableBestEffort {
    send_state: SenderState,
    recv_state: ReceiverState,
}

pub struct ReliableOrdered {
    send_state: SenderState,
    recv_state: ReceiverState,
}

Another mistake I made is trying to be too clever with the API for when GameConnection wants to ask the GameStream to take an arbitrarily sized (possibly larger than MTU) Bytes payload for sending. GameStream would do what was most appropriate and return a collection of Bytes to send out immediately to the UDP Socket:

pub fn get_sends(&mut self, payload: Bytes) -> Vec<Bytes> {..}

This was fine for unreliable streams in my MVP work, since I was always sending less than our MTU ceiling (1430 bytes) and we could always send immediately. With reliability we may want to enqueue a ~15kilobyte or 15mb payload (god forbid). We may be already SENDING such a huge payload from a previous request, and the current API didn't lend itself well to that sort of enqueuing for a ton of ugly reasons internal to GameStream's coupled behavior with GameConnection and the ever-growing ReliabilityStrategy trait I just trashed.

The Good News

The good news is that overall I did a good job separating concerns, and a rewrite of my approach to GameStream does not dramatically impact GameConnection, and will actually simplify some little "ah fuck it"s I sprinkled in there.

I think I ended up here because I'm still growing as a Rust programmer. I write it in small fits once a blue moon and haven't had a project get complicated enough like this to tackle some rust-understanding problems. I had the wrong intuition with the tools available to me when I first sketched out these APIs and relationships, and also needed to get into the weeds more with the reliability algorithms.

I don't regret the path I took so far though, this is a "trust the process" stepping stone. Getting the MVP working e2e, for example, such that I could write a simple echo server, has been an incredible boon to my motivation and also a valuable "checkpoint" test when I want something more than unit tests to make sure nothing unexpected is happening.

Getting an MVP working and learning some lessons from that and revisiting some components is a healthy and normal part of just figuring stuff out.