Establish Trust Chain From Kafka to Microservices REST APIs With JWT
Join the DZone community and get the full member experience.
Join For FreeAbstract
This article will discuss an approach to propagate trust from Kafka event stream processing modules to microservices APIs with Spring security and JSON web token (JWT) on Spring Boot applications by using Spring Cloud Stream framework.
Apache Kafka is a distributed streaming platform and is used for building real-time data pipelines and streaming applications. It becomes more popular on event stream processing (ESP) with many advantages on processing performance, data parallelism, distributed coordination, fault tolerance, and operational simplicity. It has been adopted and runs in production in thousands of companies, including LinkedIn, Microsoft, and Netflix, top banks, insurers, and telecoms on processing trillions of messages each day.
Kafka has its own ecosystem, however, form design pattern perspective, it can be simply described as Message Queue + Pub-Sub. Please see my other article on ESP integration pattern advantage and cautions.
Microservices REST APIs, as another popular enterprise integration pattern for service composition, service clustering, and request-response pattern on handling multiple concurrence. However, in some cases, people want to combined request-response APIs with event sequence of update matters. There will be a concern on how to establish, share, or propagate the security among services and events.
What Does Kafka Security Provide?
In Kafka, it provides the following security measures, which focus on message payload delivery channels, partners, and authorization on message producer and consumers.
- Authentication of connections to brokers from producers to consumers, using either SSL or SASL:
- SASL/GSSAPI (Kerberos).
- SASL/PLAIN.
- SASL/SCRAM-SHA-256 and SASL/SCRAM-SHA-512.
- SASL/OAUTHBEARER – after version 2.0.
- Authentication of connections from brokers to ZooKeeper.
- Encryption of data transferred by using SSL.
- Authorization of read / write operations by clients.
Concerns and Suggestions
In most infrastructure environments, it could be difficult to integrate Kafka event security into an existing microservices service mesh security to perform sign-sign-on. When we look at the microservice API security, it will be LDAP/database basic authentication, digest authentication, API keys, cloud signatures, JWT token, OAuth 1.0/2.0, OpenId Connect, etc. None of them can be easily integrated with Kafka security mentioned above.
One thought to pass user identity (authorization information) between microservices in an asynchronous way is keeping Kafka security separate from the microservice service mesh. To use API Gateway to handle authentication and authorization, we can issue JWT tokens and use JWT for stateless API calls. We can then pass the JWT token via an asynchronous secure channel established by Kafka with SSL/SASL security settings, chaining the security token, and validating it by other microservices protected by the JWT.
With this approach, it shares the security token among the microservices and also decoupled event streaming system (Kafka) from the microservice APIs. If the system needs to replace Kafka to another event stream processing system, such as ActiveMQ or RabbitMQ, it just needs proper configuration changes and all security implementation and setup in service mesh will stay.
Spring Apache Kafka and Spring Could Stream
This sample will use Spring-Cloud-Stream and spring-cloud-stream-binder-kafka to publish and consume events. Spring-cloud-stream-binder-kafka is a little different from Spring for Apache Kafka. It’s more loose-decoupled from Kafka's event streaming framework. It is a binding framework to allow the piece of code to communicate with remote brokers via message channels.
It ports existing Kafka streams workload into a standalone could-native application and is able to coherent data pipeline using Spring Cloud Data flow. It also leverages the framework’s content-type conversation for inbound and outbound conversation, which saves a lot of code for data serialization and de- serialization. If we replace Kafka with RabbitMQ, the change will only be the configuration in the pom.xml.
Building a Prototype Sample With Spring Boot
In this sample, I will eliminate the SSL security setup on Kafka, which can be found easily in kafka.apache.org website. Let's start zookeeper and Kafka in separate terminal windows:
.\zookeeper-server-start.bat C:\working\software\kafka25\config\zookeeper.properties
.kafka-server-start.bat C:\working\software\kafka25\config\server.properties
As you can see, I have defined a topic in “TestTopic” for this sample:
.\kafka-topics.bat --list --zookeeper localhost:2181
And in the Spring Boot project application.properties file, it defined the listener for the message output for the publisher. For the message consumer, just replace the “output” to “input”.
spring.cloud.stream.bindings.output.destination=TestTopic
In the first Spring Boot project, we will set it up as a Kafka event producer by using Spring Cloud Stream, spring-cloud-stream-binder-kafka. Additionally, we'll provide a RESTful API protected by Spring Security with JWT. It has dependencies on Spring Web, Spring Security, Spring Cloud Stream + binder-Kafka, and JJWT packages for JWTs. I also used a Lombok library for data object mapping, which saves the amount of boilerplate Java code with annotations. The pom.xml dependencies are as below:
xxxxxxxxxx
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-api</artifactId>
<version>0.11.1</version>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-impl</artifactId>
<version>0.11.1</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.jsonwebtoken</groupId>
<artifactId>jjwt-jackson</artifactId>
<version>0.11.1</version>
<scope>runtime</scope>
</dependency>
In the first producer API sample, the microservice API/authenticate will issue the JWT to the client for further API access.
xxxxxxxxxx
value = "/authenticate", method = RequestMethod.POST) (
public ResponseEntity<?> createAuthenticationToken( AuthenticationRequest authenticationRequest) throws Exception {
try {
authenticationManager.authenticate(
new UsernamePasswordAuthenticationToken(authenticationRequest.getUsername(),
authenticationRequest.getPassword())
);
}
catch (BadCredentialsException e) {
throw new Exception("Incorrect username or password", e);
}
final UserDetails userDetails = userDetailsService
.loadUserByUsername(authenticationRequest.getUsername());
/* Use JWT Util to get JWT token outof UserDetails */
final String jwt = jwtTokenUtil.generateToken(userDetails);
/* Create authentication HTTP response with Token */
return ResponseEntity.ok(new AuthenticationResponse(jwt));
}
In the Spring security configuration file, it added the jwtRequestFilter
into the security filter chain to check any incoming stateless API invocations. The Kafka message/publish API is protected by JWT security and will need token in the header for sending message.
xxxxxxxxxx
protected void configure(HttpSecurity httpSecurity) throws Exception {
httpSecurity.csrf().disable()
.authorizeRequests().antMatchers("/authenticate").permitAll().
anyRequest().authenticated()
.and()
.exceptionHandling()
.and().sessionManagement()
.sessionCreationPolicy(SessionCreationPolicy.STATELESS);
httpSecurity.addFilterBefore(jwtRequestFilter, UsernamePasswordAuthenticationFilter.class);
}
And check the JWT during the API stateless access process:
xxxxxxxxxx
if (jwtUtil.validateToken(jwt, userDetails)) {
UsernamePasswordAuthenticationToken usernamePasswordAuthenticationToken = new UsernamePasswordAuthenticationToken(
userDetails, null, userDetails.getAuthorities());
usernamePasswordAuthenticationToken.setDetails(new WebAuthenticationDetailsSource().buildDetails(request));
SecurityContextHolder.getContext().setAuthentication(usernamePasswordAuthenticationToken);
}
chain.doFilter(request, response);
When launching Postman to invoke the producer API, the service will generate a JWT for stateless API access
xxxxxxxxxx
{
"jwt": "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJkZW1vIiwiaWF0IjoxNTg3NzA0MzY4LCJleHAiOjE1ODc3NDAzNjh9.HwnYOMa-IFmmEAHXjjSA_13gjJRRjj-lIq99UyVd9ac"
}
Once you've configured this JWT in the API/publish header, the HTTP POST call will send a message to the Kafka publisher and return an HTTP result 200. In the sample, we pass the JWT the header of the message payload into the Kafka message channel that's protected by SSL security.
xxxxxxxxxx
("/publish")
public CaseData publishEvent( CaseData casedata, HttpHeaders headers){
String jwtBearer = headers.getFirst("Authorization");
CasePayload casePayload = new CasePayload(jwtBearer==null?"jwt_Null":jwtBearer, casedata);
output.send(MessageBuilder.withPayload(casePayload).build());
return casedata;
}
On the consumer side, it received the message payload and extracted the JWT from the message header. With the JWT, the consumer will build up a stateless remote API call to the 3rd micro-service, which is also protected by the JWT security. By using RestTemplate.exchange
, it chained the security and passed the token over.
xxxxxxxxxx
CaseDocument caseDocument = null;
try {
String theUrl = "http://localhost:8093/cases/"+casePayload.getCasedata().getId();
HttpHeaders headers = createHttpJwtHeaders(casePayload.getJwt());
HttpEntity<String> entity = new HttpEntity<String>("parameters", headers);
ResponseEntity<CaseDocument> response = restTemplate.exchange(theUrl, HttpMethod.GET, entity, CaseDocument.class);
caseDocument = response.getBody();
}catch (Exception e) {
System.out.println(e.getMessage());
}
In the third microservice app, it checked the security via the Spring authentication filter. Once it passed, the service will return the requested document details to the second microservice’s API call.
xxxxxxxxxx
UsernamePasswordAuthenticationToken usernamePasswordAuthenticationToken = new UsernamePasswordAuthenticationToken(
userDetails, null, userDetails.getAuthorities());
usernamePasswordAuthenticationToken
.setDetails(new WebAuthenticationDetailsSource().buildDetails(request));
SecurityContextHolder.getContext().setAuthentication(usernamePasswordAuthenticationToken);
chain.doFilter(request, response);
The complete sample code can be found in Github.
Conclusion
Kafka will provide stateless security access via the token and set up trust with microservice APIs. However, by decoupling event stream products with the microservice, APIs will also provide a flexible approach for APIs service mesh to choose different message delivery products.
Security can be configured and setup within different zones for different purposes and chained together for business applications. Usually, we don’t recommend exposing the integration layer or event channels to front-end or exposed as APIs. However, as event stream processing pattern development, modern technologies could provide other easier ways to set up trust among the service mesh and event channels.
In the next step, we can try to use OAuth2.0/OpenID Connect with Kafka and integrate with multiple microservices and build up a security chain for a backend service mesh.
I hope you found this article helpful.
Opinions expressed by DZone contributors are their own.
Comments