1 year ago

#307941

test-img

KevinMT

Kafka Connect to AKS Kafka Cluster Security Config

Brief intro:

I am having difficulty understanding exactly what needs to be done to allow my on-prem Kafka Connect client to successfully connect to Azure Kubernetes Kafka Cluster.

Full detail:

Starting from Azure, I have a fully configured AKS Kafka cluster running, this has been set up using Helm and Strimzi. The Kafka cluster was set up with this script:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-kafka-cluster
  namespace: kafka
spec:
  kafka:
    version: 3.0.0
    replicas: 1
    listeners:
    - name: plain
      port: 9092
      type: internal
      tls: false
    - name: tls
      port: 9093
      type: internal
      tls: true
      authentication:
        type: tls
    - name: external
      port: 9094
      type: loadbalancer
      tls: true
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      default.replication.factor: 1
      min.insync.replicas: 1
      log.message.format.version: "3.0"
    storage:
      type: ephemeral
  zookeeper:
    replicas: 1
    storage:
      type: ephemeral
  entityOperator:
    topicOperator: {}
    userOperator: {}

As you can see, this has an external load balancer configured on port 9094 which, from everything I can find, should be all I need to do to expose this correctly to the internet. The services appear in my cluster and it looks good. I have checked in my Azure Portal that this exists too. So I think Azure is ready...


Now for the client on-prem, I am running a Linux box with Docker and the Debezium/connect image for ease of use right now.

The connect container is pointing at the bootstrap.server=<azure-external-IP:9094> which was the external load balancer that Strimzi created.

The only other thing to get this to connect successfully, as far as I am aware, is I need to make some changes to the connect-distributed.properties file to set up the client to use SSL. These changes are:

  • Add some certificates into a keystore/truststore.
  • Add some SSL configuration for the client, producer and consumer in connect-distributed.properties

For the first bullet point: Which certificate from Kubernetes do I need exactly, is it the ca.crt? Where exactly does this certificate need to go and is there a specific setup that I need to follow because I am using docker?

For the second bullet point: Which exact configuration do I need to follow to get this to work? I assume it is similar to this https://docs.confluent.io/platform/current/kafka/encryption.html#kconnect-long, but I don't know for sure.

There are so many different tutorials online and I am struggling to know which one is correct for this exact situation. Any help at all on this would be incredibly appreciated.

Thanks!

Edit - To add Docker details, the Debezium/Connect Image details, Strimzi Version and error logs

Docker info: Version-20.10.12 API version-1.41

Debezium/Connect:image I am just using this base image and spinning it up as described in that link but with my bootstrap_Server added in.

Strimzi: Version is 0.28.0

Kafka: Azure Kubernetes Version is 3.0.0 Kafka Connect Version is 2.7.1

Error information is below!

2022-03-20 13:36:42,170 INFO   ||  [AdminClient clientId=adminclient-1] Metadata update failed   [org.apache.kafka.clients.admin.internals.AdminMetadataManager]
org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1647783402169, tries=1, nextAllowedTryMs=1647783402270) timed out at 1647783402170 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call. Call: fetchMetadata
2022-03-20 13:37:12,170 INFO   ||  App info kafka.admin.client for adminclient-1 unregistered   [org.apache.kafka.common.utils.AppInfoParser]
2022-03-20 13:37:12,171 INFO   ||  [AdminClient clientId=adminclient-1] Metadata update failed   [org.apache.kafka.clients.admin.internals.AdminMetadataManager]
org.apache.kafka.common.errors.TimeoutException: Call(callName=fetchMetadata, deadlineMs=1647783432170, tries=1, nextAllowedTryMs=-9223372036854775709) timed out at 9223372036854775807 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting to send the call. Call: fetchMetadata
2022-03-20 13:37:12,180 INFO   ||  Metrics scheduler closed   [org.apache.kafka.common.metrics.Metrics]
2022-03-20 13:37:12,180 INFO   ||  Closing reporter org.apache.kafka.common.metrics.JmxReporter   [org.apache.kafka.common.metrics.Metrics]
2022-03-20 13:37:12,180 INFO   ||  Metrics reporters closed   [org.apache.kafka.common.metrics.Metrics]
2022-03-20 13:37:12,180 ERROR  ||  Stopping due to error   [org.apache.kafka.connect.cli.ConnectDistributed]
org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.
        at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:70)
        at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:51)
        at org.apache.kafka.connect.cli.ConnectDistributed.startConnect(ConnectDistributed.java:97)
        at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:80)
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1647783432168, tries=1, nextAllowedTryMs=1647783432269) timed out at 1647783432169 after 1 attempt(s)
        at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
        at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
        at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
        at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
        at org.apache.kafka.connect.util.ConnectUtils.lookupKafkaClusterId(ConnectUtils.java:64)
        ... 3 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listNodes, deadlineMs=1647783432168, tries=1, nextAllowedTryMs=1647783432269) timed out at 1647783432169 after 1 attempt(s)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: listNodes

kubernetes

apache-kafka

apache-kafka-connect

strimzi

0 Answers

Your Answer

Accepted video resources