1 year ago
#386378
Aprendiendo Siempre
How can I configured jaas.conf in order to Kafka Inbound Endpoint WSO2 EI 6.6.0?
I have WSO2 EI 6.6.0 and I am using the next libs to configure Kafka Inbound Enpoint.
dropins:
org.apache.synapse.kafka.poll-1.0.10.jar
lib:
kafka_2.12-1.0.0.jar
kafka-clients-1.0.0.jar
metrics-core-2.2.0.jar
scala-library-2.12.3.jar
zkclient-0.10.jar
zookeeper-3.4.10.jar
Then I configured the jaas.conf file like I saw in this tutorial: https://docs.wso2.com/display/EI660/Kafka+Inbound+Protocol+, it is to say that I created the folders /conf/identity inside repository folder of WSO2.
Now the content of my file jaas is the next:
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=""
password=""
};
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
username=""
password="";
};
I have this information:
IP: X.X.X.X
Port: X
topic: X
user:
password:
security.protocol: SASL_SSL
sasl.mechanism: PLAIN
To finish, I have a inbound endpoint in EI:
<?xml version="1.0" encoding="UTF-8"?>
<inboundEndpoint xmlns="http://ws.apache.org/ns/synapse" name="kafka" sequence="request" onError="fault" class="org.wso2.carbon.inbound.kafka.KafkaMessageConsumer" suspend="false">
<parameters>
<parameter name="interval">10</parameter>
<parameter name="coordination">true</parameter>
<parameter name="sequential">true</parameter>
<parameter name="inbound.behavior">polling</parameter>
<parameter name="value.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
<parameter name="topic.name">test</parameter>
<parameter name="poll.timeout">100</parameter>
<parameter name="bootstrap.servers">localhost:9093</parameter>
<parameter name="group.id">hello</parameter>
<parameter name="contentType">application/json</parameter>
<parameter name="key.deserializer">org.apache.kafka.common.serialization.StringDeserializer</parameter>
<parameter name="security.protocol">SASL_SSL</parameter>
<parameter name="sasl.mechanism">PLAIN</parameter>
</parameters>
</inboundEndpoint>
The problem is that I receive the next error:
[2022-04-21 15:19:10,098] INFO {org.apache.kafka.clients.consumer.ConsumerConfig} - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [localhost:9093]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = prueba-kafka
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 50
reconnect.backoff.ms = 50
request.timeout.ms = 305000
retry.backoff.ms = 100
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.mechanism = PLAIN
security.protocol = SASL_SSL
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
[2022-04-21 15:19:10,119] ERROR {org.wso2.carbon.ntask.core.impl.TaskQuartzJobAdapter} - Error in executing task: Failed to construct kafka consumer org.apache.synapse.SynapseException: Failed to construct kafka consumer
at org.wso2.carbon.inbound.kafka.KafkaMessageConsumer.poll(KafkaMessageConsumer.java:351)
at org.wso2.carbon.inbound.endpoint.protocol.generic.GenericTask.taskExecute(GenericTask.java:41)
at org.wso2.carbon.inbound.endpoint.common.InboundTask.execute(InboundTask.java:51)
at org.wso2.carbon.mediation.ntask.NTaskAdapter.execute(NTaskAdapter.java:98)
at org.wso2.carbon.ntask.core.impl.TaskQuartzJobAdapter.execute(TaskQuartzJobAdapter.java:67)
at org.quartz.core.JobRunShell.run(JobRunShell.java:213)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:781)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:635)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:617)
at org.wso2.carbon.inbound.kafka.KafkaMessageConsumer.poll(KafkaMessageConsumer.java:333)
... 10 more
Caused by: java.lang.SecurityException: java.io.IOException: Error de configuraci├│n:
Línea 6: se esperaba [option key]
at sun.security.provider.ConfigFile$Spi.<init>(ConfigFile.java:137)
at sun.security.provider.ConfigFile.<init>(ConfigFile.java:102)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at javax.security.auth.login.Configuration$2.run(Configuration.java:255)
at javax.security.auth.login.Configuration$2.run(Configuration.java:247)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.login.Configuration.getConfiguration(Configuration.java:246)
at org.apache.kafka.common.security.JaasContext.defaultContext(JaasContext.java:112)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:96)
at org.apache.kafka.common.security.JaasContext.load(JaasContext.java:78)
at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:103)
at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:61)
at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:86)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:702)
... 13 more
I see that the variable: sasl.jaas.config is null but I have configured de jaas file. How can I add this variable value?
I have tried to add in the inbound endpoint and work fine but the jaas file it is not using in any moment.
How can I configured the jaas file to add this propertie?
Thank you
apache-kafka
wso2
apache-zookeeper
jaas
0 Answers
Your Answer