Big data comprises datasets that are massive, varied, complex, and can't be handled traditionally. Big data can include both structured and unstructured data, and it is often stored in data lakes or data warehouses. As organizations grow, big data becomes increasingly more crucial for gathering business insights and analytics. The Big Data Zone contains the resources you need for understanding data storage, data modeling, ELT, ETL, and more.
Logs often take up the majority of a company's data assets. Examples of logs include business logs (such as user activity logs) and Operation and Maintenance logs of servers, databases, and network or IoT devices. Logs are the guardian angel of business. On the one hand, they provide system risk alerts and help engineers quickly locate root causes in troubleshooting. On the other hand, if you zoom them out by time range, you might identify some helpful trends and patterns, not to mention that business logs are the cornerstone of user insights. However, logs can be a handful because: They flow in like crazy. Every system event or click from the user generates a log. A company often produces tens of billions of new logs per day. They are bulky. Logs are supposed to stay. They might not be useful until they are. So a company can accumulate up to PBs of log data, many of which are seldom visited but take up huge storage space. They must be quick to load and find. Locating the target log for troubleshooting is literally like looking for a needle in a haystack. People long for real-time log writing and real-time responses to log queries. Now you can see a clear picture of what an ideal log processing system is like. It should support the following: High-throughput real-time data ingestion: It should be able to write blogs in bulk and make them visible immediately. Low-cost storage: It should be able to store substantial amounts of logs without costing too many resources. Real-time text search: It should be capable of quick text search. Common Solutions: Elasticsearch and Grafana Loki There exist two common log processing solutions within the industry, exemplified by Elasticsearch and Grafana Loki, respectively. Inverted index (Elasticsearch): It is well-embraced due to its support for full-text search and high performance. The downside is the low throughput in real-time writing and the huge resource consumption in index creation. Lightweight index / no index (Grafana Loki): It is the opposite of an inverted index because it boasts high real-time write throughput and low storage cost but delivers slow queries. Introduction to Inverted Index A prominent strength of Elasticsearch in log processing is quick keyword search among a sea of logs. This is enabled by inverted indexes. Inverted indexing was originally used to retrieve words or phrases in texts. The figure below illustrates how it works: Upon data writing, the system tokenizes texts into terms and stores these terms in a posting list which maps terms to the ID of the row where they exist. In text queries, the database finds the corresponding row ID of the keyword (term) in the posting list and fetches the target row based on the row ID. By doing so, the system won't have to traverse the whole dataset and thus improves query speeds by orders of magnitudes. In the inverted indexing of Elasticsearch, quick retrieval comes at the cost of writing speed, writing throughput, and storage space. Why? Firstly, tokenization, dictionary sorting, and inverted index creation are all CPU- and memory-intensive operations. Secondly, Elasticssearch has to store the original data, the inverted index, and an extra copy of data stored in columns for query acceleration. That's triple redundancy. But without an inverted index, Grafana Loki, for example, is hurting user experience with its slow queries, which is the biggest pain point for engineers in log analysis. Simply put, Elasticsearch and Grafana Loki represent different tradeoffs between high writing throughput, low storage cost, and fast query performance. What if I tell you there is a way to have them all? We have introduced inverted indexes in Apache Doris 2.0.0 and further optimized it to realize two times faster log query performance than Elasticsearch with 1/5 of the storage space it uses. Both factors combined, it is a 10 times better solution. Inverted Index in Apache Doris Generally, there are two ways to implement indexes: external indexing system or built-in indexes. External indexing system: You connect an external indexing system to your database. In data ingestion, data is imported to both systems. After the indexing system creates indexes, it deletes the original data within itself. When data users input a query, the indexing system provides the IDs of the relevant data, and then the database looks up the target data based on the IDs. Building an external indexing system is easier and less intrusive to the database, but it comes with some annoying flaws: The need to write data into two systems can result in data inconsistency and storage redundancy. Interaction between the database and the indexing system brings overheads, so when the target data is huge, the query across the two systems can be slow. It is exhausting to maintain two systems. In Apache Doris, we opt for the other way. Built-in inverted indexes are more difficult to make, but once it is done, it is faster, more user-friendly, and trouble-free to maintain. In Apache Doris, data is arranged in the following format. Indexes are stored in the Index Region: We implement inverted indexes in a non-intrusive manner: Data ingestion and compaction: As a segment file is written into Doris, an inverted index file will be written, too. The index file path is determined by the segment ID and the index ID. Rows in segments correspond to the docs in indexes, so are the RowID and the DocID. Query: If the where clause includes a column with inverted index, the system will look up in the index file, return a DocID list, and convert the DocID list into a RowID Bitmap. Under the RowID filtering mechanism of Apache Doris, only the target rows will be read. This is how queries are accelerated. Such a non-intrusive method separates the index file from the data files, so you can make any changes to the inverted indexes without worrying about affecting the data files themselves or other indexes. Optimizations for Inverted Index General Optimizations C++ Implementation and Vectorization Different from Elasticsearch, which uses Java, Apache Doris implements C++ in its storage modules, query execution engine, and inverted indexes. Compared to Java, C++ provides better performance, allows easier vectorization, and produces no JVM GC overheads. We have vectorized every step of inverted indexing in Apache Doris, such as tokenization, index creation, and queries. To provide you with a perspective, in inverted indexing, Apache Doris writes data at a speed of 20MB/s per core, which is four times that of Elasticsearch (5MB/s). Columnar Storage and Compression Apache Lucene lays the foundation for inverted indexes in Elasticsearch. As Lucene itself is built to support file storage, it stores data in a row-oriented format. In Apache Doris, inverted indexes for different columns are isolated from each other, and the inverted index files adopt columnar storage to facilitate vectorization and data compression. By utilizing Zstandard compression, Apache Doris realizes a compression ratio ranging from 5:1 to 10:1, faster compression speeds, and 50% less space usage than GZIP compression. BKD Trees for Numeric / Datetime Columns Apache Doris implements BKD trees for numeric and datetime columns. This not only increases the performance of range queries but is a more space-saving method than converting those columns to fixed-length strings. Other benefits of it include: Efficient range queries: It is able to quickly locate the target data range in numeric and datetime columns. Less storage space: It aggregates and compresses adjacent data blocks to reduce storage costs. Support for multi-dimensional data: BKD trees are scalable and adaptive to multi-dimensional data types, such as GEO points and ranges. In addition to BKD trees, we have further optimized the queries on numeric and datetime columns. Optimization for low-cardinality scenarios: We have fine-tuned the compression algorithm for low-cardinality scenarios, so decompressing and de-serializing large amounts of inverted lists will consume fewer CPU resources. Pre-fetching: For high-hit-rate scenarios, we adopt pre-fetching. If the hit rate exceeds a certain threshold, Doris will skip the indexing process and start data filtering. Tailored Optimizations to OLAP Usually, log analysis is a simple kind of query with no need for advanced features (e.g., relevance scoring in Apache Lucene). The bread and butter capability of a log processing tool is quick queries and low storage costs. Therefore, in Apache Doris, we have streamlined the inverted index structure to meet the needs of an OLAP database. In data ingestion, we prevent multiple threads from writing data into the same index and thus avoid overheads brought by lock contention. We discard forward index files and Norm files to clear storage space and reduce I/O overheads. We simplify the computation logic of relevance scoring and ranking to further reduce overheads and increase performance. In light of the fact that logs are partitioned by time range and historical logs are visited less frequently, we plan to provide more granular and flexible index management in future versions of Apache Doris: Create an inverted index for a specified data partition: create an index for logs of the past seven days, etc. Delete inverted index for a specified data partition: delete index for logs from over one month ago, etc. (so as to clear out index space). Benchmarking We tested Apache Doris on publicly available datasets against Elasticsearch and ClickHouse. For a fair comparison, we ensure uniformity of testing conditions, including benchmarking tool, datasets, and hardware. Apache Doris vs. Elasticsearch Benchmarking tool: ES Rally, the official testing tool for Elasticsearch Dataset: 1998 World Cup HTTP Server Logs (self-contained dataset in ES Rally) Data Size (Before Compression): 32G, 247 million rows, 134 bytes per row (on average) Query: 11 queries, including keyword search, range query, aggregation, and ranking; Each query is serially executed 100 times. Environment: 3 × 16C 64G cloud virtual machines Results of Apache Doris: Writing Speed: 550 MB/s, 4.2 times that of Elasticsearch Compression Ratio: 10:1 Storage Usage: 20% that of Elasticsearch Response Time: 43% that of Elasticsearch Apache Doris vs. ClickHouse As ClickHouse launched an inverted index as an experimental feature in v23.1, we tested Apache Doris with the same dataset and SQL as described in the ClickHouse blog and compared the performance of the two under the same testing resource, case, and tool. Data: 6.7G, 28.73 million rows, the Hacker News dataset, Parquet format Query: 3 keyword searches, counting a number of occurrences of the keywords "ClickHouse," "OLAP," OR "OLTP," and "avx" AND "sve". Environment: 1 × 16C 64G cloud virtual machine Result: Apache Doris was 4.7 times, 18.5 times, and 12 times faster than ClickHouse in the three queries, respectively. Usage and Example Dataset: one million comment records from Hacker News Step 1: Specify the inverted index to the data table upon table creation. Parameters: INDEX idx_comment (comment): create an index named "idx_comment" comment for the "comment" column USING INVERTED: specify inverted index for the table PROPERTIES("parser" = "english"): specify the tokenization language to English SQL CREATE TABLE hackernews_1m ( `id` BIGINT, `deleted` TINYINT, `type` String, `author` String, `timestamp` DateTimeV2, `comment` String, `dead` TINYINT, `parent` BIGINT, `poll` BIGINT, `children` Array<BIGINT>, `url` String, `score` INT, `title` String, `parts` Array<INT>, `descendants` INT, INDEX idx_comment (`comment`) USING INVERTED PROPERTIES("parser" = "english") COMMENT 'inverted index for comment' ) DUPLICATE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 10 PROPERTIES ("replication_num" = "1"); (Note: You can add an index to an existing table via ADD INDEX idx_comment ON hackernews_1m(comment) USING INVERTED PROPERTIES("parser" = "english"). Different from that of the smart index and secondary index, the creation of an inverted index only involves the reading of the comment column, so it can be much faster.) Step 2: Retrieve the words"OLAP" and "OLTP" in the comment column with MATCH_ALL. The response time here was 1/10 of that in hard matching with like. (The performance gap widens as data volume increases.) mysql> SELECT count() FROM hackernews_1m WHERE comment LIKE '%OLAP%' AND comment LIKE '%OLTP%'; +---------+ | count() | +---------+ | 15 | +---------+ 1 row in set (0.13 sec) mysql> SELECT count() FROM hackernews_1m WHERE comment MATCH_ALL 'OLAP OLTP'; +---------+ | count() | +---------+ | 15 | +---------+ 1 row in set (0.01 sec) For more feature introduction and usage guide, see documentation: Inverted Index Wrap-up In a word, what contributes to Apache Doris' 10-time higher cost-effectiveness than Elasticsearch is its OLAP-tailored optimizations for inverted indexing, supported by the columnar storage engine, massively parallel processing framework, vectorized query engine, and cost-based optimizer of Apache Doris. As proud as we are about our own inverted indexing solution, we understand that self-published benchmarks can be controversial, so we are open to feedback from any third-party testers and see how Apache Doris works in real-world cases.
Today, in our modern developer world, it is absolutely impossible to imagine life without such technologies as React, Node JS, GraphQL, and so on. They have solid ranks and are holding leading positions in data delivery. 70% of the cases I come across are projects that are integrated with GraphQL or are about to migrate to it. More and more companies prefer to use the GraphQL data query syntax, and today it is a piece of must-have knowledge. GraphQL is a query-typed language for API which is widely used for requesting data from the server side to the client side in optimized mater. Clients request exactly what they need using typed schema. It allows you to send only what was requested instead of a fixed dataset. Apollo Server gives you tools for sending responses to client requests. Apollo Client gives the ability to use GraphQL API, including cache and linking. What Is It about? We gonna create two Apollo Servers, which going to handle the GraphQL schema merge. It’s a situation when some external server responds to GraphQL API and some other service uses its own GraphQL schema, including external schema. On the Node layer, we going to wrap results up from the external server in one schema and send it to the client. Literally, we gonna just merge two schemas into one and send it to the client. Let’s Dive Into the Code For the implementation, we going to use NodeJS environment, Koa middleware, and Apollo Server with GraphQL Tools. We have to run two servers. Both have to have a GraphQL Apollo Server. Here is the diagram. Time to create boilerplates and run them both. For that, we need to create two folders and name one folder something like this: boilerplate-raphql-koa-server-external and the second folder just like this: boilerplate-graphql-koa-server Before starting, please take a look at the folder structure in both projects. Pretty straightforward. The difference between those two repos is going to be in the code. Plain Text ├── package.json └── src ├── index.js ├── resolvers.js └── schema.js External GraphQL Server Now, let’s set up the boilerplate-graphql-koa-server-external JSON { "name": "boilerplate-graphql-koa-server-external", "version": "1.0.0", "description": "Boilerplate GraphQL Koa server external", "main": "src/index.js", "scripts": { "start": "PORT=4000 node src/index.js" }, "engines": { "node": "16.17.x" }, "dependencies": { "@graphql-tools/schema": "^9.0.2", "@koa/cors": "^3.4.1", "apollo-server-core": "^3.10.2", "apollo-server-koa": "^3.10.2", "graphql": "^15.8.0", "koa": "^2.13.4", "koa-graphql": "^0.12.0" } } Then let’s create the server itself. In the src folder in theindex.js add server setup: JavaScript const Koa = require('koa'); const http = require('http'); const cors = require('@koa/cors'); const { ApolloServer } = require('apollo-server-koa'); const { makeExecutableSchema } = require('@graphql-tools/schema'); const typeDefs = require('./schema'); const resolvers = require('./resolvers'); async function server({ typeDefs, resolvers }) { const app = new Koa(); const httpServer = http.createServer(); const apolloServer = new ApolloServer({ introspection: true, schema: makeExecutableSchema({ typeDefs, resolvers, }), }); await apolloServer.start(); apolloServer.applyMiddleware({ app, path: '/api/v1/graphql' }); httpServer.on('request', app.callback()); await new Promise(resolve => httpServer.listen({ port: process.env.PORT }, resolve)); console.log( `External Server ready at http://localhost:${process.env.PORT}${apolloServer.graphqlPath}` ); return { apolloServer, app }; } server({ typeDefs, resolvers }).then(({ app }) => { app.use(cors()); }); The async function serverwill take care of the Koa app itself, and we are going to create the Apollo server with an executable schema where we have to provide types from schema and resolvers. From the official docs, we must call apopServer.start() in advance before apolloServer.applyMiddleware . It allows for identifying potential issues and taking action in the case of crushing the process in Apollo Server startup instead to start serving requests. The second part is the boilerplate-graphql-koa-server-externallet's set up schema and resolvers. JavaScript const { gql } = require('apollo-server-koa'); module.exports = gql` type Query { getItemsExternal: [DataExternalExample] } type DataExternalExample { id: ID label: String } type Mutation { updateDataExternal(label: String!): DataExternalExample! } `; Resolvers for the schema. JavaScript const fakeData = { id: 223421, label: 'Some Label From External', }; module.exports = { Query: { getItemsExternal: () => [fakeData], }, Mutation: { updateDataExternal: (_, { label }) => { return { ...fakeData, label, }; }, }, }; Now it’s time to check the server responses. Before that, don’t forget to install the following packages: npm i and then run the command npm run start and put in the Chrome browser the URL: http://localhost:4000/api/v1/graphql. Click on the button “Query your server,” and you can get the interface of Apollo GraphQL. It allows you to see the requested schema from the server. Open the Introspection Schema page. You will see there our schema: If you were able to introspect the schema, then that means we are done with our boilerplate-graphql-koa-server-external GraphQL Server for Merging Schemas Let’s move now to boilerplate-graphql-koa-server setups. Almost everything is the same in package.json from external but with additional packages and different PORT , name, and description. JSON { "name": "boilerplate-graphql-koa-server", "description": "Boilerplate GraphQL Koa server", "scripts": { "start": "PORT=3000 node src/index.js" }, "dependencies": { "@graphql-tools/load": "^7.7.5", "@graphql-tools/url-loader": "^7.14.1", } } Let’s setup right away the new schema. There is pretty much the same but a bit different data in the schema. JavaScript const { gql } = require('apollo-server-koa'); module.exports = gql` type Query { getFakeDataExample: DataExample } type DataExample { id: ID value: String } type Mutation { updateFakeData(value: String!): DataExample! } `; And resolvers: JavaScript const fakeData = { id: 4838745, value: 'Some Random String', }; module.exports = { Query: { getFakeDataExample: () => fakeData, }, Mutation: { updateFakeData: (_, { value }) => { return { ...fakeData, value, }; }, }, }; And now, let’s take a look at the server file. You can find out that it’s relatively the same except few lines of code. First of all, we took the loadSchema in order to get the external schema by request from EXTERNAL_ENDPOINT which is our first launched server and the loader for the schema UrlLoader . The most important that we have to be sure that our schema has been loaded and the external server doesn’t throw any errors. We have to catch that situation. As you can see in the code, we got just an array of schemas. By default, we have only our own internalSchema and then, if an external server is available, we are pushing to that array externalSchema and then use the tool mergeSchemas which helps to provide merged schema right to the ApolloServer JavaScript const Koa = require('koa'); const http = require('http'); const cors = require('@koa/cors'); const { ApolloServer } = require('apollo-server-koa'); const { loadSchema } = require('@graphql-tools/load'); const { UrlLoader } = require('@graphql-tools/url-loader'); const { makeExecutableSchema, mergeSchemas } = require('@graphql-tools/schema'); const typeDefs = require('./schema'); const resolvers = require('./resolvers'); const EXTERNAL_ENDPOINT = 'http://localhost:4000/api/v1/graphql'; async function server({ typeDefs, resolvers }) { const app = new Koa(); const httpServer = http.createServer(); const internalSchema = makeExecutableSchema({ typeDefs, resolvers, }); const schemas = [internalSchema]; try { const externalSchema = await loadSchema(EXTERNAL_ENDPOINT, { loaders: [new UrlLoader()], }); schemas.push(externalSchema); } catch { console.warn('⚠️️ External Schema has not been loaded'); } const apolloServer = new ApolloServer({ introspection: true, schema: mergeSchemas({ schemas, }), }); await apolloServer.start(); apolloServer.applyMiddleware({ app, path: '/api/v1/graphql' }); httpServer.on('request', app.callback()); await new Promise(resolve => httpServer.listen({ port: process.env.PORT }, resolve)); console.log(`Server ready at http://localhost:${process.env.PORT}${apolloServer.graphqlPath}`); return { apolloServer, app }; } server({ typeDefs, resolvers }).then(({ app }) => { app.use(cors()); }); Install all packages and run the server, which will be available on the PORT=3000 . Let’s go to the same interface of Apollo GraphQL, but the URL has to be with the proper PORT: http://localhost:3000/api/v1/graphql . Now if we open the Introspection Schema page, we gonna able to see merged schemas. One from external and another one from the last created server. Keep in mind that if some of your servers will get the same Field, the GraphQL server will rise the error something like this: Plain Text Error: Unable to merge GraphQL type “Query”: Field “getFakeDataExample” already defined with a different type. Declared as “DataExample”, but you tried to override with “DataExternalExample” It means that we have to be very careful in a GraphQL schema with our Fields and Type definitions in order to not get into an awkward situation when the Type or Field already exists. Conclusion Numerous organizations are adopting a microservice architecture and attempting to isolate the data logic flow. The approach outlined above is particularly useful in situations where microservices communicate with each other within a company. Specifically, when there is a primary global service with a default schema and a secondary microservice with extra fields that may be used by the client in the future, this method allows developers to manage and scale their microservices more efficiently, thereby increasing the overall performance and agility of the system. GitHub Repos https://github.com/antonkalik/boilerplate-graphql-koa-serverhttps://github.com/antonkalik/boilerplate-graphql-koa-server-external
A message broker has very different characteristics and use cases than a data streaming platform like Apache Kafka. A business process requires more than just sending data in real time from a data source to a data sink. Data integration, processing, governance, and security must be reliable and scalable end-to-end across the business process. This blog post explores the capabilities of message brokers, the relation to the JMS standard, trade-offs compared to data streaming with Apache Kafka, and typical integration and migration scenarios. A case study explores the migration from IBM MQ to Apache Kafka. The last section contains a complete slide deck that covers all these aspects in more detail. Message Broker vs. Apache Kafka -> Apples and Oranges TL;DR: Message brokers send data from a data source to one or more data sinks in real-time. Data streaming provides the same capability but adds long-term storage, integration, and processing capabilities. Most message brokers implement the JMS (Java Message Service) standard. Most commercial messaging solutions add additional proprietary features. Data streaming has no standard. But the de facto standard is Apache Kafka. I already did a detailed comparison of (JMS-based) message brokers and Apache Kafka based on the following characteristics: Message broker vs. data streaming platform API Specification vs. open-source protocol implementation Transactional vs. analytical workloads Storage for durability vs. true decoupling Push vs. pull message consumption Simple vs. powerful and complex API Server-side vs. client-side data-processing Complex operations vs. serverless cloud Java/JVM vs. any programming language Single deployment vs. multi-region I will not explore the trade-offs again. Just check out the other blog post. I also already covered implementing event-driven design patterns from messaging solutions with data streaming and Apache Kafka. The below slide deck covers all these topics in much more detail. Before that, I want to highlight related themes in its sections: Case study: How the retailer Advance Auto Parts modernized its enterprise architecture Integration between a message broker (like IBM MQ) and Apache Kafka Mainframe deployment options for data streaming Migration from a message broker to data streaming Case Study: It Modernization From IBM MQ and Mainframe to Kafka and Confluent Cloud Advance Auto Parts is North America's largest automotive aftermarket parts provider. The retailer ensures the right parts from its 30,000 vendors are in stock for professional installers and do-it-yourself customers across all of its 5,200 stores. Advance Auto Parts implement data streaming and stream processing use cases that deliver immediate business value, including: Real-time invoicing for a large commercial supplier Dynamic pricing updates by store and location Modernization of the company’s business-critical merchandising system Here are a few details about Advance Auto Parts' success story: Started small with Confluent Cloud and scaled as needed while minimizing administrative overhead with a fully managed, cloud-native Kafka service Integrated data from more than a dozen different sources, including IBM Mainframe and IBM MQ through Kafka to the company’s ERP supply chain platform Used stream processing and ksqlDB to filter events for specific customers and to enrich product data Linked Kafka topics with Amazon S3 and Snowflake using Confluent Cloud sink connectors Set the stage for continued improvements in operational efficiency and customer experience for years to come Integration Architecture Before and After at Advance Auto Parts BEFORE — Legacy infrastructure and architecture, including IBM MQ and IBM AS/400 Mainframe, did not meet real-time performance requirements: AFTER — Future-proof data architecture, including Kafka-powered Confluent Cloud, for real-time invoicing and dynamic pricing: Integration Between a Message Broker and Apache Kafka The most common scenario for message brokers and data streaming is a combination of both. These two technologies are built for very different purposes. Point-to-point application integration (at a limited scale) is easy to build with a message broker. Systems are tightly coupled and combine messaging with web services and databases. This is how most spaghetti architectures originated. A data streaming platform usually has various data sources (including message brokers) and independent downstream consumers. Hence, data streaming is a much more strategic platform in enterprise architecture. It enables a clean separation of concerns and decentralized decoupling between domains and business units. Integrating messaging solutions like IBM MQ, TIBCO EMS, RabbitMQ, ActiveMQ, or Solace and data streaming platforms around Apache Kafka leverage Kafka Connect as integration middleware. Connectors come from data streaming vendors like Confluent. Or from the messaging provider. For instance, IBM also provides Kafka connectors for IBM MQ. The integration is straightforward and enables uni- or bi-directional communication. An enormous benefit of using Kafka Connect instead of another ETL tool, or ESB is that you don’t need another middleware for the integration. Mainframe Integration With IBM MQ and Kafka Connect The mainframe is a unique infrastructure using the IBM MQ message broker. Cost, connectivity, and security look different from other traditional enterprise components. The most common approaches for the integration between mainframe and data streaming are Change Data Capture (CDC) from the IBM DB2 database (e.g., via IBM IIDR), direct VSAM file integration (e.g., via Precisely), or connectivity via IBM MQ. Publishing messages from IBM MQ to Kafka improves data reliability, accessibility, and offloading to cloud services. This integration requires no changes to the existing mainframe applications. But it dramatically reduces MQ-related MIPS to move data off the Mainframe. However, it raises an interesting question: Where should Kafka Connect be deployed? On the mainframe or in the traditional IT environment? At Confluent, we support deploying Kafka Connect and the IBM MQ connectors on the mainframe, specifically on zLinux / the System z Integrated Information Processor (zIIP). This shows vast benefits for the customer, like 10x better performance and MQ MIPS cost reductions of up to 90%. Migration From a Message Broker to Data Streaming Because of Cost or Scalability Issues Migration can mean two things: Completely migrating away from the existing messaging infrastructure, including client and server-side Replacing message brokers (like TIBCO EMS or IBM MQ) because of licensing or scalability issues but keeping the JMS-based messaging applications running The first option is a big bang, which often includes a lot of effort and risk. If the main points are license costs or scalability problems, another alternative is viable with the Confluent Accelerator “JMS Bridge” to migrate from JMS to Kafka on the server side: The Confluent JMS Bridge enables the migration from JMS brokers to Apache Kafka (while keeping the JMS applications). Key features include: The JMS Bridge is built around a modified open-source JMS engine Supports full JMS 2.0 spec as well as Confluent/Kafka Publish and subscribe anywhere data is intended for end users JMS-specific message formats can be supported via Schema Registry (Avro) Existing JMS applications replace current JMS implementation jars with Confluent jars that embed an open-source JMS implementation and wrap it to the Kafka API Slide Deck and Video: Message Broker vs. Apache Kafka The attached slide deck explores the trade-offs, integration options, and migration scenarios for JMS-based message brokers and data streaming with Apache Kafka. Even if you use a message broker that is not JMS-based (like RabbitMQ), most of the aspects are still valid for comparison. And here is the on-demand video recording for the above slide deck. Data Streaming Is Much More Than Messaging! This blog post explored when to use a message broker or a data streaming platform, how to integrate both, and how to migrate from JMS-based messaging solutions to Apache Kafka. Advance Auto Parts is a great success story from the retail industry that solves the spaghetti architecture with a decentralized, fully managed data streaming platform. TL;DR: Use a JMS broker for simple, low-volume messaging from A to B. Apache Kafka is usually a data hub between many data sources and data sinks, enabling real-time data sharing for transactional and high-volume analytical workloads. Kafka is also used to build applications; it is more than just a producer and consumer API. The data integration and data processing capabilities of Kafka at any scale with true decoupling and event replayability are significant differences from JMS-based MQ systems. What message brokers do you use in your architecture? What is the mid-term strategy? Integration, migration, or just keeping the status quo? Let’s connect on LinkedIn and discuss it! Stay informed about new blog posts by subscribing to my newsletter.
Non-volatile memory access (NVM) is a technology that has gained significant attention over the past few years. It has brought a revolution in the memory storage industry by providing various benefits over traditional storage technologies. NVM refers to a type of memory that retains data even after the power has been turned off. This means that data stored in NVM is non-volatile and is suitable for use in a wide range of applications. In this article, we will explore the various forms of NVM, their advantages, limitations, and their potential applications. Forms of NVM NVM comes in various forms, including flash memory, magnetic RAM (MRAM), nanotube RAM (NRAM), resistive RAM (RRAM), and phase-change memory (PCM). Flash memory is the most commonly used form of NVM, and it is used in USB drives, digital cameras, and smartphones. Flash memory works by trapping electrons in a transistor’s floating gate. This makes it non-volatile as the information is retained when the power is off. The limitations of flash memory include wear-out and a limited number of write cycles. Magnetic RAM (MRAM) is another form of NVM. MRAM uses magnetic charges to store memory. It is fast, energy-efficient, and unlike traditional RAM, does not require a constant supply of power to maintain its contents. It is used in applications that require high-speed access to memory, such as in hard disk drives. MRAM has a much longer lifespan compared to flash memory and can withstand a higher number of write cycles. Nanotube RAM (NRAM) is a newer form of NVM technology that uses carbon nanotubes to store data. NRAM is predicted to have the speed of DRAM and the non-volatility of flash memory. It has been claimed that NRAM is capable of providing 100 times faster read-write speed than DRAM and flash memory while consuming less power. NRAM has been found to be less susceptible to wear and is more durable than flash memory. Resistive RAM (RRAM) is another new form of NVM. RRAM works by using the resistance of a material to represent the value of a bit. It has been found that RRAM has the potential to replace both DRAM and flash memory. One of the major advantages of RRAM over traditional memory technologies is its ability to store both data and code. This makes RRAM ideal for applications such as artificial intelligence and big data analytics. Phase-change memory (PCM) is another form of NVM. PCM stores data by altering the state of a material from an amorphous state to a crystalline state. This makes it possible for PCM to store data much more densely than flash memory. PCM is believed to be faster and more energy-efficient than flash memory, making it an attractive option for a range of applications, including wireless communication and high-performance computing. Advantages of NVM One of the main advantages of NVM is its speed. Unlike traditional disk-based storage, which relies on spinning disks and mechanical parts, NVM is much faster because it uses solid-state storage. This makes it ideal for use in applications that require fast access to large amounts of data, such as in real-time data analytics, financial trading, and scientific computing. Another advantage of NVM is its durability. Because it does not rely on moving parts, it is much more resistant to physical damage and wear and tear, making it ideal for use in rugged environments. Additionally, because it is non-volatile, data stored in NVM is less susceptible to corruption and loss due to power outages and other disruptions. NVM is also more energy-efficient than traditional storage technologies. Because it does not require constant power to maintain its contents, it can consume much less power than other types of memory. This makes it ideal for use in portable devices, such as smartphones and wearables, where battery life is a critical factor. Limitations of NVM Despite its many advantages, NVM has some limitations. One of the primary limitations of NVM is its cost. NVM is typically more expensive than traditional storage technologies. This has limited its adoption in some applications, particularly those where cost is a critical factor. Another limitation of NVM is its capacity. While NVM has the potential to store large amounts of data, it currently lags behind traditional storage technologies in terms of capacity. This means that in applications requiring vast amounts of storage space, traditional storage technologies may be more appropriate. Potential Applications of NVM NVM has the potential to revolutionize a wide range of industries, from healthcare and finance to transportation and entertainment. Because of its speed, durability, and non-volatile nature, it is well-suited for use in a range of applications. One potential application of NVM is in the healthcare industry. NVM could be used to store patient data securely and access it quickly, making it easier for doctors to make informed decisions about patient care. It could also be used for medical devices that require fast access to memory, such as ventilators and surgical robots. Another potential application of NVM is in the transportation industry. NVM could be used to store and process data in real-time for self-driving cars and smart traffic management systems. This would require fast access to data and the ability to store large amounts of data securely. In the entertainment industry, NVM could be used to store music, video, and other media on portable devices. Its durability would make it ideal for use in rugged environments, such as at concerts and festivals. Conclusion NVM is a technology that has the potential to transform the memory storage industry. Its speed, durability, and non-volatile nature make it well-suited for a range of applications, from healthcare and finance to transportation and entertainment. NVM comes in various forms, including flash memory, MRAM, NRAM, RRAM, and PCM. Each form of NVM has unique advantages and limitations, and its potential applications will rely on finding a balance between these factors. As NVM continues to develop and evolve, it is likely to become even more ubiquitous, providing faster, more energy-efficient, and more secure storage for a wide range of applications.
Today's data solutions can quickly become expensive. NetApp reports that organizations see 30% data growth every 12 months and risk seeing their solutions fail if this growth isn’t managed. Gartner echoes the concerns, stating that data solutions costs tend to be an afterthought, not addressed until they’re already a problem. If you're a developer that builds solutions for a data-driven organization, then you know that resource costs for data solutions can balloon quickly. So what can you do? In this article, we’ll look at: Why data solutions are expensive Techniques that developers can employ to improve the efficiency of their data solutions while also reducing costs How stream processing can play a major role in this improvement Through it all, we’ll work toward crafting an effective plan that leads to a more efficient data solution. Why Are Data Solutions Expensive? First, let’s look at the factors that contribute to the increasing costs of data solutions. You probably already face many, if not all, of these situations: Increasing volume of data: Data is being generated at an accelerated rate, and the cost of storing and managing that data has also increased. A myriad of data sources and their associated complexity: Disparate data sources make data integration a herculean task that leads to increased data quality checks and management costs. Frequently updated technology: Inevitable technological advancements lead to frequent upgrades in hardware and software, resulting in higher costs. Data security and privacy concerns: Not only do we have to worry about how to store and manage data, but also how to protect it from bad actors. Data privacy is critical, demanding hefty investments in encryption, security, and maintenance. Data consolidation: Many organizations hoard a tremendous amount of data without first thinking through what data is actually needed for long-term storage and analysis. This leads to unnecessary storage costs, incompatible systems, and inefficient and unscalable solutions. Lack of skilled data professionals: According to Bloomberg, the big data market will grow to $229.4 billion by 2025. With that growth comes the need for specialized—and expensive—skills, such as Apache Kafka, Apache Flink, Hadoop, Docker, containerization, DevOps, and others. How Do You Reduce Your Data Solution Costs? What can you do to help your organization keep these costs down? There are many strategies, including: Data compression and archiving (reduce data size and archive unused data) Data partitioning (make it easier and faster to process data) Data caching (keep frequently used data in high-speed storage) On-demand autoscaling Data governance measures (ensure accurate, complete, and well-managed data) Efficient data movement (improve routing from cloud and on-prem in order to speed insights) Cloud-agnostic deployment (reduce vendor lock-in, optimize costs) Using a multi-cloud approach Let’s add one more to this list, the solution that we’ll look at in more detail in this article: stream processing. What Is Stream Processing? Stream processing, as the name suggests, is a data management strategy that involves ingesting (and acting on) continuous data streams—such as the user’s clickstream journey, sensor data, or sentiment from a social media feed—in real-time. By using stream processing, developers can build data solutions with improved scalability and real-time processing for increased data quality. Stream processing solves many of the issues encountered with other data solutions, including rising costs, by giving you: Reduced storage costs: You handle the data in real-time. Real-time scalability: Stream processing systems can handle large-scale data using a distributed computing architecture. Preventing the downstream flow of bad data: Stream processing enables real-time data validation, transformation, and filtering at the source. Faster decision-making to identify high-impact opportunities: Real-time analysis gives you real-time analytics—and real-time insights. Team efficiency through automation: Automated workflows in stream processing platforms reduce complexity by abstracting the low-level details of data processing. This enables your data team to focus on data analytics, visualization, and reporting. Manage Stream Processing Effectively Stream processing is powerful and opens many use cases, but managing the implementation is no easy task! Let’s look at some challenges associated with implementing stream processing from scratch. Storage and computing resources: Anomalies in streaming data can be difficult and time-consuming to detect and address without specialized tools. Continuous delivery: Deploying updates to streaming applications can be challenging without disrupting the production version of the application. Database integration: Integrating data from different databases can be complex and time-consuming for developers. Handling schema management and scalability frequently becomes cumbersome when you're integrating databases. Migrating data from one database to another is costly and time-consuming, requiring additional infrastructure. Developer productivity: Developing streaming applications requires you to invest a significant amount of time in managing the infrastructure and configuration. Such efforts increase with the complex data processing logic, taking time away from actual application development. Stream processing when built from scratch Developers can circumvent all of these issues by using a stream processing framework, such as Turbine. A stream processing framework helps you to: Connect to an upstream resource to stream data in real-time Receive the data Process the data Send the data on to a destination. Here's an example write-up on what this looks like at a high level. Turbine supports data applications written in JavaScript, Go, Ruby, and Python. Let's look at an example of how a Go developer might interact with Turbine. The following code comes from this example application: Go func (a App) Run(v turbine.Turbine) error { source, err := v.Resources("demopg") if err != nil { return err } // a collection of records, which can't be inspected directly records, err := source.Records("user_activity", nil) if err != nil { return err } // second return is dead-letter queue result := v.Process(records, Anonymize{}) dest, err := v.Resources("s3") if err != nil { return err } err = dest.Write(result, "data-app-archive") if err != nil { return err } return nil } func (f Anonymize) Process(records []turbine.Record) []turbine.Record { for i, r := range records { hashedEmail := consistentHash(r.Payload.Get("email").(string)) err := r.Payload.Set("email", hashedEmail) if err != nil { log.Println("error setting value: ", err) break } records[i] = r } return records } func consistentHash(s string) string { h := md5.Sum([]byte(s)) return hex.EncodeToString(h[:]) } In the above code, we see the following steps in action: Create an upstream source (named source), from a PostgreSQL database (named demopg). Fetch records (from the user_activity table) from that upstream source. Call v.Process, which performs the stream processing. This process iterates through the list of records and overwrites the email of each record with an encoded hash. Create a downstream destination (named dest), using AWS S3. Write the resulting stream-processed records to the destination. As we can see, you only need a little bit of code for Turbine to process new records and stream them to the destination. Using a framework such as Turbine for stream processing brings several benefits, including: It reduces the need for storage and computing resources by using stream processing instead of batch processing. It provides a powerful mechanism to identify anomalous behavior from large volumes of streaming data, assisting with a quick fix from highly specialized real-time anomaly detection tools. Because continuous delivery is a big challenge for streaming applications, Turbine streaming data apps offer Feature Branch Deploys, allowing you to deploy the branch whenever it is ready—without impacting the production version of the application. It integrates any source database to any destination database by leveraging Change Data Capture (CDC) that receives real-time streams and publishes them downstream. Data transformation, processing logic, and orchestration are handled by the platform through the Turbine app. As a developer, you no longer need to worry about schema management or scalability issues. Ultimately, you can focus your time and effort on your core business needs. Turbine's code-first integration is aimed at helping you to focus on building applications, leading to faster development while also reducing the infrastructure typically needed to support stream processing. Conclusion Data-driven organizations are betting high on data assets to fuel their transformation journeys. As a developer, your mandate is to deliver better—and more cost-efficient—applications that can handle fast-growing datasets. Stream processing helps you to achieve this, bringing improved data quality and faster decision-making all while reducing storage costs and infrastructure complexity. With the right plan and the right stream processing platform to handle the data efficiently, you'll be well on your way!
What Is Elasticsearch? Elasticsearch is a highly scalable and distributed search and analytics engine that is built on top of the Apache Lucene search library. It is designed to handle large volumes of structured, semi-structured, and unstructured data, making it well-suited for a wide range of use cases, including search engines, log analysis, e-commerce, and security analytics. Elasticsearch uses a distributed architecture that allows it to store and process large volumes of data across multiple nodes in a cluster. Data is indexed and stored in shards, which are distributed across nodes for improved scalability and fault tolerance. Elasticsearch also supports real-time search and analytics, allowing users to query and analyze data in near real time. One of the key features of Elasticsearch is its powerful search capabilities. It supports a wide range of search queries, including full-text search, geospatial search, etc. It also provides support for advanced analytics features such as aggregations, metrics, and data visualization. Elasticsearch is often used in conjunction with other tools in the Elastic Stack, including Logstash for data collection and processing and Kibana for data visualization and analysis. Together, these tools provide a comprehensive solution for search and analytics that can be used for a wide range of applications and use cases. What Is Apache Lucene? Apache Lucene is an open-source search library that provides powerful text search and indexing capabilities. It is widely used by developers and organizations to build search applications, ranging from search engines to e-commerce platforms. Lucene works by indexing the text content of documents and storing the index in a structured format that can be searched efficiently. The index is composed of a series of inverted lists, which provide mappings between terms and the documents that contain them. When a search query is submitted, Lucene uses the index to quickly retrieve the documents that match the query. In addition to its core search and indexing capabilities, Lucene provides a range of advanced features, including support for fuzzy search and spatial search. It also provides tools for highlighting search results and ranking search results based on relevance. Lucene is used by a wide range of organizations and projects, including Elasticsearch. Its rich set of features, flexibility, and extensibility make it a popular choice for building search applications of all kinds. What Is Inverted Index? Lucene's Inverted Index is a data structure used to efficiently search and retrieve text data from a collection of documents. The Inverted Index is a central feature of Lucene, and it is used to store the terms and their associated documents that make up the index. The Inverted Index provides several benefits over other search strategies. First, it allows for fast and efficient retrieval of documents based on search terms. Second, it can handle a large amount of text data, making it well-suited for use cases with large collections of documents. Finally, it supports a wide range of advanced search features, such as fuzzy matching and stemming, that can improve the accuracy and relevance of search results. Why Elasticsearch? There are several reasons why Elasticsearch is a popular choice for building search and analytics applications: Easy to scale (Distributed): Elasticsearch is built to scale horizontally out of the box. Whenever you need to increase capacity, just add more nodes, and let the cluster reorganize itself to take advantage of the extra hardware. One server can hold one or more parts of one or more indexes, and whenever new nodes are introduced to the cluster, they are just being added to the party. Every such index, or part of it, is called a shard, and Elasticsearch shards can be moved around the cluster very easily. Everything is one JSON call away (RESTful API): Elasticsearch is API driven. Almost any action can be performed using a simple RESTful API using JSON over HTTP. Responses are always in JSON format. Unleashed power of Lucene under the hood: Elasticsearch uses Lucene internally to build its state-of-the-art distributed search and analytics capabilities. Since Lucene is a stable, proven technology and is continuously being added with more features and best practices, having Lucene as the underlying engine that powers Elasticsearch. Excellent Query DSL: The REST API exposes a very complex and capable query DSL that is very easy to use. Every query is just a JSON object that can practically contain any type of query or even several of them combined. Using filtered queries, with some queries expressed as Lucene filters, helps leverage caching and thus speed up common queries or complex queries with parts that can be reused. Multi-Tenancy: Multiple indexes can be stored on one Elasticsearch installation - node or cluster. The nice thing is you can query multiple indexes with one simple query. Support for advanced search features (Full Text): Elasticsearch uses Lucene under the covers to provide the most powerful full-text search capabilities available in any open-source product. The search comes with multi-language support, a powerful query language, support for geolocation, context-aware did-you-mean suggestions, autocomplete, and search snippets. Script support in filters and scorers. Configurable and Extensible: Many Elasticsearch configurations can be changed while Elasticsearch is running, but some will require a restart (and, in some cases, re-indexing). Most configurations can be changed using the REST API too. Document Oriented: Store complex real-world entities in Elasticsearch as structured JSON documents. All fields are indexed by default, and all the indices can be used in a single query to return results at breathtaking speed. Schema Free: Elasticsearch allows you to get started easily. Send a JSON document, and it will try to detect the data structure, index the data, and make it searchable. Conflict Management: Optimistic version control can be used where needed to ensure that data is never lost due to conflicting changes from multiple processes. Active Community: The community, other than creating nice tools and plugins, is very helpful and supportive. The overall vibe is great, and this is an important metric of any OSS project. There are also some books currently being written by community members and many blog posts around the net sharing experiences and knowledge. Elasticsearch Architecture The main components of Elasticsearch architecture are: Node: A node is an instance of Elasticsearch that stores data and provides search and indexing capabilities. Nodes can be configured to be either a master node or a data node, or both. Master nodes are responsible for cluster-wide management, while data nodes store the data and perform search operations. Cluster: A cluster is a group of one or more nodes working together to store and process data. A cluster can contain multiple indices (collections of documents) and shards (a way to distribute data across multiple nodes). Index: An index is a collection of documents that share a similar structure. Each document is represented as a JSON object and contains one or more fields. Elasticsearch indexes all fields by default, making it easy to search and analyze data. Shards: An index can be split into multiple shards, which are essentially smaller subsets of the index. Sharding allows for the parallel processing of data and distributed storage across multiple nodes. Replicas: Elasticsearch can create replicas of each shard to provide fault tolerance and high availability. Replicas are copies of the original shard and can be located on different nodes. Data Node Cluster Architecture Data nodes are responsible for storing and indexing data, as well as performing search and aggregation operations. The architecture is designed to be scalable and distributed, allowing for horizontal scaling by adding more nodes to the cluster. Here are the main components of an Elasticsearch data node cluster architecture: Data Node: A node is an instance of Elasticsearch that stores data and provides search and indexing capabilities. In a data node cluster, each node is responsible for storing a portion of the index data and serving search queries against that data. Cluster State: The cluster state is a data structure that holds information about the cluster, including the list of nodes, indices, shards, and their locations. The master node is responsible for maintaining the cluster state and distributing it to all other nodes in the cluster. Discovery and transport: Nodes in an Elasticsearch cluster communicate with each other using two protocols: discovery and transport. The discovery protocol is responsible for discovering new nodes joining the cluster or nodes that have left the cluster. The transport protocol is responsible for sending and receiving data between nodes. Index Request Index request is executed as below block diagram in Elasticsearch. Who Is Using Elasticsearch? Few companies and organizations that use Elasticsearch: Netflix: Netflix uses Elasticsearch to power its search and recommendations engine, allowing users to quickly find content to watch. GitHub: GitHub uses Elasticsearch to provide fast and efficient search capabilities across their code repositories, issues, and pull requests. Uber: Uber uses Elasticsearch to power their real-time analytics platform, allowing them to track and analyze data on its ride-hailing service in real-time. Wikipedia: Wikipedia uses Elasticsearch to power its search engine and provide fast and accurate search results to users.
Data synchronization is one of the most important aspects of any product. Apache Kafka is one of the most popular choices when designing a system that expects near-real-time propagation of large volumes of data. Even though Kafka has simple yet powerful semantics, working with it requires insight into its architecture. This article summarizes the most important design aspects of Kafka as a broker and applications that act as data producers or consumers. About Kafka Apache Kafka originated on LinkedIn and was developed as a highly scalable distribution system for telemetry and usage data. Over time, Kafka evolved into a general-purpose streaming data backbone that combines high throughput with low data delivery latencies. Internally, Kafka is a distributed log. A (commit) log is an append-only data structure to whose end the producers append the data (log records), and subscribers read the log from the beginning to replay the records. This data structure is used, for example, in the database write-ahead log. Distributed log means that the actual data structure is not hosted on a single node but is distributed across many nodes to achieve both high availability and high performance. Internals and Terminology Before we jump into how Kafka is used by applications, let's quickly go through the basic terminology and architecture so we understand the guarantees that Kafka provides to its users. A single Kafka node is called a Broker. The broker receives messages from producers and distributes these to consumers. Producers send the messages into distributed logs, which are called topics (in traditional messaging, this corresponds to a queue). To scale up the performance of a single topic over the capacity of a single node, each topic may be split into multiple partitions. To achieve high availability and durability of the data stored, each partition has a leader (performing all read and write operations) and multiple followers. Partitions are assigned to brokers automatically, and the failover of a broker is also automatic and transparent to developers using Kafka. On the backend, the assignment of leader/replica roles is orchestrated using leader election in Apache ZooKeeper or in the newer versions of Kafka using the KRaft protocol. In the diagram, we can see a Kafka cluster, which consists of five brokers. In this scenario, two topics (A and B) were created. Topic A has two partitions, while topic B has only a single partition. The cluster was set up with replication factor 3 — this means there are always three copies of the data stored, allowing two nodes to fail without losing the data. The replication factor of 3 is a sane default since it guarantees tolerance of a node failure even during the maintenance of one other broker. You may ask why topic A was divided into two partitions; what is the benefit? First, please notice that leader of Partition 1 is on a different node than the leader of Partition 2. This means that if clients produce/consume data to/from this topic, they may use the disk throughput and performance of 2 nodes instead of 1. On the other hand, there is a cost to this decision: message ordering is guaranteed only within a single partition. Producers and Consumers Now that we have some understanding of how Kafka works internally, let's take a look at how the situation looks from the perspective of producers/consumers. Producer Let's start with the producer. As mentioned above, replication or assignment of topics/partitions is a concern of Kafka itself and is not visible to producers or consumers. So the producer only needs to know which topics it wishes to send data to and if these topics have multiple partitions. In case the topic is partitioned (entity-1), the producer may create as part of its code a "partitioner," which is a simple class that decides to which partition the given record belongs. So in Kafka, the partitioning is driven by the producer. In case the producer does not specify any partitioner (but the topic is partitioned), a round-robin strategy is used. Round-robin is completely fine for entities where the exact ordering is not important — there is no causal relation between the records. For example, if you have a topic with sensor measurements, these measurements may be sent by the sensors on a scheduled basis — hence there is no particular order of the records. And round-robin provides an easy way to balance the records among the individual partitions. Our example with sensors also shows another important detail: there may be multiple producers sending the records into one topic. In the diagram above, we see that we have many sensors (producers) creating two types of records: humidity (in green) and CO2 concentration (in red). Each of the records also contains information about the sensor itself (such as its (serial) number, in this example integer is used for the sake of simplicity). Because each of the sensors ever produced has the capability of measuring humidity, while only some of the sensors support CO2 measurements, the designers of the systems have decided to split the humidity records into two partitions using the serial number of the sensor. Notice that there is strict ordering within each of the humidity partitions (and within the CO2 partition), but there is no ordering of records between the partitions — in other words: B will be always processed before D and E. A will always be processed before C, but there is no ordering guarantee between B and A (or C). Consumer Kafka consumer is an application that reads the records from the topic. In Kafka, there is one more concept through the consumer group — a set of consumers that cooperate. When there are multiple consumers from the same group subscribed to the same topic, Kafka always distributes the partitions among the consumers in the same group in a way that each partition is read exactly once (there may be multiple partitions read by a single consumer, but one partition will not be read by multiple consumers). In case some of the consumers fail, Kafka will automatically reassign partitions to other consumers (please note that consumers do not need to subscribe to all topics). But in case of a failover or switchover, how does Kafka know where to continue? We have already said that a topic contains all the messages (even the messages that were already read). Does this mean that the consumer must read again the whole topic? The answer is that the consumer is able to continue where the previous one stopped. Kafka uses a concept called an offset, which is essentially a pointer to a message in the partition, which stores the position of the last processed message by any given consumer group. Offset While it may seem trivial, the concept of offsets and distributed logs is extremely powerful. It is possible to dynamically add new consumers, and these consumers (starting from offset=0) are able to catch up with the full history of data. While with traditional queues, the consumers would need to somehow fetch all the data from consumers (because messages are deleted once read in classic messaging). This data sync is more complex because either the producer produces the messages into the one queue used for increments (and affects all other consumers), or the consumer needs to use some other mechanism (such as REST or another dedicated queue), which creates data synchronization issues (as two independent unsynchronized mechanisms are used). Another huge benefit is that the consumer may any time decide to reset the offset and read from the beginning of the time. Why would one do that? Firstly there is a class of analytical applications (such as machine learning) that requires processing the whole dataset, and offset reset gives such a mechanism. Secondly, it may happen that there is a bug in the consumer, which corrupts the data. In this case, the consumer product team may fix the issue and reset the offset – effectively reprocessing the whole dataset and replacing corrupt data with the correct one. This mechanism is heavily used in Kappa-architecture. Retention and Compaction We have above stated that the commit log is append-only, but this does not imply that the log is immutable. In fact, this is true only for certain types of deployments, where it is necessary to hold the full history of all changes (for auditing purposes or to have real kappa architecture). This strategy is powerful but also has a price. Firstly performance: the consumer needs to go through huge volumes of data in order to get on top of the log. Secondly, if the log contains any sensitive information, it is hard to get rid of it (which makes this type of log unfriendly to regulations that require the data to be erased on request). But in many cases, the logs have some fixed retention — either size or time-based. In this case, the log contains only a window of messages (and any overflow is automatically erased). Using a log as a buffer makes the log size reasonable and also ensures that the data does not stay in the log forever (making it easier to adhere to compliance requirements). However, this also makes the log unusable for certain use cases — one of these use cases is when you want to have all the records available to newly subscribed consumers. The last type of log is the so-called compacted log. In a compacted log, each record has not only a value but also a key. Whenever a new record is appended to the topic, and there is already a record with the same key, Kafka will eventually compact the log and erase the original record. Be aware that this means for a certain time, there will be multiple records with the same key, and the up-to-date value is always in the most recently inserted record — this does not require any additional handling in case you go with at-least-once semantics (it is guaranteed that the message will be delivered, but in case of any uncertainty (for example due to network issues), the message may be delivered multiple times). You can picture the compacted log as a streaming form of a database that allows anyone to subscribe to the newest data. This image of Kafka is a very correct one because there is a duality between a stream and a table. Both these concepts are merely different views of the same thing — in SQL DB, we also use tables, but under the hood, there is a commit log. Similarly, any Kafka topic (compacted included) can be viewed as a table. In fact, the Kafka Streams library builds on this duality. There is even ksqlDB (Kafka SQL) that allows you to issue SQL statements over records in Kafka. In the topology above, we see that the inbound measurement topics (temperature, humidity, co2…) are normal topics with retention set to seven days. The retention allows the developers to time travel a week back in case they find a bug in their implementation. From these inbound topics, the data are read by two services (each in a separate consumer group). The measurements history service stores the telemetry into a time-series database (long-term storage), which may be used as a source for graphs and widgets in the UI of the system. The trends service aggregates the data (creates 24h windows of the measurements in the given room), so these can be used by downstream controllers and sends the results through a compacted topic. The topic is compacted because there is no need to keep any historical records (only the latest trend is valid). On the other hand, the customer may add a new device (and associated controller) at any time, so we want to ensure that the latest readings for the given room are always present. Patterns and Principles In the previous paragraphs, we presented basic concepts. In this section, we'll expand on those and discuss a few other Kafka patterns. Eventually Consistent Architecture In data synchronization architecture based on messaging, we want to ensure that whenever new data is produced in one product, it will be available to all relevant products in near-real-time. This means that if the user creates/modifies some entity in product A and navigates to product B, he/she should (ideally) see the up-to-date version of this entity. However, since the individual products use multiple independent databases, it is not practical to have a distributed transaction mechanism and have atomical consistency between these databases. Instead, we go with the eventual consistency. In this model, the data producer is responsible for publishing any record it creates/updates/deletes to Kafka, from which an interested consumer may retrieve the record and store it locally. This propagation between systems takes some time. Less than a second (expected) between the publishing of the record and the moment when the record is available to subscribers Also, the consumer may optimize writes to his database (e.g., batch writes). During this time period, some of the systems (the replicas) have slightly stale data. It may also happen that some of the replicas will not be able to catch up for some time (downtime, network partition). But structurally, it is guaranteed that all the systems will eventually converge to the same results and will hold a consistent dataset — hence the term "eventual consistency." Optimizing Writes to the Local Database As alluded to in the previous paragraph, consumers may want to optimize writes to their local database. For example, it is highly undesirable to commit on a per-record basis in relational databases because transaction commit is a relatively expensive operation. It may be much wiser to commit in batches (commit every 5000 records; at a maximum of 500ms intervals — whatever comes first). Kafka is well able to support this (because committing to an offset is in hands of the consumer). Another example is AWS Redshift, which is a data warehouse/OLAP database in which commits are very expensive. Also, in Redshift, every commit invalidates its query caches. And as a result, the cluster takes the hit of the commit twice — once to perform the commit itself and for the second time when all previously cached queries must be re-evaluated. For these reasons, you may want to commit to Redshift (and similar technologies) on a scheduled basis every X minutes to limit the blast radius of this action. The last example may be NoSQL databases that do not support transactions. It may be just fine to stream the data on a per-record basis (obviously, depending on the capabilities of the DB engine). There is one takeaway: different replicas may use a slightly different persistence strategy, even if they consume the same data. Always assume that there is a possibility that the other side does not have the data available yet. Referential Integrity Between Topics It is important to understand that since the Kafka-based data synchronization is eventually consistent, there is no implicit referential integrity or causal integrity between the individual topics (or partitions). When it comes to referential integrity, the consumers should be written in a way that they expect that they may receive, for example, measurements for a room that they have not received yet. Some of the consumers may overcome this situation either by not showing the data at all till all the dimensions are present (for example, you can't turn on ventilation when you do not know the room). For other systems, the missing reference is not really an issue: the average temperature in the house will be the same, regardless of the presence of room details. For these reasons, it may be impractical to impose any strict restrictions centrally. Stateful Processing Kafka consumers may require stateful processing — such as aggregation, window function, and deduplication. Also, the state itself may not be of a trivial size, or there may be a requirement that in case of a failure, the replica is able to continue almost instantly. In these cases, storing the results in the RAM of the consumer is not the best choice. Luckily, the Kafka Streams library has out-of-the-box support for RocksDB — a high-performance embedded key-value store. RocksDB is able to store the results both in RAM and on disk. Caching Strategy and Parallelism Closely related to stateful processing is a caching strategy. Kafka is, by its design, not well suited for the competing consumer's style of work because each partition is assigned to exactly one processor. If one wants to implement competing consumers, he needs to create significantly more partitions than there are consumers within the system to emulate the behavior. However, this is not the way parallelism should be handled in Kafka-based systems (in case you really need a job queue of unrelated jobs, you will be much better off with RabbitMQ, SQS, and ActiveMQ…). Kafka is a stream processing system, and it is expected that the records in one partition somehow relate to each other and should be processed together. The individual partitions act as data shards, and since Kafka guarantees that each of these partitions will be assigned to one and exactly one consumer, the consumer can be sure that there is no other competing processor — so it can cache the results as it sees fit in its local cache and does not need to implement any distributed caching (such as Redis). In case the processor fails/crashes, Kafka will just reassign the partition to another consumer, which will populate its local caches and continue. This design of stream processing is significantly easier than competing consumers. There is one consideration, though. That is the partitioning strategy because that is defined by default by the producer, while different consumers may have multiple mutually incompatible needs. For this reason, it is common in Kafka's world to re-partition the topic. In our scenario, it would work the following way: In the diagram, we can see that Trends produce trends in its topic. This topic is round-robin partitioned and compacted. ProductX, which focuses on large industrial customers, needs to partition the data in some other way, for example, by customerId. In this case, ProductX may write a simple application that re-partitions the data (re-partitioning is often managed under the hood by the Kafka Streams library). In other words, it reads the data from the source topic and writes it into another topic, managed by ProductX, which partitions the data differently (per business unit in this case). With this partitioning, ProductX is able to shard the non-overlapping business units to dedicated processing nodes, massively increasing the processing parallelism. The internal ProductX topic may have just short retention (such as 24h) because it does not hold the authoritative copy of data, and the data can be easily replayed from the original topic, if necessary. In Kafka Streams, you may want to join several entities in order to combine the data (this is a common use case). Beware that in case you have multiple consumers, you need to have the inbound topics partitioned in the exact same way (same partitioner (join key based), same number of partitions). Only this way, you have guaranteed that the entities with matching join keys will be received by the same consumer (processor). Summary In this article, we have discussed how the overall architecture of Kafka and how this low-level architecture allows the broker to easily scale horizontally (thanks to partitioning) and ensure high availability and durability (thanks to leader/replica design and master election). We also went through the basics of designing Kafka-based topologies, explained eventual consistency and how that affects the guarantees given to our applications, and learned how to use Kafka's different types of logs and their retention. While Kafka may seem overwhelming at first, it is important to realize that internally it is based on the plain old good distributed log. This relatively simple internal structure is what gives Kafka its straightforward semantics, high throughput, and low data propagation latencies. Qualities are crucial for building any data pipeline.
As the digital age progresses, the need for efficient and secure data governance practices becomes more crucial than ever. This article delves into the concept of User Data Governance and its implementation using serverless streaming. We will explore the benefits of using serverless streaming for processing user data and how it can lead to improved data governance and increased privacy protection. Additionally, we will provide code snippets to illustrate the practical implementation of serverless streaming for user data governance. Introduction User Data Governance refers to the management of user data, including its collection, storage, processing, and protection. With the ever-increasing amount of data generated daily, organizations must develop robust and efficient data governance practices to ensure data privacy, security, and compliance with relevant regulations. In recent years, serverless computing has emerged as a promising solution to the challenges of data governance. This paradigm shift allows organizations to build and run applications without managing the underlying infrastructure, enabling them to focus on their core business logic. Serverless streaming, in particular, has shown great potential in processing large volumes of user data in real time, with minimal latency and scalable performance. Serverless Streaming for User Data Processing Serverless streaming is a cloud-based architecture that enables real-time data processing without the need to provision or manage servers. It provides on-demand scalability and cost-effectiveness, making it an ideal choice for processing large volumes of user data. This section examines the key components of serverless streaming for user data governance. 1.1. Event Sources An event source is any system or application that generates data in real time. These sources can include user activity logs, IoT devices, social media feeds, and more. By leveraging serverless streaming, organizations can ingest data from these diverse sources without worrying about infrastructure management. For example, consider an AWS Kinesis data stream that ingests user activity logs: Python import boto3 kinesis_client = boto3.client('kinesis', region_name='us-west-2') response = kinesis_client.create_stream( StreamName='UserActivityStream', ShardCount=1 ) 1.2. Stream Processing Stream processing involves the real-time analysis of data as it is generated by event sources. Serverless platforms, such as AWS Lambda, Google Cloud Functions, and Azure Functions, enable developers to create functions that process data streams without managing the underlying infrastructure. These functions can be triggered by specific events, allowing for the real-time processing of user data. For instance, an AWS Lambda function that processes user activity logs from the Kinesis data stream: Python import json import boto3 def lambda_handler(event, context): for record in event['Records']: payload = json.loads(record['kinesis']['data']) process_user_activity(payload) def process_user_activity(activity): # Process user activity data here pass 1.3. Data Storage The processed data must be stored securely to ensure proper data governance. Serverless storage solutions, such as Amazon S3, Google Cloud Storage, and Azure Blob Storage, offer scalable and secure storage options that automatically scale with the size of the data. For example, storing processed user activity data in an Amazon S3 bucket: Python import boto3 s3_client = boto3.client('s3') def store_processed_data(data, key): s3_client.put_object( Bucket='my-processed-data-bucket', Key=key, Body=json.dumps(data) ) Benefits of Serverless Streaming for User Data Governance The serverless streaming architecture offers several benefits for user data governance, including: 2.1. Scalability One of the main advantages of serverless streaming is its ability to scale automatically based on the volume of incoming data. This ensures that organizations can handle fluctuating workloads, such as seasonal trends or unexpected surges in user activity, without the need to over-provision resources. 2.2. Cost-Effectiveness Serverless streaming follows a pay-as-you-go pricing model, meaning organizations only pay for the resources they actually consume. This eliminates the need for upfront investments in infrastructure and reduces overall operational costs. 2.3. Flexibility Serverless streaming allows organizations to process data from multiple event sources and adapt their data processing pipelines to changing business requirements quickly. This flexibility enables them to stay agile and responsive to evolving user data governance needs. 2.4. Security With serverless streaming, organizations can implement various security measures, such as encryption, data masking, and access control, to protect user data at rest and in transit. Additionally, serverless platforms typically offer built-in security features, such as automatic patching and monitoring, to ensure the highest level of data protection. Compliance and Privacy in Serverless Streaming As organizations adopt serverless streaming for user data governance, they must address several privacy and compliance concerns, including: 3.1. Data Sovereignty Data sovereignty refers to the concept that data should be stored and processed within the borders of the country where it was generated. Serverless streaming platforms must support multi-region deployment to comply with data sovereignty requirements and ensure proper user data governance. 3.2. GDPR and Other Data Protection Regulations Organizations must adhere to the General Data Protection Regulation (GDPR) and other data protection laws when processing user data. Serverless streaming platforms should provide features to facilitate compliance, such as data anonymization, deletion, and consent management. 3.3. Privacy by Design Privacy by Design is a proactive approach to data privacy that embeds privacy considerations into the design and architecture of systems and processes. Serverless streaming platforms should support Privacy by Design principles, enabling organizations to implement privacy-enhancing techniques and best practices. Best Practices for Implementing User Data Governance With Serverless Streaming To ensure robust user data governance using serverless streaming, organizations should follow these best practices: 4.1. Assess Data Sensitivity Before processing user data, organizations should evaluate the sensitivity of the data and apply appropriate security measures based on the data classification. 4.2. Encrypt Data at Rest and in Transit Data should be encrypted both at rest (when stored) and in transit (during processing and transmission) to protect against unauthorized access. 4.3. Implement Access Control Organizations should implement strict access control policies to limit who can access and process user data. This includes role-based access control (RBAC) and the principle of least privilege (POLP). 4.4. Monitor and Audit Continuous monitoring and auditing of serverless streaming platforms are essential to ensure data governance, detect security incidents, and maintain compliance with relevant regulations. 4.5. Leverage Data Retention Policies Organizations should implement data retention policies to ensure that user data is stored only for the duration necessary and is deleted when no longer needed. Conclusion User Data Governance is an essential aspect of modern digital businesses, and serverless streaming offers a promising approach to address its challenges. By leveraging the scalability, cost-effectiveness, and flexibility of serverless streaming, organizations can process and manage large volumes of user data more efficiently and securely. By adhering to best practices and regulatory requirements, organizations can ensure robust user data governance and privacy protection using serverless streaming.
Apache Kafka and Apache Flink are increasingly joining forces to build innovative real-time stream processing applications. This blog post explores the benefits of combining both open-source frameworks, shows unique differentiators of Flink versus Kafka, and discusses when to use a Kafka-native streaming engine like Kafka Streams instead of Flink. The Tremendous Adoption of Apache Kafka and Apache Flink Apache Kafka became the de facto standard for data streaming. The core of Kafka is messaging at any scale in combination with a distributed storage (= commit log) for reliable durability, decoupling of applications, and replayability of historical data. Kafka also includes a stream processing engine with Kafka Streams. And KSQL is another successful Kafka-native streaming SQL engine built on top of Kafka Streams. Both are fantastic tools. In parallel, Apache Flink became a very successful stream-processing engine. The first prominent Kafka + Flink case study I remember is the fraud detection use case of ING Bank. The first publications came up in 2017, i.e., over five years ago: "StreamING Machine Learning Models: How ING Adds Fraud Detection Models at Runtime with Apache Kafka and Apache Flink." This is just one of many Kafka fraud detection case studies. One of the last case studies I blogged about goes in the same direction: "Why DoorDash migrated from Cloud-native Amazon SQS and Kinesis to Apache Kafka and Flink." The adoption of Kafka is already outstanding. And Flink gets into enterprises more and more, very often in combination with Kafka. This article is no introduction to Apache Kafka or Apache Flink. Instead, I explore why these two technologies are a perfect match for many use cases and when other Kafka-native tools are the appropriate choice instead of Flink. Top Reasons Apache Flink Is a Perfect Complementary Technology for Kafka Stream processing is a paradigm that continuously correlates events of one or more data sources. Data is processed in motion, in contrast to traditional processing at rest with a database and request-response API (e.g., a web service or a SQL query). Stream processing is either stateless (e.g., filter or transform a single message) or stateful (e.g., an aggregation or sliding window). Especially state management is very challenging in a distributed stream processing application. A vital advantage of the Apache Flink engine is its efficiency in stateful applications. Flink has expressive APIs, advanced operators, and low-level control. But Flink is also scalable in stateful applications, even for relatively complex streaming JOIN queries. Flink's scalable and flexible engine is fundamental to providing a tremendous stream processing framework for big data workloads. But there is more. The following aspects are my favorite features and design principles of Apache Flink: Unified streaming and batch APIs Connectivity to one or multiple Kafka clusters Transactions across Kafka and Flink Complex Event Processing Standard SQL support Machine Learning with Kafka, Flink, and Python But keep in mind that every design approach has pros and cons. While there are a lot of advantages, sometimes it is also a drawback. Unified Streaming and Batch APIs Apache Flink's DataStream API unifies batch and streaming APIs. It supports different runtime execution modes for stream processing and batch processing, from which you can choose the right one for your use case and the characteristics of your job. In the case of SQL/Table API, the switch happens automatically based on the characteristics of the sources: all bounded events go into batch execution mode; at least one unbounded event means STREAMING execution mode. The unification of streaming and batch brings a lot of advantages: Reuse of logic/code for real-time and historical processing Consistent semantics across stream and batch processing A single system to operate Applications mixing historical and real-time data processing This sounds similar to Apache Spark. But there is a significant difference: Contrary to Spark, the foundation of Flink is data streaming, not batch processing. Hence, streaming is the default execution runtime mode in Apache Flink. Continuous stateless or stateful processing enables real-time streaming analytics using an unbounded stream of events. Batch execution is more efficient for bounded jobs (i.e., a bounded subset of a stream) for which you have a known fixed input and which do not run continuously. This executes jobs in a way that is more reminiscent of batch processing frameworks, such as MapReduce in the Hadoop and Spark ecosystems. Apache Flink makes moving from a Lambda to Kappa enterprise architecture easier. The foundation of the architecture is real-time, with Kafka as its heart. But batch processing is still possible out-of-the-box with Kafka and Flink using consistent semantics. Though, this combination will likely not (try to) replace traditional ETL batch tools, e.g., for a one-time lift-and-shift migration of large workloads. Connectivity to One or Multiple Kafka Clusters Apache Flink is a separate infrastructure from the Kafka cluster. This has various pros and cons. First, I often emphasize the vast benefit of Kafka-native applications: you only need to operate, scale, and support one infrastructure for end-to-end data processing. A second infrastructure adds additional complexity, cost, and risk. However, imagine a cloud vendor taking over that burden, so you consume the end-to-end pipeline as a single cloud service. With that in mind, let's look at a few benefits of separate clusters for the data hub (Kafka) and the stream processing engine (Flink): Focus on data processing in a separate infrastructure with dedicated APIs and features independent of the data streaming platform. More efficient streaming pipelines before hitting the Kafka Topics again; the data exchange happens directly between the Flink workers. Data processing across different Kafka topics of independent Kafka clusters of different business units. If it makes sense from a technical and organizational perspective, you can connect directly to non-Kafka sources and sinks. But be careful, this can quickly become an anti-pattern in the enterprise architecture and create complex and unmanageable "spaghetti integrations". Implement new fail-over strategies for applications. I emphasize Flink is usually NOT the recommended choice for implementing your aggregation, migration, or hybrid integration scenario. Multiple Kafka clusters for hybrid and global architectures are the norm, not an exception. Flink does not change these architectures. Kafka-native replication tools like MirrorMaker 2 or Confluent Cluster Linking are still the right choice for disaster recovery. It is still easier to do such a scenario with just one technology. Tools like Cluster Linking solve challenges like offset management out-of-the-box. Transactions Across Kafka and Flink Workloads for analytics and transactions have very unlike characteristics and requirements. The use cases differ significantly. SLAs are very different, too. Many people think that data streaming is not built for transactions and should only be used for big data analytics. However, Apache Kafka and Apache Flink are deployed in many resilient, mission-critical architectures. The concept of exactly-once semantics (EOS) allows stream processing applications to process data through Kafka without loss or duplication. This ensures that computed results are always accurate. Transactions are possible across Kafka and Flink. The feature is mature and battle-tested in production. Operating separate clusters is still challenging for transactional workloads. However, a cloud service can take over this risk and burden. Many companies already use EOS in production with Kafka Streams. But EOS can even be used if you combine Kafka and Flink. That is a massive benefit if you choose Flink for transactional workloads. So, to be clear: EOS is not a differentiator in Flink (vs. Kafka Streams), but it is an excellent option to use EOS across Kafka and Flink, too. Complex Event Processing With FlinkCEP The goal of complex event processing (CEP) is to identify meaningful events in real-time situations and respond to them as quickly as possible. CEP does usually not send continuous events to other systems but detects when something significant occurs. A common use case for CEP is handling late-arriving events or the non-occurrence of events. The big difference between CEP and event stream processing (ESP) is that CEP generates new events to trigger action based on situations it detects across multiple event streams with events of different types (situations that build up over time and space). ESP detects patterns over event streams with homogenous events (i.e. patterns over time). Pattern matching is a technique to implement either pattern but the features look different. FlinkCEP is an add-on for Flink to do complex event processing. The powerful pattern API of FlinkCEP allows you to define complex pattern sequences you want to extract from your input stream. After specifying the pattern sequence, you apply them to the input stream to detect potential matches. This is also possible with SQL via the MATCH_RECOGNIZE clause. Standard SQL Support Structured Query Language (SQL) is a domain-specific language used in programming and designed for managing data held in a relational database management system (RDBMS). However, it is so predominant that other technologies like non-relational databases (NoSQL) and streaming platforms adopt it, too. SQL became a standard of the American National Standards Institute (ANSI) in 1986 and the International Organization for Standardization (ISO) in 1987. Hence, if a tool supports ANSI SQL, it ensures that any 3rd party tool can easily integrate using standard SQL queries (at least in theory). Apache Flink supports ANSI SQL, including the Data Definition Language (DDL), Data Manipulation Language (DML), and Query Language. Flink’s SQL support is based on Apache Calcite, which implements the SQL standard. This is great because many personas, including developers, architects, and business analysts, already use SQL in their daily job. The SQL integration is based on the so-called Flink SQL Gateway, which is part of the Flink framework allowing other applications to interact with a Flink cluster through a REST API easily. User applications (e.g., Java/Python/Shell program, Postman) can use the REST API to submit queries, cancel jobs, retrieve results, etc. This enables a possible integration of Flink SQL with traditional business intelligence tools like Tableau, Microsoft Power BI, or Qlik. However, to be clear, ANSI SQL was not built for stream processing. Incorporating Streaming SQL functionality into the official SQL standard is still in the works. The Streaming SQL working group includes database vendors like Microsoft, Oracle, and IBM, cloud vendors like Google and Alibaba, and data streaming vendors like Confluent. More details: "The History and Future of SQL: Databases Meet Stream Processing". Having said this, Flink supports continuous sliding windows and various streaming joins via ANSI SQL. There are things that require additional non-standard SQL keywords but continuous sliding windows or streaming joins, in general, are possible. Machine Learning with Kafka, Flink, and Python In conjunction with data streaming, machine learning solves the impedance mismatch of reliably bringing analytic models into production for real-time scoring at any scale. I explored ML deployments within Kafka applications in various blog posts, e.g., embedded models in Kafka Streams applications or using a machine learning model server with streaming capabilities like Seldon. PyFlink is a Python API for Apache Flink that allows you to build scalable batch and streaming workloads, such as real-time data processing pipelines, large-scale exploratory data analysis, Machine Learning (ML) pipelines, and ETL processes. If you’re already familiar with Python and libraries such as Pandas, then PyFlink makes it simpler to leverage the full capabilities of the Flink ecosystem. PyFlink is the missing piece for an ML-powered data streaming infrastructure, as almost every data engineer uses Python. The combination of Tiered Storage in Kafka and Data Streaming with Flink in Python is excellent for model training without the need for a separate data lake. When To Use Kafka Streams Instead of Apache Flink Don't underestimate the power and use cases of Kafka-native stream processing with Kafka Streams. The adoption rate is massive, as Kafka Streams is easy to use. And it is part of Apache Kafka. To be clear: Kafka Streams is already included if you download Kafka from the Apache website. Kafka Streams Is a Library, Apache Flink Is a Cluster The most significant difference between Kafka Streams and Apache Flink is that Kafka Streams is a Java library, while Flink is a separate cluster infrastructure. Developers can deploy the Flink infrastructure in session mode for bigger workloads (e.g., many small, homogenous workloads like SQL queries) or application mode for fewer bigger, heterogeneous data processing tasks (e.g., isolated applications running in a Kubernetes cluster). No matter your deployment option, you still need to operate a complex cluster infrastructure for Flink (including separate metadata management on a ZooKeeper cluster or an etcd cluster in a Kubernetes environment). TL;DR: Apache Flink is a fantastic stream processing framework and a top #5 Apache open-source project. But it is also complex to deploy and difficult to manage. Benefits of Using the Lightweight Library of Kafka Streams Kafka Streams is a single Java library. This adds a few benefits: Kafka-native integration supports critical SLAs and low latency for end-to-end data pipelines and applications with a single cluster infrastructure instead of operating separate messaging and processing engines with Kafka and Flink. Kafka Streams apps still run in their VMs or Kubernetes containers, but high availability and persistence are guaranteed via Kafka Topics. Very lightweight with no other dependencies (Flink needs S3 or similar storage as the state backend) Easy integration into testing/CI/DevOps pipelines Embedded stream processing into any existing JVM application, like a lightweight Spring Boot app or a legacy monolith built with old Java EE technologies like EJB. Interactive Queries allow leveraging the state of your application from outside your application. The Kafka Streams API enables your applications to be queryable. Flink's similar feature "queryable state" is approaching the end of its life due to a lack of maintainers. Kafka Streams is well-known for building independent, decoupled, lightweight microservices. This differs from submitting a processing job into the Flink (or Spark) cluster; each data product team controls its destiny (e.g., don’t depend on the central Flink team for upgrades or get forced to upgrade). Flink's application mode enables a similar deployment style for microservices. But: Kafka Streams and Apache Flink Live In Different Parts of a Company Today, Kafka Streams and Flink are usually used for different applications. While Flink provides an application mode to build microservices, most people use Kafka Streams for this today. Interactive queries are available in Kafka Streams and Flink, but it got deprecated in Flink as there is not much demand from the community. These are two examples that show that there is no clear winner. Sometimes Flink is the better choice, and sometimes Kafka Streams makes more sense. "In summary, while there certainly is an overlap between the Streams API in Kafka and Flink, they live in different parts of a company, largely due to differences in their architecture and thus we see them as complementary systems." That's the quote of a "Kafka Streams vs. Flink comparison" article written in 2016 (!) by Stephan Ewen, former CTO of Data Artisans, and Neha Narkhede, former CTO of Confluent. While some details changed over time, this old blog post is still pretty accurate today and a good read for a more technical audience. The domain-specific language (DSL) of Kafka Streams differs from Flink but is also very similar. How are both characteristics possible? It depends on who you ask. This (legitimate) subject for debate often segregates Kafka Streams and Flink communities. Kafka Streams has Stream and Table APIs. Flink has DataStream, Table, and SQL API. I guess 95% of use cases can be built with both technologies. APIs, infrastructure, experience, history, and many other factors are relevant for choosing the proper stream processing framework. Some architectural aspects are very different in Kafka Streams and Flink. These need to be understood and can be a pro or con for your use case. For instance, Flink's checkpointing has the advantage of getting a consistent snapshot, but the disadvantage is that every local error always stops the whole job and everything has to be rolled back to the last checkpoint. Kafka Streams does not have this concept. Local errors can be recovered locally (move the corresponding tasks somewhere else; the task/threads without errors just continue normally). Another example is Kafka Streams' hot standby for high availability versus Flink's fault-tolerant checkpointing system. Kafka + Flink = A Powerful Combination for Stream Processing Apache Kafka is the de facto standard for data streaming. It includes Kafka Streams, a widely used Java library for stream processing. Apache Flink is an independent and successful open-source project offering a stream processing engine for real-time and batch workloads. The combination of Kafka (including Kafka Streams) and Flink is already widespread in enterprises across all industries. Both Kafka Streams and Flink have benefits and tradeoffs for stream processing. The freedom of choice of these two leading open-source technologies and the tight integration of Kafka with Flink enables any kind of stream processing use case. This includes hybrid, global, and multi-cloud deployments, mission-critical transactional workloads, and powerful analytics with embedded machine learning. As always, understand the different options and choose the right tool for your use case and requirements. What is your favorite for streaming processing, Kafka Streams, Apache Flink, or another open-source or proprietary engine? In which use cases do you leverage stream processing? Let’s connect on LinkedIn and discuss it!
This is part 2 of a multi-part sample application that I've been building using Kafka and Apache Pinot. In the first part, we built an NFC Badge Reader to record when badges are swiped. In this part, we will begin ingesting some environmental data from a sensor into the mix. I'm not going to spoil the fun by telling you where this is ultimately headed, but feel free to reach out to me on Twitter and see if you can guess! As always, all of the code is available on GitHub. Hardware Hardware is part of this demo because I like building hardware, and it’s always more fun to have something that you can actually interact with. The hardware we’ll be building is a CO2 sensor that reads the CO2 concentration in the air, the temperature, and the relative humidity, and sends it to a Kafka topic. The sensor is based on the SCD-30 sensor, which is a non-dispersive infrared (NDIR) sensor that measures the CO2 concentration in the air. It’s a bit expensive, but it is also an industrial-quality sensor that can be relied on to give accurate readings. You will see a lot of projects based on cheaper “eCO2” sensors, but they are not as accurate as the SCD-30. I have done extensive testing of a variety of CO2 sensors and the eCO2 sensors can be off by as much as 200%, which is not something you want to rely on if accuracy matters. The sensor is connected to a SparkFun ESP32 Thing Plus - Qwiic, which reads the sensor data and sends it to a Kafka topic. I used these 2 components specifically because they both implement the Qwiic connector, which makes it easy to connect them together. The Qwiic connector is a standard 4-pin connector that uses I2C and is used by a number of different sensors and boards. It eliminates the need for soldering since you can just plug the sensor into the board and start coding. The ESP-32 Thing Plus is what’s referred to as a "Feather" because of the form factor. Really, you can use any ESP-32 board for this but the Feather is a great board for this project because it has built-in WiFi and Bluetooth, which makes it easy to connect to the Internet. It also has a built-in battery charger, which makes it easy to power the sensor with a battery if you want to place it somewhere out of reach. I programmed it using the Arduino IDE, which is a great tool for prototyping. C++ #include <ArduinoJson.h> // Serialize and deserialize JSON #include "EspMQTTClient.h" // handle MQTT #include <Wire.h> // 2Wire protocol #include "SparkFun_SCD30_Arduino_Library.h" // The CO2 Sensor EspMQTTClient client( "SSID", // Your SSID "PASSWD", // Your WiFi Password "mqtt-broker", // MQTT Broker server ip or hostname "PinotClient", // Client name that uniquely identify your device 8883 // The MQTT port, default to 1883. this line can be omitted ); SCD30 airSensor; // our sensor void setup() { Serial.begin(115200); // Optional functionalities of EspMQTTClient client.enableDebuggingMessages(); // Enable debugging messages sent to serial output Wire.begin(); if (airSensor.begin() == false) { Serial.println("Air sensor not detected. Please check wiring. Freezing..."); while (1) // infinite loop of nothingness ; } } // This function is called once everything is connected (Wifi and MQTT) // WARNING : YOU MUST IMPLEMENT IT IF YOU USE EspMQTTClient void onConnectionEstablished() { while (true) { // do this forever while (airSensor.dataAvailable()) { int co2 = airSensor.getCO2(); float temp = airSensor.getTemperature(); float hum = airSensor.getHumidity(); StaticJsonDocument<96> doc; doc["sensor"] = "SCD-30"; doc["co2"] = co2; doc["temp"] = temp; doc["humid"] = hum; char buff[128]; serializeJson(doc, buff); serializeJson(doc, Serial); // this is for debugging Serial.println(); client.publish("co2", buff); // publish the data to the co2 topic } } } void loop() { client.loop(); } That’s the entire Arduino sketch. It connects to WiFi, connects to an MQTT broker, and then reads the sensor data and publishes it to the co2 topic. You can find the code on GitHub. The SCD-30 sensor can really only provide about one reading/second, so there’s no need to do anything fancy to make it faster. Getting the Readings Into Kafka As it turns out there isn’t a good way to get data straight from an Arduino device into Kafka which is why we sent it to the MQTT broker above. Now that it’s in the MQTT broker we have to get it out and feed it into our Kafka topic. One of the other drawbacks of using an Arduino device is that they are not very good at keeping time. It is possible to use Network Time Protocol (NTP) to keep the time, but it’s not very reliable. To get around these two problems, I wrote a small program in Go that reads the data from the MQTT broker, gives it an accurate timestamp, and then publishes it to a Kafka topic. To make things easier, we will reuse some code from our Badge Reader project. Go package main import ( "bufio" "crypto/tls" "encoding/json" "fmt" "os" "strings" "time" "github.com/confluentinc/confluent-kafka-go/kafka" MQTT "github.com/eclipse/paho.mqtt.golang" ) /* Struct to hold the data from the MQTT message */ type env struct { Sensor string `json:"sensor"` CO2 int32 `json:"co2"` Temp float32 `json:"temp"` Humid float32 `json:"humid"` Time int32 `json:"time"` } That struct should look familiar as it’s almost identical to the one we used in the Arduino code. The only difference is the Time field, which we will use to store the timestamp. We will also re-use all the code from the Badge Reader project to connect to Kafka and publish the data to the topic, so I won’t reproduce it here. Go /* Read MQTT messages from the specified broker and topic and send them to the kafka broker */ func ReadMQTTMessages(brokerUri string, username string, password string, topic string, qos byte) error { // Create an MQTT client options object opts := MQTT.NewClientOptions() // Set the broker URI, username, and password opts.AddBroker(brokerUri) opts.SetUsername(username) opts.SetPassword(password) opts.SetTLSConfig(&tls.Config{InsecureSkipVerify: true}) // Create an MQTT client client := MQTT.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { return token.Error() } // Subscribe to the specified topic with the specified QoS level if token := client.Subscribe(topic, qos, func(client MQTT.Client, message MQTT.Message) { envData := env{} json.Unmarshal(message.Payload(), &envData) envData.Time = int32(time.Now().UnixMilli()) mess, _ := json.Marshal(envData) fmt.Printf("Received message: %s\n", mess) sendToKafka("co_2", string(mess)) }); token.Wait() && token.Error() != nil { return token.Error() } // Wait for messages to arrive for { select {} } } func main() { err := ReadMQTTMessages("tcp://broker-address:8883", "", "", "co2", 0) if err != nil { fmt.Printf("Error reading MQTT messages: %v\n", err) } } The main() function calls the ReadMQTTMessages() function which connects to the MQTT broker and subscribes to the co2 topic. When a message is received it is parsed into the env struct, the timestamp is added, and then it is published to the Kafka topic co_2. Consuming the Data Now that we have the data in Kafka, we can begin consuming it into StarTree Cloud. Since this project is building on the previous Badge Reader project, I’m going to reuse that StarTree Cloud instance and add a new table to it. Since I’ve already been sending data to the Kafka topic, once I add the source (with the right credentials) StarTree Cloud should show me some data that it has read from the topic. After we click through to the table we can see the data that has been read from the Kafka topic. From there, we can go to the Pinot Query Editor and start writing queries. Conclusion In this post, we looked at how to use an Arduino device to read data from a sensor and send it to an MQTT broker. We then used a Go program to read the data from the MQTT broker, add a timestamp, and send it to a Kafka topic. Finally, we used StarTree Cloud to read the data from the Kafka topic and make it available for querying in Apache Pinot.
Miguel Garcia
VP of Engineering,
Nextail Labs
Gautam Goswami
Founder,
DataView
Alexander Eleseev
Full Stack Developer,
First Line Software
Ben Herzberg
Chief Scientist,
Satori