let me boldly assert that there are two types of programmer (at least as far as this post is concerned):
- a programmer who builds something out of message queues and calls it a pipeline
- a programmer who has had to maintain and operate a type-1 made pipeline
although both types of programmer are kept awake by thoughts of code at night, only one of them is on the pager rota. every programmer gets to fuck around, but only a handful are blessed/cursed with finding out.
this essay is for programmers of the second variety. i'm not sure what anyone else will get, but hopefully the people who have already suffered will receive validation, and maybe a few lucky people will pick up some useful framing and vocabulary to fight the good fight later.
it's also for people who won't whine about the lack of capitalization in a rough blog post
prelude: character selection
you are:
- ☐ A long time industry expert, with twelve acronyms on your business card.
- ☐ Someone with a kanban, can-do attitude. Party planning your way to success!
- ☑ A well rounded burnout who got hired through word of mouth.
you are
- ☐ in a small engineering team, building out features for a website
- ☑ employee number four and the ceo keeps writing code on weekends
- ☐ in a large enterprise company with an hour and a half long daily standup
you are
- ☐ in charge of building out backend systems
- ☐ all working in the same repo
- ☑ accidentally in ops, after fixing one too many builds
the fire is lit: a pipeline is coming
it's 3pm, it's friday, and your coworker drops a link in the company chat. it's a 2000 line change request, and they're desperate for a +1. despite the length, the code is relatively straight forwards. a lot of it is just yaml.
there's a thumbnail generator that's starting to take too long. originally it just cropped and resized things down, but for some ungodly reason, there's video support and transcoding now. requests to the website are timing out because this process is taking too long to complete.
your coworker, bored as hell, desperate to get on with real work, volunteers as tribute to fix the bug. now the code calls "StartThumbNailer(user, file)", a message gets put in a queue, and another process elsewhere calls "user, file := queue.ThumbNextNail()". problem solved.
alas, it isn't icarus' fate to know his future.
step 1: cleaning up after dropped messages
you open a dm, it's best to avoid an audience. people get touchy about their code.
"This is great work, it's good to prototype these things out"
Remember: Don't be a dick about it. Don't squeal and wail, not matter how much you want to. People really don't like being told "You can't do it that way. You do not understand why." It's a bad look all round, even if it's true.
Establish common ground, reframe problem, work towards common goals. Then you can be a dick about it, later. Remember: It's only a little bit less of a dick to be Socratic about it, and ask questions you already know the answer to, so try and be nice where you can.
"I don't see a lot of error handling."
There's never any error handling. The message broker is always running, the queue always exists, and the workers never make a mistake, either. That's how prototypes look, sure, but that's how pipelines will look, years later.
The only thing that changes as pipelines age is the number of graphs on the ops dashboard.
"It will be good to work out precisely which things we can leave out of a first implementation. We don't want to lose future feature dev time to operations."
You're not trying to "shut it down", you're working out the minimum level of work needed to ship it. You're justifying why with a business case.
"What happens when message delivery fails? Maybe there's a network blip, maybe someone tripped over the cable at the data centre?"
It's important not to blame the broker, and focus more on "networks are bad and haunted." People will just tell you the broker saves messages to disk, so they're even harder to lose, ignoring the actual problem at hand.
"If the send fails, we can just log it instead of doing nothing, and I guess we can restart those thumbnail jobs by hand."
Again, it's time to move on. A log message is fine enough for a first prototype, so don't get hung up on it. A lack of error handling isn't the important point. The important point is "how much manual work is it going to be to fix things after they go wrong?"
"If something else went wrong, Is there a way to tell which users are missing thumbnails? Or would we have to add logging elsewhere?"
The answer's usually something like "a timestamp in the database" or something, and another klaxon starts ringing. Resist the urge to explain why timestamps are a bad choice for now, you can revisit it later.
The important bit is that "there's some state, in a database", and for now it only has two states "needs doing, and has been done". It'll end up with seven or eight states later on, and that's when you'll have "the timestamp talk"
"I think the important question is: Can write some .NeedsThumbnail() or .HasThumbnail() function, and then resend those messages? We can worry about the implementation details at the end."
That's right. "At the end". Now your coworker should realise it's time for a long haul discussion about the system. Don't miss your chance to recap things.
- you recognise that there's limits to error handling in a prototype
- you're worried about the manual work required to fix errors
- with a bit more of a plan, things can move forward
- working out manual steps required for recovery & seeing if there's low hanging fruit
you open a shared document. it's called "background work: operations guide", and drop a link in the change request.
lost message
- problem: the service might fail to send a message for a variety of reasons
- solution: we can discover this through logs and alarms
- solution: we can store some state in the database, so it's easy to tell the "needs thumbnails" from "has thumbnails"
- resolution: we can write scripts to restart lost jobs, or do it manually
with some aplomb, you write up a series of headings underneath.
- duplicate messages
- failed thumbnailer
- head of line
- statistics / monitoring / debugging
your coworker is not impressed, but hasn't lost hope yet. maybe they'll ship it by the end of the week. meanwhile, there's a bit of a shit-eating-grin on your face.
error handling sometimes means running the same thing twice. duplicate handling means avoiding just that. the statistics part is yet another "tricking them into using the database" section, broken down into smaller bite sized chunks.
it is time to make your coworker eat their vegetables.
step 2: handling duplicates
the original code didn't really have any deduplication. the worker assumes it's the only one running for that user's thumbnails.
duplication is a always a problem because the recovery steps for "lost messages" involve resending potentially lost messages. for example:
- workers stop running overnight
- message queue builds up
- workers restart in morning
- user notices thumbnail is missing, gets manually added to queue by hand
- now there's two messages in the queue.
it isn't the worst thing in the world if two processes get run at once, but no-one's really sure if it will cause problems, writing to the same files at the same time.
again, the point of raising this isn't "this has to be fixed" but "we need to understand how it can fail, and how much time will we waste fixing it."
sometimes the answer is "make the process idempotent", but usually the answer is "locks", shortly followed by "leases". you'll need to timeout locks eventually, if a worker crashes midway through thumbnailing.
"well, fuck it, we'll use redis. put a worker_id, timestamp in a hash, and if that timestamp is old, or missing, i can write my own one in."
it's not perfect by any means, but it will reduce the operations headache. you update the file and move on.
sometimes the queue has a magical transactional mode, where you can hold onto a message until you've finished processing it, and after a timeout it'll be available for other workers.
it's almost the same thing you're doing with redis, but there's still a little more work to do:
step 3: handling failed thumbnailer.
"if a worker takes a message, asks for a lease, and then crashes, how do we retry work?
Do we scan the logs by hand again? What if it crashes before being able to log anything?"
The usual answer to "How do we fix it" is "Something puts the message back in the queue." Your coworker points out that we can fix it in the same way as before. Putting some timestamps in the database. Writing an automated script that restarts old jobs.
You nod once more, it's still not the right time to argue about timestamps.
"Let's call it a message pump. Since it pumps messages into the queue. Also, we should probably keep track of the last error message, and the number of attempts when we do it."
It's ok, it's only adding one JSONB column. One we'll make use of later. Heh heh heh.
"There's still one problem left, but I'm not sure it applies."
step 4: handling head of line blocking (optional)
thankfully your coworker was sensible, and suggested using the same automated script as before. if they'd chosen the fancy transactional queue, there'd be a little surprise for them in store: head of line blocking
everyone who has worked with queues usually has a story like this, give or take some details:
- for error handling, we used a transactional queue: if a process failed, it returned to the queue.
- one job kept killing all the workers during a period of high load. the worker would crash, and the bad job went back to the top of the list. two people had a screaming match over zoom
- instead of limiting retries, or a time-to-live on the job message, retries are put in a new "error queue", with their own workers
- the error queue ends up full of duplicate messages, and all progress on retries is blocked.
you write "head of line blocking: not in scope" and move on. It's at this point you pray for a lightbulb above your coworkers head. Handling lost messages, handling broken thumbnailer runs, ensuring two jobs don't overlap, these are all interlinked. It's far too easy to write error handling that causes more errors.
with the error handling wrapped up, it's time to move on to the last section. unfortunately for your coworker, this is where the magic happens.
It's finally time to talk about timestamps.
step 6: debugging / introspection
now you can start asking questions like "How do we tell if a job has been enqueued or not? how do we tell if a job is being worked on?" before moving onto "Actually, how many states can a process be in?"
with the queue, it's not possible to find out what things are running, if anything has crashed, and instead all you have is a graph that has three settings:
- Queue's empty. Fine. Or maybe it's broken and no work is happening.
- Burst of work, but line is going down. Actually fine.
- Line is going up. Something is not working, and whatever it is, the problem is only getting worse.
Meanwhile, the message broker queue thing as it stands only tracks two things:
- If it's in the queue, this job will be assigned to a worker next.
- If it's not in the queue, this job will not be assigned to a worker.
In other words:
- We can't really tell if things are working reliably, outside of a few cases
- The only thing we know for sure is "this work is about to be done"
Your coworker points out that the timestamp thing from earlier would help, and it does. Now you can track two more things:
- When did I last send a message
- When did a job last complete.
Notice how neither of those two states really tell you anything about what's going on for a particular thumbnail job. You can kinda infer "this should be run" if the last send message is older than the job last completing. You can kinda infer "this ran, but i don't know if it succeeded or not" if the last send message is earlier than the job completeing.
That's not a lot to infer. The underlying problem here is that a process can be in way more than two or three states, especially now error handling has come into the mix, and a handful of timestamps won't fix it.
To be clear: This is the time to yell about timestamps.
Yell about having one field called state is way easier than started_at last_active_at completed_at last_error_at, and a nasty series of comparisons. A field with three booleans and a truth table isn't any better, either.
After sketching things out with your coworker, you decide that a job can be in several different states:
- initialised: an id or record exists but there's no data inside
- created: everything needed to start the job is available.
- enqueued: in the process of being assigned to a worker
- active: a worker is doing the job
- failed: a worker encountered a known error, and can be restarted
- panic: a worker encountered an unknown error
- timeout: a worker failed to complete the job
- skipped: no work actually needed
"We'll probably combine some of them together, but it's good practice to split things up as much as possible."
step 7: build a revised plan
Now you've committed to tracking state in the database, it's time to clean up the proposal.
- we'll have a table of thumbnail jobs in the database
- each one has a state field, a free text field, and a jsonb column for error handling and other details
"We could have a table of thumbnail changes in the database, that gets a new record every time the state changes. It might be more work now, but it's probably easier to have one DB migration than two."
- workers read from a queue to find new work, and use a redis lock to exclude others
- there's a message pump that reads from the database and writes to the queue
- we don't need to persist any messages in the queue
It all feels a little overkill, but it really does make a big difference. Unlike before, there's handling in place for all sorts of errors. It's easy to recover from a crashed queue, you just empty it and restart the workers. Plus one quite lovely feature: the message pump can check the queue length, and alert if things stop going down. There's no need to configure an alerting system.
It's not a lot of code to write. The redis lock code (in lua) is somewhere on stack overflow. The database migration is a pain in the ass to deploy, but it's not difficult to write. Your coworker even adds in the second table, and starts collecting statistical data in the jsonb field to show off.
intermission: you have not built a pipeline, yet
it's a good point to sigh and take a break. you've successfully got a coworker to build a reliable, robust system. one that has automated ways to handle failures, one that doesn't require human intervention on the regular, and one that doesn't remind you of all the awful systems you've suffered so far.
it's a good moment to take a step back and ask "how come it worked out this time"
- your coworker actually believes you when you share your experience
- you aren't forcing people to reinvent your exact solution
- not every issue is fixed, despite being identified
- it wasn't about someone being right, or someone being wrong, it was about lowering operational costs
sure enough, sometimes it doesn't work out
- your coworker doesn't care about the operational cost
- no-one's getting a bonus for taking time to ship things carefully
- the right thing will only matter at scale, and by that time, who knows what we'll be needing
sometimes it's a little bit like solving a race condition. no-one believes it can be fixed, and when people ask for help, they just want to move the problem elsewhere. turns out "have you tried explicitly ordering the operations on the shared mutable state" is not a popular answer, despite being correct. people hate eating their vegetables.
even so, you do win the argument eventually.
the queue keeps exploding, duplicate messages keep breaking everything. the error handling caused a denial of service. things tend to steer towards "tracking the state of a job".
the broker grows from "a new central point of failure that always needs more disks" into "a lightweight service discovery and work assignment service", and instead of "just fire and regret a message" you have "the queue buffers the results of a more expensive database query."
in fact, we could get rid of the queue entirely, and just use a load balancer
- workers connect to a load balancer instead of a broker
- scheduler sends http request for each item of work until told to slow down
- jobs complete in the background and update the database
we could even get rid of the load balancer, too:
- workers connect to the scheduler, and ask for the next job to perform
- scheduler hands out jobs one by one as workers scale up or finish tasks
- like before, scheduler amortises the expensive database query
- but now? you don't need a lock server
in some ways, you've just moved the queue and locking data structures inside the scheduler, they haven't really gone away. it does, however, let us do a variety of nice things:
- like before, the scheduler can be killed and restarted, as all state is kept in the database
- scheduler can even spin up more workers as and when required by the load
- scheduler can offer api for starting jobs, listing jobs, getting status, or watching for changes
- workers can even report back to the scheduler instead of to the database directly, which lets scheduler check worker is trying to update an active job.
the best part? the system doesn't really look much like a pipeline.
now the message pump, the lock, the queue, the worker registration, and the worker assignment, and the recovery processes are all happening in the same place, it feels quite different from slapping messages into a queue and hoping for the best.
it might feel like more work than wrapping shit together in redis, but in some ways there's less complexity going around. as far as the worker is concerned, there's no lock manager, there's no queue, there's no database, just a scheduler api for getting a job and another for progress updates.
it's almost important enough to recap
- keep all the process state in a database table
- use a text field for state, there's a lot of them
- use another table if you want to track changes
- scheduler amortises expensive database queries
- scheduler keeps track of which worker is active on what
- workers poll scheduler to be assigned work
- workers update scheduler, not database, with progress updates / heartbeat messages
- scheduler orders work to be done by most overdue
[that last one is how you handle restarting errored jobs or missing jobs]
intermission almost over, and we're nearly halfway to the (non) goal.
round 2: oh no, more pipeline!
even if you don't go full tilt and implement the all-in-one scheduler, it might feel like the battle is won. with just a message pump, you're handling all sorts of errors automatically, and it didn't take two sprints to write.
there's just one problem: someone has suggested splitting up the thumbnailer into seperate processes. that same someone suggests tying the parts back together with a message broker. the pipeline is back, and the same problems are back too.
not just head of line blocking, but the whole "have you tried reading the logs" school of state management.
it's at this point that you start to ask "why does everyone keep doing it this way?" and it doesn't take too long to realise that the answer is unix. when people think about batch processing, they think about being at the command line, and chaining up steps together in a unix pipeline
and much like a pipeline, you end up writing something different to make things robust.
at some point, the unix pipeline fails, and you're faced with re-running the entire thing again, or repeating that time honoured unix tradition of breaking up the pipeline a series of Makefile steps, using temporary files for each program's input and output.
pipelines aren't really designed for reliability, and in some ways, they're not really designed for task level parallelism either: it's a lot easier to run a 1000 calls to process(item) than it is to run one worker pool for each step in process and wire them up together with queues.
anyway, back to the problem at hand: someone has had the bright idea of connecting things up with a message broker. this time, it's a new coworker, and the argument's a bit different. sure enough, there's the same old story about service discover, or "free error handling for a problem created by using a broker" stuff you heard the last time, but now there's "can scale up and down each worker queue to adjust for load" too
i've already elaborated at length about why "don't join things together with queues" elsewhere, and this post is already quite lengthy in the "don't use a queue to store background work" department, so let's cut to the meat of it.
"There's no real end-to-end management of the task state, unless you have a database. "
The state tracking needs to be back, or messages will fall into the void. The message pump needs to be back, or work won't get restarted after failure. We can add new pipeline-like steps to our improved system, without having to implement the all-in-one scheduler.
"We can add new logic to the message pump, and new states to the database table."
Fine. Keep your queues and your worker pool, but you tell the database when you're done with your step, and the message pump will send off a new task. It's still a little janky, each worker has to know the name of the worker after it to glue everything together, but it'll work fine enough.
The important part is to not be woken up at night because the queue's exploded. Using a message pump keeps that queue length bounded.
That's the real problem with queues: There's never any flow control, or a way for a queue to tell clients to slow down. There's always persistence, so you can't drop messages on the floor. There's never bounds on the size of a queue, either. A queue is a machine for turning rpc calls into full disks.
Look, to keep a distributed system running, you need to build a system that aims towards an equilibrium state, slowing things down when other parts can't keep up. A message broker does the exact opposite, allowing parts to get wildly out of sync until you run out of disk space to keep up appearances.
Once you have backpressure or load shedding atop of a queue, you can't just fire a message away and hope for the best. You need to implement flow control, or error handling. You have to stop pretending the queue is a magic wand.
It's fine to glue things together with queues. You've just got to avoid persistence, and demand backpressure or load shedding, and you won't end up in the same mess over and over and over again. That's the important bit. That and "if you run things, you need to keep track of their states"
Anyway, back to the coworker.
"That's not the only route we can go. Your way is to have six worker pools, one for each part of the thumbnail task, shared across every task. Could we have one worker pool, for the whole thumbnail task, and just run the different steps inside the same worker?"
It's not much of a change from before. Now the worker updates the database, then runs the next step, then updates the database, and runs the next step, until it runs out of steps. Like before, it's way, way easier to run task() 1,000 times, than it is to run six different pools and queues between them, but not every pipeline can benefit from task-level-parallelism.
"... but if we have to do stuff like aggregation, we might have to do something different"
let's stop thinking about the janky-ass setup we have so far. remember the all in one scheduler earlier? for task level parallelism, it's pretty straight forwards, we just run all the thumbnail components in one worker, and use the same setup from before:
- we have one big worker pool
- each worker asks for a task, and which state it's in
- the worker runs that one part of the code, and reports the output
- the scheduler decides if that worker runs the next step
- in other words: the scheduler can decide how to parition the work at runtime
that's a kinda nice thing to have, but it doesn't clear up running things with task-level parallelism. in this case, the thumbnailer can run one step in parallel for each of the output files it generates.
your coworker, now fully baptised in the church of state tracking, decides to step in. they're saying stuff like "it's not a background job system, it's a series of persistent state machines executed in parallel across a shared worker pool", so you know they've got this.
"We just run another task, right? There's one table for thumbnail tasks, right, one per top level thumbnail job. Let's add a second table for subtasks, and a given task can have lots of
active subtasks running."
In the all in one scheduler, it looks something like this
- The worker tells the scheduler to start a number of subtasks
- As the worker asks the scheduler to create a subtask, the scheduler can choose to assign it immediately
- The original worker polls the scheduler to find out when that subtask completes
- The original worker moves onto other work.
- If there's an error, the worker restarts the subtask
Applying it to the redis-and-message-pump prototype, we get:
- There's a redis server providing a lock and a queue
- The database has a table for the thumbnail task and another table for subtasks
- There's a message pump, which scans both tables and tops up the message queue with jobs
- Workers pull out a task or a subtask from the queue, and run it
- The top level tasks end up just starting other subtasks and waiting for most of the time.
Congratulations: You are the proud author of a not-pipeline. Sure enough, there's task level parallelism, and there's even inter-task level parallelism too. There's duplicate handling, end-to-end error handling with state tracking, and it's easy to ask questions like "What process is running?" and "What error did this thing return with." Throw on a web interface, and you can probably move a lot of the support burden to another team.
It might still smell like a pipeline, but there's those all important differences.
- The queue isn't persistent. You can restart the broker at any time, or drop all messages to go back to a known state.
- There's backpressure. The message pump can hold off on writing messages if the queue gets too big.
- There's end-to-end error handling, it doesn't matter if an individual part fails.
- There's even process supervision. The top level task watches over the smaller ones. Errors in subprocesses can be handled and managed, too.
- There's task level parallelism. One broken process in one thumbnail doesn't affect other thumbnail tasks.
- There's inter-task level parallelism. One broken substep doesn't impeded other substeps from running to completetion.
- The system is always in recovery. Instead of assuming things work, it constantly looks out for overdue tasks, trying to push things back into equilibrium,
- It's even pretty easy to adapt. Adding new states doesn't require migrations, and the JSONB column gets used to smuggle variables into subtasks. Nice.
Most importantly? This system is the winner of the "least likely to wake you up at night" award, handed out by a select set of judges. Not bad for "a lock and and a queue in redis to amortise an expensive postgres query"
Not that anyone else notices. The problem with building robust systems is that it's a thankless task that everyone eventually takes for granted. Give it a few months, the new hires will be exclaiming "Couldn't we just use a message broker. It's so much easier than this mess of queue and redis and database."
I blame unix, personally.
finale: the product
your coworker has been nerd sniped. they went off and finished the all-in-one scheduler for fun. it turned out to be a little less code than the redis duct tape sandwich in production. they've made some changes
there's a process table and a 'procedure' table, some of the other names have changed too. they renamed the scheduler to "operator". it's a kubernetes term for a daemon that watches a database and runs set actions, very similarly to how an operator works. in some ways, your coworker has written a toolkit for writing a scheduler in.
they've also started going on about "microsoft orleans" and "entity component systems" and "the blackboard pattern." you even heard "tuplespace" once. they're all "here's a database that tracks state, persists objects, and here's a series of workers that collaborative update this database to achieve tasks in a distributed or parallel or concurrent style" looking things, and your coworker really, really wants to keep talking about turning the rather fancy state machines they can build.
well, at least it's not a pipeline. you hate pipelines, remember?
postscript
you might wonder and ask me "is this software something you want to write", and the answer is "i have already written this software ten times over, in slightly different forms, all in different jobs"
in one job, we had a web api for all the little tasks associated with a background process, and we'd even coded a few buttons in. i suggested to coworker that it might be easier to just return a list of actions each thingy could do, over hardcoding each and every one. we even started doing things like returning human facing values in api. a "state_en" along with a "state" with things like "Process is running for N hours" inside.
in another job, the big database of state got copied around from place to place. we ended up using version numbers instead of timestamps. yep, that's right, there's still more timestamp content. the problem with timestamps is that you can't tell if you've missed a version, there's no easy way to handle clock skew, and you don't really know how many times a thing has changed, either. a version number handles all these things, but you'll still need a timestamp to know when a version changed, and you won't care so much about clock skew, then.
i even got to write my own rpc toolkit one of the times. as much as i lament having to rebuild the same internal product in each and every job, i'm not sure i'd have learned so much about distributed systems without it. that, and i'd probably be out of a job if people could actually build batch processing systems as easily as they build websites.
thankfully, this post will change nothing
