Software Integration
Seamless communication — that, among other consequential advantages, is the ultimate goal when integrating your software. And today, integrating modern software means fusing various applications and/or systems — many times across distributed environments — with the common goal of unifying isolated data. This effort often signifies the transition of legacy applications to cloud-based systems and messaging infrastructure via microservices and REST APIs.So what's next? Where is the path to seamless communication and nuanced architecture taking us? Dive into our 2023 Software Integration Trend Report and fill the gaps among modern integration practices by exploring trends in APIs, microservices, and cloud-based systems and migrations. You have to integrate to innovate!
Distributed SQL Essentials
Advanced Cloud Security
The ReactAndGo project is used to compare a single page application frontend based on React and a Rest backend based on Go to Angular frontends and Spring Boot/Java backends. The goal of the project is to send out notifications to car drivers if the gas price falls below their target price. The gas prices are imported from a provider via MQTT messaging and stored in the database. For development, two test messages are provided that are sent to an Apache Artemis server to be processed in the project. The Apache Artemis server can be run as a Docker image, and the commands to download and run the image can be found in the 'docker-artemis.sh' file. As a database, Postgresql is used, and it can be run as a Docker image too. The commands can be found in the 'docker-postgres.sh' file. Architecture The system architecture looks like this: The React frontend uses the Rest interface that the Gin framework provides to communicate with the backend. The Apache Artemis Messaging Server is used in development to receive and send back the gas price test messages that are handled with the Paho-MQTT library. In production, the provider sends the MQTT messages. The Gorm framework is used to store the data in Postgresql. A push notification display is used to show the notification from the frontend if the target prices are reached. The open-source projects using Go have more of a domain-driven architecture that splits the code for each domain into packages. For the ReactAndGo project, the domain-driven architecture is combined with a layered architecture to structure the code. The common BaseController is needed to manage the routes and security of the application. The architecture is split between the gas station domain, the push notification domain, and the application user domain. The Rest request and response handling is in its own layer that includes the Rest client for the gas station import. The service layer contains the logic, database access, and other helper functions. Domain-independent functions like Cron Jobs, Jwt token handling, and message handling are implemented in separate packages that are in a utility role. Notifications From React Frontend to Go/Gin/Gorm Backend The ReactAndGo project is used to show how to display notifications with periodic requests to the backend and how to process rest requests in the backend in controllers and repositories. React Frontend In the front end, a dedicated worker is started after login that manages the notifications. The initWebWorker(...) function of the LoginModal.tsx starts the worker and handles the tokens: TypeScript-JSX const initWebWorker = async (userResponse: UserResponse) => { let result = null; if (!globalWebWorkerRefState) { const worker = new Worker(new URL('../webpush/dedicated-worker.js', import.meta.url)); if (!!worker) { worker.addEventListener('message', (event: MessageEvent) => { //console.log(event.data); if (!!event?.data?.Token && event?.data.Token?.length > 10) { setGlobalJwtToken(event.data.Token); } }); worker.postMessage({ jwtToken: userResponse.Token, newNotificationUrl: `/usernotification/new/${userResponse.Uuid}` } as MsgData); setGlobalWebWorkerRefState(worker); result = worker; } } else { globalWebWorkerRefState.postMessage({ jwtToken: userResponse.Token, newNotificationUrl: `/usernotification/new/${userResponse.Uuid}` } as MsgData); result = globalWebWorkerRefState; } return result; }; The React frontend uses the Recoil library for state management and checks if the globalWebWorkerRefState exists. If not, the worker in dedicated-worker.js gets created and the event listener for the Jwt tokens is created. The Jwt token is stored in a Recoil state to be used in all rest requests. Then the postMessage(...) method of the worker is called to start the requests for the notifications. Then the worker is stored in the globalWebWorkerRefState and the worker is returned. The worker is developed in the dedicated-worker.ts file. The worker is needed as .js file. To have the help of Typescript, the worker is developed in Typescript and then turned into Javascript in the Typescript Playground. That saves a lot of time for me. The refreshToken(...) function of the worker refreshes the Jwt tokens: TypeScript-JSX interface UserResponse { Token?: string Message?: string } let jwtToken = ''; let tokenIntervalRef: ReturnType<typeof setInterval>; const refreshToken = (myToken: string) => { if (!!tokenIntervalRef) { clearInterval(tokenIntervalRef); } jwtToken = myToken; if (!!jwtToken && jwtToken.length > 10) { tokenIntervalRef = setInterval(() => { const requestOptions = { method: 'GET', headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${jwtToken}` }, }; fetch('/appuser/refreshtoken', requestOptions).then(response => response.json() as UserResponse) .then(result => { if ((!result.Message && !!result.Token && result.Token.length > 10)) { //console.log('Token refreshed.'); jwtToken = result.Token; /* eslint-disable-next-line no-restricted-globals */ self.postMessage(result); } else { jwtToken = ''; clearInterval(tokenIntervalRef); } }); }, 45000); } } The refreshToken(...) function first checks if another token interval has been started and stops it. Then the token is assigned and checked. If it passes the check a new interval is started to refresh the token every 45 seconds. The requestOptions are created with the token in the Authorization header field. Then the new token is retrieved with fetch(...) , and the response is checked, the token is set, and it is posted to the EventListener in the LoginModal.tsx. If the Jwt token has not been received, the interval is stopped, and the jwtToken is set to an empty string. The Eventlistener of the worker receives the token message and processes it as follows: TypeScript-JSX interface MsgData { jwtToken: string; newNotificationUrl: string; } let notificationIntervalRef: ReturnType<typeof setInterval>; /* eslint-disable-next-line no-restricted-globals */ self.addEventListener('message', (event: MessageEvent) => { const msgData = event.data as MsgData; refreshToken(msgData.jwtToken); if (!!notificationIntervalRef) { clearInterval(notificationIntervalRef); } notificationIntervalRef = setInterval(() => { if (!jwtToken) { clearInterval(notificationIntervalRef); } const requestOptions = { method: 'GET', headers: { 'Content-Type': 'application/json', 'Authorization': `Bearer ${jwtToken}` }, }; /* eslint-disable-next-line no-restricted-globals */ self.fetch(msgData.newNotificationUrl, requestOptions).then(result => result.json()).then(resultJson => { if (!!resultJson && resultJson?.length > 0) { /* eslint-disable-next-line no-restricted-globals */ self.postMessage(resultJson); //Notification //console.log(Notification.permission); if (Notification.permission === 'granted') { if(resultJson?.length > 0 && resultJson[0]?.Message?.length > 1 && resultJson[0]?.Title?.length > 1) { for(let value of resultJson) { new Notification(value?.Title, {body: value?.Message}); } } } } }); }, 60000); }); The addEventListener(...) method handles the MessageEvent messages with the MsgData. The jwtToken of the MsgData is used to start the refreshToken(...) function. Then it is checked to see if a notification interval has been started, and if so, it is stopped. Then a new interval is created that checks for new target-matching gas prices every 60 seconds. The jwtToken is checked, and if the check fails, the interval is stopped. Then the requestOptions are created with the Jwt token in the Authorization header field. Then fetch(...) is used to retrieve the new matching gas price updates. Then the result JSON is checked and posted back to the EventListener in the LoginModal.tsx. With Notification.permission the user gets asked for permission to send notifications, and granted means he agreed. The data for the notification is checked, and the notification is sent with new Notification(...). Backend To handle the frontend requests, the Go backend uses the Gin framework. The Gin framework provides the needed functions to handle Rest requests, like a router, context (url related stuff), TLS support, and JSON handling. The route is defined in the basecontroller.go: Go func Start(embeddedFiles fs.FS) { router := gin.Default() ... router.GET("/usernotification/new/:useruuid", token.CheckToken, getNewUserNotifications) ... router.GET("/usernotification/current/:useruuid", token.CheckToken, getCurrentUserNotifications) router.StaticFS("/public", http.FS(embeddedFiles)) router.NoRoute(func(c *gin.Context) { c.Redirect(http.StatusTemporaryRedirect, "/public") }) absolutePathKeyFile := strings.TrimSpace(os.Getenv("ABSOLUTE_PATH_KEY_FILE")) absolutePathCertFile := strings.TrimSpace(os.Getenv("ABSOLUTE_PATH_CERT_FILE")) myPort := strings.TrimSpace(os.Getenv("PORT")) if len(absolutePathCertFile) < 2 || len(absolutePathKeyFile) < 2 || len(myPort) < 2 { router.Run() // listen and serve on 0.0.0.0:3000 } else { log.Fatal(router.RunTLS(":"+myPort, absolutePathCertFile, absolutePathKeyFile)) } } The Start function gets the embedded files for the /public directory with the static frontend files. The line: Go router.GET("/usernotification/new/:useruuid", token.CheckToken, getNewUserNotifications) Creates the route /usernotification/new/:useruuid with the useruuid as parameter. The CheckToken function in the token.go file handles the Jwt Token validation. The getNewUserNotifications function in the in the uncontroller.go handles the requests. The getNewUserNotifications(...) function: Go func getNewUserNotifications(c *gin.Context) { userUuid := c.Param("useruuid") myNotifications := notification.LoadNotifications(userUuid, true) c.JSON(http.StatusOK, mapToUnResponses(myNotifications)) } ... func mapToUnResponses(myNotifications []unmodel.UserNotification) []unbody.UnResponse { var unResponses []unbody.UnResponse for _, myNotification := range myNotifications { unResponse := unbody.UnResponse{ Timestamp: myNotification.Timestamp, UserUuid: myNotification.UserUuid, Title: myNotification.Title, Message: myNotification.Message, DataJson: myNotification.DataJson, } unResponses = append(unResponses, unResponse) } return unResponses } The getNewUserNotifications(…) function uses the Gin context to get the path parameter useruuid and then calls the LoadNotifications(…) function of the repository with it. The result is turned into UserNotifications with the mapToUnResponses(…) function, which sends only the data needed by the frontend. The Gin context is used to return the HTTP status OK and to marshal the UserNotifications to JSON. The function LoadNotifications(...) is in the unrepo.go file and loads the notifications from the database with the Gorm framework: Go func LoadNotifications(userUuid string, newNotifications bool) []unmodel.UserNotification { var userNotifications []unmodel.UserNotification if newNotifications { database.DB.Transaction(func(tx *gorm.DB) error { tx.Where("user_uuid = ? and notification_send = ?", userUuid, !newNotifications) .Order("timestamp desc").Find(&userNotifications) for _, userNotification := range userNotifications { userNotification.NotificationSend = true tx.Save(&userNotification) } return nil }) } else { database.DB.Transaction(func(tx *gorm.DB) error { tx.Where("user_uuid = ?", userUuid).Order("timestamp desc").Find(&userNotifications) var myUserNotifications []unmodel.UserNotification for index, userNotification := range userNotifications { if index < 10 { myUserNotifications = append(myUserNotifications, userNotification) continue } tx.Delete(&userNotification) } userNotifications = myUserNotifications return nil }) } return userNotifications } The LoadNotifications(...) function checks if only new notifications are requested. Then a database transaction is created, and the new UserNotifications (notification.go) of the user file are selected, ordered newest first. The send flag is set to true to mark them as no longer new, and the UserNotifications are saved to the database. The transaction is then closed, and the notifications are returned. If the current notifications are requested, a database transaction is opened, and the UserNotifications of the user are selected, ordered newest first. The first 10 notifications of the user are appended to the myUserNotification slice, and the others are deleted from the database. Then the transaction is closed, and the notifications are returned. Conclusion This is the first React frontend for me, and I share my experience developing this frontend. React is a much smaller library than the Angular Framework and needs more extra libraries like Recoil for state management. Features like interval are included in the Angular RxJs library. React has much fewer features and needs more additional libraries to achieve the same result. Angular is better for use cases where the frontend needs more than basic features. React has the advantage of simple frontends. A React frontend that grows to medium size will need more design and architecture work to be comparable to an Angular solution and might take more effort during development due to its less opinionated design. My impression is: React is the kitplane that you have to assemble yourself. Angular is the plane that is rolled out of the factory. The Go/Gin/Gorm backend works well. The Go language is much simpler than Java and makes reading it fast. Go can be learned in a relatively short amount of time and has strict types and a multi-threading concept that Project Loom tries to add to Java. The Gin framework offers the features needed to develop the controllers and can be compared to the Spring Boot framework in features and ease of development. The Gorm framework offers the features needed to develop the repositories for database access and management and can be compared to the Spring Boot framework in terms of features and ease of development. The selling point of Go is its lower memory consumption and fast startup because it compiles to a binary and does not need a virtual machine. Go and Java have garbage collection. Java can catch up with Project Graal on startup time, but the medium- to large-sized examples have to be available and analyzed first for memory consumption. A decision can be based on developer skills, the amount of memory saved, and the expected future of Project Graal.
Sometimes you need to control access to the data in your databases in a very granular way - much more granular than most databases allow. For instance, you might want some database users to be able to read only the last few digits of some credit card number, or you may need certain columns of certain rows to be readable by certain users only. Or maybe you need to hide some rows from some users under specific circumstances. The data still needs to be stored in the database, we just need to restrict who can see certain parts of that data. This is called data masking, and I've already talked about the two main approaches: static vs. dynamic data masking in a previous article. In this article, I'll show you how to roll your own dynamic data masking solution for Cassandra and Cassandra-compatible databases such as AWS Keyspaces, Azure Cosmos DB, and DataStax DSE, using a couple of off-the-shelf tools. What Cassandra Can Do on Its Own When it comes to hiding data, Cassandra provides table-level GRANT permissions, but nothing more fine-grained than that. Other Cassandra-compatible products, such as DataStax DSE, do provide some row- and column-level security, but even that has significant limitations. To narrow down how people access some tables, most relational databases offer the concept of views. Cassandra has materialized views, which are tables that are derived from other tables. Unfortunately, for materialized views, among many other restrictions, Cassandra requires that the columns must be the naked columns from the base table. This, and the other restrictions, means that materialized views are only tangentially useful for data masking, and cannot cover the vast majority of use cases. You might think you're stuck at this point. The fine folks in the Cassandra team are in fact working on a data masking solution, but that's still some ways away, and in any case, it will be limited. There is another option: using a programmable database proxy to shape the queries and the corresponding result sets. How Would a Proxy Help? The idea is simple: we introduce a programmable proxy between the database clients and the database server(s). We can then define some simple logic in the proxy, which will enforce our data masking policies as the network traffic goes back and forth (through the proxy) between clients and servers: Standing up a database proxy is easy: it's just a Docker container, and we can set up the database connection in just a few clicks. The database clients then connect to the proxy instead of directly to the database. Other than that, the clients and the server will have no idea that they are talking to each other through a proxy. Because the proxy works at the network level, no changes are required to either side, and this works with any Cassandra-compatible implementations such as AWS Keyspaces and Azure CosmosDB. Once the proxy is in place, we can define filters in the proxy to shape the traffic. For data masking, we have three possibilities: Reject invalid queries Rewrite invalid queries Modify result sets Let's take a look at each approach. Rejecting Queries Just because a database client sends you a request doesn't mean that you have to execute it. The proxy can look at a query and decide to reject it if it does not obey our access requirements. There are two main ways to do this: Limiting queries to a list of pre-approved queries, possibly depending on the user, the user's IP address, or any other factor Examining the query before it gets executed, and rejecting it if it does not satisfy certain criteria Query Control Enforcing a list of pre-approved queries is called query control, and I have covered that topic in a previous article. It's a simple idea: you record all the queries during a period of time (like during testing), then you only allow those queries after that. Any query that is not on the list gets rejected (or given an empty result set if we want to be more sneaky). This is a solid approach, but it only works for some use cases. For instance, this is not a viable solution if your queries are not 100% predictable. Still, it's a good tool to have in your toolbox. Vetting Queries A more subtle approach consists of examining the queries and determining whether they are acceptable or not. This is of course trickier - people can be very clever - but Cassandra's query language CQL is not as complex as typical SQL languages, making this a practical solution. For instance, we could decide that we don't want certain users to have access to the phones column in our customers table. In that case, we could simply reject any queries on that table that either specify the phones column, or that try to use the * operator to get all columns. This is easily done thanks to Gallium Data's CQL parser service, which can parse any CQL command and tell us exactly what that command does, and which tables/columns are involved. In the proxy, our filter will: Get the CQL from the query or prepared statement Send it to the parser service to analyze it If the CQL refers to the phones column, reject the request Otherwise let the query proceed to Cassandra See the hands-on tutorial for this article for all the details. Rewriting Queries A more flexible approach is to modify incoming queries so that they satisfy our criteria. For instance, let's say we still want to restrict access to the column phones in the table customers. Again, we can use the CQL parser service to determine whether an incoming query uses this column, or uses * to request all columns. If the query does use * to request all columns, the safest thing to do is to reject the query. It would be tempting to think that we can replace the asterisk with the names of the columns, but that is actually quite difficult to do correctly, as illustrated by this perfectly valid query: CQL SELECT /* all */ * FROM credit_card_txs If the query uses the phones column, we can replace it with something that will hide the data as we wish. Let's say we want to hide the phones column completely. You might think that you can rewrite the query from: CQL SELECT id, country, first_name, last_name, phones FROM customers to: CQL SELECT id, country, first_name, last_name, '****' as phones FROM customers That seems reasonable, but sadly, Cassandra does not support this: Shell InvalidRequest: Error from server: code=2200 [Invalid query] message="Cannot infer type for term '****' in selection clause (try using a cast to force a type)" Thankfully, there is a slightly ugly workaround: CQL SELECT id, country, first_name, last_name, blobAsText(textAsBlob('****')) as phones FROM customers We could do somewhat better using user-defined functions, if your Cassandra implementation supports them. We can thus easily create a filter in the proxy that will rewrite the query to mask the value of the phones column (see the hands-on tutorial linked previously for all the details). Let's test that: Shell cqlsh:demo> SELECT id, country, first_name, last_name, phones FROM customers; id | country | first_name | last_name | phones ----+---------+------------+---------------+--------- 23 | WF | Wanda | Williams | **** 5 | ES | Eric | Edmunds | **** 10 | JP | Juliet | Jefferson | **** 16 | PE | Patricia | Pérez | **** etc... If you need to hide only portions of a column, and your Cassandra implementation does not allow for user-defined functions, your only option is to modify result sets - let's look at that now. Modifying Result Sets For the ultimate in power and flexibility, we can modify result sets on their way back to the database clients: We can modify individual columns in specific rows. We can remove entire rows from result sets. We can also insert new rows in result sets, change the shape of result sets, etc., but that's beyond the scope of this article. Changing a column in a row is usually trivial with a few lines of code in a filter, e.g.: JavaScript let phones = context.row.phones; if (phones && phones.home) { phones.home.phone_number = "####"; } Let's try it out: Shell cqlsh:gallium_demo> SELECT id, country, last_name, phones FROM customers; id | country | last_name | phones ----+---------+---------------+----------------------------------------------------- 23 | WF | Williams | {'home': {country_code: 123, phone_number: '####'} 5 | ES | Edmunds | {'home': {country_code: 55, phone_number: '####'} 16 | PE | Pérez | {'home': {country_code: 116, phone_number: '####'} etc... Notice how much more precise this is: we're not blotting out the entire column, we're only hiding parts of it. Removing a row from a result set is also easy. It can be done either by setting parameters in the filter, or for more complex cases, in filter code, e.g.: JavaScript // Hide customers whose home phone number is in Japan let phones = context.row.phones; if (phones && phones.home && phones.home.country_code === 81) { context.row.remove(); } Again, you can see this in action in the hands-on tutorial for this article. Nothing has changed in the database: we're only affecting the data as it travels back to the Cassandra client. In Summary We've looked at three general techniques for hiding data in Cassandra with a proxy: Rejecting queries that try to access secret data Modifying queries so they do not show secret data Modifying result sets to hide secret data Rejecting queries is a blunt but effective tool. It might be sufficient for many use cases. Modifying queries has the advantage of performance: only one packet (the query) has to be modified, and the rest can work as usual. However, this technique can only work for some types of data masking requirements. Modifying result sets, on the other hand, is slightly more expensive, but it gives you complete control: you can change literally anything in the result set, no matter how fine-grained the required changes are. These techniques are not mutually exclusive: many solutions will involve a combination, perhaps in conjunction with other approaches such as fine-grained security (if available) or the data masking solution that will someday be available in Cassandra. But for complete power and flexibility, you can't beat a programmable database proxy.
Show of hands, how many of us truly understand how your build automation tool builds its dependency tree? Now, lower your hand if you understand because you work on building automation tools. Thought so! One frustrating responsibility of software engineers is understanding your project's dependencies: what transitive dependencies were brought in and by whom; why v1.3.1 is used when v1.2.10 was declared; what resulted when the transitive dependencies changed; how did multiple versions of the same artifact occur? Every software engineer has piped a dependency tree into a text file, searched for specific artifacts, and then worked their way back up to identify its origin. For anything other than trivial projects, creating a mental map of the dependencies is extremely difficult, if not impossible. I faced this problem when starting a new job with a mature code base, presenting a challenge to assemble the puzzle pieces. I've previously worked with graph databases and thought a graphical view of the dependency artifacts could be created using Neo4J, which resulted in DependencyLoader. Note: this is not a tutorial on graph databases, nor does this tutorial require a background in graph databases. If interested, Neo4J has tutorials and white papers to help you get started. Set Up Environment Install Java Java 11 or later is required. If not already available, install your favorite OpenJDK flavor. Install Neo4J The tutorial requires a Neo4J database into which the dependency information is loaded, preferably unshared, as the loader purges the database before each run. You have been warned! Neo4J provides personal sandboxes, ideal for short-term projects like this tutorial. Alternatively, install Neo4J locally on your desktop or laptop. Homebrew simplifies MacOS installations: Shell brew install neo4j && brew services start neo4j Before continuing, confirm access to your Neo4J database using the browser, using either the link and credentials for the Neo4J sandbox or locally at http://localhost:7474. The default credentials for a local install is neo4j/neo4j; upon successful login, you are forced to change the password. Clone Repositories The neo4j-gradle-dependencies repository contains the for loading the dependencies into Neo4J. This tutorial will generate a dependency graph for spring-boot. You must clone these two repositories. Shell Scott.Sosna@mymachine src% git clone git@github.com:scsosna99/neo4j-gradle-dependencies.git Scott.Sosna@mymachine src% git clone git@github.com:spring-projects/spring-boot.git Note: local Gradle is not required as both repositories use the Gradle Wrapper, which downloads all necessary components the first time the wrapper is used. Generate Dependencies DependencyLoader takes the dependency tree generated by Gradle as input. Though multiple configurations may be loaded together — i.e., compileClasspath, runtimeClasspath, testCompileClasspath, testRuntimeClasspath — starting with a single configuration is simpler to navigate, especially for a tutorial. To generate dependencies for all configurations: gradle dependencies ./gradlew dependencies To generate dependencies for a single configuration gradle dependencies --configuration <configuration> ./gradlew dependencies --configuration <configuration> Generate Spring Boot Dependencies This tutorial creates a dependency graph in Neo4J using the compileClasspath dependencies of Spring Boot. From the directory where the repositories were cloned, execute the following commands: Shell Scott.Sosna@mymachine src% cd spring-boot/spring-boot-project/spring-boot Scott.Sosna@mymachine spring-boot% ./gradlew dependencies --configuration compileClasspath > dependencies.out The file dependencies.out contains the compile-time classpath dependencies for Spring Boot. Load Dependencies First, confirm the connection URL and authentication credentials in DependencyLoader.java and modify them if necessary. Execute the following commands to load the Spring Boot dependencies into Neo4j: Shell Scott.Sosna@mymachine spring-boot% cd ../../../neo4j-gradle-dependencies Scott.Sosna@mymachine neo4j-gradle-dependencies% ./gradlew clean run --args="../spring-boot/spring-boot-project/spring-boot/dependencies.out" When successful, the output lines from gradle are: Shell Scott.Sosna@PVHY32M6KG neo4j-gradle-dependencies % ./gradlew clean run --args="../spring-boot/spring-boot-project/spring-boot/dependencies.out" > Task :compileJava Note: /Users/Scott.Sosna/data/src/github/neo4j-gradle-dependencies/src/main/java/dev/scottsosna/neo4j/gradle/relationship/DependsOn.java uses unchecked or unsafe operations. Note: Recompile with -Xlint:unchecked for details. > Task :run SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder". SLF4J: Defaulting to no-operation (NOP) logger implementation SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details. Jun 02, 2023 6:19:22 AM org.neo4j.driver.internal.logging.JULogger info INFO: Direct driver instance 1606286799 created for server address localhost:7687 dependencies.out completed. Jun 02, 2023 6:19:23 AM org.neo4j.driver.internal.logging.JULogger info INFO: Closing driver instance 1606286799 Jun 02, 2023 6:19:23 AM org.neo4j.driver.internal.logging.JULogger info INFO: Closing connection pool towards localhost:7687 Deprecated Gradle features were used in this build, making it incompatible with Gradle 8.0. You can use '--warning-mode all' to show the individual deprecation warnings and determine if they come from your own scripts or plugins. See https://docs.gradle.org/7.5.1/userguide/command_line_interface.html#sec:command_line_warnings BUILD SUCCESSFUL in 3s View Dependencies Multiple tools are available for displaying Neo4J graphs, but the built-in browser tool is adequate for this tutorial. Show the Complete Tree The query MATCH(a) RETURN a is the relational-equivalent of SELECT * FROM <table> View Details of an Artifact Each artifact found creates a node whose properties identify the artifact (groupId/artifactId) and its type, shown on the right-side pane. View Details of a Dependency Each dependency is created as a relationship whose properties identify the specifics of the dependency: configuration, specified/version, and configuration. The dependency selected below shows spring-security:spring-web depends on io.micormeter:micrometer-observation, but the spring-web specified version 1.10.7 was resolved as version 1.11.0. Traverse Dependencies Neo4J allows you to explore the graph node-by-node, allowing you to manually expand the graph node by node, providing a way to explore specific areas of the dependency tree. Assume that you want to understand the dependencies for the artifact io.projectreactor.netty:reactor-netty-http. First, we'll query Neo4J for that specific node. Cypher MATCH(a:Artifact {groupId: 'io.projectreactor.netty', artifactId: 'reactor-netty-http'}) RETURN a Double-clicking on the node shows its neighboring nodes — the artifact(s) depended on it and the artifact(s) it depends on. This expanded graph shows one artifact that is dependent on it — the root of the project with an artifact type PROJECT and six other dependencies on which it's dependent. Next, double-click on io.netty:netty-code https://github.com/netty/netty/tree/4.1/codec-httpc-http to show the next level of dependencies. Note that besides the relationships (dependencies) of the selected node, additional relationships for nodes already on the graph may be shown. Identify Version Mismatch Gradle's dependency output indicates where the specified version was not the version resolved by Gradle. The properties on the dependency (relationship) can be used in a Neo4J query, restricting the relationships shown and the attached artifacts (nodes). Cypher MATCH (a:Artifact)-[d:DEPENDS_ON]->(b:Artifact) WHERE d.specifiedVersion<>d.resolvedVersion RETURN a,b,d Neo4J can return results in a tabular format for easier review, if necessary. Cypher MATCH (a:Artifact)-[d:DEPENDS_ON]->(b:Artifact) WHERE d.specifiedVersion<>d.resolvedVersion RETURNa.name AS source, b.name AS dependency, d.specifiedVersion AS specified, d.resolvedVersion AS resolved Additional Information mappings.out The mappings.out file allows you to customize the artifact type assigned to a node based on artifacts groupId, most commonly to specifically identify artifacts created by your organization. Input Directory The command line argument for DependencyLoader may be a directory containing multiple Gradle dependency trees loaded into the same Neo4J database. This helps in understanding the dependencies of related projects with separate build.gradle files. Constrained and Omitted Gradle identifies certain dependencies as Constrained and Omitted. Currently, those are not loaded but would be easy to include, likely by creating additional properties for the relationships.
When I was learning about writing serverless functions with AWS Lambda and Java, I came across the concept of structured logging. This made me curious about the concept of Structured Logs, so I decided to explore it further. What Is Structured Logging? Typically, any logs generated by an application would be plain text that is formatted in some way. For example, here is a typical log format from a Java application: [Sun Apr 02 09:29:16 GMT] book.api.WeatherEventLambda INFO: [locationName: London, UK temperature: 22 action: record timestamp: 1564428928] While this log is formatted, it is not structured. We can see that it is formatted with the following components: Timestamp (when it occurred) Fully Qualified Class Name (from where it occurred) Logging Level (the type of event) Message (this is the part that is typically non-standardized and therefore benefits the most from having some structure, as we will see) A structured log does not use a plain-text format but instead uses a more formal structure such as XML or, more commonly, JSON. The log shown previously would like like this if it were structured: Note that the message part of the log is what you would typically be interested in. However, there is a whole bunch of meta-data surrounding the message, which may or may not be useful in the context of what you are trying to do. Depending on the logging framework that you are using, you can customize the meta-data that is shown. The example shown above was generated from an AWS Lambda function (written in Java) via the Log4J2 logging framework. The configuration looks like this: XML <?xml version="1.0" encoding="UTF-8"?> <Configuration packages="com.amazonaws.services.lambda.runtime.log4j2"> <Appenders> <Lambda name="Lambda"> <JsonLayout compact="true" eventEol="true" objectMessageAsJsonObject="true" properties="true"/> </Lambda> </Appenders> <Loggers> <Root level="info"> <AppenderRef ref="Lambda"/> </Root> </Loggers> </Configuration> The JsonLayout tag is what tells the logger to use a structured format (i.e.) JSON in this case. Note that we are using it as an appender for INFO-level logs, which means logs at other levels, such as ERROR or DEBUG, will not be structured. This sort of flexibility, in my opinion, is beneficial as you may not want to structure all of your logs but only the parts that you think need to be involved in monitoring or analytics. Here is a snippet from the AWS Lambda function that generates the log. It reads a weather event, populates a Map with the values to be logged, and passes that Map into the logger. Java final WeatherEvent weatherEvent = objectMapper.readValue(request.getBody(), WeatherEvent.class); HashMap<Object, Object> message = new HashMap<>(); message.put("action", "record"); message.put("locationName", weatherEvent.locationName); message.put("temperature", weatherEvent.temperature); message.put("timestamp", weatherEvent.timestamp); logger.info(new ObjectMessage(message)); There are different ways of achieving this. You could write your own class that implements an interface from Log4J2 and then populate the fields of an instance of this class and pass this instance to the logger. So, What Is the Point of All This? Why would you want to structure your logs? To answer this question, consider you had a pile of logs (as in, actual logs of wood): If I were to say to you, "Inspect the log on top of the bottom left one," you would have to take a guess as to which one I am referring to. Now consider that these logs were structured into a cabin. Now, if I were to say to you, "Inspect the logs that make up the front door," then you know exactly where to look. This is why structure is good. It makes it easier to find things. Querying Your Logs Structured Logs can be indexed efficiently by monitoring tools such as AWS CloudWatch, Kibana, and Splunk. What this means is that it becomes much easier to find the logs that you want. These tools offer sophisticated ways of querying your logs, making it easier to do troubleshooting or perform analytics. For example, this screenshot shows how, in AWS CloudWatch Insights, you would search for logs where a weather event from Oxford occurred. We are referring to the locationName property under the message component of the log. You can do much more sophisticated queries with filtering and sorting. For example, you could say, "Show me the top 10 weather events where the temperature was greater than 20 degrees" (a rare occurrence in the UK). Triggering Events From Logs Another benefit of being able to query your logs is that you can start measuring things. These measurements (called metrics in AWS CloudWatch parlance) can then be used to trigger events such as sending out a notification. In AWS, this would be achieved by creating a metric that represents what you want to measure and then setting up a CloudWatch Alarm based on a condition on that metric and using the alarm to trigger a notification to, for instance, SNS. For example, if you wanted to send out an email whenever the temperature went over 20 degrees in London, you can create a metric for the average temperature reading from London over a period of, say, 5 hours and then create an alarm that would activate when this metric goes above 20 degrees. This alarm can then be used to trigger a notification to an SNS topic. Subscribers to the SNS topic would then be notified so that they know not to wear warm clothes. Is There a Downside To Structured Logs? The decision as to whether to use Structured Logs should be driven by the overall monitoring and analytics strategy that you envision for the system. If you have, for example, a serverless application that is part of a wider system that ties into other services, it makes sense to centralize the logs from these various services so that you have a unified view of the system. In this scenario, having your logs structured will greatly aid monitoring and analytics. If, on the other hand, you have a very simple application that just serves data from a single data source and doesn't link to other services, you may not need to structure your logs. Let's not forget the old adage: Keep it Simple, Stupid. So, to answer the question "Is there a Downside to Structured Logs?" - only if you use it where you don't need to. You don't want to spend time on additional configuration and having to think about structure when having simple logs would work just fine. Conclusion Structured Logging not only aids you in analyzing your logs more efficiently but it also aids you in building better monitoring capabilities in your system. In addition to this, business analytics can be enhanced through relevant queries and setting up metrics and notifications that can signal trends in the system. In short, Structured Logging is not just about logging. It is a tool that drives architectural patterns that enhance both monitoring and analytics.
The article will cover the following topics: Why is Envoy proxy required? Introducing Envoy proxy Envoy proxy architecture with Istio Envoy proxy features Use cases of Envoy proxy Benefits of Envoy proxy Demo video - Deploying Envoy in K8s and configuring as a load balancer Why Is Envoy Proxy Required? Challenges are plenty for organizations moving their applications from monolithic to microservices architecture. Managing and monitoring the sheer number of distributed services across Kubernetes and the public cloud often exhausts app developers, cloud teams, and SREs. Below are some of the major network-level operational hassles of microservices, which shows why Envoy proxy is required. Lack of Secure Network Connection Kubernetes is not inherently secure because services are allowed to talk to each other freely. It poses a great threat to the infrastructure since an attacker who gains access to a pod can move laterally across the network and compromise other services. This can be a huge problem for security teams, as it is harder to ensure the safety and integrity of sensitive data. Also, the traditional perimeter-based firewall approach and intrusion detection systems will not help in such cases. Complying With Security Policies Is a Huge Challenge There is no developer on earth who would enjoy writing security logic to ensure authentication and authorization, instead of brainstorming business problems. However, organizations who want to adhere to policies such as HIPAA or GDPR, ask their developers to write security logic such as mTLS encryption in their applications. Such cases in enterprises will lead to two consequences: frustrated developers, and security policies being implemented locally and in silos. Lack of Visibility Due to Complex Network Topology Typically, microservices are distributed across multiple Kubernetes clusters and cloud providers. Communication between these services within and across cluster boundaries will contribute to a complex network topology in no time. As a result, it becomes hard for Ops teams and SREs to have visibility over the network, which impedes their ability to identify and resolve network issues in a timely manner. This will lead to frequent application downtime and compromised SLA. Complicated Service Discovery Services are often created and destroyed in a dynamic microservices environment. Static configurations provided by old-generation proxies are ineffective in keeping track of services in such an environment. This makes it difficult for application engineers to configure communication logic between services because they have to manually update the configuration file whenever a new service is deployed or deleted. It leads to application developers spending more of their time configuring the networking logic rather than coding the business logic. Inefficient Load Balancing and Traffic Routing It is crucial for platform architects and cloud engineers to ensure effective traffic routing and load balancing between services. However, it is a time-consuming and error-prone process for them to manually configure routing rules and load balancing policies for each service, especially when they have a fleet of them. Also, traditional load balancers with simple algorithms would result in inefficient resource utilization and suboptimal load balancing in the case of microservices. All these lead to increased latency, and service unavailability due to improper traffic routing. With the rise in the adoption of microservices architecture, there was a need for a fast, intelligent proxy that can handle the complex service-to-service connection across the cloud. Introducing Envoy Proxy Envoy is an open-source edge and service proxy, originally developed by Lyft to facilitate their migration from a monolith to cloud-native microservices architecture. It also serves as a communication bus for microservices (refer to Figure 1 below) across the cloud, enabling them to communicate with each other in a rapid, secure, and efficient manner. Envoy proxy abstracts network and security from the application layer to an infrastructure layer. This helps application developers simplify developing cloud-native applications by saving hours spent on configuring network and security logic. Envoy proxy provides advanced load balancing and traffic routing capabilities that are critical to run large, complex distributed applications. Also, the modular architecture of Envoy helps cloud and platform engineers to customize and extend its capabilities. Figure 1: Envoy proxy intercepting traffic between services Envoy Proxy Architecture With Istio Envoy proxies are deployed as sidecar containers alongside application containers. The sidecar proxy then intercepts and takes care of the service-to-service connection (refer to Figure 2 below) and provides a variety of features. This network of proxies is called a data plane, and it is configured and monitored from a control plane provided by Istio. These two components together form the Istio service mesh architecture, which provides a powerful and flexible infrastructure layer for managing and securing microservices. Figure 2: Istio sidecar architecture with Envoy proxy data plane Envoy Proxy Features Envoy proxy offers the following features at a high level. (Visit Envoy docs for more information on the features listed below.) Out-of-process architecture: Envoy proxy runs independently as a separate process apart from the application process. It can be deployed as a sidecar proxy and also as a gateway without requiring any changes to the application. Envoy is also compatible with any application language like Java or C++, which provides greater flexibility for application developers. L3/L4 and L7 filter architecture: Envoy supports filters and allows customizing traffic at the network layer (L3/L4) and at the application layer (L7). This allows for more control over the network traffic and offers granular traffic management capabilities such as TLS client certificate authentication, buffering, rate limiting, and routing/forwarding. HTTP/2 and HTTP/3 support: Envoy supports HTTP/1.1, HTTP/2, and HTTP/3 (currently in alpha) protocols. This enables seamless communication between clients and target servers using different versions of HTTP. HTTP L7 routing: Envoy's HTTP L7 routing subsystem can route and redirect requests based on various criteria, such as path, authority, and content type. This feature is useful for building front/edge proxies and service-to-service meshes. gRPC support: Envoy supports gRPC, a Google RPC framework that uses HTTP/2 or above as its underlying transport. Envoy can act as a routing and load-balancing substrate for gRPC requests and responses. Service discovery and dynamic configuration: Envoy supports service discovery and dynamic configuration through a layered set of APIs that provide dynamic updates about backend hosts, clusters, routing, listening sockets, and cryptographic material. This allows for centralized management and simpler deployment, with options for DNS resolution or static config files. Health checking: For building an Envoy mesh, service discovery is treated as an eventually consistent process. Envoy has a health-checking subsystem that can perform active and passive health checks to determine healthy load-balancing targets. Advanced load balancing: Envoy's self-contained proxy architecture allows it to implement advanced load-balancing techniques, such as automatic retries, circuit breaking, request shadowing, and outlier detection, in one place, accessible to any application. Front/edge proxy support: Using the same software at the edge provides benefits such as observability, management, and identical service discovery and load-balancing algorithms. Envoy's feature set makes it well-suited as an edge proxy for most modern web application use cases, including TLS termination, support for multiple HTTP versions, and HTTP L7 routing. Best-in-class observability: Envoy provides robust statistics support for all subsystems and supports distributed tracing via third-party providers, making it easier for SREs and Ops teams to monitor and debug problems occurring at both the network and application levels. Given its powerful set of features, Envoy proxy has become a popular choice for organizations to manage and secure multicloud and multicluster apps. In practice, it has two main use cases. Use Cases of Envoy Proxy Envoy proxy can be used as both a sidecar service proxy and a gateway. Envoy Sidecar Proxy As we have seen in the Isito architecture, Envoy proxy constitutes the data plane and manages the traffic flow between services deployed in the mesh. The sidecar proxy provides features such as service discovery, load balancing, traffic routing, etc., and offers visibility and security to the network of microservices. Envoy Gateway as API Envoy proxy can be deployed as an API Gateway and as an ingress (please refer to the Envoy Gateway project). Envoy Gateway is deployed at the edge of the cluster to manage external traffic flowing into the cluster and between multicloud applications (north-south traffic). Envoy Gateway helped application developers who were toiling to configure Envoy proxy (Istio-native) as API and ingress controller, instead of purchasing a third-party solution like NGINX. With its implementation, they have a central location to configure and manage ingress and egress traffic and apply security policies such as authentication and access control. Below is a diagram of Envoy Gateway architecture and its components. Envoy Gateway architecture (Source) Benefits of Envoy Proxy Envoy’s ability to abstract network and security layers offers several benefits for IT teams such as developers, SREs, cloud engineers, and platform teams. Following are a few of them. Effective Network Abstraction The out-of-process architecture of Envoy helps it to abstract the network layer from the application to its own infrastructure layer. This allows for faster deployment for application developers, while also providing a central plane to manage communication between services. Fine-Grained Traffic Management With its support for the network (L3/L4) and application (L7) layers, Envoy provides flexible and granular traffic routing, such as traffic splitting, retry policies, and load balancing. Ensure Zero Trust Security at L4/L7 Layers Envoy proxy helps to implement authentication among services inside a cluster with stronger identity verification mechanisms like mTLS and JWT. You can achieve authorization at the L7 layer with Envoy proxy easily and ensure zero trust. (You can implement AuthN/Z policies with Istio service mesh — the control plane for Envoy.) Control East-West and North-South Traffic for Multicloud Apps Since enterprises deploy their applications into multiple clouds, it is important to understand and control the traffic or communication in and out of the data centers. Since Envoy proxy can be used as a sidecar and also an API gateway, it can help manage east-west traffic and also north-south traffic, respectively. Monitor Traffic and Ensure Optimum Platform Performance Envoy aims to make the network understandable by emitting statistics, which are divided into three categories: downstream statistics for incoming requests, upstream statistics for outgoing requests, and server statistics for describing the Envoy server instance. Envoy also provides logs and metrics that provide insights into traffic flow between services, which is also helpful for SREs and Ops teams to quickly detect and resolve any performance issues. Video: Get Started With Envoy Proxy Deploying Envoy in k8s and Configuring as Load Balancer The below video discusses different deployment types and their use cases, and it shows a demo of Envoy deployment into Kubernetes and how to set it as a load balancer (edge proxy).
Cucumber is a well-known Behavior-Driven Development (BDD) framework that allows developers to implement end-to-end testing. The combination of Selenium with Cucumber provides a powerful framework that will enable you to create functional tests in an easy way. It allows you to express acceptance criteria in language that business people can read and understand, along with the steps to take to verify that they are met. The Cucumber tests are then run through a browser-like interface that allows you to see what's happening in your test at each step. This Cucumber Selenium tutorial will walk you through the basics of writing test cases with Cucumber in Selenium WebDriver. If you are not familiar with Cucumber, this Cucumber BDD tutorial will also give you an introduction to its domain-specific language (DSL) and guide you through writing your first step definitions, setting up Cucumber with Selenium WebDriver, and automating web applications using Cucumber, Selenium, and TestNG framework. Why Use Cucumber.js for Selenium Automation Testing? Cucumber.js is a tool that is often used in conjunction with Selenium for automating acceptance tests. It allows you to write tests in a natural language syntax called Gherkin, which makes it easy for non-technical team members to understand and write tests. Cucumber with Selenium is one the easiest-to-use combinations. Here are a few benefits of using Selenium Cucumber for automation testing: Improved collaboration: Since Cucumber.js tests are written in a natural language syntax, they can be easily understood by all members of the development team, including non-technical stakeholders. This can improve collaboration between different team members and ensure that everyone is on the same page. Easier to maintain: Cucumber.js tests are organized into features and scenarios, which makes it easy to understand the structure of the tests and locate specific tests when you need to make changes. Better documentation: The natural language syntax of Cucumber.js tests makes them a good source of documentation for the functionality of your application. This can be especially useful for teams that follow a behavior-driven development (BDD) approach. Greater flexibility: Cucumber.js allows you to write tests for a wide variety of applications, including web, mobile, and desktop applications. It also supports multiple programming languages, so you can use it with the language that your team is most familiar with. To sum it up, Cucumber with Selenium is a useful tool for automating acceptance tests, as it allows you to write tests in a natural language syntax that is easy to understand and maintain and provides greater flexibility and improved collaboration. Configure Cucumber Setup in Eclipse and IntelliJ [Tutorial] With the adoption of Agile methodology, a variety of stakeholders, such as Quality Assurance professionals, technical managers, and program managers, including those without technical backgrounds, have come together to work together to improve the product. This is where the need to implement Behavior Driven Development (BDD) arises. Cucumber is a popular BDD tool that is used for automated testing. In this section, we will explain how to set up Cucumber in Eclipse and IntelliJ for automated browser testing. How To Configure Cucumber in Eclipse In this BDD Cucumber tutorial, we will look at the instructions on how to add necessary JARs to a Cucumber Selenium Java project in order to set up Cucumber in Eclipse. It is similar to setting up the TestNG framework and is useful for those who are just starting with Selenium automation testing. Open Eclipse by double-clicking, then select workspace and change it anytime with the Browse button. Click Launch and close the Welcome window; not needed for the Cucumber Selenium setup. To create a new project, go to File > New > Java Project. Provide information or do what is requested, then enter a name for your project and click the Finish button. Right-click on the project, go to Build Path > Configure Build Path. Click on the button labeled "Add External JARs." Locate Cucumber JARs and click Open. Add required Selenium JARs for Cucumber setup in Eclipse. They will appear under the Libraries tab. To import the necessary JARs for the Cucumber setup in Eclipse, click 'Apply and Close.' The imported JARs will be displayed in the 'Referenced Libraries' tab of the project. How To Install Cucumber in IntelliJ In this part of the IntelliJ Cucumber Selenium tutorial, we will demonstrate how to set up Cucumber with IntelliJ, a widely used Integrated Development Environment (IDE) for Selenium Cucumber Java development. Here are the steps for configuring Cucumber in IntelliJ. To create a new project in IntelliJ IDEA, open the IntelliJ IDE and go to File > New > Project. To create a new Java project in IntelliJ IDEA, select Java and click Next. To complete the process of creating a new project in IntelliJ IDEA, name your project and click Finish. After creating a new project in IntelliJ IDEA, you will be prompted to choose whether to open the project in the current window or a new one. You can select 'This Window.' After creating a new project in IntelliJ IDEA, it will be displayed in the project explorer. To import the necessary JARs for Selenium and Cucumber in IntelliJ IDEA, go to File > Project Structure > Modules, similar to how it was done in Eclipse. To add dependencies to your project, click the '+' sign at the bottom of the window and select the necessary JARs or directories. To add Selenium to Cucumber, add the selenium-java and selenium dependencies JARs to the project. Click Apply and then OK. Similar to how Cucumber Selenium is set up in Eclipse, the imported JARs will be located in the "External Libraries" section once they have been added. For a step-by-step guide on using Cucumber for automated testing, be sure to check out our Cucumber testing tutorial on Cucumber in Eclipse and IntelliJ. This article provides detailed instructions and tips for setting up Cucumber in these two popular Java IDEs. Cucumber.js Tutorial With Examples for Selenium JavaScript Using a BDD framework has its own advantages, ones that can help you take your Selenium test automation a long way. Not to sideline, these BDD frameworks help all of your stakeholders to easily interpret the logic behind your test automation script. Leveraging Cucumber.js for your Selenium JavaScript testing can help you specify an acceptance criterion that would be easy for any non-programmer to understand. It could also help you quickly evaluate the logic implied in your Selenium test automation suite without going through huge chunks of code. Cucumber.js, a Behaviour Driven Development framework, has made it easier to understand tests by using a given-when-then structure. For example- Imagine you have $1000 in your bank account. You go to the ATM machine and ask for $200. If the machine is working properly, it will give you $200, and your bank account balance will be $800. The machine will also give your card back to you. How To Use Annotations in Cucumber Framework in Selenium [Tutorial] The Cucumber Selenium framework is popular because it uses natural language specifications to define test cases. It allows developers to write test scenarios in plain language, which makes it easier for non-technical stakeholders to understand and review. In Cucumber, test scenarios are written in a feature file using annotations, which are keywords that mark the steps in the scenario. The feature file is then translated into code using step definition files, which contain methods that correspond to each step in the feature file. Cucumber Selenium annotations are used to mark the steps in the feature file and map them to the corresponding methods in the step definition file. There are three main types of annotations in Cucumber: Given, When, and Then. Given annotations are used to set up the initial state of the system under test. When annotations are used to describe the actions that are being tested, and Then annotations are used to verify that the system is in the expected state after the actions have been performed. Cucumber Selenium also provides a number of "hooks," which are methods that are executed before or after certain events in the test execution process. For example, a "before" hook might be used to set up the test environment, while an "after" hook might be used to clean up resources or take screenshots after the test has been completed. There are several types of hooks available in Cucumber, including "before" and "after" hooks, as well as "beforeStep" and "afterStep" hooks, which are executed before and after each step in the test scenario. In this tutorial, we will go over the different types of annotations and hooks that are available in Cucumber and how to use them to write effective test scenarios. We will also look at some best practices for organizing and maintaining your Cucumber Selenium tests. How To Perform Automation Testing With Cucumber and Nightwatch JS To maximize the advantages of test automation, it is important to choose the right test automation framework and strategy so that the quality of the project is not compromised while speeding up the testing cycle, detecting bugs early, and quickly handling repetitive tasks. Automation testing is a crucial aspect of software development, allowing developers to quickly and efficiently validate the functionality and performance of their applications. Cucumber is a testing framework that uses natural language specifications to define test cases, and Nightwatch.js is a tool for automating tests for web applications. Behavior Driven Development (BDD) is a technique that clarifies the behavior of a feature using simple, user-friendly language. This approach makes it easy for anyone to understand the requirements, even those without technical expertise. DSL is also used to create automated test scripts. Cucumber allows users to write scenarios in plain text using Gherkin syntax. This syntax is made up of various keywords, including Feature, Scenario, Given, When, Then, and And. The feature is used to describe the high-level functionality, while Scenario is a collection of steps for Cucumber to execute. Each step is composed of keywords such as Given, When, Then, and And, all of which serve a specific purpose. A Gherkin document is stored in a file with a .feature extension. Automation Testing With Selenium, Cucumber, and TestNG Automation testing with Selenium, Cucumber, and TestNG is a powerful combination for testing web applications. Selenium is an open-source tool for automating web browsers, and it can be used to automate a wide range of tasks in a web application. Cucumber is a tool for writing and executing acceptance tests, and it can be used to define the expected behavior of a web application in a simple, human-readable language. TestNG is a testing framework for Java that can be used to organize and run Selenium tests. One of the benefits of using Selenium, Cucumber, and TestNG together is that they can be integrated into a continuous integration and delivery (CI/CD) pipeline. This allows developers to automatically run tests as part of their development process, ensuring that any changes to the codebase do not break existing functionality. To use Selenium, Cucumber, and TestNG together, you will need to install the necessary software and dependencies. This typically involves installing Java, Selenium, Cucumber, and TestNG. You will also need to set up a project in your development environment and configure it to use Selenium, Cucumber, and TestNG. Once you have set up your project, you can begin writing tests using Selenium, Cucumber, and TestNG. This involves creating test cases using Cucumber's Given-When-Then syntax and implementing those test cases using Selenium and TestNG. You can then run your tests using TestNG and analyze the results to identify any issues with the application. To sum it up, automation testing with Selenium, Cucumber, and TestNG can be a powerful tool for testing web applications. By integrating these tools into your CI/CD pipeline, you can ensure that your application is tested thoroughly and continuously, helping you to deliver high-quality software to your users. How To Integrate Cucumber With Jenkins Cucumber is a tool for writing and executing acceptance tests, and it is often used in conjunction with continuous integration (CI) tools like Jenkins to automate the testing process. By integrating Cucumber with Jenkins, developers can automatically run acceptance tests as part of their CI pipeline, ensuring that any changes to the codebase do not break existing functionality. To use Cucumber Selenium with Jenkins, you will need to install the necessary software and dependencies. This typically involves installing Java, Cucumber, and Jenkins. You will also need to set up a project in your development environment and configure it to use Cucumber. Once you have set up your project and configured Cucumber, you can begin writing acceptance tests using Cucumber's Given-When-Then syntax. These tests should define the expected behavior of your application in a simple, human-readable language. To run your Cucumber Selenium tests with Jenkins, you will need to set up a Jenkins job and configure it to run your tests. This typically involves specifying the location of your Cucumber test files and any necessary runtime parameters. You can then run your tests by triggering the Jenkins job and analyzing the results to identify any issues with the application. In conclusion, integrating Cucumber with Jenkins can be a powerful way to automate the acceptance testing process. By running your tests automatically as part of your CI pipeline, you can ensure that your application is tested thoroughly and continuously, helping you to deliver high-quality software to your users. Top Five Cucumber Best Practices for Selenium Automation Cucumber is a popular tool for writing and executing acceptance tests, and following best practices can help ensure that your tests are effective and maintainable. Here you can look at the five best practices for using Cucumber to create robust and maintainable acceptance tests. 1. Keep acceptance tests simple and focused: Cucumber tests should define the expected behavior of your application in a clear and concise manner. Avoid including unnecessary details or extraneous test steps. 2. Use the Given-When-Then syntax: Cucumber's Given-When-Then syntax is a helpful way to structure your acceptance tests. It helps to clearly define the pre-conditions, actions, and expected outcomes of each test. 3. Organize tests with tags: Cucumber allows you to use tags to label and group your tests. This can be useful for organizing tests by feature or by the level of testing (e.g. unit, integration, acceptance). 4. Avoid testing implementation details: Your acceptance tests should focus on the expected behavior of your application rather than the specific implementation details. This will make your tests more robust and easier to maintain. 5. Use data tables to test multiple scenarios: Cucumber's data tables feature allows you to test multiple scenarios with a single test. This can be a useful way to reduce the number of test cases you need to write. By following these best practices when using Cucumber, you can create effective and maintainable acceptance tests that help ensure the quality of your application.
The console.log function — the poor man’s debugger — is every JavaScript developer’s best friend. We use it to verify that a certain piece of code was executed or to check the state of the application at a given point in time. We may also use console.warn to send warning messages or console.error to explain what happened when things have gone wrong. Logging makes it easy to debug your app during local development. But what about debugging your Node.js app while it’s running in a hosted cloud environment? The logs are kept on the server, to which you may or may not have access. How do you view your logs then? Most companies use application performance monitoring tools and observability tools for better visibility into their hosted apps. For example, you might send your logs to a log aggregator like Datadog, Sumo Logic, or Papertrail where logs can be viewed and queried. In this article, we’ll look at how we can configure an app that is hosted on Render to send its system logs to Papertrail by using Render Log Streams. By the end, you’ll have your app up and running — and logging — in no time. Creating Our Node.JS App and Hosting It With Render Render is a cloud hosting platform made for developers by developers. With Render, you can easily host your static sites, web services, cron jobs, and more. We’ll start with a simple Node.js and Express app for our demo. You can find the GitHub repo here. You can also view the app here. To follow along on your machine, fork the repo so that you have a copy running locally. You can install the project’s dependencies by running yarn install, and you can start the app by running yarn start. Easy enough! Render Log Streams demo app Now it’s time to get our app running on Render. If you don’t have a Render account yet, create one now. It’s free! Once you’re logged in, click the “New” button and then choose the “Web Service” option from the menu. Creating a new web service This will take you to the next page where you’ll select the GitHub repo you’d like to connect. If you haven’t connected your GitHub account yet, you can do so here. And if you have connected your GitHub account but haven’t given Render access to your specific repo yet, you can click the “Configure account” button. This will take you to GitHub, where you can grant access to all your repos or just a selection of them. Connecting your GitHub repo Back on Render, after connecting to your repo, you’ll be taken to a configuration page. Give your app a name (I chose the same name as my repo, but it can be anything), and then provide the correct build command (yarn, which is a shortcut for yarn install) and start command (yarn start). Choose your instance type (free tier), and then click the “Create Web Service” button at the bottom of the page to complete your configuration setup. Configuring your app With that, Render will deploy your app. You did it! You now have an app hosted on Render’s platform. Log output from your Render app’s first deployment Creating Our Papertrail Account Let’s now create a Papertrail account. Papertrail is a log aggregator tool that helps make log management easy. You can create an account for free — no credit card is required. Once you’ve created your account, click on the “Add your first system” button to get started. Adding your first system in Papertrail This will take you to the next page which provides you with your syslog endpoint at the top of the screen. There are also instructions for running an install script, but in our case, we don’t actually need to install anything! So just copy that syslog endpoint, and we’ll paste it in just a bit. Syslog endpoint Connecting Our Render App to Papertrail We now have an app hosted on Render, and we have a Papertrail account for logging. Let’s connect the two! Back in the Render dashboard, click on your avatar in the global navigation, then choose “Account Settings” from the drop-down menu. Render account settings Then in the secondary side navigation, click on the “Log Streams” tab. Once on that page, you can click the “Add Log Stream” button, which will open a modal. Paste your syslog endpoint from Papertrail into the “Log Endpoint” input field, and then click “Add Log Stream” to save your changes. Adding your log stream You should now see your Log Stream endpoint shown in Render’s dashboard. Render Log Stream dashboard Great! We’ve connected Render to Papertrail. What’s neat is that we’ve set up this connection for our entire Render account, so we don’t have to configure it for each individual app hosted on Render. Adding Logs to Our Render App Now that we have our logging configured, let’s take it for a test run. In our GitHub repo’s code, we have the following in our app.js file: JavaScript app.get('/', (req, res) => { console.log('Log - home page'); console.info('Info - home page'); console.warn('Warn - home page'); console.error('Error - home page'); console.debug('Debug - home page'); return res.sendFile('index.html', { root: 'public' }); }); When a request is made to the root URL of our app, we do a bit of logging and then send the index.html file to the client. The user doesn’t see any of the logs since these are server-side rather than client-side logs. Instead, the logs are kept on our server, which, again, is hosted on Render. To generate the logs, open your demo app in your browser. This will trigger a request for the home page. If you’re following along, your app URL will be different from mine, but my app is hosted here. Viewing Logs in Papertrail Let’s go find those logs in Papertrail. After all, they were logged to our server, but our server is hosted on Render. In your Papertrail dashboard, you should see at least two systems: one for Render itself, which was used to test the account connection, and one for your Render app (“render-log-stream-demo” in my case). Papertrail systems Click on the system for your Render app, and you’ll see a page where all the logs are shown and tailed, with the latest logs appearing at the bottom of the screen. Render app logs in Papertrail You can see that we have logs for many events, not just the data that we chose to log from our app.js file. These are the syslogs, so you also get helpful log data from when Render was installing dependencies and deploying your app! At the bottom of the page, we can enter search terms to query our logs. We don’t have many logs here yet, but where you’re running a web service that gets millions of requests per day, these log outputs can get very large very quickly. Searching logs in Papertrail Best Practices for Logging This leads us to some good questions: Now that we have logging set up, what exactly should we be logging? And how should we be formatting our logs so that they’re easy to query when we need to find them? What you’re logging and why you’re logging something will vary by situation. You may be adding logs after a customer issue is reported that you’re unable to reproduce locally. By adding logs to your app, you can get better visibility into what’s happening live in production. This is a reactive form of logging in which you’re adding new logs to certain files and functions after you realize you need them. As a more proactive form of logging, there may be important business transactions that you want to log all the time, such as account creation or order placement. This will give you greater peace of mind that events are being processed as expected throughout the day. It will also help you see the volume of events generated in any given interval. And, when things do go wrong, you’ll be able to pinpoint when your log output changed. How you format your logs is up to you, but you should be consistent in your log structure. In our example, we just logged text strings, but it would be even better to log our data in JSON format. With JSON, we can include key-value pairs for all of our messages. For each message, we might choose to include data for the user ID, the timestamp, the actual message text, and more. The beauty of JSON is that it makes querying your logs much easier, especially when viewing them in a log aggregator tool that contains thousands or millions of other messages. Conclusion There you have it — how to host your app on Render and configure logging with Render Log Streams and Papertrail. Both platforms only took minutes to set up, and now we can manage our logs with ease. Keep in mind that Render Log Streams let you send your logs to any of several different log aggregators, giving you lots of options. For example, Render logs can be sent to Sumo Logic. You just need to create a Cloud Syslog Source in your Sumo Logic account. Or, you can send your logs to Datadog as well. With that, it’s time for me to log off. Thanks for reading, Happy coding, and happy logging!
Java collection framework provides a variety of classes and interfaces, such as lists, sets, queues, and maps, for managing and storing collections of related objects. In this blog, we go over effective Java collection framework: best practices and tips. What Is a Collection Framework? The Java collection framework is a key element of Java programming. To effectively use the Java collection framework, consider factors like utilizing the enhanced for loop, generics, avoiding raw types, and selecting the right collection. Choosing the Right Collection for the Task Each collection class has its own distinct set of qualities and is made to be used for a particular function. Following are some descriptions of each kind of collection: List: The ArrayList class is the most widely used list implementation in Java, providing resizable arrays when it is unknown how large the collection will be. Set: The HashSet class is the most popular implementation of a set in Java, providing uniqueness with a hash-table-based implementation. Queue: The LinkedList class is the most popular Java implementation of a queue, allowing elements to be accessed in a specific order. Map: The HashMap class of Java is the most popular map implementation for storing and retrieving data based on distinct keys. Factors to Consider While Choosing a Collection Type of data: Different collections may be more suitable depending on the kind of data that will be handled and stored. Ordering: A list or queue is preferable to a set or map when arranging important items. Duplicate elements: A set or map may be a better option than a list or queue if duplicate elements are not allowed. Performance: The characteristics of performance differences between different collections. By picking the right collection, you can improve the performance of your code. Examples of Use Cases for Different Collections Lists: Lists allow for the storage and modification of ordered data, such as a to-do list or shopping list. Set: A set can be used to create unique items, such as email addresses. Queue: A queue can be used to access elements in a specific order, such as handling jobs in the order they are received. Map: A map can be used to store and access data based on unique keys, such as user preferences. Selecting the right collection for a Java application is essential, taking into account data type, ordering, duplicate elements, and performance requirements. This will increase code effectiveness and efficiency. Using the Correct Methods and Interfaces In this section, the various methods and interfaces that the collection framework provides will be covered, along with some tips on how to effectively use them. Choosing the Right Collection: The collection framework provides a variety of collection types to improve code speed and readability, such as lists, sets, queues, maps, and deques. Using Iterators: Iterators are crucial for browsing through collections, but if modified, they can quickly break down and throw a ConcurrentModificationException. Use a copy-on-write array list or concurrent hash map to stop this. Using Lambda Expressions: Lambda expressions in Java 8 allow programmers to write code that can be used as an argument to a method and can be combined with the filter() and map() methods of the Stream API to process collections. Using the Stream API: The Stream API is a powerful feature in Java 8 that enables functional collection processing, parallelizable and lazy, resulting in better performance. Using Generics: Generics are a powerful feature introduced in Java 5 that allows you to write type-safe code. They are especially useful when working with collections, as they allow you to specify the types of elements that a collection can contain. To use generics, it is important to use the wildcard operator. The Java collection framework provides methods and interfaces to improve code efficiency, readability, and maintainability. Iterators, Lambda expressions, Stream API, and generics can be used to improve performance and avoid common pitfalls. Best Practices for Collection Usage In this section, we will explore some important best practices for collection usage. Proper Initialization and Declaration of Collections Collections should be initialized correctly before use to avoid null pointer exceptions. Use the appropriate interface or class to declare the collection for uniqueness or order. Using Generics to Ensure Type Safety Generics provide type safety by allowing us to specify the type of objects that can be stored in a collection, allowing us to catch type mismatch errors at compile time. When declaring a collection, specify the type using angle brackets (<>). For example, List<String> ensures that only String objects can be added to the list. Employing the Appropriate Interfaces for Flexibility The Java collection framework provides a variety of interfaces, allowing us to easily switch implementations and take advantage of polymorphism to write code that is more modular and reusable. Understanding the Behavior of Different Collection Methods It is important to understand the behavior of collection methods to use them effectively. To gain a thorough understanding, consult Java documentation or reliable sources. Understanding the complexities of operations like contains() and remove() can make a difference in code performance. Handling Null Values and Empty Collections To prevent unexpected errors or undesirable behavior, it's crucial to handle null values and empty collections properly. Check that collections are not null and have the required data to prevent errors. Memory and Performance Optimization In this section, we will explore techniques and best optimize to optimize memory utilization and enhance the performance of collections in Java as follows: 1. Minimizing the Memory Footprint With the Right Collection Implementation Memory usage can be significantly decreased by selecting the best collection implementation for the job. When frequent random access is required, for instance, using an array list rather than a linked list can reduce memory overhead. 2. Efficient Iteration Over Collections It is common practice to iterate over collections, so picking the most effective iteration strategy is crucial. In comparison to conventional loops, using iterator-based loops or enhanced for-each loops can offer better performance. 3. Considering Alternative Collection Libraries for Specific Use Cases The Java collection framework offers a wide range of collection types, but in some cases, alternative libraries like Guava or Apache commons-collections can provide additional features and better performance for specific use cases. 4. Utilizing Parallel Processing With Collections for Improved Performance With the advent of multi-core processors, leveraging parallel processing techniques can enhance the performance of operations performed on large collections. The Java Stream API provides support for parallel execution, allowing for efficient processing of data in parallel. Tips and Tricks for Effective Collection Usage Using the Right Data Structures for Specific Tasks The right data structure must be chosen for the task at hand, with advantages and disadvantages, to make wise decisions and improve performance. Making Use of Utility Methods in the Collections Class The collections class in Java provides utility methods to simplify and streamline collection operations, such as sorting, searching, shuffling, and reversing. Leveraging Third-Party Libraries and Frameworks for Enhanced Functionality The Java collection framework provides a wide range of data structures, but third-party libraries and frameworks can provide more advanced features and unique data structures. These libraries can boost productivity, give access to more powerful collection options, and address use cases that the built-in Java collections cannot. Optimizing Collections for Specific Use Cases Immutable collections offer better thread safety and can be shared without defensive copying. Dynamic collections can be used to prevent frequent resizing and enhance performance. Specialized collections like HashSet or TreeMap can improve efficiency for unique or sorted elements. Optimise collections to improve performance, readability, and maintainability. Conclusion In this blog post, we have covered some effective Java collection frameworks with the best practices and tips. To sum up, the Java collection framework is a crucial component of Java programming. You can use the collection framework effectively and create more effective, maintainable code by adhering to these best practices and advice.
The reason why we need a Transactional Outbox is that a service often needs to publish messages as part of a transaction that updates the database. Both the database update and the sending of the message must happen within a transaction. Otherwise, if the service doesn’t perform these two operations automatically, a failure could leave the system in an inconsistent state. The GitHub repository with the source code for this article. In this article, we will implement it using Reactive Spring and Kotlin with Coroutines. Here is a full list of used dependencies: Kotlin with Coroutines, Spring Boot 3, WebFlux, R2DBC, Postgres, MongoDB, Kafka, Grafana, Prometheus, Zipkin, and Micrometer for observability. The Transactional Outbox pattern solves the problem of the implementation where usually the transaction tries to update the database table, then publishes a message to the broker and commits the transaction. But here is the problem: If the last step of the transaction fails, the transaction will roll back database changes, but the event has already been published to the broker. So, we need to find a way to guarantee both databases are written and published to the broker. The idea of how we can solve it is in one transaction, save it to the orders table, and in the same transaction, save to the outbox table and commit the transaction. Then, we have to publish saved events from the outbox table to the broker. We have two ways to do that; using a CDC (Change data capture) tool like Debezium, which continuously monitors your databases and lets any of your applications stream every row-level change in the same order they were committed to the database and polling publisher. For this project, we used polling publisher. Highly recommend Chris Richardson's Book: Microservices Patterns, where the Transactional Outbox pattern is very well explained. One more important thing is that we have to be ready for cases when the same event can be published more than one time, so the consumer must be idempotent. Idempotence describes the reliability of messages in a distributed system, specifically the reception of duplicated messages. Because of retries or message broker features, a message sent once can be received multiple times by consumers. A service is idempotent if processing the same event multiple times results in the same state and output as processing that event just a single time. The reception of a duplicated event does not change the application state or behavior. Most of the time, an idempotent service detects these events and ignores them. Idempotence can be implemented using unique identifiers. So, let’s implement it. The business logic of our example microservice is simple: orders with product shop items; it’s two tables for simplicity and an outbox table, of course. Usually, when an outbox table looks like it does when in the data field, we store serialized events. The most common is the JSON format, but it’s up to you and concrete microservices. We can put as data field state changes or can simply put every time the last updated full order domain entity; of course, state changes take much less size, but again it’s up to you. Other fields in the outbox table usually include event type, timestamp, version, and other metadata. It depends on each concrete implementation, but often it’s required minimum. The version field is for concurrency control. All UI interfaces will be available on ports: Swagger UI URL. Grafana UI URL. Zipkin UI URL. Kafka UI URL. Prometheus UI URL. The docker-compose file for this article has Postgres, MongoDB, zookeeper, Kafka, Kafka-ui, Zipkin, Prometheus, and Grafana, For local development run: use make local or make develop, first run only docker-compose, second same include the microservice image. YAML version: "3.9" services: microservices_postgresql: image: postgres:latest container_name: microservices_postgresql expose: - "5432" ports: - "5432:5432" restart: always environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres - POSTGRES_DB=microservices - POSTGRES_HOST=5432 command: -p 5432 volumes: - ./docker_data/microservices_pgdata:/var/lib/postgresql/data networks: [ "microservices" ] zoo1: image: confluentinc/cp-zookeeper:7.3.0 hostname: zoo1 container_name: zoo1 ports: - "2181:2181" environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_SERVER_ID: 1 ZOOKEEPER_SERVERS: zoo1:2888:3888 volumes: - "./zookeeper:/zookeeper" networks: [ "microservices" ] kafka1: image: confluentinc/cp-kafka:7.3.0 hostname: kafka1 container_name: kafka1 ports: - "9092:9092" - "29092:29092" - "9999:9999" environment: KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka1:19092,EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092,DOCKER://host.docker.internal:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT,DOCKER:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_ZOOKEEPER_CONNECT: "zoo1:2181" KAFKA_BROKER_ID: 1 KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 KAFKA_JMX_PORT: 9999 KAFKA_JMX_HOSTNAME: ${DOCKER_HOST_IP:-127.0.0.1} KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" depends_on: - zoo1 volumes: - "./kafka_data:/kafka" networks: [ "microservices" ] kafka-ui: image: provectuslabs/kafka-ui container_name: kafka-ui ports: - "8086:8080" restart: always environment: - KAFKA_CLUSTERS_0_NAME=local - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka1:19092 networks: [ "microservices" ] zipkin-all-in-one: image: openzipkin/zipkin:latest restart: always ports: - "9411:9411" networks: [ "microservices" ] mongo: image: mongo restart: always ports: - "27017:27017" environment: MONGO_INITDB_ROOT_USERNAME: admin MONGO_INITDB_ROOT_PASSWORD: admin MONGODB_DATABASE: bank_accounts networks: [ "microservices" ] prometheus: image: prom/prometheus:latest container_name: prometheus ports: - "9090:9090" command: - --config.file=/etc/prometheus/prometheus.yml volumes: - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro networks: [ "microservices" ] node_exporter: container_name: microservices_node_exporter restart: always image: prom/node-exporter ports: - '9101:9100' networks: [ "microservices" ] grafana: container_name: microservices_grafana restart: always image: grafana/grafana ports: - '3000:3000' networks: [ "microservices" ] networks: microservices: name: microservices The Postgres database schema for this project is: Orders domain REST Controller has the following methods: Kotlin @RestController @RequestMapping(path = ["/api/v1/orders"]) class OrderController(private val orderService: OrderService, private val or: ObservationRegistry) { @GetMapping @Operation(method = "getOrders", summary = "get order with pagination", operationId = "getOrders") suspend fun getOrders( @RequestParam(name = "page", defaultValue = "0") page: Int, @RequestParam(name = "size", defaultValue = "20") size: Int, ) = coroutineScopeWithObservation(GET_ORDERS, or) { observation -> ResponseEntity.ok() .body(orderService.getAllOrders(PageRequest.of(page, size)) .map { it.toSuccessResponse() } .also { response -> observation.highCardinalityKeyValue("response", response.toString()) } ) } @GetMapping(path = ["{id}"]) @Operation(method = "getOrderByID", summary = "get order by id", operationId = "getOrderByID") suspend fun getOrderByID(@PathVariable id: String) = coroutineScopeWithObservation(GET_ORDER_BY_ID, or) { observation -> ResponseEntity.ok().body(orderService.getOrderWithProductsByID(UUID.fromString(id)).toSuccessResponse()) .also { response -> observation.highCardinalityKeyValue("response", response.toString()) log.info("getOrderByID response: $response") } } @PostMapping @Operation(method = "createOrder", summary = "create new order", operationId = "createOrder") suspend fun createOrder(@Valid @RequestBody createOrderDTO: CreateOrderDTO) = coroutineScopeWithObservation(CREATE_ORDER, or) { observation -> ResponseEntity.status(HttpStatus.CREATED).body(orderService.createOrder(createOrderDTO.toOrder()).toSuccessResponse()) .also { log.info("created order: $it") observation.highCardinalityKeyValue("response", it.toString()) } } @PutMapping(path = ["/add/{id}"]) @Operation(method = "addProductItem", summary = "add to the order product item", operationId = "addProductItem") suspend fun addProductItem( @PathVariable id: UUID, @Valid @RequestBody dto: CreateProductItemDTO ) = coroutineScopeWithObservation(ADD_PRODUCT, or) { observation -> ResponseEntity.ok().body(orderService.addProductItem(dto.toProductItem(id))) .also { observation.highCardinalityKeyValue("CreateProductItemDTO", dto.toString()) observation.highCardinalityKeyValue("id", id.toString()) log.info("addProductItem id: $id, dto: $dto") } } @PutMapping(path = ["/remove/{orderId}/{productItemId}"]) @Operation(method = "removeProductItem", summary = "remove product from the order", operationId = "removeProductItem") suspend fun removeProductItem( @PathVariable orderId: UUID, @PathVariable productItemId: UUID ) = coroutineScopeWithObservation(REMOVE_PRODUCT, or) { observation -> ResponseEntity.ok().body(orderService.removeProductItem(orderId, productItemId)) .also { observation.highCardinalityKeyValue("productItemId", productItemId.toString()) observation.highCardinalityKeyValue("orderId", orderId.toString()) log.info("removeProductItem orderId: $orderId, productItemId: $productItemId") } } @PutMapping(path = ["/pay/{id}"]) @Operation(method = "payOrder", summary = "pay order", operationId = "payOrder") suspend fun payOrder(@PathVariable id: UUID, @Valid @RequestBody dto: PayOrderDTO) = coroutineScopeWithObservation(PAY_ORDER, or) { observation -> ResponseEntity.ok().body(orderService.pay(id, dto.paymentId).toSuccessResponse()) .also { observation.highCardinalityKeyValue("response", it.toString()) log.info("payOrder result: $it") } } @PutMapping(path = ["/cancel/{id}"]) @Operation(method = "cancelOrder", summary = "cancel order", operationId = "cancelOrder") suspend fun cancelOrder(@PathVariable id: UUID, @Valid @RequestBody dto: CancelOrderDTO) = coroutineScopeWithObservation(CANCEL_ORDER, or) { observation -> ResponseEntity.ok().body(orderService.cancel(id, dto.reason).toSuccessResponse()) .also { observation.highCardinalityKeyValue("response", it.toString()) log.info("cancelOrder result: $it") } } @PutMapping(path = ["/submit/{id}"]) @Operation(method = "submitOrder", summary = "submit order", operationId = "submitOrder") suspend fun submitOrder(@PathVariable id: UUID) = coroutineScopeWithObservation(SUBMIT_ORDER, or) { observation -> ResponseEntity.ok().body(orderService.submit(id).toSuccessResponse()) .also { observation.highCardinalityKeyValue("response", it.toString()) log.info("submitOrder result: $it") } } @PutMapping(path = ["/complete/{id}"]) @Operation(method = "completeOrder", summary = "complete order", operationId = "completeOrder") suspend fun completeOrder(@PathVariable id: UUID) = coroutineScopeWithObservation(COMPLETE_ORDER, or) { observation -> ResponseEntity.ok().body(orderService.complete(id).toSuccessResponse()) .also { observation.highCardinalityKeyValue("response", it.toString()) log.info("completeOrder result: $it") } } } As I mentioned earlier, the main idea of implementation for the transactional outbox is that in the first step in the one transaction, write to orders and outbox tables and commit the transaction—additionally, but not required, optimization. We can, in the same methods, after successfully committing a transaction, then publish the event and delete it from the outbox table. But here, if any one step of publishing to the broker or deleting from the outbox table fails, it’s ok because we have polling producer as a scheduled process. It’s small optimization and improvement, and it’s not mandatory to implement an outbox pattern. Try both variants and chose the best for your case. In our case, we are using Kafka, so we have to remember that producers have acks setting, When acks=0, producers consider messages as “written successfully” the moment the message was sent without waiting for the broker to accept it at all. If the broker goes offline or an exception happens, we won’t know and will lose data, so be careful with this setting and don’t use acks=0. When acks=1, producers consider messages as “written successfully” when the message was acknowledged by only the leader. When acks=all, producers consider messages as “written successfully” when the message is accepted by all in-sync replicas (ISR). In the simplified sequence diagram for service layer business logic, steps 5 and 6 are optional and not required optimization because we have polling publisher anyway: The order service implementation: Kotlin interface OrderService { suspend fun createOrder(order: Order): Order suspend fun getOrderByID(id: UUID): Order suspend fun addProductItem(productItem: ProductItem) suspend fun removeProductItem(orderID: UUID, productItemId: UUID) suspend fun pay(id: UUID, paymentId: String): Order suspend fun cancel(id: UUID, reason: String?): Order suspend fun submit(id: UUID): Order suspend fun complete(id: UUID): Order suspend fun getOrderWithProductsByID(id: UUID): Order suspend fun getAllOrders(pageable: Pageable): Page<Order> suspend fun deleteOutboxRecordsWithLock() } Kotlin @Service class OrderServiceImpl( private val orderRepository: OrderRepository, private val productItemRepository: ProductItemRepository, private val outboxRepository: OrderOutboxRepository, private val orderMongoRepository: OrderMongoRepository, private val txOp: TransactionalOperator, private val eventsPublisher: EventsPublisher, private val kafkaTopicsConfiguration: KafkaTopicsConfiguration, private val or: ObservationRegistry, private val outboxEventSerializer: OutboxEventSerializer ) : OrderService { override suspend fun createOrder(order: Order): Order = coroutineScopeWithObservation(CREATE, or) { observation -> txOp.executeAndAwait { orderRepository.insert(order).let { val productItemsEntityList = ProductItemEntity.listOf(order.productsList(), UUID.fromString(it.id)) val insertedItems = productItemRepository.insertAll(productItemsEntityList).toList() it.addProductItems(insertedItems.map { item -> item.toProductItem() }) Pair(it, outboxRepository.save(outboxEventSerializer.orderCreatedEventOf(it))) } }.run { observation.highCardinalityKeyValue("order", first.toString()) observation.highCardinalityKeyValue("outboxEvent", second.toString()) publishOutboxEvent(second) first } } override suspend fun addProductItem(productItem: ProductItem): Unit = coroutineScopeWithObservation(ADD_PRODUCT, or) { observation -> txOp.executeAndAwait { val order = orderRepository.findOrderByID(UUID.fromString(productItem.orderId)) order.incVersion() val updatedProductItem = productItemRepository.upsert(productItem) val savedRecord = outboxRepository.save( outboxEventSerializer.productItemAddedEventOf( order, productItem.copy(version = updatedProductItem.version).toEntity() ) ) orderRepository.updateVersion(UUID.fromString(order.id), order.version) .also { result -> log.info("addOrderItem result: $result, version: ${order.version}") } savedRecord }.run { observation.highCardinalityKeyValue("outboxEvent", this.toString()) publishOutboxEvent(this) } } override suspend fun removeProductItem(orderID: UUID, productItemId: UUID): Unit = coroutineScopeWithObservation(REMOVE_PRODUCT, or) { observation -> txOp.executeAndAwait { if (!productItemRepository.existsById(productItemId)) throw ProductItemNotFoundException(productItemId) val order = orderRepository.findOrderByID(orderID) productItemRepository.deleteById(productItemId) order.incVersion() val savedRecord = outboxRepository.save(outboxEventSerializer.productItemRemovedEventOf(order, productItemId)) orderRepository.updateVersion(UUID.fromString(order.id), order.version) .also { log.info("removeProductItem update order result: $it, version: ${order.version}") } savedRecord }.run { observation.highCardinalityKeyValue("outboxEvent", this.toString()) publishOutboxEvent(this) } } override suspend fun pay(id: UUID, paymentId: String): Order = coroutineScopeWithObservation(PAY, or) { observation -> txOp.executeAndAwait { val order = orderRepository.getOrderWithProductItemsByID(id) order.pay(paymentId) val updatedOrder = orderRepository.update(order) Pair(updatedOrder, outboxRepository.save(outboxEventSerializer.orderPaidEventOf(updatedOrder, paymentId))) }.run { observation.highCardinalityKeyValue("order", first.toString()) observation.highCardinalityKeyValue("outboxEvent", second.toString()) publishOutboxEvent(second) first } } override suspend fun cancel(id: UUID, reason: String?): Order = coroutineScopeWithObservation(CANCEL, or) { observation -> txOp.executeAndAwait { val order = orderRepository.findOrderByID(id) order.cancel() val updatedOrder = orderRepository.update(order) Pair(updatedOrder, outboxRepository.save(outboxEventSerializer.orderCancelledEventOf(updatedOrder, reason))) }.run { observation.highCardinalityKeyValue("order", first.toString()) observation.highCardinalityKeyValue("outboxEvent", second.toString()) publishOutboxEvent(second) first } } override suspend fun submit(id: UUID): Order = coroutineScopeWithObservation(SUBMIT, or) { observation -> txOp.executeAndAwait { val order = orderRepository.getOrderWithProductItemsByID(id) order.submit() val updatedOrder = orderRepository.update(order) updatedOrder.addProductItems(order.productsList()) Pair(updatedOrder, outboxRepository.save(outboxEventSerializer.orderSubmittedEventOf(updatedOrder))) }.run { observation.highCardinalityKeyValue("order", first.toString()) observation.highCardinalityKeyValue("outboxEvent", second.toString()) publishOutboxEvent(second) first } } override suspend fun complete(id: UUID): Order = coroutineScopeWithObservation(COMPLETE, or) { observation -> txOp.executeAndAwait { val order = orderRepository.findOrderByID(id) order.complete() val updatedOrder = orderRepository.update(order) log.info("order submitted: ${updatedOrder.status} for id: $id") Pair(updatedOrder, outboxRepository.save(outboxEventSerializer.orderCompletedEventOf(updatedOrder))) }.run { observation.highCardinalityKeyValue("order", first.toString()) observation.highCardinalityKeyValue("outboxEvent", second.toString()) publishOutboxEvent(second) first } } @Transactional(readOnly = true) override suspend fun getOrderWithProductsByID(id: UUID): Order = coroutineScopeWithObservation(GET_ORDER_WITH_PRODUCTS_BY_ID, or) { observation -> orderRepository.getOrderWithProductItemsByID(id).also { observation.highCardinalityKeyValue("order", it.toString()) } } override suspend fun getAllOrders(pageable: Pageable): Page<Order> = coroutineScopeWithObservation(GET_ALL_ORDERS, or) { observation -> orderMongoRepository.getAllOrders(pageable).also { observation.highCardinalityKeyValue("pageResult", it.toString()) } } override suspend fun deleteOutboxRecordsWithLock() = coroutineScopeWithObservation(DELETE_OUTBOX_RECORD_WITH_LOCK, or) { observation -> outboxRepository.deleteOutboxRecordsWithLock { observation.highCardinalityKeyValue("outboxEvent", it.toString()) eventsPublisher.publish(getTopicName(it.eventType), it) } } override suspend fun getOrderByID(id: UUID): Order = coroutineScopeWithObservation(GET_ORDER_BY_ID, or) { observation -> orderMongoRepository.getByID(id.toString()) .also { log.info("getOrderByID: $it") } .also { observation.highCardinalityKeyValue("order", it.toString()) } } private suspend fun publishOutboxEvent(event: OutboxRecord) = coroutineScopeWithObservation(PUBLISH_OUTBOX_EVENT, or) { observation -> try { log.info("publishing outbox event: $event") outboxRepository.deleteOutboxRecordByID(event.eventId!!) { eventsPublisher.publish(getTopicName(event.eventType), event.aggregateId.toString(), event) } log.info("outbox event published and deleted: $event") observation.highCardinalityKeyValue("event", event.toString()) } catch (ex: Exception) { log.error("exception while publishing outbox event: ${ex.localizedMessage}") observation.error(ex) } } } Order and product items Postgres repositories are a combination of CoroutineCrudRepository and custom implementation using DatabaseClient and R2dbcEntityTemplate, supporting optimistic and pessimistic locking, depending on method requirements. Kotlin @Repository interface OrderRepository : CoroutineCrudRepository<OrderEntity, UUID>, OrderBaseRepository @Repository interface OrderBaseRepository { suspend fun getOrderWithProductItemsByID(id: UUID): Order suspend fun updateVersion(id: UUID, newVersion: Long): Long suspend fun findOrderByID(id: UUID): Order suspend fun insert(order: Order): Order suspend fun update(order: Order): Order } @Repository class OrderBaseRepositoryImpl( private val dbClient: DatabaseClient, private val entityTemplate: R2dbcEntityTemplate, private val or: ObservationRegistry ) : OrderBaseRepository { override suspend fun updateVersion(id: UUID, newVersion: Long): Long = coroutineScopeWithObservation(UPDATE_VERSION, or) { observation -> dbClient.sql("UPDATE microservices.orders SET version = (version + 1) WHERE id = :id AND version = :version") .bind(ID, id) .bind(VERSION, newVersion - 1) .fetch() .rowsUpdated() .awaitSingle() .also { log.info("for order with id: $id version updated to $newVersion") } .also { observation.highCardinalityKeyValue("id", id.toString()) observation.highCardinalityKeyValue("newVersion", newVersion.toString()) } } override suspend fun getOrderWithProductItemsByID(id: UUID): Order = coroutineScopeWithObservation(GET_ORDER_WITH_PRODUCTS_BY_ID, or) { observation -> dbClient.sql( """SELECT o.id, o.email, o.status, o.address, o.version, o.payment_id, o.created_at, o.updated_at, |pi.id as productId, pi.price, pi.title, pi.quantity, pi.order_id, pi.version as itemVersion, pi.created_at as itemCreatedAt, pi.updated_at as itemUpdatedAt |FROM microservices.orders o |LEFT JOIN microservices.product_items pi on o.id = pi.order_id |WHERE o.id = :id""".trimMargin() ) .bind(ID, id) .map { row, _ -> Pair(OrderEntity.of(row), ProductItemEntity.of(row)) } .flow() .toList() .let { orderFromList(it) } .also { log.info("getOrderWithProductItemsByID order: $it") observation.highCardinalityKeyValue("order", it.toString()) } } override suspend fun findOrderByID(id: UUID): Order = coroutineScopeWithObservation(FIND_ORDER_BY_ID, or) { observation -> val query = Query.query(Criteria.where(ID).`is`(id)) entityTemplate.selectOne(query, OrderEntity::class.java).awaitSingleOrNull()?.toOrder() .also { observation.highCardinalityKeyValue("order", it.toString()) } ?: throw OrderNotFoundException(id) } override suspend fun insert(order: Order): Order = coroutineScopeWithObservation(INSERT, or) { observation -> entityTemplate.insert(order.toEntity()).awaitSingle().toOrder() .also { log.info("inserted order: $it") observation.highCardinalityKeyValue("order", it.toString()) } } override suspend fun update(order: Order): Order = coroutineScopeWithObservation(UPDATE, or) { observation -> entityTemplate.update(order.toEntity()).awaitSingle().toOrder() .also { log.info("updated order: $it") observation.highCardinalityKeyValue("order", it.toString()) } } } Kotlin interface ProductItemBaseRepository { suspend fun insert(productItemEntity: ProductItemEntity): ProductItemEntity suspend fun insertAll(productItemEntities: List<ProductItemEntity>): List<ProductItemEntity> suspend fun upsert(productItem: ProductItem): ProductItem } @Repository class ProductItemBaseRepositoryImpl( private val entityTemplate: R2dbcEntityTemplate, private val or: ObservationRegistry, ) : ProductItemBaseRepository { override suspend fun upsert(productItem: ProductItem): ProductItem = coroutineScopeWithObservation(UPDATE, or) { observation -> val query = Query.query( Criteria.where("id").`is`(UUID.fromString(productItem.id)) .and("order_id").`is`(UUID.fromString(productItem.orderId)) ) val product = entityTemplate.selectOne(query, ProductItemEntity::class.java).awaitSingleOrNull() if (product != null) { val update = Update .update("quantity", (productItem.quantity + product.quantity)) .set("version", product.version + 1) .set("updated_at", LocalDateTime.now()) val updatedProduct = product.copy(quantity = (productItem.quantity + product.quantity), version = product.version + 1) val updateResult = entityTemplate.update(query, update, ProductItemEntity::class.java).awaitSingle() log.info("updateResult product: $updateResult") log.info("updateResult updatedProduct: $updatedProduct") return@coroutineScopeWithObservation updatedProduct.toProductItem() } entityTemplate.insert(ProductItemEntity.of(productItem)).awaitSingle().toProductItem() .also { productItem -> log.info("saved productItem: $productItem") observation.highCardinalityKeyValue("productItem", productItem.toString()) } } override suspend fun insert(productItemEntity: ProductItemEntity): ProductItemEntity = coroutineScopeWithObservation(INSERT, or) { observation -> val product = entityTemplate.insert(productItemEntity).awaitSingle() log.info("saved product: $product") observation.highCardinalityKeyValue("product", product.toString()) product } override suspend fun insertAll(productItemEntities: List<ProductItemEntity>) = coroutineScopeWithObservation(INSERT_ALL, or) { observation -> val result = productItemEntities.map { entityTemplate.insert(it) }.map { it.awaitSingle() } log.info("inserted product items: $result") observation.highCardinalityKeyValue("result", result.toString()) result } } The important detail here is to be able to handle the case of multiple pod instances processing in a parallel outbox table. We have idempotent consumers, but we have to avoid processing the same table events more than one time. To prevent multiple instances select and publish the same events, we use FOR UPDATE SKIP LOCKED.This combination tries to select a batch of outbox events. If some other instance has already selected these records, first, one will skip locked records and select the next available and not locked, and so on. Kotlin @Repository interface OutboxBaseRepository { suspend fun deleteOutboxRecordByID(id: UUID, callback: suspend () -> Unit): Long suspend fun deleteOutboxRecordsWithLock(callback: suspend (outboxRecord: OutboxRecord) -> Unit) } class OutboxBaseRepositoryImpl( private val dbClient: DatabaseClient, private val txOp: TransactionalOperator, private val or: ObservationRegistry, private val transactionalOperator: TransactionalOperator ) : OutboxBaseRepository { override suspend fun deleteOutboxRecordByID(id: UUID, callback: suspend () -> Unit): Long = coroutineScopeWithObservation(DELETE_OUTBOX_RECORD_BY_ID, or) { observation -> withTimeout(DELETE_OUTBOX_RECORD_TIMEOUT_MILLIS) { txOp.executeAndAwait { callback() dbClient.sql("DELETE FROM microservices.outbox_table WHERE event_id = :eventId") .bind("eventId", id) .fetch() .rowsUpdated() .awaitSingle() .also { log.info("outbox event with id: $it deleted") observation.highCardinalityKeyValue("id", it.toString()) } } } } override suspend fun deleteOutboxRecordsWithLock(callback: suspend (outboxRecord: OutboxRecord) -> Unit) = coroutineScopeWithObservation(DELETE_OUTBOX_RECORD_WITH_LOCK, or) { observation -> withTimeout(DELETE_OUTBOX_RECORD_TIMEOUT_MILLIS) { txOp.executeAndAwait { dbClient.sql("SELECT * FROM microservices.outbox_table ORDER BY timestamp ASC LIMIT 10 FOR UPDATE SKIP LOCKED") .map { row, _ -> OutboxRecord.of(row) } .flow() .onEach { log.info("deleting outboxEvent with id: ${it.eventId}") callback(it) dbClient.sql("DELETE FROM microservices.outbox_table WHERE event_id = :eventId") .bind("eventId", it.eventId!!) .fetch() .rowsUpdated() .awaitSingle() log.info("outboxEvent with id: ${it.eventId} published and deleted") observation.highCardinalityKeyValue("eventId", it.eventId.toString()) } .collect() } } } } The polling producer implementation is a scheduled process that does the same job for publishing and deleting events at the given interval as typed earlier and uses the same service method: Kotlin @Component @ConditionalOnProperty(prefix = "schedulers", value = ["outbox.enable"], havingValue = "true") class OutboxScheduler(private val orderService: OrderService, private val or: ObservationRegistry) { @Scheduled(initialDelayString = "\${schedulers.outbox.initialDelayMillis}", fixedRateString = "\${schedulers.outbox.fixedRate}") fun publishAndDeleteOutboxRecords() = runBlocking { coroutineScopeWithObservation(PUBLISH_AND_DELETE_OUTBOX_RECORDS, or) { log.debug("starting scheduled outbox table publishing") orderService.deleteOutboxRecordsWithLock() log.debug("completed scheduled outbox table publishing") } } companion object { private val log = LoggerFactory.getLogger(OutboxScheduler::class.java) private const val PUBLISH_AND_DELETE_OUTBOX_RECORDS = "OutboxScheduler.publishAndDeleteOutboxRecords" } } Usually, the transactional outbox is more often required to guarantee data consistency between microservices. Here, for example, consumers in the same microservice process it and save it to MongoDB. The one more important detail here, as we’re processing Kafka events in multiple consumer processes, possible use cases when the order of the events processing can be randomized. In Kafka, we have key features, and it helps us because it sends messages with the same key to one partition. But if the broker has not had this feature, we have to handle it manually. Cases when, for example, first, some of the consumers are trying to process event #6 before events #4 and #5 were processed. For this reason, have a domain entity version field in outbox events, so we can simply look at the version and validate if in our database we have order version #3, but now processing event with version #6, we need first wait for #4,#5 and process them first, but of course, these details depend on each concrete business logic of the application, here shows only the idea that it’s a possible case. And one more important detail — is to retry topics. If we need to retry the process of the messages, better to create a retry topic and process retry here, how much time to retry, and other advanced logic detail depending on your concrete case. In the example, we have two listeners. Where one of them is for retry topic message processing: Kotlin @Component class OrderConsumer( private val kafkaTopicsConfiguration: KafkaTopicsConfiguration, private val serializer: Serializer, private val eventsPublisher: EventsPublisher, private val orderEventProcessor: OrderEventProcessor, private val or: ObservationRegistry, ) { @KafkaListener( groupId = "\${kafka.consumer-group-id:order-service-group-id}", topics = [ "\${topics.orderCreated.name}", "\${topics.productAdded.name}", "\${topics.productRemoved.name}", "\${topics.orderPaid.name}", "\${topics.orderCancelled.name}", "\${topics.orderSubmitted.name}", "\${topics.orderCompleted.name}", ], id = "orders-consumer" ) fun process(ack: Acknowledgment, consumerRecord: ConsumerRecord<String, ByteArray>) = runBlocking { coroutineScopeWithObservation(PROCESS, or) { observation -> try { observation.highCardinalityKeyValue("consumerRecord", getConsumerRecordInfoWithHeaders(consumerRecord)) processOutboxRecord(serializer.deserialize(consumerRecord.value(), OutboxRecord::class.java)) ack.acknowledge() log.info("committed record: ${getConsumerRecordInfo(consumerRecord)}") } catch (ex: Exception) { observation.highCardinalityKeyValue("consumerRecord", getConsumerRecordInfoWithHeaders(consumerRecord)) observation.error(ex) if (ex is SerializationException || ex is UnknownEventTypeException || ex is AlreadyProcessedVersionException) { log.error("ack not serializable, unknown or already processed record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}") ack.acknowledge() return@coroutineScopeWithObservation } if (ex is InvalidVersionException || ex is NoSuchElementException || ex is OrderNotFoundException) { publishRetryTopic(kafkaTopicsConfiguration.retryTopic.name, consumerRecord, 1) ack.acknowledge() log.warn("ack concurrency write or version exception ${ex.localizedMessage}") return@coroutineScopeWithObservation } publishRetryTopic(kafkaTopicsConfiguration.retryTopic.name, consumerRecord, 1) ack.acknowledge() log.error("ack exception while processing record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}", ex) } } } @KafkaListener(groupId = "\${kafka.consumer-group-id:order-service-group-id}", topics = ["\${topics.retryTopic.name}"], id = "orders-retry-consumer") fun processRetry(ack: Acknowledgment, consumerRecord: ConsumerRecord<String, ByteArray>): Unit = runBlocking { coroutineScopeWithObservation(PROCESS_RETRY, or) { observation -> try { log.warn("processing retry topic record >>>>>>>>>>>>> : ${getConsumerRecordInfoWithHeaders(consumerRecord)}") observation.highCardinalityKeyValue("consumerRecord", getConsumerRecordInfoWithHeaders(consumerRecord)) processOutboxRecord(serializer.deserialize(consumerRecord.value(), OutboxRecord::class.java)) ack.acknowledge() log.info("committed retry record: ${getConsumerRecordInfo(consumerRecord)}") } catch (ex: Exception) { observation.highCardinalityKeyValue("consumerRecord", getConsumerRecordInfoWithHeaders(consumerRecord)) observation.error(ex) val currentRetry = String(consumerRecord.headers().lastHeader(RETRY_COUNT_HEADER).value()).toInt() observation.highCardinalityKeyValue("currentRetry", currentRetry.toString()) if (ex is InvalidVersionException || ex is NoSuchElementException || ex is OrderNotFoundException) { publishRetryTopic(kafkaTopicsConfiguration.retryTopic.name, consumerRecord, currentRetry) log.warn("ack concurrency write or version exception ${ex.localizedMessage},record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}") ack.acknowledge() return@coroutineScopeWithObservation } if (currentRetry > MAX_RETRY_COUNT) { publishRetryTopic(kafkaTopicsConfiguration.deadLetterQueue.name, consumerRecord, currentRetry + 1) ack.acknowledge() log.error("MAX_RETRY_COUNT exceed, send record to DLQ: ${getConsumerRecordInfoWithHeaders(consumerRecord)}") return@coroutineScopeWithObservation } if (ex is SerializationException || ex is UnknownEventTypeException || ex is AlreadyProcessedVersionException) { ack.acknowledge() log.error("commit not serializable, unknown or already processed record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}") return@coroutineScopeWithObservation } log.error("exception while processing: ${ex.localizedMessage}, record: ${getConsumerRecordInfoWithHeaders(consumerRecord)}") publishRetryTopic(kafkaTopicsConfiguration.retryTopic.name, consumerRecord, currentRetry + 1) ack.acknowledge() } } } private suspend fun publishRetryTopic(topic: String, record: ConsumerRecord<String, ByteArray>, retryCount: Int) = coroutineScopeWithObservation(PUBLISH_RETRY_TOPIC, or) { observation -> observation.highCardinalityKeyValue("topic", record.topic()) .highCardinalityKeyValue("key", record.key()) .highCardinalityKeyValue("offset", record.offset().toString()) .highCardinalityKeyValue("value", String(record.value())) .highCardinalityKeyValue("retryCount", retryCount.toString()) record.headers().remove(RETRY_COUNT_HEADER) record.headers().add(RETRY_COUNT_HEADER, retryCount.toString().toByteArray()) mono { publishRetryRecord(topic, record, retryCount) } .retryWhen(Retry.backoff(PUBLISH_RETRY_COUNT, Duration.ofMillis(PUBLISH_RETRY_BACKOFF_DURATION_MILLIS)) .filter { it is SerializationException }) .awaitSingle() } } The role of the orders events processor at this microservice is validating the version of the events and updating MongoDB: Kotlin interface OrderEventProcessor { suspend fun on(orderCreatedEvent: OrderCreatedEvent) suspend fun on(productItemAddedEvent: ProductItemAddedEvent) suspend fun on(productItemRemovedEvent: ProductItemRemovedEvent) suspend fun on(orderPaidEvent: OrderPaidEvent) suspend fun on(orderCancelledEvent: OrderCancelledEvent) suspend fun on(orderSubmittedEvent: OrderSubmittedEvent) suspend fun on(orderCompletedEvent: OrderCompletedEvent) } @Service class OrderEventProcessorImpl( private val orderMongoRepository: OrderMongoRepository, private val or: ObservationRegistry, ) : OrderEventProcessor { override suspend fun on(orderCreatedEvent: OrderCreatedEvent): Unit = coroutineScopeWithObservation(ON_ORDER_CREATED_EVENT, or) { observation -> orderMongoRepository.insert(orderCreatedEvent.order).also { log.info("created order: $it") observation.highCardinalityKeyValue("order", it.toString()) } } override suspend fun on(productItemAddedEvent: ProductItemAddedEvent): Unit = coroutineScopeWithObservation(ON_ORDER_PRODUCT_ADDED_EVENT, or) { observation -> val order = orderMongoRepository.getByID(productItemAddedEvent.orderId) validateVersion(order.id, order.version, productItemAddedEvent.version) order.addProductItem(productItemAddedEvent.productItem) order.version = productItemAddedEvent.version orderMongoRepository.update(order).also { log.info("productItemAddedEvent updatedOrder: $it") observation.highCardinalityKeyValue("order", it.toString()) } } override suspend fun on(productItemRemovedEvent: ProductItemRemovedEvent): Unit = coroutineScopeWithObservation(ON_ORDER_PRODUCT_REMOVED_EVENT, or) { observation -> val order = orderMongoRepository.getByID(productItemRemovedEvent.orderId) validateVersion(order.id, order.version, productItemRemovedEvent.version) order.removeProductItem(productItemRemovedEvent.productItemId) order.version = productItemRemovedEvent.version orderMongoRepository.update(order).also { log.info("productItemRemovedEvent updatedOrder: $it") observation.highCardinalityKeyValue("order", it.toString()) } } override suspend fun on(orderPaidEvent: OrderPaidEvent): Unit = coroutineScopeWithObservation(ON_ORDER_PAID_EVENT, or) { observation -> val order = orderMongoRepository.getByID(orderPaidEvent.orderId) validateVersion(order.id, order.version, orderPaidEvent.version) order.pay(orderPaidEvent.paymentId) order.version = orderPaidEvent.version orderMongoRepository.update(order).also { log.info("orderPaidEvent updatedOrder: $it") observation.highCardinalityKeyValue("order", it.toString()) } } override suspend fun on(orderCancelledEvent: OrderCancelledEvent): Unit = coroutineScopeWithObservation(ON_ORDER_CANCELLED_EVENT, or) { observation -> val order = orderMongoRepository.getByID(orderCancelledEvent.orderId) validateVersion(order.id, order.version, orderCancelledEvent.version) order.cancel() order.version = orderCancelledEvent.version orderMongoRepository.update(order).also { log.info("orderCancelledEvent updatedOrder: $it") observation.highCardinalityKeyValue("order", it.toString()) } } override suspend fun on(orderSubmittedEvent: OrderSubmittedEvent): Unit = coroutineScopeWithObservation(ON_ORDER_SUBMITTED_EVENT, or) { observation -> val order = orderMongoRepository.getByID(orderSubmittedEvent.orderId) validateVersion(order.id, order.version, orderSubmittedEvent.version) order.submit() order.version = orderSubmittedEvent.version orderMongoRepository.update(order).also { log.info("orderSubmittedEvent updatedOrder: $it") observation.highCardinalityKeyValue("order", it.toString()) } } override suspend fun on(orderCompletedEvent: OrderCompletedEvent): Unit = coroutineScopeWithObservation(ON_ORDER_COMPLETED_EVENT, or) { observation -> val order = orderMongoRepository.getByID(orderCompletedEvent.orderId) validateVersion(order.id, order.version, orderCompletedEvent.version) order.complete() order.version = orderCompletedEvent.version orderMongoRepository.update(order).also { log.info("orderCompletedEvent updatedOrder: $it") observation.highCardinalityKeyValue("order", it.toString()) } } private fun validateVersion(id: Any, currentDomainVersion: Long, eventVersion: Long) { log.info("validating version for id: $id, currentDomainVersion: $currentDomainVersion, eventVersion: $eventVersion") if (currentDomainVersion >= eventVersion) { log.warn("currentDomainVersion >= eventVersion validating version for id: $id, currentDomainVersion: $currentDomainVersion, eventVersion: $eventVersion") throw AlreadyProcessedVersionException(id, eventVersion) } if ((currentDomainVersion + 1) < eventVersion) { log.warn("currentDomainVersion + 1) < eventVersion validating version for id: $id, currentDomainVersion: $currentDomainVersion, eventVersion: $eventVersion") throw InvalidVersionException(eventVersion) } } } The MongoDB repository code is quite simple: Kotlin interface OrderMongoRepository { suspend fun insert(order: Order): Order suspend fun update(order: Order): Order suspend fun getByID(id: String): Order suspend fun getAllOrders(pageable: Pageable): Page<Order> } @Repository class OrderMongoRepositoryImpl( private val mongoTemplate: ReactiveMongoTemplate, private val or: ObservationRegistry, ) : OrderMongoRepository { override suspend fun insert(order: Order): Order = coroutineScopeWithObservation(INSERT, or) { observation -> withContext(Dispatchers.IO) { mongoTemplate.insert(OrderDocument.of(order)).awaitSingle().toOrder() .also { log.info("inserted order: $it") } .also { observation.highCardinalityKeyValue("order", it.toString()) } } } override suspend fun update(order: Order): Order = coroutineScopeWithObservation(UPDATE, or) { observation -> withContext(Dispatchers.IO) { val query = Query.query(Criteria.where(ID).`is`(order.id).and(VERSION).`is`(order.version - 1)) val update = Update() .set(EMAIL, order.email) .set(ADDRESS, order.address) .set(STATUS, order.status) .set(VERSION, order.version) .set(PAYMENT_ID, order.paymentId) .set(PRODUCT_ITEMS, order.productsList()) val options = FindAndModifyOptions.options().returnNew(true).upsert(false) val updatedOrderDocument = mongoTemplate.findAndModify(query, update, options, OrderDocument::class.java) .awaitSingleOrNull() ?: throw OrderNotFoundException(order.id.toUUID()) observation.highCardinalityKeyValue("order", updatedOrderDocument.toString()) updatedOrderDocument.toOrder().also { orderDocument -> log.info("updated order: $orderDocument") } } } override suspend fun getByID(id: String): Order = coroutineScopeWithObservation(GET_BY_ID, or) { observation -> withContext(Dispatchers.IO) { mongoTemplate.findById(id, OrderDocument::class.java).awaitSingle().toOrder() .also { log.info("found order: $it") } .also { observation.highCardinalityKeyValue("order", it.toString()) } } } override suspend fun getAllOrders(pageable: Pageable): Page<Order> = coroutineScopeWithObservation(GET_ALL_ORDERS, or) { observation -> withContext(Dispatchers.IO) { val query = Query().with(pageable) val data = async { mongoTemplate.find(query, OrderDocument::class.java).collectList().awaitSingle() }.await() val count = async { mongoTemplate.count(Query(), OrderDocument::class.java).awaitSingle() }.await() PageableExecutionUtils.getPage(data.map { it.toOrder() }, pageable) { count } .also { observation.highCardinalityKeyValue("pageResult", it.pageable.toString()) } } } } More details and source code of the full project you can find here in the GitHub repository. In real-world applications, we have to implement many more necessary features, like k8s health checks, rate limiters, etc. Depending on the project, it can be implemented in different ways. For example, you can use Kubernetes and Istio for some of them. I hope this article is useful and helpful, and am happy to receive any feedback or questions. Feel free to contact me by email or any messengers :)
To get more clarity about ISR in Apache Kafka, we should first carefully examine the replication process in the Kafka broker. In short, replication means having multiple copies of our data spread across multiple brokers. Maintaining the same copies of data in different brokers makes possible the high availability in case one or more brokers go down or are untraceable in a multi-node Kafka cluster to server the requests. Because of this reason, it is mandatory to mention how many copies of data we want to maintain in the multi-node Kafka cluster while creating a topic. It is termed a replication factor, and that’s why it can’t be more than one while creating a topic on a single-node Kafka cluster. The number of replicas specified while creating a topic can be changed in the future based on node availability in the cluster. On a single-node Kafka cluster, however, we can have more than one partition in the broker because each topic can have one or more partitions. The Partitions are nothing but sub-divisions of the topic into multiple parts across all the brokers on the cluster, and each partition would hold the actual data(messages). Internally, each partition is a single log file upon which records are written in an append-only fashion. Based on the provided number, the topic internally split into the number of partitions at the time of creation. Thanks to partitioning, messages can be distributed in parallel among several brokers in the cluster. Kafka scales to accommodate several consumers and producers at once by employing this parallelism technique. This partitioning technique enables linear scaling for both consumers and providers. Even though more partitions in a Kafka cluster provide a higher throughput but with more partitions, there are pitfalls too. Briefly, more file handlers would be created if we increase the number of partitions as each partition maps to a directory in the file system in the broker. Now it would be easy for us to understand better the ISR as we have discussed replication and partitions of Apache Kafka above. The ISR is just a partition’s replicas that are “in sync” with the leader, and the leader is nothing but a replica that all requests from clients and other brokers of Kafka go to it. Other replicas that are not the leader are termed followers. A follower that is in sync with the leader is called an ISR (in-sync replica). For example, if we set the topic’s replication factor to 3, Kafka will store the topic-partition log in three different places and will only consider a record to be committed once all three of these replicas have verified that they have written the record to the disc successfully and eventually send back the acknowledgment to the leader. In a multi-broker (multi-node) Kafka cluster (please click here to read how a multi-node Kafka cluster can be created), one broker is selected as the leader to serve the other brokers, and this leader broker would be responsible to handle all the read and write requests for a partition while the followers (other brokers) passively replicate the leader to achieve the data consistency. Each partition can only have one leader at a time and handles all reads and writes of records for that partition. The Followers replicate leaders and take over if the leader dies. By leveraging Apache Zookeeper, Kafka internally selects the replica of one broker’s partition, and if the leader of that partition fails (due to an outage of that broker), Kafka chooses a new ISR (in-sync replica) as the new leader. When all of the ISRs for a partition write to their log, the record is said to have been “committed,” and the consumer can only read committed records. The minimum in-sync replica count specifies the minimum number of replicas that must be present for the producer to successfully send records to a partition. Even though the high number of minimum in-sync replicas gives a higher persistence but there might be a repulsive effect, too, in terms of availability. The data availability automatically gets reduced if the minimum number of in-sync replicas won’t be available before publishing. The minimum number of in-sync replicas indicates how many replicas must be available for the producer to send records to a partition successfully. For example, if we have a three-node operational Kafka cluster with minimum in-sync replicas configuration as three, and subsequently, if one node goes down or unreachable, then the rest other two nodes will not be able to receive any data/messages from the producers because of only two active/available in sync replicas across the brokers. The third replica, which existed on the dead or unavailable broker, won’t be able to send the acknowledgment to the leader that it was synced with the latest data like how the other two live replicas did on the available brokers in the cluster. Hope you have enjoyed this read. Please like and share if you feel this composition is valuable.
How to Supplement SharePoint Site Drive Security With Java Code Examples
June 7, 2023 by CORE
Microservices With Apache Camel and Quarkus (Part 3)
June 7, 2023 by CORE
Explainable AI: Making the Black Box Transparent
May 16, 2023 by
How to Supplement SharePoint Site Drive Security With Java Code Examples
June 7, 2023 by CORE
Microservices With Apache Camel and Quarkus (Part 3)
June 7, 2023 by CORE
Low Code vs. Traditional Development: A Comprehensive Comparison
May 16, 2023 by
How to Supplement SharePoint Site Drive Security With Java Code Examples
June 7, 2023 by CORE
Microservices With Apache Camel and Quarkus (Part 3)
June 7, 2023 by CORE
Software Development: Best Practices and Methods
June 7, 2023 by
Microservices With Apache Camel and Quarkus (Part 3)
June 7, 2023 by CORE
Low Code vs. Traditional Development: A Comprehensive Comparison
May 16, 2023 by
How to Supplement SharePoint Site Drive Security With Java Code Examples
June 7, 2023 by CORE
Microservices With Apache Camel and Quarkus (Part 3)
June 7, 2023 by CORE
Five IntelliJ Idea Plugins That Will Change the Way You Code
May 15, 2023 by