-
-
Notifications
You must be signed in to change notification settings - Fork 9.8k
[Messenger][WIP] Sending messages in batches #62600
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 8.1
Are you sure you want to change the base?
Conversation
59d8706 to
90f61db
Compare
90f61db to
f77ca1b
Compare
Some transports will have a max batch limit (eg. SQS) Either the transport has to make sub batches, or the messageBus should fail when trying to send bigger batches The issue with the transport making sub batches, is that you have the chance that half the batch fails and half succeeds. eg. dispatchBatch(20 messages) - transport sends 10 - transport tries to send 10 again, but fails now half the batch got sent, but we have no clue which ones. IMO, best to block this.
| private string $batchId, | ||
| private int $batchIndex, | ||
| private int $batchSize, | ||
| private ?BatchCollector $collector = null, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
non-nullable?
| public function addPendingSend(string $alias, SenderInterface $sender, Envelope $envelope, int $index): void | ||
| { | ||
| $this->pending[$alias] ??= ['sender' => $sender, 'items' => []]; | ||
| $this->pending[$alias]['items'][] = ['envelope' => $envelope, 'index' => $index]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| $this->pending[$alias]['items'][] = ['envelope' => $envelope, 'index' => $index]; | |
| $this->pending[$alias]['items'][$index] = $envelope; |
?
| * | ||
| * @author Joppe De Cuyper <[email protected]> | ||
| * | ||
| * @internal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
im fine with making it publicly available, for other maybe-implementions
otherwise we can argue whole BatchStamp is internal
|
it might be interesting to update DoctrineTransport as such |
Yeah that was the next thing i was looking at. This interface sounds nice and easy on paper, but already ran into trouble when implementing. Not all transports that support batching, also support returning the ids of each item in the badge. So TransportMessageIdStamp can't be promised depending on the transport used |
it sounds generally reasonable, there's no current contract enforcing it either as well it SHOULD be implemented if possible |
Note: This PR is a proof of concept. It is in no way, shape or form supposed to be a final ready for production fix. This PR is supposed to get the conversation going. Please, provide feedback where needed, as this feature turned out to be more complex than i initially imagined 😀 This is also the reason the code is littered with comments
Batch Dispatch for Symfony Messenger
Allows dispatching multiple messages in a single batch, enabling transports to optimize delivery (e.g., bulk inserts to a queue). This allows you to optimize message sending (Eg. Single SQS request for 10 messages, where you pay per-request)
How it works
$bus->dispatchBatch([new OrderCreated(1), new OrderCreated(2), new OrderCreated(3)]);Step by step:
Notes