1 year ago
#360744
Joel
MassTransit batch consumer stalling, not receiving messages
We've got an issue where from time to time, a consumers for a specific message will stall and stop receiving/processing messages. It's not always the same consumer type, but I'm pretty sure it's always batch consumers for subscriptions on topics (Azure Service Bus) that this happens to. Restarting the Deployment in Kubernetes often solves the issue and the consumers are "unstuck" and start consuming messages again.
At the moment I've got one service that consumes messages for a short while, but stalls pretty soon again after a restart (~10-15 minutes). I'm not sure how to debug this issue, or what I can do to figure out what's going on. Is there anyone out there that knows something about this, or what can be done to investigate it?
I've got memory dumps of a process it has happened to, but I don't know enough to know if I can tell from that what the issue is.
Setup:
- .NET 6
- MassTransit 7.3.1 (we've seen this on previous versions as well)
This is a compact repro of what our setup looks like:
public async Task ExampleConfiguration<TConsumer, TEvent>(
IServiceCollection services,
string connectionString,
string subscriptionName)
where TConsumer : class, IConsumer
where TEvent : class, IEvent
{
services.AddMassTransit(busConfigurator =>
{
busConfigurator.AddConsumer<TConsumer>();
busConfigurator.UsingAzureServiceBus((context, serviceBusBusFactoryConfigurator) =>
{
serviceBusBusFactoryConfigurator.Host(connectionString);
serviceBusBusFactoryConfigurator.SubscriptionEndpoint<TEvent>(
subscriptionName,
receiveEndpointConfigurator =>
{
receiveEndpointConfigurator.PublishFaults = false;
receiveEndpointConfigurator.MaxAutoRenewDuration = TimeSpan.FromMinutes(30);
receiveEndpointConfigurator.UseMessageRetry(r => r.Intervals(500, 2000));
receiveEndpointConfigurator.PrefetchCount = 1100;
receiveEndpointConfigurator.ConfigureConsumer<TConsumer>(context, consumerConfigurator =>
{
consumerConfigurator.Options<BatchOptions>(batchOptions =>
{
batchOptions.MessageLimit = 100;
batchOptions.TimeLimit = TimeSpan.FromSeconds(5);
batchOptions.ConcurrencyLimit = 10;
});
});
});
});
});
// Add the hosted service for MassTransit that is responsible for starting and stopping the bus.
services.AddMassTransitHostedService(true);
}
We're exposing Prometheus metrics, so we can see that mt_receive_total
is going up after a restart, but will drop down to zero after a while. A couple of ConsumerCancelledException
s are recorded as well.
sum(rate(mt_consume_fault_total{app="facebookservice"}[5m])) by (app, pod, exception_type)
sum(rate(mt_receive_total{app="facebookservice",endpoint_address="facebookservice-item-inserted"}[5m])) by (app, pod)
sum(rate(mt_receive_in_progress{app="facebookservice",endpoint_address="facebookservice-item-inserted"}[5m])) by (app, pod)
Interesting to note that only one of the pod instances seem to have a value for mt_receive_in_progress
🤔
azureservicebus
masstransit
azure-servicebus-topics
azure-servicebus-subscriptions
0 Answers
Your Answer