1 year ago

#362852

test-img

Damianos17

how to change the frequency of reporting metrics in Apache Flink?

Flink by default reports metrics to Influxdb every 10 seconds. After I added a line in the configuration file flink_conf.yaml:

metrics.reporter.rtm.interval: 20 SECONDS

metrics now are reported every 60 seconds, what I may see in Chronograf:

enter image description here

But my goal is to reporting every 20 seconds. Does anyone know how to set it?

Edit: flink-conf.yaml:

     1  ################################################################################
     2  #  Licensed to the Apache Software Foundation (ASF) under one
     3  #  or more contributor license agreements.  See the NOTICE file
     4  #  distributed with this work for additional information
     5  #  regarding copyright ownership.  The ASF licenses this file
     6  #  to you under the Apache License, Version 2.0 (the
     7  #  "License"); you may not use this file except in compliance
     8  #  with the License.  You may obtain a copy of the License at
     9  #
    10  #      http://www.apache.org/licenses/LICENSE-2.0
    11  #
    12  #  Unless required by applicable law or agreed to in writing, software
    13  #  distributed under the License is distributed on an "AS IS" BASIS,
    14  #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    15  #  See the License for the specific language governing permissions and
    16  # limitations under the License.
    17  ################################################################################
    18
    19
    20  #==============================================================================
    21  # Common
    22  #==============================================================================
    23
    24  # The external address of the host on which the JobManager runs and can be
    25  # reached by the TaskManagers and any clients which want to connect. This setting
    26  # is only used in Standalone mode and may be overwritten on the JobManager side
    27  # by specifying the --host <hostname> parameter of the bin/jobmanager.sh executable.
    28  # In high availability mode, if you use the bin/start-cluster.sh script and setup
    29  # the conf/masters file, this will be taken care of automatically. Yarn/Mesos
    30  # automatically configure the host name based on the hostname of the node where the
    31  # JobManager runs.
    32
    33  jobmanager.rpc.address: rtmflinkvmd2
    34
    35  taskmanager.hostname: rtmflinkvmd2
    36
    37  # The RPC port where the JobManager is reachable.
    38
    39  jobmanager.rpc.port: 6123
    40
    41
    42  # The heap size for the JobManager JVM
    43
    44  jobmanager.heap.mb: 1024
    45
    46
    47  # The heap size for the TaskManager JVM
    48
    49  taskmanager.heap.mb: 4096
    50
    51
    52  # The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.
    53
    54  taskmanager.numberOfTaskSlots: 32
    55
    56  # Specify whether TaskManager memory should be allocated when starting up (true) or when
    57  # memory is required in the memory manager (false)
    58  # Important Note: For pure streaming setups, we highly recommend to set this value to `false`
    59  # as the default state backends currently do not use the managed memory.
    60
    61  taskmanager.memory.preallocate: false
    62
    63  # The parallelism used for programs that did not specify and other parallelism.
    64
    65  parallelism.default: 1
    66
    67
    68  #==============================================================================
    69  # Web Frontend
    70  #==============================================================================
    71
    72  # The address under which the web-based runtime monitor listens.
    73  #
    74  #jobmanager.web.address: 0.0.0.0
    75
    76  # The port under which the web-based runtime monitor listens.
    77  # A value of -1 deactivates the web server.
    78
    79  # Must be consistent with masters file!
    80  rest.port: 8182 # <- changed from default 8081
    81
    82  # Flag to specify whether job submission is enabled from the web-based
    83  # runtime monitor. Uncomment to disable.
    84
    85  #jobmanager.web.submit.enable: false
    86
    87  #==============================================================================
    88  # HistoryServer
    89  #==============================================================================
    90
    91  # The HistoryServer is started and stopped via bin/historyserver.sh (start|stop)
    92
    93  # Directory to upload completed jobs to. Add this directory to the list of
    94  # monitored directories of the HistoryServer as well (see below).
    95  #jobmanager.archive.fs.dir: hdfs:///completed-jobs/
    96
    97  # The address under which the web-based HistoryServer listens.
    98  #historyserver.web.address: 0.0.0.0
    99
   100  # The port under which the web-based HistoryServer listens.
   101  #historyserver.web.port: 8082
   102
   103  # Comma separated list of directories to monitor for completed jobs.
   104  #historyserver.archive.fs.dir: hdfs:///completed-jobs/
   105
   106  # Interval in milliseconds for refreshing the monitored directories.
   107  #historyserver.archive.fs.refresh-interval: 10000
   108
   109  #==============================================================================
   110  # Streaming state checkpointing
   111  #==============================================================================
   112
   113  # The backend that will be used to store operator state checkpoints if
   114  # checkpointing is enabled.
   115  #
   116  # Supported backends: jobmanager, filesystem, rocksdb, <class-name-of-factory>
   117  #
   118  # state.backend: filesystem
   119
   120
   121  # Directory for storing checkpoints in a Flink-supported filesystem
   122  # Note: State backend must be accessible from the JobManager and all TaskManagers.
   123  # Use "hdfs://" for HDFS setups, "file://" for UNIX/POSIX-compliant file systems,
   124  # (or any local file system under Windows), or "S3://" for S3 file system.
   125  #
   126  # state.backend.fs.checkpointdir: hdfs://namenode-host:port/flink-checkpoints
   127  state.backend: filesystem
   128  # Jest jawnie wypisany namenode, bo flink w tym przypadku nie zagląda do hdfs-site.xml
   129  state.checkpoints.dir: hdfs:///flink/dev_plk/checkpoints
   130  state.savepoints.dir: hdfs:///flink/dev_plk/savepoints
   131  security.kerberos.login.keytab: /opt/esp/flink-1.9.3/conf/esp.keytab
   132  security.kerberos.login.principal: ###
   133
   134  #Below are base settings for rocksdb metrics, that can be used for grafana dashboards
   135  state.backend.rocksdb.metrics.estimate-num-keys: true
   136  state.backend.rocksdb.metrics.estimate-live-data-size: true
   137  state.backend.rocksdb.metrics.cur-size-all-mem-tables: true
   138  state.backend.rocksdb.metrics.size-all-mem-tables: true
   139
   140  #For frequent writes increase the value as needed. Currently RocksDB settings can only be changed per Flink cluster
   141  state.backend.rocksdb.timer-service.factory: ROCKSDB
   142  state.backend.rocksdb.writebuffer.size: 1024m
   143
   144  #==============================================================================
   145  # Advanced
   146  #==============================================================================
   147
   148  # The number of buffers for the network stack.
   149  #
   150  # taskmanager.network.numberOfBuffers: 2048
   151
   152
   153  # Directories for temporary files.
   154  #
   155  # Add a delimited list for multiple directories, using the system directory
   156  # delimiter (colon ':' on unix) or a comma, e.g.:
   157  #     /data1/tmp:/data2/tmp:/data3/tmp
   158  #
   159  # Note: Each directory entry is read from and written to by a different I/O
   160  # thread. You can include the same directory multiple times in order to create
   161  # multiple I/O threads against that directory. This is for example relevant for
   162  # high-throughput RAIDs.
   163  #
   164  # If not specified, the system-specific Java temporary directory (java.io.tmpdir
   165  # property) is taken.
   166  #
   167  io.tmp.dirs: /opt/esp/flink-1.9.3/tmp
   168
   169  blob.storage.directory: /opt/esp/flink-1.9.3/blob
   170
   171
   172  # Path to the Hadoop configuration directory.
   173  #
   174  # This configuration is used when writing into HDFS. Unless specified otherwise,
   175  # HDFS file creation will use HDFS default settings with respect to block-size,
   176  # replication factor, etc.
   177  #
   178  # You can also directly specify the paths to hdfs-default.xml and hdfs-site.xml
   179  # via keys 'fs.hdfs.hdfsdefault' and 'fs.hdfs.hdfssite'.
   180  #
   181  # fs.hdfs.hadoopconf: /path/to/hadoop/conf/
   182  fs.hdfs.hadoopconf: /opt/esp/flink-1.9.3/conf
   183
   184
   185  #==============================================================================
   186  # High Availability
   187  #==============================================================================
   188
   189  # The high-availability mode. Possible options are 'NONE' or 'zookeeper'.
   190  #
   191  # high-availability: zookeeper
   192
   193  # The path where metadata for master recovery is persisted. While ZooKeeper stored
   194  # the small ground truth for checkpoint and leader election, this location stores
   195  # the larger objects, like persisted dataflow graphs.
   196  #
   197  # Must be a durable file system that is accessible from all nodes
   198  # (like HDFS, S3, Ceph, nfs, ...)
   199  #
   200  # high-availability.storageDir: hdfs:///flink/ha/
   201
   202  # The list of ZooKeeper quorum peers that coordinate the high-availability
   203  # setup. This must be a list of the form:
   204  # "host1:clientPort,host2:clientPort,..." (default clientPort: 2181)
   205  #
   206  # high-availability.zookeeper.quorum: localhost:2181
   207
   208
   209  # ACL options are based on https://zookeeper.apache.org/doc/r3.1.2/zookeeperProgrammers.html#sc_BuiltinACLSchemes
   210  # It can be either "creator" (ZOO_CREATE_ALL_ACL) or "open" (ZOO_OPEN_ACL_UNSAFE)
   211  # The default value is "open" and it can be changed to "creator" if ZK security is enabled
   212  #
   213  # high-availability.zookeeper.client.acl: open
   214
   215  high-availability: zookeeper
   216  high-availability.zookeeper.quorum: rtmflinkvmd1:2181,rtmflinkvmd2:2181,rtmkafkavmd1:2181
   217  high-availability.zookeeper.path.root: /flink19
   218  high-availability.storageDir: hdfs:///flink/dev_plk/recovery
   219  zookeeper.sasl.disable: false
   220  high-availability.zookeeper.client.acl: creator
   221
   222  #==============================================================================
   223  # Flink Cluster Security Configuration (optional configuration)
   224  #==============================================================================
   225
   226  # Kerberos authentication for various components - Hadoop, ZooKeeper, and connectors -
   227  # may be enabled in four steps:
   228  # 1. configure the local krb5.conf file
   229  # 2. provide Kerberos credentials (either a keytab or a ticket cache w/ kinit)
   230  # 3. make the credentials available to various JAAS login contexts
   231  # 4. configure the connector to use JAAS/SASL
   232
   233  # The below configure how Kerberos credentials are provided. A keytab will be used instead of
   234  # a ticket cache if the keytab path and principal are set.
   235
   236  # security.kerberos.login.use-ticket-cache: true
   237  # security.kerberos.login.keytab: /path/to/kerberos/keytab
   238  # security.kerberos.login.principal: flink-user
   239
   240  # The configuration below defines which JAAS login contexts
   241
   242  # security.kerberos.login.contexts: Client,KafkaClient
   243
   244  #==============================================================================
   245  # ZK Security Configuration (optional configuration)
   246  #==============================================================================
   247
   248  # Below configurations are applicable if ZK ensemble is configured for security
   249
   250  # Override below configuration to provide custom ZK service name if configured
   251  # zookeeper.sasl.service-name: zookeeper
   252
   253  # The configuration below must match one of the values set in "security.kerberos.login.contexts"
   254  # zookeeper.sasl.login-context-name: Client
   255
   256  #metrics.reporters: graphite_reporter
   257  #metrics.reporter.graphite_reporter.class: org.apache.flink.metrics.graphite.GraphiteReporter
   258  #metrics.reporter.graphite_reporter.host: rtmesppvmd1
   259  #metrics.reporter.graphite_reporter.port: 2003
   260  #metrics.reporter.graphite_reporter.protocol: tcp
   261  #
   262  #metrics.scope.jm: dev_plk.<host>.jobmanagerGlobal
   263  #metrics.scope.jm.job: dev_plk.<host>.jobmanagerJob.<job_name>
   264  #metrics.scope.tm: dev_plk.<host>.taskmanagerGlobal.<tm_id>
   265  #metrics.scope.tm.job: dev_plk.<host>.taskmanagerJob.<tm_id>.<job_name>
   266  #metrics.scope.task: dev_plk.<host>.taskmanagerTask.<tm_id>.<job_name>.<task_name>.<subtask_index>
   267  #metrics.scope.operator: dev_plk.<host>.taskmanagerTask.<tm_id>.<job_name>.<operator_name>.<subtask_index>
   268
   269  metrics.reporters: rtm
   270  metrics.reporter.rtm.class: com.spl.rtm.metrics.flink.Reporter
   271  metrics.scope.jm: dev_plk.<host>.jobmanagerGlobal
   272  metrics.scope.jm.job: dev_plk.<host>.jobmanagerJob.<job_name>
   273  metrics.scope.tm: dev_plk.<host>.taskmanagerGlobal.<tm_id>
   274  metrics.scope.tm.job: dev_plk.<host>.taskmanagerJob.<tm_id>.<job_name>
   275  metrics.scope.task: dev_plk.<host>.taskmanagerTask.<tm_id>.<job_name>.<task_name>.<subtask_index>
   276  metrics.scope.operator: dev_plk.<host>.taskmanagerTask.<tm_id>.<job_name>.<operator_name>.<subtask_index>
   277
   278  # InfluxDB configuration
   279  metrics.reporter.rtm.enabled: true
   280  metrics.reporter.rtm.host: rtmesppvmd1
   281  metrics.reporter.rtm.port: 2003
   282  metrics.reporter.rtm.protocol: TCP
   283  metrics.reporter.rtm.interval: 20 SECONDS
   286
   287  # Kafka configuration
   288  metrics.reporter.rtm.publisher.kafka.enabled: true
   289  metrics.reporter.rtm.publisher.kafka.metric: F
   290  metrics.reporter.rtm.publisher.kafka.node_id: 2
   291  metrics.reporter.rtm.publisher.kafka.client_id: rtm-flink-metrics-publisher
   292  metrics.reporter.rtm.publisher.kafka.bootstrap_servers: rtmkafkavmd1:9091
   293  metrics.reporter.rtm.publisher.kafka.security.protocol: SASL_PLAINTEXT
   294  metrics.reporter.rtm.publisher.kafka.topic: rtm-metric-log
   295  metrics.reporter.rtm.publisher.kafka.prefixes: tech-
   296  metrics.reporter.rtm.publisher.kafka.compression_type: gzip
   297
   298  akka.framesize: 209715200b
   299  jobmanager.web.history: 50
   300  akka.client.timeout: 180 s
   301  akka.ask.timeout: 180 s
   302
   303  web.tmpdir: /opt/esp/flink-1.9.3/tmp
   304
   305  cluster.evenly-spread-out-slots: true

apache-flink

monitoring

influxdb

0 Answers

Your Answer

Accepted video resources