1 year ago

#378091

test-img

grusome

Google Pub-sub flow-control with max_messages is not working as expected

I am using google pubsub to listen to the events that are published. The publisher is not under my control and I dont know at what rate they are published. I subscribe to that pubsub with the below settings. I am using python library.

flow_control = pubsub_v1.types.FlowControl(
    max_messages=1)
scheduler = ThreadScheduler(
    ThreadPoolExecutor(
        1, f"ThreadPoolExecutor-customexecutor"
    )
)

while True:
    print("i am here ****")
    streaming_pull_request = current_subscriber.subscribe(
        subscription_path,
        callback=onmessage,
        flow_control=flow_control,
        scheduler=scheduler,
    )
    try:
        streaming_pull_request.result()
    except Exception as e:
        print("exception found")
        print(e)
        streaming_pull_request.cancel()
     


def onmessage(message):
    print(
        f"{threading.current_thread().name} received the message {message.message_id}")
    time.sleep(0.5)
    message.ack()
    print(f" {threading.current_thread().name} message {message.message_id} acked")

But even with the above settings the unacked messages count is increasing. Can someone point me what is going wrong.

I am expecting the rate should be constant at 1 message per time duration.

google-cloud-platform

google-cloud-pubsub

0 Answers

Your Answer

Accepted video resources