mido

thinking so ur dog doesnt have to

I like making little games with friends :] Mostly shit posting and vibing.


mido
@mido

already very pleased with the specialized approach to my streams, I can now write very concern-specific implementations of simpler interfaces.

the bytes crate is truly goated with the sauce when doing byte stuff is the vibe. it has a lot of little things you can do with it to avoid allocs or drops until you're truly done with a chunk of memory. in the last few weeks i've gotten way more comfortable with it and it's paying off.

here, for example, i'm usin the slice(..) function to chop up a larger-than-MTU-size payload and enqueue it's constituent parts. the original "Bytes" is dropped, but it's memory lives on in all the little slices we made, and we can avoid freeing that memory until our reliable send is complete.

(this is a WIP)

/// Some example math of the chunking:
///
/// ```plaintext
/// if payload is 5000 bytes
/// and MTU is 1430:
///
/// i=0 chunk_size=1430     0..1429
/// i=1 chunk_size=1430  1430..2859
/// i=2 chunk_size=1430  2860..4289
/// i=3 chunk_size=710   4290..5000
/// ```
fn enqueue_send(&mut self, payload: Bytes) -> Result<(), EnqueueError> {
    if payload.len() <= MTU_MAX {
        // simple path, payload is LTE MTU!
        self.pending_parts.push_back(Parts::SimpleSingle(payload));
    } else {
        // complicated path, payload is GT MTU!
        let last_chunk_size = payload.len() % MTU_MAX;
        let max_chunks = (payload.len() / MTU_MAX) + usize::from(last_chunk_size != 0);

        if max_chunks > u16::MAX as usize {
            panic!(
                "We shouldn't be trying to send things that are bigger than {} bytes! Yowza!",
                MTU_MAX * u16::MAX as usize
            )
        }

        for i in 0..max_chunks {
            let chunk_size = if i == max_chunks - 1 {
                last_chunk_size
            } else {
                MTU_MAX
            };

            let slice_start = i * MTU_MAX;
            let slice_end = (i * MTU_MAX) + chunk_size;

            // we do this whole for loop instead of a more functional-style map because
            // the bytes.slice(..) creates a new bytes WITHOUT allocs, which is cool
            // for us, we can avoid memory shuffling until it's fully sent out and
            // is dropped!
            let chunk = payload.slice(slice_start..slice_end);
            self.pending_parts
                .push_back(Parts::Chunk(i as u16, max_chunks as u16, chunk))
        }
    }
    Ok(())
}

avoiding much memory thrashing will be important, because in busier games you can cause some pretty nasty memory contention with memory i/o alone. this is why some very ambitious games like battlefield games, with their huge arenas of chaos that affect tons of nearby entities, can have way bigger cpu requirements than most games. it's not just all the physics and not all the crazy animations (though it is those, too, to be fair), it's all the damn I/O across all system components. so saving allocs on hot paths like this is super important.


mido
@mido

Something I missed when I did this implementation is that if you have a slice reference that is part of an existing Byte's memory, you can create new Bytes from the original one. This comes to us via the slice_ref function. It will panic if the slice is not part of it's owned memory region, but otherwise lets you continue to avoid unwanted allocs. That method in the previous post now looks like this:

/// Some example math of the chunking:
///
/// ```plaintext
/// if payload is 5000 bytes
/// and MTU is 1430:
///
/// i=0 chunk_size=1430     0..1429
/// i=1 chunk_size=1430  1430..2859
/// i=2 chunk_size=1430  2860..4289
/// i=3 chunk_size=710   4290..5000
/// ```
fn enqueue_send(&mut self, payload: Bytes) -> Result<(), EnqueueError> {
    if payload.len() <= MTU_MAX {
        // simple path, payload is LTE MTU!
        self.pending_parts.push_back(Parts::SimpleSingle(payload));
    } else {
        // complicated path, payload is GT MTU!
        let max_chunks = (payload.len() / MTU_MAX) + usize::from(payload.len() % MTU_MAX != 0);

        if max_chunks > u16::MAX as usize {
            panic!(
                "We shouldn't be trying to send things that are bigger than {} bytes! Yowza!",
                MTU_MAX * u16::MAX as usize
            )
        }

        payload
            .chunks(MTU_MAX)
            .enumerate()
            .for_each(|(i, subslice)| {
                let chunk = payload.slice_ref(subslice);
                self.pending_parts
                    .push_back(Parts::Chunk(i as u16, max_chunks as u16, chunk));
            });
    }
    Ok(())
}

I completely missed this somehow and now I realize what a life saver it is.


mido
@mido

almost caught up to where i left off, e2e test work again after tons of reorganizing:

UDP Socket listening on 0.0.0.0:27015
Starting ingress...
Built new pending connection!
Query incoming
Promoting connection Ready for Some(127.0.0.1:55947)
New player added! id: 1 from: 127.0.0.1:55947
Adopted active connection for 127.0.0.1:55947
Received system event from 1(127.0.0.1:55947): "hello"
Received system event from 1(127.0.0.1:55947): "hello"
Received system event from 1(127.0.0.1:55947): "hello"

I have a new and exciting data structure challenge where I must come up with a simple structure responsible for binning partial parts/chunks of a fragmented message (larger than MTU).

Some things we know reliably, as it's data included in every chunk: order via send_sequence, the chunk's offset within the fragmented object, the total # of chunks this chunk is a part of.

Rust graciously allows us to use Range<u16> as a hashmap key so I'm looking at doing something like this:

#[derive(Default)]
struct StreamPartBundler {
    bundles: HashMap<Range<u16>, Vec<StreamPart>>,
}
#[derive(Default)]
struct StreamPartBundler {
    bundles: HashMap<Range<u16>, Vec<StreamPart>>,
}

impl StreamPartBundler {
    /// Consume part of a chunked message and if it happens
    /// to complete as a result of this final piece of the puzzle then
    /// please assemble it and output the final Bytes payload.
    fn process_part(&mut self, part: StreamPart) -> Option<Bytes> { }
}

You must log in to comment.

in reply to @mido's post: