1 year ago
#325019
Wollie
@Incoming not running multithreaded on Quarkus application connecting to RabbitMQ
Some background
We are running a fairly simple application that handles subscriptions and are running into the limits of the external service. The solution is that we are introducing a queue and throttle the consumers of this queue to optimize the throughput.
For this we are using a Quarkus
(2.7.5.Final) implementation and using quarkus-smallrye-reactive-messaging-rabbitmq
connector provided by quarkus.io
Simplified implementation
rabbitmq-host=localhost
rabbitmq-port=5672
rabbitmq-username=guest
rabbitmq-password=guest
mp.messaging.incoming.subscriptions-in.connector=smallrye-rabbitmq
mp.messaging.incoming.subscriptions-in.queue.name=subscriptions
@Incoming("subscriptions-in")
public CompletionStage<Void> consume(Message<JsonObject> message) {
try {
Thread.sleep(1000);
return message.ack();
} catch (Exception e) {
return message.nack(e);
}
}
The problem
This only uses one worker thread and therefore the jobs are handles 1 by 1, ideally this application picks up as many jobs as there are worker threads available (in parallel), how can I make this work?
I tried
@Incoming("subscriptions-in")
@Blocking
Didn't change anything
@Incoming("subscriptions-in")
@NonBlocking
Didn't change anything
@Incoming("subscriptions-in")
@Blocking(ordered = false)
This made it split of into different worker threads, but ?detached? the job from the queue, so none of the messages got ack'd or nack'd
@Incoming("subscriptions-in-1")
..
@Incoming("subscriptions-in-2")
..
@Incoming("subscriptions-in-3")
These different channels seem to all work on the same worker thread (which is picked on startup)
The only way I currently see is to slim down the application and run one consumer thread each and just run 50 in parallel in kubernetes. This feels wrong and I can't believe there is no way to multithread at least some of the consuming.
Question
I am hopeful that I am missing a simple solution or am missing the concept of this RabbitMQ connector.
- Is there anyway to get the @Incoming consumption to run in parallel?
- Or is there a way in this Java implementation to increase the prefetch count? If so I can multithread them myself
java
rabbitmq
quarkus
smallrye-reactive-messaging
0 Answers
Your Answer