Not correct, but it's very easy to think timestamps will solve this. Timestamps aren't good because system times aren't synced across different computers precisely. Meaning if Producer A creates the first event, and Producer B creates a second event 50ms after (imagine a single row gets updated very quickly twice), but the system time on Producer A is 100ms ahead of Producer B and the event from Producer B gets to the consumers first (variable network latency), the event from Producer A will look like the latest event from a timestamp perspective and overwrite the Producer B event.
One way to solve is it to use not use timestamps, but use a monotonically increasing version number associated with a row that gets updated for every event/update or whatever and is sent along with the event message payload. The book, Designing data intensive systems, goes into this problem a whole lot. Recommend it to anyone discussing architecture. Issues like this will seem obvious to you after reading
Sequence numbering is how FIX does it. And I think its quite neat that it does this at a _protocol_ level. This means that a FIX client/engine will typically take care of sequence numbering, out-of-order detection (can happen during re-send requests), buffering any ahead-of-time messages, requesting gapfills, etc. It will only present your application layer code with in-order messages.
I'm not aware of any universal pattern for dealing with poison pill messages. Completeness detection and dealing with messages that crash your system are 2 separate concerns.
The poison pill case is a business logic thing on either side of the queue. You can either do validation up-front before sticking requests into the queue, or after the fact when processing in batches. Either way, you will ultimately need to be able to handle problems on both sides of the fence. Something that doesn't look like "poison" on the way into the queue could become pretty nasty with certain emergent state as events are processed on the other side.
[0]: https://lmax-exchange.github.io/disruptor/disruptor.html
They can be. That's what Google Spanner does, using GPS and atomic clocks. It's not hard or expensive. An atomic clock + GPS will set you back under $1000 https://www.ebay.com/itm/174750548607 for one in a nice box, $200 for a PCB https://www.ebay.com/itm/353611628534. Apparently it gets you to with 1e-11, which is about 10 pico seconds (I think).
But that doesn't solve the problem. Lets say you have two event producers, and both produce two events at times t and t+1. Once they both arrive, it's trivial to process then in the right order. Your problem is there is an unreliable network connecting you to these producers. Those atomic clocks can guarantee those events t and t+1 are distinguishable events if they are just 10ps apart, but how long do you wait for t to arrive before you decide to process the event at t+1 because there was no event at t. I can absolutely guarantee you whatever time you decide is reasonable, the universe will at some point screw you over and drop the event occurring at on your doorstep just after you decided to process t1.
Your issue isn't that it's difficult, it's is you are living in a state of sin if you believe the problem solvable given the premises.
PS: Google Spanner doesn't attempt to do the impossible. In Spanners case there is a single event producer, and Spanner is "merely" trying to record the event consistently across multiple nodes. If there are multiple event producers then it will serialise them in some order, but if there are two disconnected, independent Spanners out there processing the same events from the same producers Google is not claiming there would decide on the same order. That would need a God more powerful than Google.
It is a concept that is known as vector clock. I suggest take a look at vector and Lamport clocks (named after Leslie Lamport), very useful in distributed systems using messages.
Disclaimer: I'm the tech lead of the temporal.io open source project and the CEO of the affiliated company.
If you keep your queueing system and business process as separate layers with queueing system serving only as a means of transporting business events then you can make it all to work correctly.
Think in terms of IP protocol (as in TCP/IP). It is unsuitable for transmitting financial transactions. Yet, financial transactions can be made to work on top of it if you separate the layers and treat IP only as a component mechanism of getting data from A to B.
The issue is that 99.9% of developers use queues directly in their business applications.
Using the TypeScript SDK, you can catch that exception here: https://github.com/temporalio/samples-typescript/blob/9d9108...
A queue should NEVER drop messages - otherwise it's a shit queue. Or you have a bug in your application code that needs to be fixed.
Poison messages are DEFINITELY A SMELL. This means you essentially have a broken interface contract. The code that is adding messages is expecting one thing -- code that is processing messages is expecting a different thing. It needs to be fixed by clearly documenting your message queue expectations and fixing your code. Most likely you need to add clear expectations for the lifetime of a message.
In a field where precision is absolutely necessary, it's unfortunate to use the term queue to describe something that is not a queue.
It's common to have windows in time where two or more sides may not have agreed what happened before they lose communication. Most of the problem can be solved by idempotency, so when the peer retries, the receiver understands it is looking at a duplicate transaction and can discard it indicating that it succeeded.
This is fundamentally equivalent to database ACID constraints, and the other modes described are great in the same way that if you're able to relax some of the ACID constraints in your code (say, by not being SERIALIZABLE), you get in return nice things (like reads never deadlocking).
If you can't know if message N will change the outcome of processing message N+M, then you have to resolve that before you can proceed - as surely as a serializable database will wait for the outcome of transaction N before being able to proceed with N+M.
There are certainly fields with lower correctness requirements where inconsistent data is not such a big deal though.
Yup. If you want to be sure, you need to persist which message for each entity X you have already processed and ignore older ones. And also make sure you handle race conditions where both messages are handled almost at the same time at two different consumers, by using a lock in a db or so. Which both are annoying, ideally I could just process messages without any care.
Spent all day trying to architecture this for a new queue where ordering matters but we need lots and lots of consumeres working at the same time prefetching messages. My case is actually a bit similar to the one in the article about a "stream of vehicle positions". I only really care about the latest one. Problem is it's hard to know which is the latest one without having a db and check if I've already processed something more recent. Any other ideas on how to efficiently solve this? As in, strategies to handle messages arriving out of order, so I can avoid that as a requirement.
The state of the vehicle is kept in memory for some time.
Sidekiq does not guarantee ordering within a queue; that’s a terrible, very expensive guarantee. Developers don’t want total ordering of jobs within a queue, they want to know that Job A will fully execute before Job B. There might be 1000 other jobs in the queue that are completely independent of that ordering but we’ve screwed ourselves by forcing total queue ordering. Instead Sidekiq Pro provides a workflow API, Sidekiq::Batch, which allows the developer to author higher-level workflows for Job A -> Job B which provides the ordering guarantee.
For poison pills, we detect jobs which were running when a Sidekiq process died. If this happens multiple times, the job will be sent to the dead letter queue so the developer can deal with them manually. If they were part of a Batch, the workflow will stall until the developer fixes the issue and executes the job manually to resume the workflow.
I built a system where
1. Sensor events were picked up by a ZWave device connected to Samsung SmartThings
2. SmartThings would call a AWS lambda function I wrote (SmartThings lives in AWS so this is efficient)
3. My lambda function posts an event to an SQS queue
4. My home server takes the event off SQS and posts it to RabbitMQ
5. A queue listener takes events from RabbitMQ and takes an action
So long as I was using ordered SQS queues I would sometimes get a 5 second delay to turn on a light. When I turned off ordering the latency was perceptible but didn't make me want to jam the button multiple times.If you hit a poison message, block just that smaller stream, not the aggregated larger stream. Once you fix the problem, reprocess the entire small stream starting from the poison event, or the next event after that. The "entire" stream here might be just a handful of events.
Greg Young's Event Store (https://www.eventstore.com/) works this way (there's a $by_category projection that produces the aggregated streams).
Caveat: I haven't actually implemented this mechanism because I've been able to get away without it, because we have some legacy event streams that aren't split up in this way, and because nobody else has yet added support for it to the tools I'm using.
[0] https://www.rabbitmq.com/quorum-queues.html#poison-message-h...
That’s a toy example; your system corruption could get much worse due to a poison message depending on the application.
It's kind of a shitty solution to the problem but there you have it, maybe it's the best that can be done. Rollout code changes gradually in individual regions and make sure a bug doesn't bring everything down.
For more common use cases it is possible to provide the minimum guarantees required to reliably reconstruct a domain object throughout a distribute system whilst still providing a fuck-ton of scope for concurrency, batching and high throughput through better partition key choice, informed by:
A: The maximum ordering guarantees that can be provided by the data source
B: the minimum ordering guarantees required to reconstruct a domain object
If you have inter-dependence between messages, you need to have a message id scheme that shows the interdependence. For example, a hierarchical message order can have a .dot delimited scheme. If a poison message is a parent, the subsequent message can go to the fault-queue as well.
The consumer, of course, must be aware of & ready to handle multiple deliveries. (E.g., you keep a "log position" which represents where you've processed the incoming messages up to.) But if you need ordering+not losing messages, it's the mode you need. (Since "at most once" implies "sometimes losing messages".)