1 year ago

#308638

test-img

justaq

Flink Statefun netty DisconnectedException

I am trying to run a flink statefun (version 3.2.0) application on my local machine using docker, with a single task manager and a single job manager. The application is a pipeline of multiple services that communicate to each other via sending messages through Kafka to HTTP function endpoints using aiohttp with gunicorn. At the beginning of the pipeline is a service that pulls results from Amazon S3 and sends them to the rest of pipeline, at a rate of about 8000-10000 requests per minute.

When I run it, it at first runs successfully, but looking at the docker logs for the flink worker (task manager) container, I repeatedly see these warnings:

2022-03-18 17:35:43,315 WARN  org.apache.flink.statefun.flink.core.nettyclient.NettyRequest [] - Exception caught while trying to deliver a message: (attempt #0)ToFunctionRequestSummary(address=Address(analytics-transformer, dispatch, 77ce0dcb-347c-4c03-bc32-f7ebb734b930), batchSize=1, totalSizeInBytes=1323, numberOfStates=0)

org.apache.flink.statefun.flink.core.nettyclient.exceptions.DisconnectedException: Disconnected
18:25:27,594 WARN  org.apache.flink.statefun.flink.core.nettyclient.NettyRequest [] - Exception caught while trying to deliver a message: (attempt #0)ToFunctionRequestSummary(address=Address(web, statefun, 82936819-b3d9-4a24-b4eb-81a189d6306c), batchSize=1, totalSizeInBytes=1434, numberOfStates=0)

org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer

Eventually I see this warning as well:

2022-03-18 18:06:44,848 WARN  org.apache.flink.statefun.flink.core.nettyclient.NettyRequest [] - Exception caught while trying to deliver a message: (attempt #0)ToFunctionRequestSummary(address=Address(web, statefun, f004409f-77be-433c-8ab1-ae5f9dad605c), batchSize=1, totalSizeInBytes=1172, numberOfStates=0)
java.lang.IllegalStateException: FixedChannelPool was closed

And after some time the flink master fails due to request timeout and has to restart:

org.apache.flink.statefun.flink.core.functions.StatefulFunctionInvocationException: An error occurred when attempting to invoke function FunctionType(analytics-transformer, dispatch).
    at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:50) ~[statefun-flink-core.jar:3.2.0]
    at org.apache.flink.statefun.flink.core.functions.ReusableContext.apply(ReusableContext.java:74) ~[statefun-flink-core.jar:3.2.0]
    at org.apache.flink.statefun.flink.core.functions.LocalFunctionGroup.processNextEnvelope(LocalFunctionGroup.java:60) ~[statefun-flink-core.jar:3.2.0]
    at org.apache.flink.statefun.flink.core.functions.Reductions.processEnvelopes(Reductions.java:164) ~[statefun-flink-core.jar:3.2.0]
    at org.apache.flink.statefun.flink.core.functions.AsyncSink.drainOnOperatorThread(AsyncSink.java:119) ~[statefun-flink-core.jar:3.2.0]
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:338) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:324) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:201) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) ~[flink-dist_2.12-1.14.3.jar:1.14.3]
    at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.IllegalStateException: Failure forwarding a message to a remote function Address(analytics-transformer, dispatch, 77d07eb3-f499-4265-a456-b0f75d738830)
    at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.onAsyncResult(RequestReplyFunction.java:170) ~[statefun-flink-core.jar:3.2.0]
    at org.apache.flink.statefun.flink.core.reqreply.RequestReplyFunction.invoke(RequestReplyFunction.java:124) ~[statefun-flink-core.jar:3.2.0]
    at org.apache.flink.statefun.flink.core.functions.StatefulFunction.receive(StatefulFunction.java:48) ~[statefun-flink-core.jar:3.2.0]
    ... 16 more
Caused by: org.apache.flink.statefun.flink.core.nettyclient.exceptions.RequestTimeoutException

I am guessing this is a load issue due to the number of incoming requests, where the worker is unable to handle them all. This is what I have configured for each of the HTTP function endpoints in the module.yaml:

spec:
  functions: <function>
  urlPathTemplate: <url>
  transport:
    type: io.statefun.transports.v1/async
    call: 15min
    connect: 15min
    pool_ttl: 45s
    pool_size: 1024
    payload_max_bytes: 33554432

I find that decreasing the pool size to a small value like 20 reduces the number of warnings, but then later on I see this warning a lot:

2022-03-18 15:44:52,566 WARN  org.apache.flink.statefun.flink.core.nettyclient.NettyRequest [] - Exception caught while trying to deliver a message: (attempt #0)ToFunctionRequestSummary(address=Address(analytics-transformer, dispatch, 7facef98-b659-442e-846f-4e4d45559555), batchSize=1, totalSizeInBytes=739, numberOfStates=0)
org.apache.flink.shaded.netty4.io.netty.channel.pool.FixedChannelPool$AcquireTimeoutException: Acquire operation took longer then configured maximum time

which I'm assuming means that the connections were not able to form before the timeout due to the small pool size compared to the large number of requests.

Here is the flink-conf.yaml:

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This file is the base for the Apache Flink configuration

statefun.flink-job-name: Statefun Application

#==============================================================================
# Configurations strictly required by Stateful Functions. Do not change.
#==============================================================================

classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf

#==============================================================================
# Fault tolerance, checkpointing and recovery.
# For more related configuration options, please see: https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#fault-tolerance
#==============================================================================

# Uncomment the below to enable checkpointing for your application
#execution.checkpointing.mode: EXACTLY_ONCE
#execution.checkpointing.interval: 5sec

restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647
restart-strategy.fixed-delay.delay: 1sec

state.backend.local-recovery: true
state.backend: rocksdb
state.backend.rocksdb.timer-service.factory: ROCKSDB
state.backend.rocksdb.localdir: /local/state/rocksdb
state.backend.rocksdb.memory.partitioned-index-filters: true
state.backend.rocksdb.checkpoint.transfer.thread.num: 8
state.backend.rocksdb.thread.num: 4
state.checkpoints.dir: file:///checkpoint-dir
state.backend.incremental: true

taskmanager.state.local.root-dirs: file:///local/state/recovery

#==============================================================================
# Recommended memory configurations. Users may change according to their needs.
#==============================================================================

jobmanager.memory.process.size: 1g
taskmanager.memory.process.size: 4g

#==============================================================================
# Support easy upgrades as the module.yaml file updates
#==============================================================================

pipeline.auto-generate-uids: false
execution.savepoint.ignore-unclaimed-state: true
 
statefun.async.max-per-task: 163840
execution.checkpointing.mode: EXACTLY_ONCE
execution.checkpointing.interval: 5sec

I have also tried increasing the number of gunicorn workers and setting taskmanager.network.netty.server.numThreads to 100 in the flink-conf.yaml, but this does not seem to fix the issue.

docker

apache-flink

netty

flink-statefun

0 Answers

Your Answer

Accepted video resources