1 year ago
#362852
Damianos17
how to change the frequency of reporting metrics in Apache Flink?
Flink by default reports metrics to Influxdb every 10 seconds. After I added a line in the configuration file flink_conf.yaml:
metrics.reporter.rtm.interval: 20 SECONDS
metrics now are reported every 60 seconds, what I may see in Chronograf:
But my goal is to reporting every 20 seconds. Does anyone know how to set it?
Edit: flink-conf.yaml:
1 ################################################################################
2 # Licensed to the Apache Software Foundation (ASF) under one
3 # or more contributor license agreements. See the NOTICE file
4 # distributed with this work for additional information
5 # regarding copyright ownership. The ASF licenses this file
6 # to you under the Apache License, Version 2.0 (the
7 # "License"); you may not use this file except in compliance
8 # with the License. You may obtain a copy of the License at
9 #
10 # http://www.apache.org/licenses/LICENSE-2.0
11 #
12 # Unless required by applicable law or agreed to in writing, software
13 # distributed under the License is distributed on an "AS IS" BASIS,
14 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 # See the License for the specific language governing permissions and
16 # limitations under the License.
17 ################################################################################
18
19
20 #==============================================================================
21 # Common
22 #==============================================================================
23
24 # The external address of the host on which the JobManager runs and can be
25 # reached by the TaskManagers and any clients which want to connect. This setting
26 # is only used in Standalone mode and may be overwritten on the JobManager side
27 # by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable.
28 # In high availability mode, if you use the bin/start-cluster.sh script and setup
29 # the conf/masters file, this will be taken care of automatically. Yarn/Mesos
30 # automatically configure the host name based on the hostname of the node where the
31 # JobManager runs.
32
33 jobmanager.rpc.address: rtmflinkvmd2
34
35 taskmanager.hostname: rtmflinkvmd2
36
37 # The RPC port where the JobManager is reachable.
38
39 jobmanager.rpc.port: 6123
40
41
42 # The heap size for the JobManager JVM
43
44 jobmanager.heap.mb: 1024
45
46
47 # The heap size for the TaskManager JVM
48
49 taskmanager.heap.mb: 4096
50
51
52 # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
53
54 taskmanager.numberOfTaskSlots: 32
55
56 # Specify whether TaskManager memory should be allocated when starting up (true) or when
57 # memory is required in the memory manager (false)
58 # Important Note: For pure streaming setups, we highly recommend to set this value to `false`
59 # as the default state backends currently do not use the managed memory.
60
61 taskmanager.memory.preallocate: false
62
63 # The parallelism used for programs that did not specify and other parallelism.
64
65 parallelism.default: 1
66
67
68 #==============================================================================
69 # Web Frontend
70 #==============================================================================
71
72 # The address under which the web-based runtime monitor listens.
73 #
74 #jobmanager.web.address: 0.0.0.0
75
76 # The port under which the web-based runtime monitor listens.
77 # A value of -1 deactivates the web server.
78
79 # Must be consistent with masters file!
80 rest.port: 8182 # <- changed from default 8081
81
82 # Flag to specify whether job submission is enabled from the web-based
83 # runtime monitor. Uncomment to disable.
84
85 #jobmanager.web.submit.enable: false
86
87 #==============================================================================
88 # HistoryServer
89 #==============================================================================
90
91 # The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)
92
93 # Directory to upload completed jobs to. Add this directory to the list of
94 # monitored directories of the HistoryServer as well (see below).
95 #jobmanager.archive.fs.dir: hdfs:///completed-jobs/
96
97 # The address under which the web-based HistoryServer listens.
98 #historyserver.web.address: 0.0.0.0
99
100 # The port under which the web-based HistoryServer listens.
101 #historyserver.web.port: 8082
102
103 # Comma separated list of directories to monitor for completed jobs.
104 #historyserver.archive.fs.dir: hdfs:///completed-jobs/
105
106 # Interval in milliseconds for refreshing the monitored directories.
107 #historyserver.archive.fs.refresh-interval: 10000
108
109 #==============================================================================
110 # Streaming state checkpointing
111 #==============================================================================
112
113 # The backend that will be used to store operator state checkpoints if
114 # checkpointing is enabled.
115 #
116 # Supported backends: jobmanager, filesystem, rocksdb, <class-name-of-factory>
117 #
118 # state.backend: filesystem
119
120
121 # Directory for storing checkpoints in a Flink-supported filesystem
122 # Note: State backend must be accessible from the JobManager and all TaskManagers.
123 # Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems,
124 # (or any local file system under Windows), or "S3://" for S3 file system.
125 #
126 # state.backend.fs.checkpointdir: hdfs://namenode-host:port/flink-checkpoints
127 state.backend: filesystem
128 # Jest jawnie wypisany namenode, bo flink w tym przypadku nie zagląda do hdfs-site.xml
129 state.checkpoints.dir: hdfs:///flink/dev_plk/checkpoints
130 state.savepoints.dir: hdfs:///flink/dev_plk/savepoints
131 security.kerberos.login.keytab: /opt/esp/flink-1.9.3/conf/esp.keytab
132 security.kerberos.login.principal: ###
133
134 #Below are base settings for rocksdb metrics, that can be used for grafana dashboards
135 state.backend.rocksdb.metrics.estimate-num-keys: true
136 state.backend.rocksdb.metrics.estimate-live-data-size: true
137 state.backend.rocksdb.metrics.cur-size-all-mem-tables: true
138 state.backend.rocksdb.metrics.size-all-mem-tables: true
139
140 #For frequent writes increase the value as needed. Currently RocksDB settings can only be changed per Flink cluster
141 state.backend.rocksdb.timer-service.factory: ROCKSDB
142 state.backend.rocksdb.writebuffer.size: 1024m
143
144 #==============================================================================
145 # Advanced
146 #==============================================================================
147
148 # The number of buffers for the network stack.
149 #
150 # taskmanager.network.numberOfBuffers: 2048
151
152
153 # Directories for temporary files.
154 #
155 # Add a delimited list for multiple directories, using the system directory
156 # delimiter (colon ':' on unix) or a comma, e.g.:
157 # /data1/tmp:/data2/tmp:/data3/tmp
158 #
159 # Note: Each directory entry is read from and written to by a different I/O
160 # thread. You can include the same directory multiple times in order to create
161 # multiple I/O threads against that directory. This is for example relevant for
162 # high-throughput RAIDs.
163 #
164 # If not specified, the system-specific Java temporary directory (java.io.tmpdir
165 # property) is taken.
166 #
167 io.tmp.dirs: /opt/esp/flink-1.9.3/tmp
168
169 blob.storage.directory: /opt/esp/flink-1.9.3/blob
170
171
172 # Path to the Hadoop configuration directory.
173 #
174 # This configuration is used when writing into HDFS. Unless specified otherwise,
175 # HDFS file creation will use HDFS default settings with respect to block-size,
176 # replication factor, etc.
177 #
178 # You can also directly specify the paths to hdfs-default.xml and hdfs-site.xml
179 # via keys 'fs.hdfs.hdfsdefault' and 'fs.hdfs.hdfssite'.
180 #
181 # fs.hdfs.hadoopconf: /path/to/hadoop/conf/
182 fs.hdfs.hadoopconf: /opt/esp/flink-1.9.3/conf
183
184
185 #==============================================================================
186 # High Availability
187 #==============================================================================
188
189 # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
190 #
191 # high-availability: zookeeper
192
193 # The path where metadata for master recovery is persisted. While ZooKeeper stored
194 # the small ground truth for checkpoint and leader election, this location stores
195 # the larger objects, like persisted dataflow graphs.
196 #
197 # Must be a durable file system that is accessible from all nodes
198 # (like HDFS, S3, Ceph, nfs, ...)
199 #
200 # high-availability.storageDir: hdfs:///flink/ha/
201
202 # The list of ZooKeeper quorum peers that coordinate the high-availability
203 # setup. This must be a list of the form:
204 # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
205 #
206 # high-availability.zookeeper.quorum: localhost:2181
207
208
209 # ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
210 # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
211 # The default value is "open" and it can be changed to "creator" if ZK security is enabled
212 #
213 # high-availability.zookeeper.client.acl: open
214
215 high-availability: zookeeper
216 high-availability.zookeeper.quorum: rtmflinkvmd1:2181,rtmflinkvmd2:2181,rtmkafkavmd1:2181
217 high-availability.zookeeper.path.root: /flink19
218 high-availability.storageDir: hdfs:///flink/dev_plk/recovery
219 zookeeper.sasl.disable: false
220 high-availability.zookeeper.client.acl: creator
221
222 #==============================================================================
223 # Flink Cluster Security Configuration (optional configuration)
224 #==============================================================================
225
226 # Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
227 # may be enabled in four steps:
228 # 1. configure the local krb5.conf file
229 # 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
230 # 3. make the credentials available to various JAAS login contexts
231 # 4. configure the connector to use JAAS/SASL
232
233 # The below configure how Kerberos credentials are provided. A keytab will be used instead of
234 # a ticket cache if the keytab path and principal are set.
235
236 # security.kerberos.login.use-ticket-cache: true
237 # security.kerberos.login.keytab: /path/to/kerberos/keytab
238 # security.kerberos.login.principal: flink-user
239
240 # The configuration below defines which JAAS login contexts
241
242 # security.kerberos.login.contexts: Client,KafkaClient
243
244 #==============================================================================
245 # ZK Security Configuration (optional configuration)
246 #==============================================================================
247
248 # Below configurations are applicable if ZK ensemble is configured for security
249
250 # Override below configuration to provide custom ZK service name if configured
251 # zookeeper.sasl.service-name: zookeeper
252
253 # The configuration below must match one of the values set in "security.kerberos.login.contexts"
254 # zookeeper.sasl.login-context-name: Client
255
256 #metrics.reporters: graphite_reporter
257 #metrics.reporter.graphite_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
258 #metrics.reporter.graphite_reporter.host: rtmesppvmd1
259 #metrics.reporter.graphite_reporter.port: 2003
260 #metrics.reporter.graphite_reporter.protocol: tcp
261 #
262 #metrics.scope.jm: dev_plk.<host>.jobmanagerGlobal
263 #metrics.scope.jm.job: dev_plk.<host>.jobmanagerJob.<job_name>
264 #metrics.scope.tm: dev_plk.<host>.taskmanagerGlobal.<tm_id>
265 #metrics.scope.tm.job: dev_plk.<host>.taskmanagerJob.<tm_id>.<job_name>
266 #metrics.scope.task: dev_plk.<host>.taskmanagerTask.<tm_id>.<job_name>.<task_name>.<subtask_index>
267 #metrics.scope.operator: dev_plk.<host>.taskmanagerTask.<tm_id>.<job_name>.<operator_name>.<subtask_index>
268
269 metrics.reporters: rtm
270 metrics.reporter.rtm.class: com.spl.rtm.metrics.flink.Reporter
271 metrics.scope.jm: dev_plk.<host>.jobmanagerGlobal
272 metrics.scope.jm.job: dev_plk.<host>.jobmanagerJob.<job_name>
273 metrics.scope.tm: dev_plk.<host>.taskmanagerGlobal.<tm_id>
274 metrics.scope.tm.job: dev_plk.<host>.taskmanagerJob.<tm_id>.<job_name>
275 metrics.scope.task: dev_plk.<host>.taskmanagerTask.<tm_id>.<job_name>.<task_name>.<subtask_index>
276 metrics.scope.operator: dev_plk.<host>.taskmanagerTask.<tm_id>.<job_name>.<operator_name>.<subtask_index>
277
278 # InfluxDB configuration
279 metrics.reporter.rtm.enabled: true
280 metrics.reporter.rtm.host: rtmesppvmd1
281 metrics.reporter.rtm.port: 2003
282 metrics.reporter.rtm.protocol: TCP
283 metrics.reporter.rtm.interval: 20 SECONDS
286
287 # Kafka configuration
288 metrics.reporter.rtm.publisher.kafka.enabled: true
289 metrics.reporter.rtm.publisher.kafka.metric: F
290 metrics.reporter.rtm.publisher.kafka.node_id: 2
291 metrics.reporter.rtm.publisher.kafka.client_id: rtm-flink-metrics-publisher
292 metrics.reporter.rtm.publisher.kafka.bootstrap_servers: rtmkafkavmd1:9091
293 metrics.reporter.rtm.publisher.kafka.security.protocol: SASL_PLAINTEXT
294 metrics.reporter.rtm.publisher.kafka.topic: rtm-metric-log
295 metrics.reporter.rtm.publisher.kafka.prefixes: tech-
296 metrics.reporter.rtm.publisher.kafka.compression_type: gzip
297
298 akka.framesize: 209715200b
299 jobmanager.web.history: 50
300 akka.client.timeout: 180 s
301 akka.ask.timeout: 180 s
302
303 web.tmpdir: /opt/esp/flink-1.9.3/tmp
304
305 cluster.evenly-spread-out-slots: true
apache-flink
monitoring
influxdb
0 Answers
Your Answer