Until you hit scale, the database you're already using is fine. If that's Postgres, look up SELECT FOR UPDATE SKIP LOCKED. The major convenience here - aside from operational simplicity - is transactional task enqueueing.
For hosted, SQS or Google Cloud Tasks. Google's approach is push-based (as opposed to pull-based) and is far and above easier to use than any other queueing system.
I'm probably biased, but in the number of cases where I had to work with Kafka, I'd really prefer to simply have an SQL database. In all of those cases I struggled to understand why developers wanted Kafka, what problem was it solving better than the database they already had, and for the life of me, there just wasn't one.
I'm not saying that configuring and deploying databases is easy, but it's probably going to happen anyway. Deploying and configuring Kafka is a huge headache: bad documentation, no testing tools, no way to really understand performance in the light of durability guarantees (which are also obscured by the poor quality documentation). It's just an honestly bad product (from the infra perspective): poor UX, poor design... and worst of all, it's kind of useless from the developer standpoint. Not 100% useless, but whatever it offers can be replaced by other existing tools with a tiny bit of work.
Cloud Tasks is one of the most undervalued tools in the GCP ecosystem, but mostly because PubSub gets all the attention. I've been using it since it was baked in the AppEngine and love it for 1-to-1 queues or delayed job handling.
Back when I was all-in on GCP, I had a queue.yaml file which the appengine deployer syncs to cloud tasks (creates/disabled queues, changes the rate limits, concurrency, etc).
Now that I'm mostly on AWS... I still use the same system. I have a thin little project that deploys to GAE and has a queue.yaml file. It sets up the cloud tasks queues. They hit my EB endpoints just like they used to hit my GAE endpoints.
As a bonus, my thin little GAE app also has a cron.yaml that it proxies to my AWS app. Appengine's cron is also better than Amazon's overcomplicated eventbridge system.
Terraform is definitely for the best. Any AI tool should be able to spit it out well enough, but if you do rawdog it in the console or gcloud you might be able to export the terraform with:
How could I solve the problem of in-order processing based on a key using skip locked? Basically all records having the key to be processed one after other.
Work jobs in the order they were submitted within a partition key. This selects the next partition key that isn't locked. You could make it smarter to select a subset of the jobs checking for partition keys where all of the rows are still unlocked.
SELECT
*
FROM jobs
WHERE partition_key = (
SELECT partition_key
FROM jobs
ORDER BY partition_key
LIMIT 1
SKIP LOCKED
)
ORDER BY submitted_at
FOR UPDATE SKIP LOCKED;
These weren't his last words, but Jim Gray had this to say
about this so-called "antipattern".
Queues Are Databases (1995)
Message-oriented-middleware (MOM) has become an small industry.
MOM offers queued transaction processing as an advance over pure
client-server transaction processing. This note makes four points:
Queued transaction processing is less general than direct transaction
processing. Queued systems are built on top of direct systems.
You cannot build a direct system atop a queued system.
It is difficult to build direct, conversational, or distributed
transactions atop a queued system. Queues are interesting databases
with interesting concurrency control. It is best to build these
mechanisms into a standard database system so other applications
can use these interesting features. Queue systems need DBMS functionality.
Queues need security, configuration, performance monitoring, recovery,
and reorganization utilities. Database systems already have these features.
A full-function MOM system duplicates these database features.
Queue managers are simple TP-monitors managing server pools driven by queues.
Database systems are encompassing many server pool features
as they evolve to TP-lite systems.
as with everything, it depends on how you're processing the queue.
eg we built a system at my last company to process 150 million objects / hour, and we modeled this using a postgres-backed queue with multiple processes pulling from the queue.
we observed that, whenever there were a lot of locked rows (ie lots of work being done), Postgres would correctly SKIP these rows, but having to iterate over and skip that many locked rows did have a noticeable impact on CPU utilization.
we worked around this by partitioning the queue, indexing on partition, and assigning each worker process a partition to pull from upon startup. this reduced the # of locked rows that postgres would have to skip over because our queries would contain a `WHERE partition=X` clause.
i had some great graphs on how long `SELECT FOR UPDATE ... SKIP LOCKED` takes as the number of locked rows in the queue increases, and how this partiton work around reduced the time to execute the SKIP LOCKED query, but unfortunately they are in the hands of my previous employer :(
I did sth similar. Designed and built for 10 million objects / hour. Picked up by workers in batches of 1k. Benchmark peaked above 200 million objects / hour with PG in a small VM. Fast forward two years, the curse of success strikes, and we have a much higher load than designed for.
Redesigned to create batches on the fly and then `SELECT FOR UPDATE batch SKIP LOCKED LIMIT 1` instead of `SELECT FOR UPDATE object SKIP LOCKED LIMIT 1000`. And just like that, 1000x reduction in load. Postgres is awesome.
----
The application is for processing updates to objects. Using a dedicated task queue for this is guaranteed to be worse. The objects are picked straight from their tables, based on the values of a few columns. Using a task queue would require reading these tables anyway, but then writing them out to the queue, and then invalidating / dropping the queue should any of the objects' properties update. FOR UPDATE SKIP LOCKED allows simply reading from the table ... and that's it.
smart. although, i guess that pushes the locking from selecting queue entries to making sure that objects are placed into exactly 1 batch. curious if you ran into any bottlenecks there?
> ... making sure that objects are placed into exactly 1 batch. curious if you ran into any bottlenecks there?
A single application-layer thread doing batches of batch creation (heh). Not instant, but fast enough. I did have to add 'batchmaker is done' onto the 'no batch left' condition for worker exit.
> ... that pushes the locking from selecting queue entries to ...
To selecting batches. A batch is immutable once created. If work has to be restarted to handle new/updated objects, all batches are wiped and the batchmaker (and workers, anyway) start over.
40,000 per second is waaaaay beyond where you should use a dedicated queuing solution. Even dedicated queues require tuning to handle that kind of throughput.
(or you can just use SQS or google cloud tasks, which work out of the box)
I hit 60k per second in 2020 on a 2-core, 100GB SSD installation of PG on GCP. And "tuning" PG is way easier than any dedicated queueing system I've seen. Does there exist a dedicated queueing system with an equivalent to EXPLAIN (ANALYZE)?
It's possible the person you're replying to wasn't using replication, so it's entirely different. Those folks also used "synchronous_commit is set to remote_write" which will have a performance impact
This is correct. My use-case was safe with eventual consistency, so I could've even used `synchronous_commit=off`, but I kept it to 'local' to get a baseline. Was happy with the 60k number I got, so there was no need for 'off'.
But I think the biggest reason I hit that number so easily was the ridiculous ease of batching. Starting with a query to select one task at a time, "converting" it select multiple tasks instead is ... a change of a single integer literal. FOR UPDATE SKIP LOCKED works the same regardless of whether your LIMIT is 1 or 1000.
I worked at a shop that had to process about 6M RPS for 5 seconds at a time, once a minute or so. That looked a lot like a boatload of Python background threads queueing work in memory then flushing them out into Cassandra. That was a fun little project.
The main complaint seems to be that it's not optimal...but then, the frame of the discussion was "Until you hit scale", so IMHO convenience and simpler infra trumps having the absolute most efficient tool at that stage.
SQS, Azure Service Bus, RabbitMQ, ActiveMQ, QPID, etc… any message broker that provides the competing consumer pattern. though I’ll say having managed many of these message brokers myself, it’s definitely better paying for a managed service. They’re a nightmare when you start running into problems.
If you're using .NET I have to plug
https://particular.net/
Nservicebus from particular.net. It's great at abstracting away the underlying message broker and provides an opinionated way to build a distributed system.
.Net SRE here, please no. Take 5 minutes to learn your messaging bus SDK and messaging system instead of yoloing some library that you don't understand. It's really not that hard.
Also, ServiceControl, ServiceInsight and ServicePulse are inventions of developers who are clearly WinAdmins who don't know what modern DevOps is. If you want to use that, you are bad and should feel bad.
NATS does much more than pub/sub (with any fan out), it also does queueing in Core NATS (no persistence) and streaming including queuing over streams (with ack/nack/term). Don't be fooled into thinking NATS doesn't do queueing because it doesn't have a "Q" in the name :).
Actually, I used RabbitMQ static routes to feed per-cpu-core single thread bound consumers that restart their process every k transactions, or watchdog process timeout after w seconds. This prevents cross contamination of memory spaces, and slow fragmentation when the parsers get hammered hard.
RabbitMQ/Erlang on OTP is probably one of the most solid solutions I've deployed over the years (low service cycle demands.) Highly recommended with the AMQP SSL credential certs, and GUID approach to application layer load-balancing. Cut our operational costs around 37 times lower than traditional load-balancer approaches. =3
Agree. RabbitMQ is a Swiss Army knife that has a lot of available patterns, scales fairly well, and is very robust. If you don’t know what to choose, start with Rabbit. It will let you figure out which patterns you need and you probably won’t scale out of it. Pay someone to host it.
On the other hand, if you know what you need to do and it’s supported by it, NATS is IME the way to go (particularly JetStream).
We use RabbitMQ, and workers simply pull whatever is next in the queue after they finish processing their previous jobs. I’ve never witnessed jobs piling up for a single consumer.
Pulsar vs Kafka was a significant lesson to me: The "best" technology isn't always the winner.
I put it in quotes because I'm a massive fan of Pulsar and addressing the shortcomings of Kafka. However, with regards to some choices at a former workplace: The broader existing support/integration ecosystem along with Confluent's commercial capabilities won out with regards to technology choices and I was forced to acquiesce.
A bit like Betamax vs VHS, albeit that one pre-dates me significantly.
and it then becomes a self-perpetuating cycle because the network effect and usage of one product makes it become exponentially more robust & mature than the other. I've heard that Pulsar is buggy basically, and I wouldn't be surprised because it doesn't have thousands of organizations using it in all sorts of ways. It's not its fault too much really
Even StreamNative is effectively abandoning Pulsar and going all-in on the Kafka protocol. I can see the theoretical benefits of Pulsar, but it just doesn’t seem to have the ecosystem momentum to compete with the Kafka juggernaut.
It sure looks like they’re going quite a ways beyond Kafka-on-Pulsar - the Ursa/Oxia work they’re focused on right now replaces BookKeeper and seems very firmly Kafka-oriented. Or does Ursa also work with the Pulsar protocol?
Kafka with a different partitioner would have worked fine. The problem was that the web workers loaded up the same partition. Randomising the chosen partition would have removed, or at least alleviated, the stated problem.
Has anyone used Redpanda? I stumbled upon it when researching streaming, it claims to be Kafka compatible but higher performance and easier to manage. Haven't tried it myself but interested if anyone else has experience.
Plenty of people choose Redpanda because it’s the easiest getting started experience for kafka. There is a single binary for the full broker, where I have never seen Apache Kafka as easy to setup. It’s got a great UI as well.
I'm not sure you understood the article. You can have a very low load but each task on your queue takes a while to process, in which case you want fair distribution of work.
The distribution is fair - everything is round-robin, so in the long run each worker receives the same rate of tasks. It's just "lumpy" - sometimes you can get a big batch sent to one worker, then a big batch sent to another worker - but it will all average out.
Especially for low levels of load, that doesn't require that the dispatcher and consumer are written in the same language.