DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • 10 Ways To Keep Your Java Application Safe and Secure
  • Generating OAuth Tokens Part 1
  • Secure Communication with Token-based RSocket
  • Building Resilient Identity Systems: Lessons from Securing Billions of Authentication Requests

Trending

  • How to Perform Custom Error Handling With ANTLR
  • Accelerating Debugging in Integration Testing: An Efficient Search-Based Workflow for Impact Localization
  • Endpoint Security Controls: Designing a Secure Endpoint Architecture, Part 2
  • Unit Testing Large Codebases: Principles, Practices, and C++ Examples
  1. DZone
  2. Data Engineering
  3. Big Data
  4. Kafka Security With SASL and ACL

Kafka Security With SASL and ACL

Tutorial covering authentication using SCRAM, authorization using Kafka ACL, encryption using SSL, and using camel-Kafka to produce/consume messages.

By 
Ramu kakarla user avatar
Ramu kakarla
·
Oct. 06, 20 · Tutorial
Likes (2)
Comment
Save
Tweet
Share
40.0K Views

Join the DZone community and get the full member experience.

Join For Free

Red Hat AMQ Streams

Red Hat AMQ Streams is a massively-scalable, distributed, and high-performance data streaming platform based on the Apache ZooKeeper and Apache Kafka projects.

This blog covers authentication using SCRAM, authorization using Kafka ACL, encryption using SSL, and connect Kafka cluster using camel-Kafka to produce/consume messages with camel routes.

Encryption and Authentication

AMQ Streams supports encryption and authentication, which is configured as part of the listener configuration.

Listener Configuration

Encryption and authentication in Kafka brokers is configured per listener. 

Each listener in the Kafka broker is configured with its own security protocol. The configuration property listener.security.protocal defines which listener uses which security protocol. It maps each listener name to its security protocol. 

Supported security protocols are 

  • PLAINTEXT

Listener without any encryption or authentication.

  • SSL

Listener using TLS encryption and, optionally, authentication using TLS client certificates.

  • SASL_PLAINTEXT

Listener without encryption but with SASL-based authentication.

  • SASL_SSL

Listener with TLS-based encryption and SASL-based authentication.

Given the following listener configuration for SASL_SSL:


Java
 




x
5


 
1
listeners=SASL_SSL://localhost:9092
2
advertised.listeners=SASL_SSL://localhost:9092
3

            
4
security.inter.broker.protocol=SASL_SSL
5

            



TLS Encryption

In order to use TLS encryption and server authentication, a keystore containing private and public keys has to be provided. This is usually done using a file in the Java Key store (JKS) format. A path to this file is set in the ssl.keystore.location property. The ssl.keystore.password

Generate TLS certificates for all Kafka brokers in your cluster. The certificates should have their advertised and bootstrap addresses in their Common Name or Subject Alternative Name.

Edit the /opt/kafka/config/server.properties Kafka configuration file on all cluster nodes for the following:

  1. Change the listener.security.protocol.map field to specify the SSL protocol for the listener where you want to use TLS encryption.
  2. Set the ssl.keystore.location option to the path to the JKS keystore with the broker certificate.
  3. Set the ssl.keystore.password option to the password you used to protect the keystore.

Download Apache Kafka  and Start Zookeeper

SASL Authentication

SASL authentication is configured using Java Authentication and Authorization Service (JAAS). JAAS is also used for authentication of connections between Kafka and ZooKeeper. JAAS uses its own configuration file. The recommended location for this file is /opt/kafka/config/jaas.conf 

SASL authentication is supported both through plain unencrypted connections as well as through TLS connections. SASL can be enabled individually for each listener. To enable it, the security protocol in listener.security.protocol.map has to be either SASL_PLAINTEXT or SASL_SSL.

SASL authentication in Kafka supports several different mechanisms:

  • PLAIN

Implements authentication based on username and passwords. Usernames and passwords are stored locally in Kafka configuration.

  • SCRAM-SHA-256 and SCRAM-SHA-512

Implements authentication using Salted Challenge Response Authentication Mechanism (SCRAM). SCRAM credentials are stored centrally in ZooKeeper. SCRAM can be used in situations where ZooKeeper cluster nodes are running isolated in a private network.

  • GSSAPI

Implements authentication against a Kerberos server

The SASL mechanisms are configured via the JAAS configuration file. Kafka uses the JAAS context named Kafka server. After they are configured in JAAS, the SASL mechanisms have to be enabled in the Kafka configuration. This is done using the sasl.enabled.mechanisms property

  • SASL SCRAM

SCRAM authentication in Kafka consists of two mechanisms: SCRAM-SHA-256 and SCRAM-SHA-512. These mechanisms differ only in the hashing algorithm used - SHA-256 versus stronger SHA-512. To enable SCRAM authentication, the JAAS configuration file has to include the following configuration:

Sample ${kafka-home}/config/kafka_server_jass.conf file

Java
 




x


 
1
KafkaServer {
2
   org.apache.kafka.common.security.scram.ScramLoginModule required
3
   username="admin"
4
   password="admin123";
5
};



And in server.properties file enable SASL authentication 


Java
 




xxxxxxxxxx
1


 
1
 # With SASL & SSL encryption
2
scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
3
  username="admin" \
4
  password="admin123";



Create ssl-user-config.properties in kafka-home/config 

Java
 




xxxxxxxxxx
1


 
1
security.protocol=SASL_SSL
2
sasl.mechanism=SCRAM-SHA-512
3
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ramu" password="ramu123";
4
ssl.truststore.location=/home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/kafka_2.12-2.5.0.redhat-00003/config/truststore/kafka.truststore.jks
5
ssl.truststore.password=kkr123


User credentials for the SCRAM mechanism are stored in ZooKeeper. The kafka-configs.sh tool can be used to manage them

Java
 




xxxxxxxxxx
1
10


 
1
 ./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-512=[password='admin123']' --entity-type users --entity-name admin
2
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
3
Use --bootstrap-server instead to specify a broker to connect to.
4
Completed updating config for entity: user-principal 'admin'.
5

          
6

          
7
./kafka-configs.sh --zookeeper localhost:2181 --alter --add-config 'SCRAM-SHA-512=[password='ramu123']' --entity-type users --entity-name ramu
8
Warning: --zookeeper is deprecated and will be removed in a future version of Kafka.
9
Use --bootstrap-server instead to specify a broker to connect to.
10
Completed updating config for entity: user-principal 'ramu'.



complete ${kafka-home}/config/server.properties file looks like below

Java
 




xxxxxxxxxx
1
162


 
1
# Licensed to the Apache Software Foundation (ASF) under one or more
2
# contributor license agreements.  See the NOTICE file distributed with
3
# this work for additional information regarding copyright ownership.
4
# The ASF licenses this file to You under the Apache License, Version 2.0
5
# (the "License"); you may not use this file except in compliance with
6
# the License.  You may obtain a copy of the License at
7
#
8
#    http://www.apache.org/licenses/LICENSE-2.0
9
#
10
# Unless required by applicable law or agreed to in writing, software
11
# distributed under the License is distributed on an "AS IS" BASIS,
12
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13
# See the License for the specific language governing permissions and
14
# limitations under the License.
15

          
16
# see kafka.server.KafkaConfig for additional details and defaults
17

          
18
############################# Server Basics #############################
19

          
20
# The id of the broker. This must be set to a unique integer for each broker.
21
broker.id=0
22

          
23
############################# Socket Server Settings #############################
24

          
25
# The address the socket server listens on. It will get the value returned from 
26
# java.net.InetAddress.getCanonicalHostName() if not configured.
27
#   FORMAT:
28
#     listeners = listener_name://host_name:port
29
#   EXAMPLE:
30
#     listeners = PLAINTEXT://your.host.name:9092
31
listeners=SASL_SSL://localhost:9092
32

          
33
# Hostname and port the broker will advertise to producers and consumers. If not set, 
34
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
35
# returned from java.net.InetAddress.getCanonicalHostName().
36
advertised.listeners=SASL_SSL://localhost:9092
37

          
38
security.inter.broker.protocol=SASL_SSL
39
ssl.endpoint.identification.algorithm=
40
ssl.client.auth=required
41
sasl.mechanism.inter.broker.protocol=SCRAM-SHA-512
42
sasl.enabled.mechanisms=SCRAM-SHA-512
43

          
44

          
45
# Broker security settings
46
ssl.truststore.location=/home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/kafka_2.12-2.5.0.redhat-00003/config/truststore/kafka.truststore.jks
47
ssl.truststore.password=kkr123
48
ssl.keystore.location=/home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/kafka_2.12-2.5.0.redhat-00003/config/keystore/kafka.keystore.jks
49
ssl.keystore.password=kkr123
50
ssl.key.password=kkr123
51

          
52
# ACLs
53
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
54
super.users=User:admin
55

          
56
# #zookeeper SASL
57
zookeeper.set.acl=false
58

          
59
# # With SASL & SSL encryption
60
scram-sha-512.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
61
  username="admin" \
62
  password="admin123";
63

          
64
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
65
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
66

          
67
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
68
num.network.threads=3
69

          
70
# The number of threads that the server uses for processing requests, which may include disk I/O
71
num.io.threads=8
72

          
73
# The send buffer (SO_SNDBUF) used by the socket server
74
socket.send.buffer.bytes=102400
75

          
76
# The receive buffer (SO_RCVBUF) used by the socket server
77
socket.receive.buffer.bytes=102400
78

          
79
# The maximum size of a request that the socket server will accept (protection against OOM)
80
socket.request.max.bytes=104857600
81

          
82

          
83
############################# Log Basics #############################
84

          
85
# A comma separated list of directories under which to store log files
86
log.dirs=/tmp/kafka-logs
87

          
88
# The default number of log partitions per topic. More partitions allow greater
89
# parallelism for consumption, but this will also result in more files across
90
# the brokers.
91
num.partitions=1
92

          
93
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
94
# This value is recommended to be increased for installations with data dirs located in RAID array.
95
num.recovery.threads.per.data.dir=1
96

          
97
############################# Internal Topic Settings  #############################
98
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
99
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
100
offsets.topic.replication.factor=1
101
transaction.state.log.replication.factor=1
102
transaction.state.log.min.isr=1
103

          
104
############################# Log Flush Policy #############################
105

          
106
# Messages are immediately written to the filesystem but by default we only fsync() to sync
107
# the OS cache lazily. The following configurations control the flush of data to disk.
108
# There are a few important trade-offs here:
109
#    1. Durability: Unflushed data may be lost if you are not using replication.
110
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
111
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
112
# The settings below allow one to configure the flush policy to flush data after a period of time or
113
# every N messages (or both). This can be done globally and overridden on a per-topic basis.
114

          
115
# The number of messages to accept before forcing a flush of data to disk
116
#log.flush.interval.messages=10000
117

          
118
# The maximum amount of time a message can sit in a log before we force a flush
119
#log.flush.interval.ms=1000
120

          
121
############################# Log Retention Policy #############################
122

          
123
# The following configurations control the disposal of log segments. The policy can
124
# be set to delete segments after a period of time, or after a given size has accumulated.
125
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
126
# from the end of the log.
127

          
128
# The minimum age of a log file to be eligible for deletion due to age
129
log.retention.hours=168
130

          
131
# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
132
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
133
#log.retention.bytes=1073741824
134

          
135
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
136
log.segment.bytes=1073741824
137

          
138
# The interval at which log segments are checked to see if they can be deleted according
139
# to the retention policies
140
log.retention.check.interval.ms=300000
141

          
142
############################# Zookeeper #############################
143

          
144
# Zookeeper connection string (see zookeeper docs for details).
145
# This is a comma separated host:port pairs, each corresponding to a zk
146
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
147
# You can also append an optional chroot string to the urls to specify the
148
# root directory for all kafka znodes.
149
zookeeper.connect=localhost:2181
150

          
151
# Timeout in ms for connecting to zookeeper
152
zookeeper.connection.timeout.ms=18000
153

          
154

          
155
############################# Group Coordinator Settings #############################
156

          
157
# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
158
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
159
# The default value for this is 3 seconds.
160
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
161
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
162
group.initial.rebalance.delay.ms=0



Start Kafka with JAAS

Java
 




xxxxxxxxxx
1


 
1
export KAFKA_OPTS=-Djava.security.auth.login.config=/home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/kafka_2.12-2.5.0.redhat-00003/config/kafka_server_jaas.conf
2

          
3
./bin/kafka-server-start.sh   /home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/kafka_2.12-2.5.0.redhat-00003/config/server.properties



Create Topic test-topic 

Java
 




xxxxxxxxxx
1


 
1
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --command-config ./config/ssl-user-config.properties --replication-factor 1 --partitions 1 --topic test-topic



The above command will fails as it do not have create permissions

Now assign permissions

Java
 




xxxxxxxxxx
1


 
1
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:ramu --operation Create --operation Describe  --topic test-topic
2
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test-topic, patternType=LITERAL)`: 
3
    (principal=User:ramu, host=*, operation=DESCRIBE, permissionType=ALLOW)
4
    (principal=User:ramu, host=*, operation=CREATE, permissionType=ALLOW) 
5

          
6
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test-topic, patternType=LITERAL)`: 
7
    (principal=User:ramu, host=*, operation=DESCRIBE, permissionType=ALLOW)
8
    (principal=User:ramu, host=*, operation=CREATE, permissionType=ALLOW) 



Similarly give permissions to producer and consumer also

Producer

Java
 




xxxxxxxxxx
1
11


 
1
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:ramu --producer --topic test-topic
2
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test-topic, patternType=LITERAL)`: 
3
    (principal=User:ramu, host=*, operation=DESCRIBE, permissionType=ALLOW)
4
    (principal=User:ramu, host=*, operation=WRITE, permissionType=ALLOW)
5
    (principal=User:ramu, host=*, operation=CREATE, permissionType=ALLOW) 
6

          
7
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test-topic, patternType=LITERAL)`: 
8
    (principal=User:ramu, host=*, operation=DESCRIBE, permissionType=ALLOW)
9
    (principal=User:ramu, host=*, operation=CREATE, permissionType=ALLOW)
10
    (principal=User:ramu, host=*, operation=WRITE, permissionType=ALLOW) 
11

          



Now produce some msgs

Java
 




xxxxxxxxxx
1


 
1
[kkakarla@kkakarla kafka_2.12-2.5.0.redhat-00003]$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic --producer.config config/ssl-producer.properties
2
>Hi this is october
3
>
4

          


ssl-producer.properties file

Java
 




xxxxxxxxxx
1


 
1
bootstrap.servers=localhost:9092
2
compression.type=none
3
### SECURITY ######
4
security.protocol=SASL_SSL
5
sasl.mechanism=SCRAM-SHA-512
6
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ramu" password="ramu123";
7
ssl.truststore.location=/home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/kafka_2.12-2.5.0.redhat-00003/config/truststore/kafka.truststore.jks
8
ssl.truststore.password=kkr123



Consumer

Java
 




xxxxxxxxxx
1
16


 
1
./bin/kafka-acls.sh --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:ramu --consumer --topic test-topic --group test-consumer-group
2
Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test-topic, patternType=LITERAL)`: 
3
    (principal=User:ramu, host=*, operation=READ, permissionType=ALLOW)
4
    (principal=User:ramu, host=*, operation=DESCRIBE, permissionType=ALLOW) 
5

          
6
Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=demo-consumer-group, patternType=LITERAL)`: 
7
    (principal=User:ramu, host=*, operation=READ, permissionType=ALLOW) 
8

          
9
Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test-topic, patternType=LITERAL)`: 
10
    (principal=User:ramu, host=*, operation=DESCRIBE, permissionType=ALLOW)
11
    (principal=User:ramu, host=*, operation=CREATE, permissionType=ALLOW)
12
    (principal=User:ramu, host=*, operation=WRITE, permissionType=ALLOW)
13
    (principal=User:ramu, host=*, operation=READ, permissionType=ALLOW) 
14

          
15
Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=demo-consumer-group, patternType=LITERAL)`: 
16
    (principal=User:ramu, host=*, operation=READ, permissionType=ALLOW) 



Now consume msgs

Java
 




xxxxxxxxxx
1


 
1
[kkakarla@kkakarla kafka_2.12-2.5.0.redhat-00003]$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --consumer.config config/ssl-consumer.properties
2
Hi this is october
3

          



ssl-consumer.properties

Java
 




xxxxxxxxxx
1
15


 
1
bootstrap.servers=localhost:9092
2

          
3
# consumer group id
4
group.id=test-consumer-group
5

          
6
# What to do when there is no initial offset in Kafka or if the current
7
# offset does not exist any more on the server: latest, earliest, none
8
#auto.offset.reset=
9

          
10
### SECURITY ######
11
security.protocol=SASL_SSL
12
sasl.mechanism=SCRAM-SHA-512
13
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="ramu" password="ramu123";
14
ssl.truststore.location=/home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/kafka_2.12-2.5.0.redhat-00003/config/truststore/kafka.truststore.jks
15
ssl.truststore.password=kkr123



Now from spring-boot application  using camel producer/consumer

Java
 




xxxxxxxxxx
1
35


 
1
public class KafkaRouteBuilder extends RouteBuilder {
2
    
3
    @Override
4
    public void configure() throws Exception {
5
        
6
        
7
        //producer
8
        
9
         from("timer://foo?period={{period}}")
10
         .setBody(constant("Hi This is kafka example")) 
11
         .to("kafka:{{kafka.topic}}?brokers={{kafka.bootstrap.url}}"
12
                 + "&keySerializerClass=org.apache.kafka.common.serialization.StringSerializer"
13
                 + "&serializerClass=org.apache.kafka.common.serialization.StringSerializer" 
14
                 + "&securityProtocol={{security.protocol}}&saslJaasConfig={{sasl.jaas.config}}"
15
                 + "&saslMechanism={{sasl.mechanism}}&sslTruststoreLocation={{ssl.truststore.location}}"
16
                 + "&sslTruststorePassword={{ssl.truststore.password}}&sslTruststoreType={{ssl.truststore.type}}")
17
         .log("${body}");
18
         
19
         //consumer
20
         
21
        
22
        from("kafka:{{consumer.topic}}?brokers={{kafka.bootstrap.url}}&maxPollRecords={{consumer.max.poll.records}}"
23
                 + "&keySerializerClass=org.apache.kafka.common.serialization.StringSerializer"
24
                + "&serializerClass=org.apache.kafka.common.serialization.StringSerializer" 
25
                + "&groupId={{consumer.group}}&securityProtocol={{security.protocol}}&saslJaasConfig={{sasl.jaas.config}}"
26
                + "&saslMechanism={{sasl.mechanism}}&sslTruststoreLocation={{ssl.truststore.location}}"
27
                 + "&sslTruststorePassword={{ssl.truststore.password}}&sslTruststoreType={{ssl.truststore.type}}"
28
               + "&autoOffsetReset={{consumer.auto.offset.reset}}&autoCommitEnable={{consumer.auto.commit.enable}}")
29
         .log("${body}");
30
        
31
       
32
        
33
    }
34
      
35

          



Application.properties file

Java
 




xxxxxxxxxx
1
20


 
1
kafka.topic=test-topic
2
kafka.bootstrap.url=localhost:9092
3
period=10000&repeatCount=5
4
kafka.key.serializer=org.apache.kafka.common.serialization.StringSerializer
5
kafka.value.serializer=org.apache.kafka.common.serialization.StringSerializer
6
consumer.topic=test-topic
7
consumer.group=test-consumer-group
8
consumer.max.poll.records=1
9
consumer.threads=10
10
consumer.consumersCount=1 
11
consumer.auto.offset.reset=earliest
12
consumer.auto.commit.enable=true
13
consumer.receive.buffer.bytes=-1 
14
security.protocol = SASL_SSL
15
ssl.truststore.location =/home/kkakarla/development/git/ramu-git/kafka-poc/camel-example-kafka-sasl_ssl/src/main/truststore/kafka.truststore.jks
16
ssl.truststore.password = kkr123
17
ssl.truststore.type = JKS
18
sasl.mechanism = SCRAM-SHA-512
19
#sasl.kerberos.service.name=
20
sasl.jaas.config = org.apache.kafka.common.security.scram.ScramLoginModule required username="ramu" password="ramu123"; 



Test

Java
 




xxxxxxxxxx
1
220


 
1
[kkakarla@kkakarla camel-example-kafka-sasl_ssl]$ mvn spring-boot:run
2
[INFO] Scanning for projects...
3
[INFO] 
4
[INFO] ---------< org.apache.camel.example:camel-example-kafka-sasl >----------
5
[INFO] Building Camel :: Example :: Kafka :: sasl 1.0
6
[INFO] --------------------------------[ jar ]---------------------------------
7
[INFO] 
8
[INFO] >>> spring-boot-maven-plugin:2.2.5.RELEASE:run (default-cli) > test-compile @ camel-example-kafka-sasl >>>
9
[INFO] 
10
[INFO] --- build-helper-maven-plugin:3.0.0:add-source (add-source) @ camel-example-kafka-sasl ---
11
[INFO] Source directory: /home/kkakarla/development/git/ramu-git/kafka-poc/camel-example-kafka-sasl_ssl/target/generated-sources/sasl added.
12
[INFO] 
13
[INFO] --- maven-resources-plugin:2.6:resources (default-resources) @ camel-example-kafka-sasl ---
14
[INFO] Using 'UTF-8' encoding to copy filtered resources.
15
[INFO] Copying 3 resources
16
[INFO] 
17
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ camel-example-kafka-sasl ---
18
[INFO] Changes detected - recompiling the module!
19
[INFO] Compiling 2 source files to /home/kkakarla/development/git/ramu-git/kafka-poc/camel-example-kafka-sasl_ssl/target/classes
20
[INFO] 
21
[INFO] --- maven-resources-plugin:2.6:testResources (default-testResources) @ camel-example-kafka-sasl ---
22
[INFO] Using 'UTF-8' encoding to copy filtered resources.
23
[INFO] skip non existing resourceDirectory /home/kkakarla/development/git/ramu-git/kafka-poc/camel-example-kafka-sasl_ssl/src/test/resources
24
[INFO] 
25
[INFO] --- maven-compiler-plugin:3.1:testCompile (default-testCompile) @ camel-example-kafka-sasl ---
26
[INFO] No sources to compile
27
[INFO] 
28
[INFO] <<< spring-boot-maven-plugin:2.2.5.RELEASE:run (default-cli) < test-compile @ camel-example-kafka-sasl <<<
29
[INFO] 
30
[INFO] 
31
[INFO] --- spring-boot-maven-plugin:2.2.5.RELEASE:run (default-cli) @ camel-example-kafka-sasl ---
32
[INFO] Attaching agents: []
33

          
34
  .   ____          _            __ _ _
35
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
36
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
37
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
38
  '  |____| .__|_| |_|_| |_\__, | / / / /
39
 =========|_|==============|___/=/_/_/_/
40
 :: Spring Boot ::        (v2.2.5.RELEASE)
41

          
42
2020-10-02 13:12:13.520  INFO 13586 --- [           main] o.a.c.e.kafka.sasl.ssl.Application       : Starting Application on kkakarla.pnq.csb with PID 13586 (/home/kkakarla/development/git/ramu-git/kafka-poc/camel-example-kafka-sasl_ssl/target/classes started by kkakarla in /home/kkakarla/development/git/ramu-git/kafka-poc/camel-example-kafka-sasl_ssl)
43
2020-10-02 13:12:13.522  INFO 13586 --- [           main] o.a.c.e.kafka.sasl.ssl.Application       : No active profile set, falling back to default profiles: default
44
2020-10-02 13:12:14.518  INFO 13586 --- [           main] o.apache.camel.support.LRUCacheFactory   : Detected and using LRUCacheFactory: camel-caffeine-lrucache
45
2020-10-02 13:12:14.689  INFO 13586 --- [           main] o.a.c.s.boot.SpringBootRoutesCollector   : Loading additional Camel XML routes from: classpath:camel/*.xml
46
2020-10-02 13:12:14.689  INFO 13586 --- [           main] o.a.c.s.boot.SpringBootRoutesCollector   : Loading additional Camel XML route templates from: classpath:camel-template/*.xml
47
2020-10-02 13:12:14.689  INFO 13586 --- [           main] o.a.c.s.boot.SpringBootRoutesCollector   : Loading additional Camel XML rests from: classpath:camel-rest/*.xml
48
2020-10-02 13:12:14.772  INFO 13586 --- [           main] o.a.c.impl.engine.AbstractCamelContext   : Apache Camel 3.5.0 (camel) is starting
49
2020-10-02 13:12:14.775  INFO 13586 --- [           main] o.a.c.impl.engine.AbstractCamelContext   : StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
50
2020-10-02 13:12:14.775  INFO 13586 --- [           main] o.a.c.impl.engine.AbstractCamelContext   : Using HealthCheck: camel-health
51
2020-10-02 13:12:14.792  INFO 13586 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
52
    acks = 1
53
    batch.size = 16384
54
    bootstrap.servers = [localhost:9092]
55
    buffer.memory = 33554432
56
    client.dns.lookup = default
57
    client.id = producer-1
58
    compression.type = none
59
    connections.max.idle.ms = 540000
60
    delivery.timeout.ms = 120000
61
    enable.idempotence = false
62
    interceptor.classes = []
63
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
64
    linger.ms = 0
65
    max.block.ms = 60000
66
    max.in.flight.requests.per.connection = 5
67
    max.request.size = 1048576
68
    metadata.max.age.ms = 300000
69
    metadata.max.idle.ms = 300000
70
    metric.reporters = []
71
    metrics.num.samples = 2
72
    metrics.recording.level = INFO
73
    metrics.sample.window.ms = 30000
74
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
75
    receive.buffer.bytes = 65536
76
    reconnect.backoff.max.ms = 1000
77
    reconnect.backoff.ms = 50
78
    request.timeout.ms = 30000
79
    retries = 0
80
    retry.backoff.ms = 100
81
    sasl.client.callback.handler.class = null
82
    sasl.jaas.config = [hidden]
83
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
84
    sasl.kerberos.min.time.before.relogin = 60000
85
    sasl.kerberos.service.name = null
86
    sasl.kerberos.ticket.renew.jitter = 0.05
87
    sasl.kerberos.ticket.renew.window.factor = 0.8
88
    sasl.login.callback.handler.class = null
89
    sasl.login.class = null
90
    sasl.login.refresh.buffer.seconds = 300
91
    sasl.login.refresh.min.period.seconds = 60
92
    sasl.login.refresh.window.factor = 0.8
93
    sasl.login.refresh.window.jitter = 0.05
94
    sasl.mechanism = SCRAM-SHA-512
95
    security.protocol = SASL_SSL
96
    security.providers = null
97
    send.buffer.bytes = 131072
98
    ssl.cipher.suites = null
99
    ssl.enabled.protocols = [TLSv1.2]
100
    ssl.endpoint.identification.algorithm = https
101
    ssl.key.password = null
102
    ssl.keymanager.algorithm = SunX509
103
    ssl.keystore.location = null
104
    ssl.keystore.password = null
105
    ssl.keystore.type = JKS
106
    ssl.protocol = TLSv1.2
107
    ssl.provider = null
108
    ssl.secure.random.implementation = null
109
    ssl.trustmanager.algorithm = PKIX
110
    ssl.truststore.location = /home/kkakarla/development/git/ramu-git/kafka-poc/camel-example-kafka-sasl_ssl/src/main/truststore/kafka.truststore.jks
111
    ssl.truststore.password = [hidden]
112
    ssl.truststore.type = JKS
113
    transaction.timeout.ms = 60000
114
    transactional.id = null
115
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer
116

          
117
2020-10-02 13:12:14.918  INFO 13586 --- [           main] o.a.k.c.s.authenticator.AbstractLogin    : Successfully logged in.
118
2020-10-02 13:12:14.986  INFO 13586 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.1
119
2020-10-02 13:12:14.986  INFO 13586 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 0efa8fb0f4c73d92
120
2020-10-02 13:12:14.986  INFO 13586 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1601624534985
121
2020-10-02 13:12:14.991  INFO 13586 --- [           main] o.a.c.i.e.InternalRouteStartupManager    : Route: route1 started and consuming from: timer://foo
122
2020-10-02 13:12:14.991  INFO 13586 --- [           main] o.a.camel.component.kafka.KafkaConsumer  : Starting Kafka consumer on topic: test-topic with breakOnFirstError: false
123
2020-10-02 13:12:14.996  INFO 13586 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
124
    allow.auto.create.topics = true
125
    auto.commit.interval.ms = 5000
126
    auto.offset.reset = earliest
127
    bootstrap.servers = [localhost:9092]
128
    check.crcs = true
129
    client.dns.lookup = default
130
    client.id = 
131
    client.rack = 
132
    connections.max.idle.ms = 540000
133
    default.api.timeout.ms = 60000
134
    enable.auto.commit = true
135
    exclude.internal.topics = true
136
    fetch.max.bytes = 52428800
137
    fetch.max.wait.ms = 500
138
    fetch.min.bytes = 1
139
    group.id = test-consumer-group
140
    group.instance.id = null
141
    heartbeat.interval.ms = 3000
142
    interceptor.classes = []
143
    internal.leave.group.on.close = true
144
    isolation.level = read_uncommitted
145
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
146
    max.partition.fetch.bytes = 1048576
147
    max.poll.interval.ms = 300000
148
    max.poll.records = 1
149
    metadata.max.age.ms = 300000
150
    metric.reporters = []
151
    metrics.num.samples = 2
152
    metrics.recording.level = INFO
153
    metrics.sample.window.ms = 30000
154
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
155
    receive.buffer.bytes = 65536
156
    reconnect.backoff.max.ms = 1000
157
    reconnect.backoff.ms = 50
158
    request.timeout.ms = 40000
159
    retry.backoff.ms = 100
160
    sasl.client.callback.handler.class = null
161
    sasl.jaas.config = [hidden]
162
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
163
    sasl.kerberos.min.time.before.relogin = 60000
164
    sasl.kerberos.service.name = null
165
    sasl.kerberos.ticket.renew.jitter = 0.05
166
    sasl.kerberos.ticket.renew.window.factor = 0.8
167
    sasl.login.callback.handler.class = null
168
    sasl.login.class = null
169
    sasl.login.refresh.buffer.seconds = 300
170
    sasl.login.refresh.min.period.seconds = 60
171
    sasl.login.refresh.window.factor = 0.8
172
    sasl.login.refresh.window.jitter = 0.05
173
    sasl.mechanism = SCRAM-SHA-512
174
    security.protocol = SASL_SSL
175
    security.providers = null
176
    send.buffer.bytes = 131072
177
    session.timeout.ms = 10000
178
    ssl.cipher.suites = null
179
    ssl.enabled.protocols = [TLSv1.2]
180
    ssl.endpoint.identification.algorithm = https
181
    ssl.key.password = null
182
    ssl.keymanager.algorithm = SunX509
183
    ssl.keystore.location = null
184
    ssl.keystore.password = null
185
    ssl.keystore.type = JKS
186
    ssl.protocol = TLSv1.2
187
    ssl.provider = null
188
    ssl.secure.random.implementation = null
189
    ssl.trustmanager.algorithm = PKIX
190
    ssl.truststore.location = /home/kkakarla/development/git/ramu-git/kafka-poc/camel-example-kafka-sasl_ssl/src/main/truststore/kafka.truststore.jks
191
    ssl.truststore.password = [hidden]
192
    ssl.truststore.type = JKS
193
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
194

          
195
2020-10-02 13:12:15.016  WARN 13586 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : The configuration 'specific.avro.reader' was supplied but isn't a known config.
196
2020-10-02 13:12:15.016  INFO 13586 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.5.1
197
2020-10-02 13:12:15.016  INFO 13586 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 0efa8fb0f4c73d92
198
2020-10-02 13:12:15.016  INFO 13586 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1601624535016
199
2020-10-02 13:12:15.017  INFO 13586 --- [           main] o.a.c.i.e.InternalRouteStartupManager    : Route: route2 started and consuming from: kafka://test-topic
200
2020-10-02 13:12:15.017  INFO 13586 --- [mer[test-topic]] o.a.camel.component.kafka.KafkaConsumer  : Subscribing test-topic-Thread 0 to topic test-topic
201
2020-10-02 13:12:15.018  INFO 13586 --- [mer[test-topic]] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Subscribed to topic(s): test-topic
202
2020-10-02 13:12:15.020  INFO 13586 --- [           main] o.a.c.impl.engine.AbstractCamelContext   : Total 2 routes, of which 2 are started
203
2020-10-02 13:12:15.021  INFO 13586 --- [           main] o.a.c.impl.engine.AbstractCamelContext   : Apache Camel 3.5.0 (camel) started in 0.246 seconds
204
2020-10-02 13:12:15.030  INFO 13586 --- [           main] o.a.c.e.kafka.sasl.ssl.Application       : Started Application in 1.721 seconds (JVM running for 1.985)
205
2020-10-02 13:12:15.034  INFO 13586 --- [extShutdownHook] o.a.c.impl.engine.AbstractCamelContext   : Apache Camel 3.5.0 (camel) is shutting down
206
2020-10-02 13:12:15.035  INFO 13586 --- [extShutdownHook] o.a.c.i.engine.DefaultShutdownStrategy   : Starting to graceful shutdown 2 routes (timeout 45 seconds)
207
2020-10-02 13:12:15.036  INFO 13586 --- [ - ShutdownTask] o.a.camel.component.kafka.KafkaConsumer  : Stopping Kafka consumer on topic: test-topic
208
2020-10-02 13:12:15.315  INFO 13586 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: TIW2NTETQmeyjTIzNCKdIg
209
2020-10-02 13:12:15.318  INFO 13586 --- [mer[test-topic]] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Cluster ID: TIW2NTETQmeyjTIzNCKdIg
210
2020-10-02 13:12:15.319  INFO 13586 --- [mer[test-topic]] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Discovered group coordinator localhost:9092 (id: 2147483647 rack: null)
211
2020-10-02 13:12:15.321  INFO 13586 --- [mer[test-topic]] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] (Re-)joining group
212
2020-10-02 13:12:15.390  INFO 13586 --- [mer[test-topic]] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
213
2020-10-02 13:12:15.390  INFO 13586 --- [mer[test-topic]] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] (Re-)joining group
214
2020-10-02 13:12:15.394  INFO 13586 --- [mer[test-topic]] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Finished assignment for group at generation 16: {consumer-test-consumer-group-1-6f265a6e-422f-4651-b442-a48638bcc2ee=Assignment(partitions=[test-topic-0])}
215
2020-10-02 13:12:15.398  INFO 13586 --- [mer[test-topic]] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Successfully joined group with generation 16
216
2020-10-02 13:12:15.401  INFO 13586 --- [mer[test-topic]] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Adding newly assigned partitions: test-topic-0
217
2020-10-02 13:12:15.411  INFO 13586 --- [mer[test-topic]] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-consumer-group-1, groupId=test-consumer-group] Setting offset for partition test-topic-0 to the committed offset FetchPosition{offset=10, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional[localhost:9092 (id: 0 rack: null)], epoch=0}}
218
2020-10-02 13:12:16.081  INFO 13586 --- [cer[test-topic]] route1                                   : Hi This is kafka example
219
2020-10-02 13:12:16.082  INFO 13586 --- [mer[test-topic]] route2                                   : Hi This is kafka example
220

          



Enjoy!  See you with another article soon.

kafka authentication security Java (programming language)

Opinions expressed by DZone contributors are their own.

Related

  • 10 Ways To Keep Your Java Application Safe and Secure
  • Generating OAuth Tokens Part 1
  • Secure Communication with Token-based RSocket
  • Building Resilient Identity Systems: Lessons from Securing Billions of Authentication Requests

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!