DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Please enter at least three characters to search
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Zones

Culture and Methodologies Agile Career Development Methodologies Team Management
Data Engineering AI/ML Big Data Data Databases IoT
Software Design and Architecture Cloud Architecture Containers Integration Microservices Performance Security
Coding Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks
Culture and Methodologies
Agile Career Development Methodologies Team Management
Data Engineering
AI/ML Big Data Data Databases IoT
Software Design and Architecture
Cloud Architecture Containers Integration Microservices Performance Security
Coding
Frameworks Java JavaScript Languages Tools
Testing, Deployment, and Maintenance
Deployment DevOps and CI/CD Maintenance Monitoring and Observability Testing, Tools, and Frameworks

Last call! Secure your stack and shape the future! Help dev teams across the globe navigate their software supply chain security challenges.

Modernize your data layer. Learn how to design cloud-native database architectures to meet the evolving demands of AI and GenAI workloads.

Releasing software shouldn't be stressful or risky. Learn how to leverage progressive delivery techniques to ensure safer deployments.

Avoid machine learning mistakes and boost model performance! Discover key ML patterns, anti-patterns, data strategies, and more.

Related

  • Securing and Monitoring Your Data Pipeline: Best Practices for Kafka, AWS RDS, Lambda, and API Gateway Integration
  • A Comparative Analysis: AWS Kinesis vs Amazon Managed Streaming for Kafka - MSK
  • Designing High Performant Responsive Web Application With AWS Services and Finetuning for Performance
  • Integrating Cloud-Based Applications, Kafka Middle-Ware/Data Streams, CRM, and Snowflake Data Warehouse in IT Architecture for Small and Medium Enterprises

Trending

  • Unlocking the Potential of Apache Iceberg: A Comprehensive Analysis
  • Measuring the Impact of AI on Software Engineering Productivity
  • Start Coding With Google Cloud Workstations
  • Cookies Revisited: A Networking Solution for Third-Party Cookies
  1. DZone
  2. Coding
  3. Tools
  4. Reading AWS S3 File Content to Kafka Topic

Reading AWS S3 File Content to Kafka Topic

How to Read S3 files to Kafka topic using CamelAWSS3SourceConnector.

By 
Ramu kakarla user avatar
Ramu kakarla
·
Sep. 03, 20 · Tutorial
Likes (3)
Comment
Save
Tweet
Share
12.1K Views

Join the DZone community and get the full member experience.

Join For Free

Apache Camel

Apache Camel is an open-source framework for message-oriented middleware with a rule-based routing and mediation engine that provides a Java object-based implementation of the Enterprise Integration Patterns using an application programming interface to configure routing and mediation rules

Red Hat AMQ Streams

Red Hat AMQ Streams is a massively-scalable, distributed, and high-performance data streaming platform based on the Apache ZooKeeper and Apache Kafka projects.

The main components comprise:

Kafka Broker

Messaging broker responsible for delivering records from producing clients to consuming clients.

Apache ZooKeeper is a core dependency for Kafka, providing a cluster coordination service for highly reliable distributed coordination.

AMQ Streams architecture

kafka bridge

camel-kafka-connector

Camel Kafka Connector allows you to use all Camel components as Kafka Connect connectors.

This is a "Camel Kafka connector adapter" that aims to provide a user friendly way to use all Apache Camel components in Kafka Connect. 

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems. It makes it simple to quickly define connectors that move large collections of data into and out of Kafka.For more information about Kafka Connect take a look here.

Prerequisites

For this demonstration, you will need the following technologies set up in your development environment:

  1. Apache Maven 3.6.3+
  2. JDK 11 Installed
  3. kafka cluster 
  4. AWS Account set up and Files available in S3 bucket. 

In this article, we demonstrate how to read files from S3 buckets and write to kafka Topic using 

CamelAWSS3SourceConnector

Prepare the Needed Bits to Develop the Example

 setup the plugin.path property in your kafka

Open the $KAFKA_HOME/config/connect-standalone.properties

and set the plugin.path property to your chosen location

Java
 




xxxxxxxxxx
1


 
1
plugin.path=/home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/camel-kafka-connectors/



In this example we’ll use /home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/camel-kafka-connectors

Download 'camel-aws-s3-kafka-connector'

Java
 




x


 
1
wget https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector/camel-aws-s3-kafka-connector/0.4.0/camel-aws-s3-kafka-connector-0.4.0-package.zip
2

          
3
unzip camel-aws-s3-kafka-connector-0.4.0-package.zip



Configure properties in file CamelAWSS3SourceConnector.properties

Java
 




xxxxxxxxxx
1
17


 
1
name=CamelAWSS3SourceConnector
2
connector.class=org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector
3
key.converter=org.apache.kafka.connect.storage.StringConverter
4
value.converter=org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter
5

          
6
camel.source.maxPollDuration=10000
7

          
8
topics=mytopic
9

          
10
camel.component.aws-s3.access-key=xxxxx
11
camel.component.aws-s3.secret-key=yyyyyy
12
camel.component.aws-s3.region=US_EAST_2
13
camel.source.path.bucketNameOrArn=arn:aws:s3:::kkakarla-test-kafka-connector
14
camel.source.endpoint.autocloseBody=true



Start zookeeper and kafka server

Java
 




xxxxxxxxxx
1


 
1
./bin/zookeeper-server-start.sh config/zookeeper.properties
2
./bin/kafka-server-start.sh config/server.properties



Create mytopic

Java
 




xxxxxxxxxx
1


 
1
./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic  mytopic



Now start the application

Java
 




xxxxxxxxxx
1


 
1
./bin/connect-standalone.sh /home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/kafka_2.12-2.5.0.redhat-00003/config/connect-standalone.properties  /home/kkakarla/development/fuse-ocp-training-videos/kcs/amq-streams-kafka/camel-kafka-connector-examples/examples/aws-s3/CamelAWSS3SourceConnector.properties


Now upload any file to the S3 bucket 'kkakarla-test-kafka-connector'

logs

Java
 




xxxxxxxxxx
1
108


 
1
2020-09-01 12:43:17,149] INFO Kafka version: 2.5.0.redhat-00003 (org.apache.kafka.common.utils.AppInfoParser:117)
2
[2020-09-01 12:43:17,149] INFO Kafka commitId: f960e3745ec74111 (org.apache.kafka.common.utils.AppInfoParser:118)
3
[2020-09-01 12:43:17,149] INFO Kafka startTimeMs: 1598944397149 (org.apache.kafka.common.utils.AppInfoParser:119)
4
[2020-09-01 12:43:17,156] INFO Starting CamelSourceTask connector task (org.apache.camel.kafkaconnector.CamelSourceTask:77)
5
[2020-09-01 12:43:17,156] INFO [Producer clientId=connector-producer-CamelAWSS3SourceConnector-0] Cluster ID: jmyQzmm2QUe12p8is0zNAQ (org.apache.kafka.clients.Metadata:280)
6
[2020-09-01 12:43:17,156] INFO Created connector CamelAWSS3SourceConnector (org.apache.kafka.connect.cli.ConnectStandalone:112)
7
[2020-09-01 12:43:17,157] INFO CamelAwss3SourceConnectorConfig values: 
8
    camel.component.aws-s3.accelerateModeEnabled = false
9
    camel.component.aws-s3.accessKey = null
10
    camel.component.aws-s3.amazonS3Client = null
11
    camel.component.aws-s3.autoCreateBucket = true
12
    camel.component.aws-s3.autocloseBody = true
13
    camel.component.aws-s3.basicPropertyBinding = false
14
    camel.component.aws-s3.bridgeErrorHandler = false
15
    camel.component.aws-s3.chunkedEncodingDisabled = false
16
    camel.component.aws-s3.configuration = null
17
    camel.component.aws-s3.deleteAfterRead = true
18
    camel.component.aws-s3.delimiter = null
19
    camel.component.aws-s3.dualstackEnabled = false
20
    camel.component.aws-s3.encryptionMaterials = null
21
    camel.component.aws-s3.endpointConfiguration = null
22
    camel.component.aws-s3.fileName = null
23
    camel.component.aws-s3.forceGlobalBucketAccessEnabled = false
24
    camel.component.aws-s3.includeBody = true
25
    camel.component.aws-s3.pathStyleAccess = false
26
    camel.component.aws-s3.payloadSigningEnabled = false
27
    camel.component.aws-s3.policy = null
28
    camel.component.aws-s3.prefix = null
29
    camel.component.aws-s3.proxyHost = null
30
    camel.component.aws-s3.proxyPort = null
31
    camel.component.aws-s3.proxyProtocol = HTTPS
32
    camel.component.aws-s3.region = US_EAST_2
33
    camel.component.aws-s3.secretKey = null
34
    camel.component.aws-s3.useEncryption = false
35
    camel.component.aws-s3.useIAMCredentials = false
36
    camel.source.camelMessageHeaderKey = null
37
    camel.source.component = aws-s3
38
    camel.source.contentLogLevel = OFF
39
    camel.source.endpoint.accelerateModeEnabled = false
40
    camel.source.endpoint.accessKey = null
41
    camel.source.endpoint.amazonS3Client = null
42
    camel.source.endpoint.autoCreateBucket = true
43
    camel.source.endpoint.autocloseBody = true
44
    camel.source.endpoint.backoffErrorThreshold = null
45
    camel.source.endpoint.backoffIdleThreshold = null
46
    camel.source.endpoint.backoffMultiplier = null
47
    camel.source.endpoint.basicPropertyBinding = false
48
    camel.source.endpoint.bridgeErrorHandler = false
49
    camel.source.endpoint.chunkedEncodingDisabled = false
50
    camel.source.endpoint.delay = 500
51
    camel.source.endpoint.deleteAfterRead = true
52
    camel.source.endpoint.delimiter = null
53
    camel.source.endpoint.dualstackEnabled = false
54
    camel.source.endpoint.encryptionMaterials = null
55
    camel.source.endpoint.endpointConfiguration = null
56
    camel.source.endpoint.exceptionHandler = null
57
    camel.source.endpoint.exchangePattern = null
58
    camel.source.endpoint.fileName = null
59
    camel.source.endpoint.forceGlobalBucketAccessEnabled = false
60
    camel.source.endpoint.greedy = false
61
    camel.source.endpoint.includeBody = true
62
    camel.source.endpoint.initialDelay = 1000
63
    camel.source.endpoint.maxConnections = 60
64
    camel.source.endpoint.maxMessagesPerPoll = 10
65
    camel.source.endpoint.pathStyleAccess = false
66
    camel.source.endpoint.payloadSigningEnabled = false
67
    camel.source.endpoint.policy = null
68
    camel.source.endpoint.pollStrategy = null
69
    camel.source.endpoint.prefix = null
70
    camel.source.endpoint.proxyHost = null
71
    camel.source.endpoint.proxyPort = null
72
    camel.source.endpoint.proxyProtocol = HTTPS
73
    camel.source.endpoint.region = null
74
    camel.source.endpoint.repeatCount = 0
75
    camel.source.endpoint.runLoggingLevel = TRACE
76
    camel.source.endpoint.scheduledExecutorService = null
77
    camel.source.endpoint.scheduler = none
78
    camel.source.endpoint.schedulerProperties = null
79
    camel.source.endpoint.secretKey = null
80
    camel.source.endpoint.sendEmptyMessageWhenIdle = false
81
    camel.source.endpoint.startScheduler = true
82
    camel.source.endpoint.synchronous = false
83
    camel.source.endpoint.timeUnit = MILLISECONDS
84
    camel.source.endpoint.useEncryption = false
85
    camel.source.endpoint.useFixedDelay = true
86
    camel.source.endpoint.useIAMCredentials = false
87
    camel.source.marshal = null
88
    camel.source.maxBatchPollSize = 1000
89
    camel.source.maxPollDuration = 10000
90
    camel.source.path.bucketNameOrArn = arn:aws:s3:::kkakarla-test-kafka-connector
91
    camel.source.pollingConsumerBlockTimeout = 0
92
    camel.source.pollingConsumerBlockWhenFull = true
93
    camel.source.pollingConsumerQueueSize = 1000
94
    camel.source.unmarshal = null
95
    camel.source.url = null
96
    topics = mytopic
97
 (org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnectorConfig:347)
98
[2020-09-01 12:43:17,239] INFO Setting initial properties in Camel context: [{connector.class=org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector, camel.source.endpoint.autocloseBody=true, camel.source.maxPollDuration=10000, topics=mytopic, camel.component.aws-s3.region=US_EAST_2, camel.source.component=aws-s3, task.class=org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceTask, camel.source.path.bucketNameOrArn=arn:aws:s3:::kkakarla-test-kafka-connector, camel.component.aws-s3.access-key=xxxxxxxx, name=CamelAWSS3SourceConnector, value.converter=org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter, camel.component.aws-s3.secret-key=xxxxxxxxxxxx, key.converter=org.apache.kafka.connect.storage.StringConverter}] (org.apache.camel.kafkaconnector.utils.CamelMainSupport:91)
99
[2020-09-01 12:43:17,244] INFO Using properties from: classpath:application.properties;optional=true (org.apache.camel.main.BaseMainSupport:463)
100
[2020-09-01 12:43:17,271] INFO No additional Camel XML routes discovered from: classpath:camel/*.xml (org.apache.camel.main.DefaultRoutesCollector:126)
101
[2020-09-01 12:43:17,272] INFO No additional Camel XML rests discovered from: classpath:camel-rest/*.xml (org.apache.camel.main.DefaultRoutesCollector:162)
102
[2020-09-01 12:43:17,285] INFO Creating Camel route from({}) (org.apache.camel.kafkaconnector.utils.CamelMainSupport:102)
103
[2020-09-01 12:43:17,285] INFO .to(direct:end?pollingConsumerQueueSize=1000&pollingConsumerBlockTimeout=0&pollingConsumerBlockWhenFull=true) (org.apache.camel.kafkaconnector.utils.CamelMainSupport:130)
104
[2020-09-01 12:43:17,299] INFO Starting CamelContext (org.apache.camel.kafkaconnector.utils.CamelMainSupport:138)
105
[2020-09-01 12:43:17,360] INFO Apache Camel 3.4.2 (camel-1) is starting (org.apache.camel.impl.engine.AbstractCamelContext:2630)
106
[2020-09-01 12:43:17,361] INFO StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html (org.apache.camel.impl.engine.AbstractCamelContext:2773)
107
[2020-09-01 12:43:19,848] INFO Route: route1 started and consuming from: aws-s3://arn:aws:s3:::kkakarla-test-kafka-connector (org.apache.camel.impl.engine.InternalRouteStartupManager:158)
108

          


The content of the file will be written to kafka topic mytopic

Java
 




xxxxxxxxxx
1


 
1
[kkakarla@kkakarla kafka_2.12-2.5.0.redhat-00003]$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic mytopic
2
hi hello how are you
3

          
4

          



For complete properties please refer CAMEL-AWS-S3-KAFKA-CONNECTOR SOURCE CONFIGURATION

Summary

CamelKafkaConnectors  helps those who do not want to write the code to read and write from external systems to kafka 

List of SUPPORTED CONNECTORS AND DOCUMENTATION

kafka AWS

Opinions expressed by DZone contributors are their own.

Related

  • Securing and Monitoring Your Data Pipeline: Best Practices for Kafka, AWS RDS, Lambda, and API Gateway Integration
  • A Comparative Analysis: AWS Kinesis vs Amazon Managed Streaming for Kafka - MSK
  • Designing High Performant Responsive Web Application With AWS Services and Finetuning for Performance
  • Integrating Cloud-Based Applications, Kafka Middle-Ware/Data Streams, CRM, and Snowflake Data Warehouse in IT Architecture for Small and Medium Enterprises

Partner Resources

×

Comments
Oops! Something Went Wrong

The likes didn't load as expected. Please refresh the page and try again.

ABOUT US

  • About DZone
  • Support and feedback
  • Community research
  • Sitemap

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 100
  • Nashville, TN 37211
  • support@dzone.com

Let's be friends:

Likes
There are no likes...yet! 👀
Be the first to like this post!
It looks like you're not logged in.
Sign in to see who liked this post!