DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Related

  • Kafka Link: Ingesting Data From MongoDB to Capella Columnar
  • Implementing MongoDB to Elastic Search 7.X Data Pipeline
  • Data Pipeline Using MongoDB and Kafka Connect on Kubernetes
  • Building AMQP-Based Messaging Framework on MongoDB

Trending

  • Developers Beware: Slopsquatting and Vibe Coding Can Increase Risk of AI-Powered Attacks
  • Infrastructure as Code (IaC) Beyond the Basics
  • The Full-Stack Developer's Blind Spot: Why Data Cleansing Shouldn't Be an Afterthought
  • Integrating Model Context Protocol (MCP) With Microsoft Copilot Studio AI Agents
  1. DZone
  2. Data Engineering
  3. Big Data
  4. MongoDB and Data Streaming: Implementing a MongoDB Kafka Consumer

MongoDB and Data Streaming: Implementing a MongoDB Kafka Consumer

This article introduces Apache Kafka and then illustrates how to use MongoDB as a source and a destination for streamed data.

By 
Andrew Morgan user avatar
Andrew Morgan
·
Mar. 12, 18 · Tutorial
Likes (17)
Comment
Save
Tweet
Share
36.0K Views

Join the DZone community and get the full member experience.

Join For Free

Data Streaming

In today's data landscape, no single system can provide all of the required perspectives to deliver real insight. Deriving the full meaning from data requires mixing huge volumes of information from many sources.

At the same time, we're impatient to get answers instantly; if the time to insight exceeds 10s of milliseconds then the value is lost - applications such as high frequency trading, fraud detection, and recommendation engines can't afford to wait. This often means analyzing the inflow of data before it even makes it to the database of record. Add in zero tolerance for data loss and the challenge gets even more daunting.

Kafka and data streams are focused on ingesting the massive flow of data from multiple fire-hoses and then routing it to the systems that need it - filtering, aggregating, and analyzing en-route.

This blog introduces Apache Kafka and then illustrates how to use MongoDB as a source (producer) and destination (consumer) for the streamed data. A more complete study of this topic can be found in the Data Streaming with Kafka & MongoDB white paper.

Apache Kafka

Kafka provides a flexible, scalable, and reliable method to communicate streams of event data from one or more producers to one or more consumers. Examples of events include:

  • A periodic sensor reading such as the current temperature
  • A user adding an item to the shopping cart in an online store
  • A Tweet being sent with a specific hashtag

Streams of Kafka events are organized into topics. A producer chooses a topic to send a given event to, and consumers select which topics they pull events from. For example, a financial application could pull NYSE stock trades from one topic, and company financial announcements from another in order to look for trading opportunities.

In Kafka, topics are further divided into partitions to support scale out. Each Kafka node (broker) is responsible for receiving, storing, and passing on all of the events from one or more partitions for a given topic. In this way, the processing and storage for a topic can be linearly scaled across many brokers. Similarly, an application may scale out by using many consumers for a given topic, with each pulling events from a discrete set of partitions.


Figure 1: Kafka Producers, Consumers, Topics, and Partitions

MongoDB as a Kafka Consumer: a Java Example

In order to use MongoDB as a Kafka consumer, the received events must be converted into BSON documents before they are stored in the database. In this example, the events are strings representing JSON documents. The strings are converted to Java objects so that they are easy for Java developers to work with; those objects are then transformed into BSON documents.

Complete source code, Maven configuration, and test data can be found further down, but here are some of the highlights; starting with the main loop for receiving and processing event messages from the Kafka topic:

MongoClient client = new MongoClient();
MongoDatabase db = client.getDatabase("clusterdb");
MongoCollection<Document> fishCollection = 
    db.getCollection("fish");
Gson gson = new Gson();
Type type = new TypeToken<Fish>() {}.getType();

// Receive and process all available messages 
// from the Kafka topic
for (MessageAndOffset messageAndOffset : 
    fetchResponse.messageSet(a_topic, 
        a_partition)) 
    {
    long currentOffset = 
        messageAndOffset.offset();
    ...
    readOffset = messageAndOffset.nextOffset();
    ByteBuffer payload = 
        messageAndOffset.message().payload();
    byte[] bytes = new byte[payload.limit()];
    payload.get(bytes);
    Fish incomingFish = gson.fromJson(new String(
        bytes, "UTF-8"), type);
    fishCollection.insertOne(
        incomingFish.getFishAsDocument());
    ...
    }
}

The Fish class includes helper methods to hide how the objects are converted into BSON documents:

public String getBreedAsString() {
    String breedString;
    switch (breed) {
        case Cod:  breedString = "Cod";
            break;
        case Goldfish:  breedString = 
            "Goldfish";
            break;
        case Bass:  breedString = "Bass";
            break;
        case Billy:  breedString = "Billy";
            break;
        case Kipper:  breedString = 
            "Kipper";
            break;
        case Turbot:  breedString = 
            "Turbot";
            break;
        default: breedString = 
            "Unknown breed";
            break;
    }
    return breedString;
};
public void setBreed(Breed breed) {
    this.breed = breed;
}

public Document getFishAsDocument() {
    Document fishDocument = new Document(
        "_id", getInternationalFishId())
            .append("name", getName())
            .append("breed", 
                getBreedAsString());
    return fishDocument;
};

In a real application, more would be done with the received messages - they could be combined with reference data read from MongoDB, acted on and then passed along the pipeline by publishing to additional topics. In this example, the final step is to confirm from the mongo shell that the data has been added to the database:

db.fish.find()
{ "_id" : 93734195, "name" : "Gerald", 
    "breed" : "Turbot" }
{ "_id" : 71858458, "name" : "Douglas", 
    "breed" : "Turbot" }
{ "_id" : 25992945, "name" : "Louise", 
    "breed" : "Turbot" }
{ "_id" : 95476834, "name" : "Helen", 
    "breed" : "Kipper" }
{ "_id" : 27950146, "name" : "Ronald", 
    "breed" : "Goldfish" }
{ "_id" : 19155329, "name" : "Jerry", 
    "breed" : "Kipper" }
...

Full Java Code for MongoDB Kafka Consumer

Business Object - Fish.java

package com.clusterdb.kafka;

import org.bson.Document;

public class Fish {
    private int internationalFishId;
    private String name;
    private Breed breed;

    public enum Breed {
        Cod, Goldfish, Bass, Billy, Kipper, Turbot
    };

    public Fish(int internationalFishId, String name, Breed breed) {
        this.internationalFishId = internationalFishId;
        this.name = name;
        this.breed = breed;
    }

    public int getInternationalFishId() {
        return internationalFishId;
    }

    public void setInternationalFishId(int internationalFishId) {
        this.internationalFishId = internationalFishId;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Breed getBreed() {
        return breed;
    }

    public String getBreedAsString() {
        String breedString;
        switch (breed) {
            case Cod:  breedString = "Cod";
                break;
            case Goldfish:  breedString = "Goldfish";
                break;
            case Bass:  breedString = "Bass";
                break;
            case Billy:  breedString = "Billy";
                break;
            case Kipper:  breedString = "Kipper";
                break;            case Turbot:  breedString = "Turbot";
                break;
            default: breedString = "Unknown breed";
                break;
        }
        return breedString;
    };
    public void setBreed(Breed breed) {
        this.breed = breed;
    }

    public Document getFishAsDocument() {
        Document fishDocument = new Document("_id", getInternationalFishId())
                .append("name", getName())
                .append("breed", getBreedAsString());
        return fishDocument;
    };

    @Override
    public String toString() {
        return "Fish Object: {" +
                "internationalFishId=" + internationalFishId +
                ", name='" + name + '\'' +
                ", breed=" + breed +
                '}';
    }
}
Kafka Consumer for MongoDB - MongoDBSimpleConsumer.java

Note that this example consumer is written using the Kafka Simple Consumer API - there is also a Kafka High Level Consumer API which hides much of the complexity - including managing the offsets. The Simple API provides more control to the application but at the cost of writing extra code.

package com.clusterdb.kafka;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.*;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.bson.Document;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.lang.reflect.Type;

public class MongoDBSimpleConsumer {
    public static void main(String args[]) {
        MongoDBSimpleConsumer example = new MongoDBSimpleConsumer();
        //long maxReads = Long.parseLong(args[0]);
        long maxReads = 100;
        //String topic = args[1];
        String topic = "clusterdb-topic1";
        //int partition = Integer.parseInt(args[2]);
        int partition = 0;
        List<String> seeds = new ArrayList<String>();
        //seeds.add(args[3]);
        seeds.add("127.0.0.1");
        //int port = Integer.parseInt(args[4]);
        int port = 9092;
        try {
            example.run(maxReads, topic, partition, seeds, port);
        } catch (Exception e) {
            System.out.println("Oops:" + e);
            e.printStackTrace();
        }
    }

    private List<String> m_replicaBrokers = new ArrayList<String>();

    public MongoDBSimpleConsumer() {
        m_replicaBrokers = new ArrayList<String>();
    }

    public void run(long a_maxReads, String a_topic, int a_partition,
                    List<String> a_seedBrokers, int a_port) throws Exception {
        // find the meta data about the topic and partition we are interested in
        //
        PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition);
        if (metadata == null) {
            System.out.println("Can't find metadata for Topic and Partition. Exiting");
            return;
        }
        if (metadata.leader() == null) {
            System.out.println("Can't find Leader for Topic and Partition. Exiting");
            return;
        }
        String leadBroker = metadata.leader().host();
        String clientName = "Client_" + a_topic + "_" + a_partition;

        SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,
                100000, 64 * 1024, clientName);
        long readOffset = getLastOffset(consumer,a_topic, a_partition,
                kafka.api.OffsetRequest.EarliestTime(), clientName);

        int numErrors = 0;
        while (a_maxReads > 0) {
            if (consumer == null) {
                consumer = new SimpleConsumer(leadBroker, a_port,
                        100000, 64 * 1024, clientName);
            }
            FetchRequest req = new FetchRequestBuilder()
                    .clientId(clientName)
                    .addFetch(a_topic, a_partition, readOffset, 100000)
                    .build();
            FetchResponse fetchResponse = consumer.fetch(req);

            if (fetchResponse.hasError()) {
                numErrors++;
                // Something went wrong!
                short code = fetchResponse.errorCode(a_topic, a_partition);
                System.out.println("Error fetching data from the Broker:"
                        + leadBroker + " Reason: " + code);
                if (numErrors > 5) break;
                if (code == ErrorMapping.OffsetOutOfRangeCode())  {
                    // We asked for an invalid offset. For simple case ask
                    // for the last element to reset
                    readOffset = getLastOffset(consumer,a_topic, a_partition,
                            kafka.api.OffsetRequest.LatestTime(), clientName);
                    continue;
                }
                consumer.close();
                consumer = null;
                leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port);
                continue;
            }
            numErrors = 0;
            long numRead = 0;

            MongoClient client = new MongoClient();
            MongoDatabase db = client.getDatabase("clusterdb");
            MongoCollection<Document> fishCollection = db.getCollection("fish");
            Gson gson = new Gson();
            Type type = new TypeToken<Fish>() {}.getType();

            for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic,
                    a_partition)) {
                long currentOffset = messageAndOffset.offset();
                if (currentOffset < readOffset) {
                    System.out.println("Found an old offset: " + currentOffset
                            + " Expecting: " + readOffset);
                    continue;
                }
                readOffset = messageAndOffset.nextOffset();
                ByteBuffer payload = messageAndOffset.message().payload();

                byte[] bytes = new byte[payload.limit()];
                payload.get(bytes);
                Fish incomingFish = gson.fromJson(new String(bytes, "UTF-8"), type);
                System.out.println(incomingFish);
                fishCollection.insertOne(incomingFish.getFishAsDocument());

                numRead++;
                a_maxReads--;
            }

            if (numRead == 0) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        if (consumer != null) consumer.close();
    }

    public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                     long whichTime, String clientName) {
        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
                new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
        kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
        OffsetResponse response = consumer.getOffsetsBefore(request);

        if (response.hasError()) {
            System.out.println("Error fetching data Offset Data the Broker. Reason: "
                    + response.errorCode(topic, partition) );
            return 0;
        }
        long[] offsets = response.offsets(topic, partition);
        return offsets[0];
    }

    private String findNewLeader(String a_oldLeader, String a_topic, int a_partition,
                                 int a_port) throws Exception {
        for (int i = 0; i < 3; i++) {
            boolean goToSleep = false;
            PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port, a_topic,
                    a_partition);
            if (metadata == null) {
                goToSleep = true;
            } else if (metadata.leader() == null) {
                goToSleep = true;
            } else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
                // first time through if the leader hasn't changed give ZooKeeper
                // a second to recover second time, assume the broker did recover before failover,
                // or it was a non-Broker issue
                goToSleep = true;
            } else {
                return metadata.leader().host();
            }
            if (goToSleep) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException ie) {
                }
            }
        }
        System.out.println("Unable to find new leader after Broker failure. Exiting");
        throw new Exception("Unable to find new leader after Broker failure. Exiting");
    }

    private PartitionMetadata findLeader(List<String> a_seedBrokers, int a_port,
                                         String a_topic, int a_partition) {
        PartitionMetadata returnMetaData = null;
        loop:
        for (String seed : a_seedBrokers) {
            SimpleConsumer consumer = null;
            try {
                consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup");
                List<String> topics = Collections.singletonList(a_topic);
                TopicMetadataRequest req = new TopicMetadataRequest(topics);
                kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);

                List<TopicMetadata> metaData = resp.topicsMetadata();
                for (TopicMetadata item : metaData) {
                    for (PartitionMetadata part : item.partitionsMetadata()) {
                        if (part.partitionId() == a_partition) {
                            returnMetaData = part;
                            break loop;
                        }
                    }
                }
            } catch (Exception e) {
                System.out.println("Error communicating with Broker [" + seed + "] to find Leader for ["
                        + a_topic
                        + ", " + a_partition + "] Reason: " + e);
            } finally {
                if (consumer != null) consumer.close();
            }
        }
        if (returnMetaData != null) {
            m_replicaBrokers.clear();
            for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
                m_replicaBrokers.add(replica.host());
            }
        }
        return returnMetaData;
    }
}

Maven Dependencies - pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.clusterdb</groupId>
  <artifactId>M101J</artifactId>
  <version>1.0-SNAPSHOT</version>
  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
  <packaging>jar</packaging>

  <name>M101J</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.mongodb</groupId>
      <artifactId>mongodb-driver</artifactId>
      <version>3.2.2</version>
    </dependency>
    <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>2.6.2</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.11</artifactId>
        <version>0.8.2.2</version>
    </dependency>
      <dependency>
          <groupId>com.fasterxml.jackson.core</groupId>
          <artifactId>jackson-core</artifactId>
          <version>2.7.3</version>
      </dependency>

      <!-- Just the annotations; use this dependency if you want to attach annotations
           to classes without connecting them to the code. -->
      <dependency>
          <groupId>com.fasterxml.jackson.core</groupId>
          <artifactId>jackson-annotations</artifactId>
          <version>2.7.3</version>
      </dependency>

      <!-- databinding; ObjectMapper, JsonNode and related classes are here -->
      <dependency>
          <groupId>com.fasterxml.jackson.core</groupId>
          <artifactId>jackson-databind</artifactId>
          <version>2.7.3</version>
      </dependency>

      <!-- smile (binary JSON). Other artifacts in this group do other formats. -->
      <dependency>
          <groupId>com.fasterxml.jackson.dataformat</groupId>
          <artifactId>jackson-dataformat-smile</artifactId>
          <version>2.7.3</version>
      </dependency>
      <!-- JAX-RS provider -->
      <dependency>
          <groupId>com.fasterxml.jackson.jaxrs</groupId>
          <artifactId>jackson-jaxrs-json-provider</artifactId>
          <version>2.7.3</version>
      </dependency>
      <!-- Support for JAX-B annotations as additional configuration -->
      <dependency>
          <groupId>com.fasterxml.jackson.module</groupId>
          <artifactId>jackson-module-jaxb-annotations</artifactId>
          <version>2.7.3</version>
      </dependency>
  </dependencies>
</project>

Test Data - Fish.json A sample of the test data injected into Kafka is shown below:

{"internationalFishId": 93734195, "name": "Gerald", "breed": "Turbot"}
{"internationalFishId": 71858458, "name": "Douglas", "breed": "Turbot"}
{"internationalFishId": 25992945, "name": "Louise", "breed": "Turbot"}
{"internationalFishId": 95476834, "name": "Helen", "breed": "Kipper"}
{"internationalFishId": 27950146, "name": "Ronald", "breed": "Goldfish"}
{"internationalFishId": 19155329, "name": "Jerry", "breed": "Kipper"}
{"internationalFishId": 67784636, "name": "Benjamin", "breed": "Kipper"}
{"internationalFishId": 72704562, "name": "Stephanie", "breed": "Turbot"}
{"internationalFishId": 84804136, "name": "Evelyn", "breed": "Billy"}
{"internationalFishId": 49356570, "name": "Patrick", "breed": "Bass"}
{"internationalFishId": 22463391, "name": "Gerald", "breed": "Kipper"}
{"internationalFishId": 25494987, "name": "Jonathan", "breed": "Bass"}
{"internationalFishId": 25984696, "name": "Martin", "breed": "Turbot"}
{"internationalFishId": 89316196, "name": "Joe", "breed": "Kipper"}
{"internationalFishId": 93704506, "name": "Debra", "breed": "Bass"}
{"internationalFishId": 90875449, "name": "Susan", "breed": "Billy"}
{"internationalFishId": 3302594, "name": "Bruce", "breed": "Cod"}
{"internationalFishId": 23941776, "name": "Gerald", "breed": "Billy"}
{"internationalFishId": 14868491, "name": "Diane", "breed": "Bass"}
{"internationalFishId": 15475987, "name": "Joan", "breed": "Cod"}
{"internationalFishId": 82261217, "name": "Kathleen", "breed": "Billy"}
{"internationalFishId": 88362208, "name": "Nancy", "breed": "Cod"}
{"internationalFishId": 84881229, "name": "Aaron", "breed": "Cod"}
{"internationalFishId": 68008775, "name": "Randy", "breed": "Turbot"}
{"internationalFishId": 3246036, "name": "Larry", "breed": "Goldfish"}
{"internationalFishId": 25346448, "name": "Annie", "breed": "Billy"}
{"internationalFishId": 99978187, "name": "Wanda", "breed": "Goldfish"}
{"internationalFishId": 95566251, "name": "Susan", "breed": "Goldfish"}
{"internationalFishId": 3885361, "name": "Katherine", "breed": "Goldfish"}
{"internationalFishId": 12010058, "name": "Amy", "breed": "Kipper"}
{"internationalFishId": 81095784, "name": "Gerald", "breed": "Turbot"}
{"internationalFishId": 51150986, "name": "Laura", "breed": "Cod"}
{"internationalFishId": 62232475, "name": "Walter", "breed": "Kipper"}
{"internationalFishId": 58979946, "name": "Frances", "breed": "Goldfish"}
{"internationalFishId": 43801537, "name": "Carl", "breed": "Goldfish"}
{"internationalFishId": 23888593, "name": "Jason", "breed": "Cod"}
{"internationalFishId": 49527129, "name": "Shawn", "breed": "Billy"}
{"internationalFishId": 4168540, "name": "John", "breed": "Kipper"}
{"internationalFishId": 91915998, "name": "Amanda", "breed": "Cod"}
{"internationalFishId": 84999277, "name": "Ruth", "breed": "Cod"}
{"internationalFishId": 2823005, "name": "Phyllis", "breed": "Kipper"}
{"internationalFishId": 54375889, "name": "Patricia", "breed": "Billy"}
{"internationalFishId": 50481402, "name": "Joseph", "breed": "Cod"}
{"internationalFishId": 31330205, "name": "Patricia", "breed": "Turbot"}
{"internationalFishId": 68315446, "name": "Craig", "breed": "Billy"}
{"internationalFishId": 99509344, "name": "Peter", "breed": "Goldfish"}
{"internationalFishId": 99750447, "name": "Wayne", "breed": "Turbot"}
{"internationalFishId": 35470235, "name": "Rebecca", "breed": "Bass"}
{"internationalFishId": 22131676, "name": "David", "breed": "Kipper"}
{"internationalFishId": 14579895, "name": "Rachel", "breed": "Cod"}
{"internationalFishId": 59394346, "name": "Willie", "breed": "Cod"}
{"internationalFishId": 44310345, "name": "Anthony", "breed": "Bass"}
{"internationalFishId": 80822300, "name": "Willie", "breed": "Turbot"}
{"internationalFishId": 81693909, "name": "Laura", "breed": "Kipper"}
{"internationalFishId": 60343577, "name": "Harry", "breed": "Goldfish"}
{"internationalFishId": 28041557, "name": "Peter", "breed": "Kipper"}
{"internationalFishId": 61731232, "name": "Joshua", "breed": "Billy"}
{"internationalFishId": 67890018, "name": "Christina", "breed": "Goldfish"}
{"internationalFishId": 21402806, "name": "Shirley", "breed": "Bass"}
{"internationalFishId": 97849346, "name": "Carlos", "breed": "Goldfish"}
{"internationalFishId": 98194670, "name": "Thomas", "breed": "Bass"}
{"internationalFishId": 62769667, "name": "Richard", "breed": "Cod"}
{"internationalFishId": 26442193, "name": "Mark", "breed": "Cod"}
{"internationalFishId": 50659450, "name": "Tammy", "breed": "Kipper"}
{"internationalFishId": 73450338, "name": "Adam", "breed": "Goldfish"}
{"internationalFishId": 90544847, "name": "Margaret", "breed": "Bass"}
{"internationalFishId": 9763724, "name": "Harold", "breed": "Bass"}
{"internationalFishId": 49421808, "name": "Lawrence", "breed": "Cod"}
{"internationalFishId": 90370973, "name": "Kathleen", "breed": "Kipper"}
{"internationalFishId": 83311163, "name": "Billy", "breed": "Kipper"}
{"internationalFishId": 14814821, "name": "Ann", "breed": "Turbot"}
{"internationalFishId": 60732672, "name": "Mark", "breed": "Turbot"}
{"internationalFishId": 99621807, "name": "Donna", "breed": "Turbot"}
{"internationalFishId": 21966510, "name": "Jimmy", "breed": "Kipper"}
{"internationalFishId": 57278762, "name": "Benjamin", "breed": "Kipper"}
{"internationalFishId": 39839485, "name": "Eric", "breed": "Kipper"}
{"internationalFishId": 14616731, "name": "Julia", "breed": "Bass"}
{"internationalFishId": 68590077, "name": "Nicole", "breed": "Goldfish"}
{"internationalFishId": 16595863, "name": "Dennis", "breed": "Turbot"}
{"internationalFishId": 51139271, "name": "Brian", "breed": "Goldfish"}
{"internationalFishId": 29777274, "name": "Marie", "breed": "Turbot"}
{"internationalFishId": 13574988, "name": "Sandra", "breed": "Cod"}
{"internationalFishId": 19393831, "name": "Andrea", "breed": "Turbot"}
{"internationalFishId": 33182829, "name": "Jerry", "breed": "Kipper"}
{"internationalFishId": 37685705, "name": "William", "breed": "Kipper"}
{"internationalFishId": 88965921, "name": "Janice", "breed": "Turbot"}
{"internationalFishId": 98974498, "name": "Deborah", "breed": "Goldfish"}
{"internationalFishId": 20433714, "name": "Daniel", "breed": "Billy"}
{"internationalFishId": 2721376, "name": "Antonio", "breed": "Goldfish"}
{"internationalFishId": 93776684, "name": "Brandon", "breed": "Billy"}
{"internationalFishId": 15673501, "name": "Daniel", "breed": "Goldfish"}
{"internationalFishId": 10207475, "name": "Joseph", "breed": "Bass"}
{"internationalFishId": 51689664, "name": "Jack", "breed": "Cod"}
{"internationalFishId": 87980387, "name": "Brandon", "breed": "Goldfish"}
{"internationalFishId": 84577085, "name": "Steven", "breed": "Kipper"}
{"internationalFishId": 45082334, "name": "Tina", "breed": "Cod"}
{"internationalFishId": 81295703, "name": "Charles", "breed": "Turbot"}
{"internationalFishId": 84808531, "name": "Amanda", "breed": "Turbot"}
{"internationalFishId": 49809743, "name": "Adam", "breed": "Turbot"}
{"internationalFishId": 77388184, "name": "Benjamin", "breed": "Turbot"}

For simple testing, this data can be injected into the clusterdb-topic1 topic using the kafka-console-producer.sh command.

Next Steps

To learn much more about data streaming and how MongoDB fits in (including Apache Kafka and competing and complementary technologies) read the Data Streaming with Kafka & MongoDB white paper.

kafka Test data MongoDB

Published at DZone with permission of Andrew Morgan, DZone MVB. See the original article here.

Opinions expressed by DZone contributors are their own.

Related

  • Kafka Link: Ingesting Data From MongoDB to Capella Columnar
  • Implementing MongoDB to Elastic Search 7.X Data Pipeline
  • Data Pipeline Using MongoDB and Kafka Connect on Kubernetes
  • Building AMQP-Based Messaging Framework on MongoDB

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!