DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Because the DevOps movement has redefined engineering responsibilities, SREs now have to become stewards of observability strategy.

Apache Cassandra combines the benefits of major NoSQL databases to support data management needs not covered by traditional RDBMS vendors.

The software you build is only as secure as the code that powers it. Learn how malicious code creeps into your software supply chain.

Generative AI has transformed nearly every industry. How can you leverage GenAI to improve your productivity and efficiency?

Related

  • NiFi In-Memory Processing
  • Real-Time Analytics: All Data, Any Data, Any Scale, at Any Time
  • Building a Real-Time Change Data Capture Pipeline With Debezium, Kafka, and PostgreSQL
  • Event-Driven Microservices: How Kafka and RabbitMQ Power Scalable Systems

Trending

  • IoT and Cybersecurity: Addressing Data Privacy and Security Challenges
  • How GitHub Copilot Helps You Write More Secure Code
  • Prioritizing Cloud Security Risks: A Developer's Guide to Tackling Security Debt
  • How Kubernetes Cluster Sizing Affects Performance and Cost Efficiency in Cloud Deployments
  1. DZone
  2. Coding
  3. Tools
  4. Exploring Apache NiFi 1.10: Parameters and Stateless Engine

Exploring Apache NiFi 1.10: Parameters and Stateless Engine

By 
Tim Spann user avatar
Tim Spann
DZone Core CORE ·
Nov. 26, 19 · Tutorial
Likes (4)
Comment
Save
Tweet
Share
29.0K Views

Join the DZone community and get the full member experience.

Join For Free

Image title

Apache NiFi Is Now Available in 1.10!

https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12316020&version=12344993

You can now use JDK 8 or JDK 11! I am running in JDK 11, and it seems a bit faster. A huge feature is the addition of Parameters! You can use them to pass parameters to Apache NiFi Stateless!

A few lesser Processors have been moved from the main download. See the following links for migration hints:

https://cwiki.apache.org/confluence/display/NIFI/Migration+Guidance

Release Notes:   https://cwiki.apache.org/confluence/display/NIFI/Release+Notes#ReleaseNotes-Version1.10.0

Example Source Code:   https://github.com/tspannhw/stateless-examples

More New Features:

  • ParquetReader/Writer (See: https://www.datainmotion.dev/2019/10/migrating-apache-flume-flows-to-apache_7.html.)
  • Prometheus Reporting Task. Expect more Prometheus features coming soon.
  • Experimental Encrypted content repository. People asked me for this one before.
  • Parameters!! Time to replace Variables/Variable Registry. Parameters are better in every way.
  • Toolkit module to generate and build Swagger API library for NiFi.
  • PostSlack processor.
  • PublishKafka Partition Support.
  • GeoEnrichIPRecord Processor.
  • Remote Input Port in a Process Group.
  • Command Line Diagnostics.
  • RocksDB FlowFile Repository.
  • PutBigQueryStreaming Processor.
  • nifi.analytics.predict.enabled — Turn on Back Pressure Prediction.
  • More Lookup Services for ETL/ELT: DatabaseRecordLookupService.
  • KuduLookupService.
  • HBase_2_ListLookupService.

Stateless

First, we will run in the command line straight from the NiFi Registry. Then, we will run from YARN! Yes, you can now run your Apache NiFi flows on your giant Cloudera CDH/HDP/CDP YARN clusters! Let's make use of your hundreds of Hadoop nodes.

Stateless Examples

Let's Build A Stateless Flow

The first thing to keep in mind is that we will want anything that might change to be a parameter that we can pass with our JSON file. It's very easy to set parameters even for dropdowns! You even get prompted to pick a parameter from a selection list. Before parameters are available, you will need to add them to a parameter list and assign that parameter context to your Process Group.

A Parameter in a Processor Configuration is shown as #{broker}
Configuring processor

Configuring processor

Parameter Context Connected to a Process Group, Controller Service, ...

Updating parameter context

Updating parameter context

Apply those parameters

Applying parameter context

Applying parameter context

Param(eter) is now an option for properties

Parameter now an option for properties

Parameter now an option for properties

Pop-up Hint for Using Parameters

Pop-up parameter hint

Pop-up parameter hint

Edit a Parameter in a Parameter Context

Editing parameter context

Editing parameter context

We can configure parameters in Controller Services as well.

Configure controller service

Configure controller service

So easy to choose an existing one.
Choosing existing parameter

Choosing existing parameter

Use them for anything that can change or is something you don't want to hardcode.

Apache Kafka Consumer to Sink

This is a simple two-step Apache NiFi flow that reads from Kafka and sends output to a sink, for example, a File.

Consuming Kafka 2.0 messages

Consuming Kafka 2.0 messages

Let's make sure we use that Parameter Context 

Using parameter context

Using parameter context

To Build Your JSON Configuration File, you will need the bucket ID and flow ID from your Apache NiFi Registry. You will also need the URL for that registry. You can browse that registry at a URL similar to http://tspann-mbp15-hw14277:18080.

Stateless Kafka — Production

Stateless Kafka — Production

My Command Line Runner

/Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/bin/nifi.sh stateless RunFromRegistry Continuous --file /Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/logs/kafkaconsumer.json

RunFromRegistry [Once|Continuous] --file <File Name>

This is the basic use case of running from the command line using a file. The flow must exist in the reference Apache NiFi Registry.

JSON Configuration File (kafkaconsumer.json)

{

  "registryUrl": "http://tspann-mbp15-hw14277:18080",

  "bucketId": "140b30f0-5a47-4747-9021-19d4fde7f993",

  "flowId": "0540e1fd-c7ca-46fb-9296-e37632021945",

  "ssl": {

    "keystoreFile": "",

    "keystorePass": "",

    "keyPass": "",

    "keystoreType": "",

    "truststoreFile": "/Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home/lib/security/cacerts",

    "truststorePass": "changeit",

    "truststoreType": "JKS"

  },

  "parameters": {

    "broker" : "4.317.852.100:9092",

    "topic" : "iot",

    "group_id" : "nifi-stateless-kafka-consumer",

    "DestinationDirectory" : "/tmp/nifistateless/output2/",

    "output_dir": "/Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/logs/output"

  }

}


Example Run

12:25:38.725 [main] DEBUG org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0 - ConsumeKafka_2_0[id=e405df7f-87ca-305a-95a9-d25e3c5dbb56] Running ConsumeKafka_2_0.onTrigger with 0 FlowFiles

12:25:38.728 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Node 8 sent an incremental fetch response for session 1943199939 with 0 response partition(s), 10 implied partition(s)

12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-8 at offset 15 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)

12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-9 at offset 16 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)

12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-6 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)

12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-7 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)

12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-4 at offset 18 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)

12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-5 at offset 16 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)

12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-2 at offset 17 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)

12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-3 at offset 19 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)

12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-0 at offset 16 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)

12:25:38.728 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Added READ_UNCOMMITTED fetch request for partition iot-1 at offset 20 to node ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)

12:25:38.728 [main] DEBUG org.apache.kafka.clients.FetchSessionHandler - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Built incremental fetch (sessionId=1943199939, epoch=5) for node 8. Added 0 partition(s), altered 0 partition(s), removed 0 partition(s) out of 10 partition(s)

12:25:38.729 [main] DEBUG org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=nifi-stateless-kafka-consumer] Sending READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), implied=(iot-8, iot-9, iot-6, iot-7, iot-4, iot-5, iot-2, iot-3, iot-0, iot-1)) to broker ip-10-0-1-244.ec2.internal:9092 (id: 8 rack: null)

12:25:38.737 [main] DEBUG org.apache.nifi.processors.kafka.pubsub.ConsumeKafka_2_0 - ConsumeKafka_2_0[id=e405df7f-87ca-305a-95a9-d25e3c5dbb56] Running ConsumeKafka_2_0.onTrigger with 0 FlowFiles


Example Output

cat output/247361879273711.statelessFlowFile

{"id":"20191105113853_350b493f-9308-4eb2-b71f-6bcdbaf5d6c1_Timer-Driven Process Thread-13","te":"0.5343","diskusage":"0.2647115097153814.3 MB","memory":57,"cpu":132.87,"host":"192.168.1.249/tspann-MBP15-HW14277","temperature":"72","macaddress":"dd73eadf-1ac1-4f76-aecb-14be86ce46ce","end":"48400221819907","systemtime":"11/05/2019 11:38:53"}


Command line output

Example output

We can also run Once in this example to send one Kafka message.

Generator to Apache Kafka Producer

Generator to Producer flow

Generator to Producer flow

My Command Line Runner

 /Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/bin/nifi.sh stateless
RunFromRegistry Once --file /Users/tspann/Documents/nifi-1.10.0-SNAPSHOT/logs/
 kafka.json

JSON Configuration File (kafka.json)

{

  "registryUrl": "http://tspann-mbp15-hw14277:18080",

  "bucketId": "140b30f0-5a47-4747-9021-19d4fde7f993",

  "flowId": "402814a2-fb7a-4b19-a641-9f4bb191ed67",

  "flowVersion": "1",

  "ssl": {

    "keystoreFile": "",

    "keystorePass": "",

    "keyPass": "",

    "keystoreType": "",

    "truststoreFile": "/Library/Java/JavaVirtualMachines/amazon-corretto-11.jdk/Contents/Home/lib/security/cacerts",

    "truststorePass": "changeit",

    "truststoreType": "JKS"

  },

  "parameters": {

    "broker" : "3.218.152.236:9092"

  }

}


Example Output

12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 33304, SO_SNDBUF = 131768, SO_TIMEOUT = 0 to node 8 12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node 8. Fetching API versions. 12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node 8. 12:32:37.732 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Recorded API versions for node 8: (Produce(0): 0 to 7 [usable: 6], Fetch(1): 0 to 10 [usable: 8], ListOffsets(2): 0 to 5 [usable: 3], Metadata(3): 0 to 7 [usable: 6], LeaderAndIsr(4): 0 to 2 [usable: 1], StopReplica(5): 0 to 1 [usable: 0], UpdateMetadata(6): 0 to 5 [usable: 4], ControlledShutdown(7): 0 to 2 [usable: 1], OffsetCommit(8): 0 to 6 [usable: 4], OffsetFetch(9): 0 to 5 [usable: 4], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 4 [usable: 3], Heartbeat(12): 0 to 2 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 2 [usable: 2], DescribeGroups(15): 0 to 2 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 2], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 2 [usable: 1], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 1], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 0], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], UNKNOWN(43): 0) 12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.records-per-batch 12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.bytes 12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.compression-rate 12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.record-retries 12:32:37.740 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.record-errors 12:32:37.745 [main] DEBUG org.apache.nifi.parameter.ExpressionLanguageAwareParameterParser - For input iot found 0 Parameter references: [] 12:32:37.745 [main] DEBUG org.apache.nifi.parameter.ExpressionLanguageAwareParameterParser - For input iot found 0 Parameter references: [] Flow Succeeded 12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.network.Selector - [Producer clientId=producer-1] Created socket with SO_RCVBUF = 33304, SO_SNDBUF = 131768, SO_TIMEOUT = 0 to node 8

12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Completed connection to node 8. Fetching API versions.

12:32:37.717 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Initiating API versions fetch from node 8.

12:32:37.732 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Recorded API versions for node 8: (Produce(0): 0 to 7 [usable: 6], Fetch(1): 0 to 10 [usable: 8], ListOffsets(2): 0 to 5 [usable: 3], Metadata(3): 0 to 7 [usable: 6], LeaderAndIsr(4): 0 to 2 [usable: 1], StopReplica(5): 0 to 1 [usable: 0], UpdateMetadata(6): 0 to 5 [usable: 4], ControlledShutdown(7): 0 to 2 [usable: 1], OffsetCommit(8): 0 to 6 [usable: 4], OffsetFetch(9): 0 to 5 [usable: 4], FindCoordinator(10): 0 to 2 [usable: 2], JoinGroup(11): 0 to 4 [usable: 3], Heartbeat(12): 0 to 2 [usable: 2], LeaveGroup(13): 0 to 2 [usable: 2], SyncGroup(14): 0 to 2 [usable: 2], DescribeGroups(15): 0 to 2 [usable: 2], ListGroups(16): 0 to 2 [usable: 2], SaslHandshake(17): 0 to 1 [usable: 1], ApiVersions(18): 0 to 2 [usable: 2], CreateTopics(19): 0 to 3 [usable: 3], DeleteTopics(20): 0 to 3 [usable: 2], DeleteRecords(21): 0 to 1 [usable: 1], InitProducerId(22): 0 to 1 [usable: 1], OffsetForLeaderEpoch(23): 0 to 2 [usable: 1], AddPartitionsToTxn(24): 0 to 1 [usable: 1], AddOffsetsToTxn(25): 0 to 1 [usable: 1], EndTxn(26): 0 to 1 [usable: 1], WriteTxnMarkers(27): 0 [usable: 0], TxnOffsetCommit(28): 0 to 2 [usable: 1], DescribeAcls(29): 0 to 1 [usable: 1], CreateAcls(30): 0 to 1 [usable: 1], DeleteAcls(31): 0 to 1 [usable: 1], DescribeConfigs(32): 0 to 2 [usable: 2], AlterConfigs(33): 0 to 1 [usable: 1], AlterReplicaLogDirs(34): 0 to 1 [usable: 1], DescribeLogDirs(35): 0 to 1 [usable: 1], SaslAuthenticate(36): 0 to 1 [usable: 0], CreatePartitions(37): 0 to 1 [usable: 1], CreateDelegationToken(38): 0 to 1 [usable: 1], RenewDelegationToken(39): 0 to 1 [usable: 1], ExpireDelegationToken(40): 0 to 1 [usable: 1], DescribeDelegationToken(41): 0 to 1 [usable: 1], DeleteGroups(42): 0 to 1 [usable: 1], UNKNOWN(43): 0)

12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.records-per-batch

12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.bytes

12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.compression-rate

12:32:37.739 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.record-retries

12:32:37.740 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.iot.record-errors

12:32:37.745 [main] DEBUG org.apache.nifi.parameter.ExpressionLanguageAwareParameterParser - For input iot found 0 Parameter references: []

12:32:37.745 [main] DEBUG org.apache.nifi.parameter.ExpressionLanguageAwareParameterParser - For input iot found 0 Parameter references: []

Flow Succeeded


Other Runtime Options:

RunYARNServiceFromRegistry        <YARN RM URL> <Docker Image Name> <Service Name> <# of Containers> --file <File Name>

RunOpenwhiskActionServer          <Port>


References:

  • Awesome Article on NiFi 1.10 Error Handling:   https://medium.com/@abdelkrim.hadjidj/apache-nifi-1-10-series-simplifying-error-handling-7de86f130acd
  • https://www.datainmotion.dev/2019/08/find-cacerts-from-java-jre-lib-security.html
  • https://github.com/apache/nifi/tree/master/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-stateless
  • https://nifi.apache.org/docs/nifi-docs/html/user-guide.html
  • Parameters Added to API:   https://nifi.apache.org/docs/nifi-docs/rest-api/index.html
  • http://bit.ly/cdf-platform 
  • https://www.mtnfog.com/blog/apache-nifi-phi-processing
  • https://www.slideshare.net/BryanBende/apache-nifi-sdlc-improvements
  • https://nifi.apache.org/registry

Add A S2S Port Inside Process Group

Adding input port

Adding input port

ParquetReader

Adding controller service

Adding controller service

ParquetRecordSetWriter

Adding controller service

Adding controller service

Example Ouput

Example output

Example output

Example output

Example output


Further Reading

  • A Kafka Tutorial for Everyone, no Matter Your Stage in Development.
  • Real-Time Stream Processing With Apache Kafka Part 3: Setup a Single Node Kafka Cluster.
  • Kafka Technical Overview.
kafka Apache NiFi Engine

Opinions expressed by DZone contributors are their own.

Related

  • NiFi In-Memory Processing
  • Real-Time Analytics: All Data, Any Data, Any Scale, at Any Time
  • Building a Real-Time Change Data Capture Pipeline With Debezium, Kafka, and PostgreSQL
  • Event-Driven Microservices: How Kafka and RabbitMQ Power Scalable Systems

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!