DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • End-to-End Event Streaming With Kafka, Spring Boot and AWS SQS/SNS (Production-Ready Code Guide)
  • Securing and Monitoring Your Data Pipeline: Best Practices for Kafka, AWS RDS, Lambda, and API Gateway Integration
  • A Comparative Analysis: AWS Kinesis vs Amazon Managed Streaming for Kafka - MSK
  • Designing High Performant Responsive Web Application With AWS Services and Finetuning for Performance

Trending

  • Alternative Structured Concurrency
  • Every Cache Miss Is a Tiny Tax on Your Performance
  • Why Stable RAG Answers Can Still Hide Unstable Evidence
  • Implementing Secure API Gateways for Microservices Architecture
  1. DZone
  2. Coding
  3. Tools
  4. Reading AWS S3 File Content to Kafka Topic

Reading AWS S3 File Content to Kafka Topic

How to Read S3 files to Kafka topic using CamelAWSS3SourceConnector.

By 
Ramu kakarla user avatar
Ramu kakarla
·
Sep. 03, 20 · Tutorial
Likes (3)
Comment
Save
Tweet
Share
12.3K Views

Join the DZone community and get the full member experience.

Join For Free

Apache Camel

Apache Camel is an open-source framework for message-oriented middleware with a rule-based routing and mediation engine that provides a Java object-based implementation of the Enterprise Integration Patterns using an application programming interface to configure routing and mediation rules

Red Hat AMQ Streams

Red Hat AMQ Streams is a massively-scalable, distributed, and high-performance data streaming platform based on the Apache ZooKeeper and Apache Kafka projects.

The main components comprise:

Kafka Broker

Messaging broker responsible for delivering records from producing clients to consuming clients.

Apache ZooKeeper is a core dependency for Kafka, providing a cluster coordination service for highly reliable distributed coordination.

AMQ Streams architecture

kafka bridge

camel-kafka-connector

Camel Kafka Connector allows you to use all Camel components as Kafka Connect connectors.

This is a "Camel Kafka connector adapter" that aims to provide a user friendly way to use all Apache Camel components in Kafka Connect. 

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka.For more information about Kafka Connect take a look here.

Prerequisites

For this demonstration, you will need the following technologies set up in your development environment:

  1. Apache Maven 3.6.3+
  2. JDK 11 Installed
  3. kafka cluster 
  4. AWS Account set up and Files available in S3 bucket. 

In this article, we demonstrate how to read files from S3 buckets and write to kafka Topic using 

CamelAWSS3SourceConnector

Prepare the Needed Bits to Develop the Example

 setup the plugin.path property in your kafka

Open the $KAFKA_HOME/config/connect-standalone.properties

and set the plugin.path property to your chosen location

Java
 




xxxxxxxxxx
1


 
1
plugin.path=/home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/camel-kafka-connectors/



In this example we’ll use /home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/camel-kafka-connectors

Download 'camel-aws-s3-kafka-connector'

Java
 




x


 
1
wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-aws-s3-kafka-connector/0.4.0/camel-aws-s3-kafka-connector-0.4.0-package.zip
2

          
3
unzip camel-aws-s3-kafka-connector-0.4.0-package.zip



Configure properties in file CamelAWSS3SourceConnector.properties

Java
 




xxxxxxxxxx
1
17


 
1
name=CamelAWSS3SourceConnector
2
connector.class=org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector
3
key.converter=org.apache.kafka.connect.storage.StringConverter
4
value.converter=org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter
5

          
6
camel.source.maxPollDuration=10000
7

          
8
topics=mytopic
9

          
10
camel.component.aws-s3.access-key=xxxxx
11
camel.component.aws-s3.secret-key=yyyyyy
12
camel.component.aws-s3.region=US_EAST_2
13
camel.source.path.bucketNameOrArn=arn:aws:s3:::kkakarla-test-kafka-connector
14
camel.source.endpoint.autocloseBody=true



Start zookeeper and kafka server

Java
 




xxxxxxxxxx
1


 
1
./bin/zookeeper-server-start.sh config/zookeeper.properties
2
./bin/kafka-server-start.sh config/server.properties



Create mytopic

Java
 




xxxxxxxxxx
1


 
1
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic  mytopic



Now start the application

Java
 




xxxxxxxxxx
1


 
1
./bin/connect-standalone.sh /home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/kafka_2.12-2.5.0.redhat-00003/config/connect-standalone.properties  /home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/camel-kafka-connector-examples/examples/aws-s3/CamelAWSS3SourceConnector.properties


Now upload any file to the S3 bucket 'kkakarla-test-kafka-connector'

logs

Java
 




xxxxxxxxxx
1
108


 
1
2020-09-01 12:43:17,149] INFO Kafka version: 2.5.0.redhat-00003 (org.apache.kafka.common.utils.AppInfoParser:117)
2
[2020-09-01 12:43:17,149] INFO Kafka commitId: f960e3745ec74111 (org.apache.kafka.common.utils.AppInfoParser:118)
3
[2020-09-01 12:43:17,149] INFO Kafka startTimeMs: 1598944397149 (org.apache.kafka.common.utils.AppInfoParser:119)
4
[2020-09-01 12:43:17,156] INFO Starting CamelSourceTask connector task (org.apache.camel.kafkaconnector.CamelSourceTask:77)
5
[2020-09-01 12:43:17,156] INFO [Producer clientId=connector-producer-CamelAWSS3SourceConnector-0] Cluster ID: jmyQzmm2QUe12p8is0zNAQ (org.apache.kafka.clients.Metadata:280)
6
[2020-09-01 12:43:17,156] INFO Created connector CamelAWSS3SourceConnector (org.apache.kafka.connect.cli.ConnectStandalone:112)
7
[2020-09-01 12:43:17,157] INFO CamelAwss3SourceConnectorConfig values: 
8
    camel.component.aws-s3.accelerateModeEnabled = false
9
    camel.component.aws-s3.accessKey = null
10
    camel.component.aws-s3.amazonS3Client = null
11
    camel.component.aws-s3.autoCreateBucket = true
12
    camel.component.aws-s3.autocloseBody = true
13
    camel.component.aws-s3.basicPropertyBinding = false
14
    camel.component.aws-s3.bridgeErrorHandler = false
15
    camel.component.aws-s3.chunkedEncodingDisabled = false
16
    camel.component.aws-s3.configuration = null
17
    camel.component.aws-s3.deleteAfterRead = true
18
    camel.component.aws-s3.delimiter = null
19
    camel.component.aws-s3.dualstackEnabled = false
20
    camel.component.aws-s3.encryptionMaterials = null
21
    camel.component.aws-s3.endpointConfiguration = null
22
    camel.component.aws-s3.fileName = null
23
    camel.component.aws-s3.forceGlobalBucketAccessEnabled = false
24
    camel.component.aws-s3.includeBody = true
25
    camel.component.aws-s3.pathStyleAccess = false
26
    camel.component.aws-s3.payloadSigningEnabled = false
27
    camel.component.aws-s3.policy = null
28
    camel.component.aws-s3.prefix = null
29
    camel.component.aws-s3.proxyHost = null
30
    camel.component.aws-s3.proxyPort = null
31
    camel.component.aws-s3.proxyProtocol = HTTPS
32
    camel.component.aws-s3.region = US_EAST_2
33
    camel.component.aws-s3.secretKey = null
34
    camel.component.aws-s3.useEncryption = false
35
    camel.component.aws-s3.useIAMCredentials = false
36
    camel.source.camelMessageHeaderKey = null
37
    camel.source.component = aws-s3
38
    camel.source.contentLogLevel = OFF
39
    camel.source.endpoint.accelerateModeEnabled = false
40
    camel.source.endpoint.accessKey = null
41
    camel.source.endpoint.amazonS3Client = null
42
    camel.source.endpoint.autoCreateBucket = true
43
    camel.source.endpoint.autocloseBody = true
44
    camel.source.endpoint.backoffErrorThreshold = null
45
    camel.source.endpoint.backoffIdleThreshold = null
46
    camel.source.endpoint.backoffMultiplier = null
47
    camel.source.endpoint.basicPropertyBinding = false
48
    camel.source.endpoint.bridgeErrorHandler = false
49
    camel.source.endpoint.chunkedEncodingDisabled = false
50
    camel.source.endpoint.delay = 500
51
    camel.source.endpoint.deleteAfterRead = true
52
    camel.source.endpoint.delimiter = null
53
    camel.source.endpoint.dualstackEnabled = false
54
    camel.source.endpoint.encryptionMaterials = null
55
    camel.source.endpoint.endpointConfiguration = null
56
    camel.source.endpoint.exceptionHandler = null
57
    camel.source.endpoint.exchangePattern = null
58
    camel.source.endpoint.fileName = null
59
    camel.source.endpoint.forceGlobalBucketAccessEnabled = false
60
    camel.source.endpoint.greedy = false
61
    camel.source.endpoint.includeBody = true
62
    camel.source.endpoint.initialDelay = 1000
63
    camel.source.endpoint.maxConnections = 60
64
    camel.source.endpoint.maxMessagesPerPoll = 10
65
    camel.source.endpoint.pathStyleAccess = false
66
    camel.source.endpoint.payloadSigningEnabled = false
67
    camel.source.endpoint.policy = null
68
    camel.source.endpoint.pollStrategy = null
69
    camel.source.endpoint.prefix = null
70
    camel.source.endpoint.proxyHost = null
71
    camel.source.endpoint.proxyPort = null
72
    camel.source.endpoint.proxyProtocol = HTTPS
73
    camel.source.endpoint.region = null
74
    camel.source.endpoint.repeatCount = 0
75
    camel.source.endpoint.runLoggingLevel = TRACE
76
    camel.source.endpoint.scheduledExecutorService = null
77
    camel.source.endpoint.scheduler = none
78
    camel.source.endpoint.schedulerProperties = null
79
    camel.source.endpoint.secretKey = null
80
    camel.source.endpoint.sendEmptyMessageWhenIdle = false
81
    camel.source.endpoint.startScheduler = true
82
    camel.source.endpoint.synchronous = false
83
    camel.source.endpoint.timeUnit = MILLISECONDS
84
    camel.source.endpoint.useEncryption = false
85
    camel.source.endpoint.useFixedDelay = true
86
    camel.source.endpoint.useIAMCredentials = false
87
    camel.source.marshal = null
88
    camel.source.maxBatchPollSize = 1000
89
    camel.source.maxPollDuration = 10000
90
    camel.source.path.bucketNameOrArn = arn:aws:s3:::kkakarla-test-kafka-connector
91
    camel.source.pollingConsumerBlockTimeout = 0
92
    camel.source.pollingConsumerBlockWhenFull = true
93
    camel.source.pollingConsumerQueueSize = 1000
94
    camel.source.unmarshal = null
95
    camel.source.url = null
96
    topics = mytopic
97
 (org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnectorConfig:347)
98
[2020-09-01 12:43:17,239] INFO Setting initial properties in Camel context: [{connector.class=org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector, camel.source.endpoint.autocloseBody=true, camel.source.maxPollDuration=10000, topics=mytopic, camel.component.aws-s3.region=US_EAST_2, camel.source.component=aws-s3, task.class=org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceTask, camel.source.path.bucketNameOrArn=arn:aws:s3:::kkakarla-test-kafka-connector, camel.component.aws-s3.access-key=xxxxxxxx, name=CamelAWSS3SourceConnector, value.converter=org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter, camel.component.aws-s3.secret-key=xxxxxxxxxxxx, key.converter=org.apache.kafka.connect.storage.StringConverter}] (org.apache.camel.kafkaconnector.utils.CamelMainSupport:91)
99
[2020-09-01 12:43:17,244] INFO Using properties from: classpath:application.properties;optional=true (org.apache.camel.main.BaseMainSupport:463)
100
[2020-09-01 12:43:17,271] INFO No additional Camel XML routes discovered from: classpath:camel/*.xml (org.apache.camel.main.DefaultRoutesCollector:126)
101
[2020-09-01 12:43:17,272] INFO No additional Camel XML rests discovered from: classpath:camel-rest/*.xml (org.apache.camel.main.DefaultRoutesCollector:162)
102
[2020-09-01 12:43:17,285] INFO Creating Camel route from({}) (org.apache.camel.kafkaconnector.utils.CamelMainSupport:102)
103
[2020-09-01 12:43:17,285] INFO .to(direct:end?pollingConsumerQueueSize=1000&pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true) (org.apache.camel.kafkaconnector.utils.CamelMainSupport:130)
104
[2020-09-01 12:43:17,299] INFO Starting CamelContext (org.apache.camel.kafkaconnector.utils.CamelMainSupport:138)
105
[2020-09-01 12:43:17,360] INFO Apache Camel 3.4.2 (camel-1) is starting (org.apache.camel.impl.engine.AbstractCamelContext:2630)
106
[2020-09-01 12:43:17,361] INFO StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html (org.apache.camel.impl.engine.AbstractCamelContext:2773)
107
[2020-09-01 12:43:19,848] INFO Route: route1 started and consuming from: aws-s3://arn:aws:s3:::kkakarla-test-kafka-connector (org.apache.camel.impl.engine.InternalRouteStartupManager:158)
108

          


The content of the file will be written to kafka topic mytopic

Java
 




xxxxxxxxxx
1


 
1
[kkakarla@kkakarla kafka_2.12-2.5.0.redhat-00003]$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic mytopic
2
hi hello how are you
3

          
4

          



For complete properties please refer CAMEL-AWS-S3-KAFKA-CONNECTOR SOURCE CONFIGURATION

Summary

CamelKafkaConnectors  helps those who do not want to write the code to read and write from external systems to kafka 

List of SUPPORTED CONNECTORS AND DOCUMENTATION

kafka AWS

Opinions expressed by DZone contributors are their own.

Related

  • End-to-End Event Streaming With Kafka, Spring Boot and AWS SQS/SNS (Production-Ready Code Guide)
  • Securing and Monitoring Your Data Pipeline: Best Practices for Kafka, AWS RDS, Lambda, and API Gateway Integration
  • A Comparative Analysis: AWS Kinesis vs Amazon Managed Streaming for Kafka - MSK
  • Designing High Performant Responsive Web Application With AWS Services and Finetuning for Performance

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook