mido

thinking so ur dog doesnt have to

I like making little games with friends :] Mostly shit posting and vibing.


I got a few things done the last few days. One important thing I got working e2e is the re-assembly of fragments in reliable streams. All I really added since part 8 was this chunk of code and some scaffolding:

    fn is_complete(&self) -> bool {
        self.gaps_remaining == 0
    }

    /// will consume this part and output the re-assembled payload from
    /// all the fragments.
    fn assemble(self) -> Bytes {
        let parts: Vec<_> = self.parts.into_iter().map(|f| f.unwrap().payload).collect();
        parts.concat().into()
    }

With this in place, I decided to set up a test harness with my echo test_client and echo test_server where test_client would send a 50 megabyte file to the server, which the server would send back. I stubbed my toe a few times getting it going and tracked down a few small bugs/issues to get there.

The biggest issue was that I didn't have a clean mechanism for emitting "just an ACK" from an idle receiver. I revisited the recv() method in the ReliableOrdered stream type and restructured how it considers incoming packets. If the incoming packet is EXACTLY a header length, we'll process the header but ONLY for the "sender state", which is the state we use to know which of our messages our remote peer, and what the most-recent sequence counter of ours they've seen:

    fn recv(&mut self, payload: Bytes) {
        let mut payload = payload.to_owned();
        let payload_len = payload.len();
        // TODO: we will probably want ack-only payloads that
        // dont have parts but we'll punt on that for now
        if payload_len < ACK_HEADER_BYTE_SIZE {
            println!("Received a packet that was too small for a header!")
        }

        let ack_header = read_ack_header(&mut payload);

        // we need to write this down BEFORE we change state, but we still want to process
        // the headers
        let should_process = self.recv_state.should_process_body(&ack_header);

        self.send_state.process_header(&ack_header);

        if !should_process || payload_len < ACK_HEADER_BYTE_SIZE + PART_HEADER_BYTES {
            // we've already received this! or it's weird/too-old
            // OR it has no body to process
            //println!("should process body failed");
            return;
        }
        // everything past this point is "should process"

        // if it's an ACK header where we shouldn't process
        // the body, we'll also not process receiver state because
        // the ack header is only trying to tell us about OUR sends,
        // and we shouldn't treat it as meaningful receiver
        // state. This is helpful to ensure that the remote peer
        // can keep sending stuff even when we're out of stuff
        // to send.
        self.recv_state.process_header(&ack_header);

// .. snip

With this and some other "i'll fix it later" hacks I don't think are worth sharing, our "send 50 megabytes of data" test was ready to roll. I fire up a handful of test_clients and ask them all to send the same 50 megabyte payload and watched them whirr. image of some network traffic in my test client and server going swimmingly same as before

     Running `target\debug\test_server.exe`
UDP Socket listening on 0.0.0.0:27015
Starting ingress...
Built new pending connection!
Query incoming
Promoting connection Ready for Some(127.0.0.1:49177)
New player added! id: 1 from: 127.0.0.1:49177
Adopted active connection for 127.0.0.1:49177
Connection heartbeat started...
Received system event from 1(127.0.0.1:49177): hello
Received system event from 1(127.0.0.1:49177): 52428800 bytes! (huge)

Woohoo! It performed well overall for a first run, the client was only able to transmit a whopping 0.3MBytes/s (2.4Mbits) per second with the currently pretty naive tcp-like sliding window mechanisms. But hey, most games, like say FFXIV only need 30-20KBit/s and more realtime-intensive games like CS:GO need approximately 1.0-2.0 Mbits/s, so I'd say we're in good shape! We'll come back and profile this later. I suspect a lot of this right now is due to the limitation of my current heartbeat watchdog, which is in charge of emitting pure ACKs and is very naive right now. We also wouldn't expect this sort of throughput demand on a reliable-ordered stream, we can probably get way more throughput on an unreliable stream where we don't have sliding ACK windows we might pause on until our peer can allow us to send more with some ACKs, etc.

Immediately the server tries to send all this information back to the client, but does run into an expected problem (not pictured anywhere in this post): the cipher we use for encryption becomes exhausted! Right now GameConnection is configured to panic! if we re-use the cipher more than some amount of times, right now that's just u16::MAX but I'll probably increase it. A big TODO for the near future is configuring cipher key rotations.

Tonight

Tonight, post-work, my little bug-brain is too tired for anything too fancy, so I've been working on some unit tests. One of the next things we need to tackle is re-transmission for our coveted ReliableOrdered streams. I don't have the brain to achieve the implementation tonight, but I can maybe treat my future-self with some Test Driven Development juice:

#[tokio::test]
async fn retransmission_required() {
    let mut sender: ReliableOrdered = Default::default();
    let mut receiver: ReliableOrdered = Default::default();

    // enqueue a bunch of messages
    for _i in 0..60 {
        assert!(sender.enqueue_send(b"hello"[..].into()).is_ok());
    }

    // we should only get OLDEST_UNACKED_MESSAGE results, because
    // thats how many the stream should deduce it can emit before
    // it needs to await acks
    let mut emissions = sender.get_emissions(50, usize::MAX).unwrap();

    // let's drop two messages and then pass the rest into the receiver
    emissions.remove(1); // drop!
    emissions.remove(3); // drop!

    for emission in emissions {
        receiver.recv(emission);
    }

    // receiver has 2 gaps in it's recv mask! oh no!!
    assert_eq!(2, receiver.recv_state.recv_ack_mask.count_zeros());

    // generate an ack from the receiver which will both let the sender
    // send a few more messages and, more importantly, will show the
    // sender the fact that we've lost two payloads!
    let ack = receiver.maybe_create_ack().unwrap();
    sender.recv(ack);
    assert_eq!(2, sender.send_state.send_ack_mask.count_zeros());

    // can't keep writing this test because I need to think on
    // these next parts.
    // NEXT BIG TODO FOR ME: decide how we'll detect, schedule and emit re-transmissions.
    // There is a hacky heartbeat tokio task we spawn with each
    // GameConnection that is probably going to need to be the thing which
    // notices the NEED for retransmission, and then ask the stream for
    // payloads to re-transmit, and perform the send itself.
}

Something nice about the current architecture and layering is that GameStream implementations don't have too much scheduling smarts (ie: no little companion tokio task or strict timing contract, (mostly)), they are largely driven by whatever owns them, in this case GameConnection or unit tests. This means that unit testing the complex behavior of things like ReliableOrdered has been really really nice so far. Tests like the above make me feel really confident about how everything is behaving.


You must log in to comment.