1 year ago

#325019

test-img

Wollie

@Incoming not running multithreaded on Quarkus application connecting to RabbitMQ

Some background

We are running a fairly simple application that handles subscriptions and are running into the limits of the external service. The solution is that we are introducing a queue and throttle the consumers of this queue to optimize the throughput.

For this we are using a Quarkus (2.7.5.Final) implementation and using quarkus-smallrye-reactive-messaging-rabbitmq connector provided by quarkus.io

Simplified implementation

rabbitmq-host=localhost
rabbitmq-port=5672
rabbitmq-username=guest
rabbitmq-password=guest

mp.messaging.incoming.subscriptions-in.connector=smallrye-rabbitmq
mp.messaging.incoming.subscriptions-in.queue.name=subscriptions
@Incoming("subscriptions-in")
public CompletionStage<Void> consume(Message<JsonObject> message) {
    try {
        Thread.sleep(1000);
        return message.ack();
    } catch (Exception e) {
        return message.nack(e);
    } 
}

The problem

This only uses one worker thread and therefore the jobs are handles 1 by 1, ideally this application picks up as many jobs as there are worker threads available (in parallel), how can I make this work?

I tried

@Incoming("subscriptions-in")
@Blocking

Didn't change anything

@Incoming("subscriptions-in")
@NonBlocking

Didn't change anything

@Incoming("subscriptions-in")
@Blocking(ordered = false)

This made it split of into different worker threads, but ?detached? the job from the queue, so none of the messages got ack'd or nack'd

@Incoming("subscriptions-in-1")
..
@Incoming("subscriptions-in-2")
..
@Incoming("subscriptions-in-3")

These different channels seem to all work on the same worker thread (which is picked on startup)

The only way I currently see is to slim down the application and run one consumer thread each and just run 50 in parallel in kubernetes. This feels wrong and I can't believe there is no way to multithread at least some of the consuming.

Question

I am hopeful that I am missing a simple solution or am missing the concept of this RabbitMQ connector.

  • Is there anyway to get the @Incoming consumption to run in parallel?
  • Or is there a way in this Java implementation to increase the prefetch count? If so I can multithread them myself

java

rabbitmq

quarkus

smallrye-reactive-messaging

0 Answers

Your Answer

Accepted video resources