From HTTP to Kafka: A Custom Source Connector
Learn how to implement a custom Kafka Connect HTTP source connector to integrate with HTTP endpoints, covering connector configuration, deployment and usage.
Join the DZone community and get the full member experience.
Join For FreeRecently, I came across an interesting scenario: one application had a cron job constantly polling an API for active offers, just to refresh a Redis cache that powered the offer view. So, I started thinking—isn’t there a better way to handle this? Or at least a way to offload such repetitive tasks outside the application itself?
That’s when it hit me: this pattern looks way too similar to the CDC flows we already implement with Kafka Connect JDBC source connectors. So why not apply the same idea to HTTP? After a bit of digging, I found the answer was yes—we can definitely do it. But there’s a catch. The official Confluent HTTP source connector requires a license, and most open-source alternatives are either too complex or don’t quite match the use case.
So I decided to build my own HTTP source connector.
Spoiler alert: the implementation turned out to be relatively simple, tailored to this specific scenario. The goal of this article is to share that base skeleton—something you can adapt to your own needs, while also exploring some of Kafka Connect’s extensibility in practice.
Note: here is the source code in case you want to follow along.
Implementation
We are going to create a new maven project with the following pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>inc.evil</groupId>
<artifactId>kafka-connect-http</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<kafka.version>3.0.0</kafka.version>
<httpclient.version>5.5</httpclient.version>
<slf4j.version>2.0.17</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>${httpclient.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.7.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Basically we are going to use:
- The Kafka Connect API to build and run reusable data for the HTTP source connector that polls data from HTTP endpoints and streams it directly into Kafka topics.
- Apache HttpClient 5 that is a pure Java, standards-based implementation of the HTTP protocol. It offers a robust and flexible toolkit for building HTTP-aware client applications, and in this case, it serves as the backbone for polling the API.
- The Simple Logging Facade for Java (SLF4J) which serves as a simple facade for logging.
- The Maven Assembly Plugin that enables the creation of a fat jar.
First we are going to tackle the configuration part of the connector.
Configuration
public class HttpSourceConfig extends AbstractConfig {
public static final String HTTP_URL = "http.url";
public static final String HTTP_QUERY_PARAMS = "http.query.params";
public static final String HTTP_REQUEST_BODY = "http.request.body";
public static final String HTTP_HEADERS = "http.headers";
public static final String HTTP_POLL_INTERVAL_MS = "http.poll.interval.ms";
public static final String HTTP_METHOD = "http.method";
public static final String HTTP_AUTH_USERNAME = "http.auth.username";
public static final String HTTP_AUTH_PASSWORD = "http.auth.password";
public static final String HTTP_AUTH_BEARER = "http.auth.bearer";
public static final String HTTP_CONNECT_TIMEOUT_MS = "http.connect.timeout.ms";
public static final String HTTP_READ_TIMEOUT_MS = "http.read.timeout.ms";
public static final String TOPIC = "topic";
public static final String VERSION = "1.0";
public static ConfigDef getConfig() {
return new ConfigDef()
.define(TOPIC,
ConfigDef.Type.STRING,
ConfigDef.Importance.HIGH,
"The Kafka topic to write the fetched data to.")
.define(HTTP_HEADERS,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.LOW,
"Optional HTTP request headers in 'key=value' pairs separated by commas. " +
"Example: 'Accept=application/json'.")
.define(HTTP_QUERY_PARAMS,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.LOW,
"Optional query parameters appended to the HTTP request URL in 'key=value' pairs separated by '&'. " +
"Example: 'updatedSince=2023-01-01&limit=100'.")
.define(HTTP_REQUEST_BODY,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.MEDIUM,
"The HTTP request body to be sent with the request. Only applicable for methods like POST and PUT.")
.define(HTTP_URL,
ConfigDef.Type.STRING,
"https://httpbin.org/get",
new ConfigDef.NonEmptyString(),
ConfigDef.Importance.HIGH,
"The base HTTP URL to fetch data from.",
"Connection",
1,
ConfigDef.Width.LONG,
HTTP_URL)
.define(HTTP_POLL_INTERVAL_MS,
ConfigDef.Type.INT,
60000,
ConfigDef.Range.atLeast(5000),
ConfigDef.Importance.HIGH,
"Polling interval in milliseconds between consecutive HTTP requests. " +
"Minimum allowed is 10000 ms (10 seconds). Default is 60000 ms (1 minute).")
.define(HTTP_METHOD,
ConfigDef.Type.STRING,
"GET",
ConfigDef.ValidString.in("GET", "POST", "PATCH", "PUT", "DELETE"),
ConfigDef.Importance.HIGH,
"The HTTP method to use for requests.",
"Connection",
2,
ConfigDef.Width.SHORT,
HTTP_METHOD)
.define(HTTP_AUTH_USERNAME,
ConfigDef.Type.STRING,
"",
ConfigDef.Importance.MEDIUM,
"Username for HTTP Basic Authentication. Used together with '" + HTTP_AUTH_PASSWORD + "'.")
.define(HTTP_AUTH_PASSWORD,
ConfigDef.Type.PASSWORD,
"",
ConfigDef.Importance.MEDIUM,
"Password for HTTP Basic Authentication. Used together with '" + HTTP_AUTH_USERNAME + "'.")
.define(HTTP_AUTH_BEARER,
ConfigDef.Type.PASSWORD,
"",
ConfigDef.Importance.MEDIUM,
"Bearer token for Authorization header. Example: 'eyJhbGciOi...'.")
.define(HTTP_CONNECT_TIMEOUT_MS,
ConfigDef.Type.INT,
5000,
ConfigDef.Range.atLeast(1000),
ConfigDef.Importance.MEDIUM,
"Timeout in milliseconds for establishing the HTTP connection. Default is 5000 ms.")
.define(HTTP_READ_TIMEOUT_MS,
ConfigDef.Type.INT,
10000,
ConfigDef.Range.atLeast(1000),
ConfigDef.Importance.MEDIUM,
"Timeout in milliseconds for reading the HTTP response. Default is 10000 ms.");
}
public HttpSourceConfig(Map<String, String> originals) {
super(getConfig(), originals);
}
}
In Kafka Connect, every connector needs a way to define and validate its configuration. That’s exactly what the HttpSourceConfig class does. It extends AbstractConfig and declares all the settings our HTTP source connector will support.
At the top, I define constants like http.url, http.method, http.headers, topic, etc. These are the configuration keys that users will specify when they set up the connector in JSON. For example, http.url points to the API endpoint, and topic is the Kafka topic where the data should land.
The main interest is getConfig() method. Here I build a ConfigDef, which is Kafka Connect’s way of describing the incoming connector's parameters by defining on each:
ConfigDef.Type: what kind of value is expected (STRING,INT,PASSWORD, …)- Default value: what to fall back on if nothing is provided
- Validation: rules like
NonEmptyStringor allowed ranges viaConfigDef.Range ConfigDef.Importance: whether the setting is critical or optional (HIGH,MEDIUM,LOW)- Documentation: human-friendly descriptions that show up in the connector’s docs
For example:
http.poll.interval.msdefaults to 60 seconds, but must be at least 5 seconds.http.methodcan only be one ofGET,POST,PATCH,PUT, orDELETE.http.auth.usernameandhttp.auth.passwordlet you configure Basic Auth, whilehttp.auth.bearercovers token-based authentication.
Finally, in the constructor I just call super(getConfig(), originals) that wires both our properties and the original/default ones from Kafka Connect API.
HTTP Client
public class HttpApiClient implements AutoCloseable {
private static final Logger log = LoggerFactory.getLogger(HttpApiClient.class);
private final String requestParams;
private final String requestBody;
private final String headers;
private final String authUsername;
private final String authPassword;
private final String authBearer;
private final int connectTimeoutMs;
private final int readTimeoutMs;
private CloseableHttpClient httpClient;
public HttpApiClient(AbstractConfig config) {
this.requestParams = config.getString(HttpSourceConfig.HTTP_QUERY_PARAMS);
this.requestBody = config.getString(HttpSourceConfig.HTTP_REQUEST_BODY);
this.headers = config.getString(HttpSourceConfig.HTTP_HEADERS);
this.authUsername = config.getString(HttpSourceConfig.HTTP_AUTH_USERNAME);
this.authPassword = config.getPassword(HttpSourceConfig.HTTP_AUTH_PASSWORD).value();
this.authBearer = config.getPassword(HttpSourceConfig.HTTP_AUTH_BEARER).value();
this.connectTimeoutMs = config.getInt(HttpSourceConfig.HTTP_CONNECT_TIMEOUT_MS);
this.readTimeoutMs = config.getInt(HttpSourceConfig.HTTP_READ_TIMEOUT_MS);
this.httpClient = HttpClients.custom()
.setDefaultRequestConfig(RequestConfig.custom()
.setConnectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS)
.setResponseTimeout(readTimeoutMs, TimeUnit.MILLISECONDS)
.build())
.build();
}
public String executeRequest(String baseUri, String method) throws IOException, URISyntaxException {
log.info("Polling API at {}", baseUri);
return httpClient.execute(createHttpRequest(baseUri, Method.normalizedValueOf(method.toUpperCase())), new StringResponseHandler());
}
private HttpUriRequestBase createHttpRequest(String baseUri, Method method) throws URISyntaxException {
URI uri = buildUriWithParams(baseUri);
HttpUriRequestBase request;
switch (method) {
case GET:
request = new HttpGet(uri);
break;
case POST:
request = new HttpPost(uri);
addBody(request);
break;
case PUT:
request = new HttpPut(uri);
addBody(request);
break;
case PATCH:
request = new HttpPatch(uri);
addBody(request);
break;
case DELETE:
request = new HttpDelete(uri);
break;
default:
throw new UnsupportedOperationException("Unsupported HTTP method: " + method);
}
addHeaders(request);
addAuth(request);
log.debug("Computed HTTP request={}", request);
return request;
}
private void addBody(HttpUriRequestBase request) {
if (requestBody == null || requestBody.isEmpty()) {
return;
}
if (request instanceof HttpPost || request instanceof HttpPut || request instanceof HttpPatch) {
StringEntity entity = new StringEntity(requestBody, StandardCharsets.UTF_8);
request.setEntity(entity);
}
}
private URI buildUriWithParams(String baseUri) throws URISyntaxException {
if (requestParams == null || requestParams.isEmpty()) {
return new URI(baseUri);
}
StringBuilder sb = new StringBuilder(baseUri);
if (baseUri.contains("?")) {
sb.append("&");
} else {
sb.append("?");
}
sb.append(requestParams);
return new URI(sb.toString());
}
private void addHeaders(HttpUriRequestBase request) {
if (headers == null || headers.isEmpty()) {
return;
}
String[] headersArray = headers.split(",");
for (String header : headersArray) {
String[] parts = header.split("=", 2);
if (parts.length == 2) {
request.addHeader(parts[0].trim(), parts[1].trim());
} else {
log.warn("Skipping invalid header format: {}", header);
}
}
}
private void addAuth(HttpUriRequestBase request) {
if (authBearer != null && !authBearer.isEmpty()) {
request.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + authBearer);
} else if (authUsername != null && !authUsername.isEmpty() && authPassword != null && !authPassword.isEmpty()) {
String auth = authUsername + ":" + authPassword;
String encodedAuth = Base64.getEncoder().encodeToString(auth.getBytes(StandardCharsets.UTF_8));
request.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + encodedAuth);
}
}
@Override
public void close() throws IOException {
if (httpClient != null) {
httpClient.close();
}
}
static class StringResponseHandler implements HttpClientResponseHandler<String> {
@Override
public String handleResponse(ClassicHttpResponse response) throws IOException {
int statusCode = response.getCode();
if (statusCode >= HttpStatus.SC_OK && statusCode < HttpStatus.SC_REDIRECTION) {
HttpEntity entity = response.getEntity();
try {
return EntityUtils.toString(entity);
} catch (ParseException e) {
throw new ConnectException("Failed to parse HTTP response.", e);
}
} else {
throw new ConnectException("HTTP request failed with status code: " + statusCode);
}
}
}
void setHttpClient(CloseableHttpClient httpClient) {
this.httpClient = httpClient;
}
}
The HttpApiClient is a simple wrapper around Apache HttpClient 5 that hides the boilerplate of building and executing HTTP requests. It consumes the configuration values from the HttpSourceConfig we defined earlier—things like headers, query params, authentication, timeouts, and request body and uses them to construct proper HTTP calls.
It supports all the common methods (GET, POST, PUT, PATCH, DELETE), attaches request bodies when needed, and adds authentication headers automatically. Responses are processed through a simple StringResponseHandler, which converts successful responses into plain strings, while surfacing errors and unexpected status codes as connector exceptions.
This class isn’t the centerpiece of the connector, but rather a utility layer. Its role is to encapsulate HTTP-related concerns so that the connector task code can stay focused on polling data and writing records into Kafka.
Source Task
public class HttpSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(HttpSourceTask.class);
private String url;
private String method;
private long pollIntervalMs;
private String topic;
private HttpApiClient apiClient;
private Map<String, String> sourcePartition;
private Map<String, Object> sourceOffset;
private long lastPollTime = 0L;
@Override
public String version() {
return HttpSourceConfig.VERSION;
}
@Override
public void start(Map<String, String> props) {
log.info("Starting HttpSourceTask with properties: {}", props);
try {
HttpSourceConfig config = new HttpSourceConfig(props);
this.url = config.getString(HttpSourceConfig.HTTP_URL);
this.method = config.getString(HttpSourceConfig.HTTP_METHOD);
this.pollIntervalMs = config.getInt(HttpSourceConfig.HTTP_POLL_INTERVAL_MS);
this.topic = config.getString(HttpSourceConfig.TOPIC);
this.apiClient = new HttpApiClient(config);
this.sourcePartition = Collections.singletonMap("url", this.url);
this.sourceOffset = context.offsetStorageReader().offset(this.sourcePartition);
if (this.sourceOffset != null) {
log.info("Found persisted offset: {}", this.sourceOffset);
Long lastPolledTimestamp = (Long) this.sourceOffset.get("last_polled_timestamp");
if (lastPolledTimestamp != null) {
this.lastPollTime = lastPolledTimestamp;
}
} else {
log.info("No previous offset found. Starting from scratch.");
}
} catch (ConfigException e) {
throw new ConnectException("Invalid connector configuration.", e);
}
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
long currentTime = System.currentTimeMillis();
if (currentTime - lastPollTime < pollIntervalMs) {
long waitTime = pollIntervalMs - (currentTime - lastPollTime);
log.info("Waiting for {} ms before next poll.", waitTime);
Thread.sleep(waitTime);
return Collections.emptyList();
}
try {
String payload = this.apiClient.executeRequest(url, method);
SourceRecord record = getSourceRecord(payload, currentTime);
log.debug("Publishing fetched data={}", record);
this.lastPollTime = currentTime;
return Collections.singletonList(record);
} catch (ConnectException e) {
log.error("API client reported an unrecoverable error.", e);
throw e;
} catch (IOException e) {
log.warn("An I/O error occurred during the HTTP request. This is likely temporary.", e);
throw new RetriableException("I/O error during HTTP request.", e);
} catch (Exception e) {
log.error("An unexpected error occurred during the HTTP request.", e);
throw new ConnectException("Unexpected error.", e);
}
}
private SourceRecord getSourceRecord(String payload, long currentTime) {
log.info("Successfully fetched data. Payload size: {}", payload.length());
return new SourceRecord(
this.sourcePartition,
Collections.singletonMap("last_polled_timestamp", currentTime),
this.topic,
Schema.STRING_SCHEMA,
Instant.now().toString(),
Schema.STRING_SCHEMA,
payload
);
}
@Override
public void stop() {
log.info("Stopping HttpSourceTask");
if (this.apiClient != null) {
try {
this.apiClient.close();
} catch (IOException e) {
log.error("Failed to close HTTP client.", e);
}
}
}
}
The HttpSourceTask is where the connector actually does its job. This class is executed by Kafka Connect workers, and it’s responsible for polling the HTTP API at a fixed interval, turning the responses into Kafka records, and tracking offsets so the connector knows where it left off.
Initialization - start()
- Reads our custom properties via
HttpSourceConfig. - Extracts the required properties:
HTTP_URL,HTTP_METHOD,HTTP_POLL_INTERVAL_MSand targetTOPIC. - Instantiates the
HttpApiClient(our wrapper from before). - Sets up partition/offset tracking:
- In Kafka Connect, each source partition is a logical identifier that lets the framework track offsets and progress for a given data stream. There are different ways you can define partitions. For example, by combining multiple fields, using unique IDs, or even composite keys depending on the system you’re integrating with. In our case, since we are polling HTTP endpoints, the
urlitself is enough to act as the partition key. Each URL uniquely identifies the API response we’re working with, so Kafka Connect can reliably track offsets per endpoint. - Uses
offsetStorageReaderto load the last saved offset (basically: when was this API last polled?) by invoking thethis.sourceOffset.get("last_polled_timestamp") - If found, resumes from that
timestamp; otherwise starts fresh.
- In Kafka Connect, each source partition is a logical identifier that lets the framework track offsets and progress for a given data stream. There are different ways you can define partitions. For example, by combining multiple fields, using unique IDs, or even composite keys depending on the system you’re integrating with. In our case, since we are polling HTTP endpoints, the
Polling - poll()
- Runs repeatedly, controlled by Kafka Connect’s scheduling.
- Checks if enough time has passed since the last poll (
pollIntervalMs). If not, it waits before trying again viaThread.sleep(waitTime). - When ready, it calls
HttpApiClient.executeRequest()to fetch the data. - Wraps the API response in a
SourceRecord:- Attaches the source partition (
url) and new offset (last_polled_timestamp). - Includes the payload body as the record’s value and
Instant.now()as record's key both in plain string usingSchema.STRING_SCHEMA - Sets the configured Kafka
topic.
- Attaches the source partition (
- Handles errors:
- I/O exceptions treated as
RetriableException - Unexpected runtime issues treated as
ConnectException. - Other errors will result in connector's failure
- I/O exceptions treated as
Shutdown - stop()
- Ensures the
HttpApiClientis closed and resources are freed.
Source Connector
public class HttpSourceConnector extends SourceConnector {
private static final Logger log = LoggerFactory.getLogger(HttpSourceConnector.class);
private Map<String, String> configProps = null;
@Override
public String version() {
return VERSION;
}
@Override
public ConfigDef config() {
return HttpSourceConfig.getConfig();
}
@Override
public void start(Map<String, String> props) {
log.info("Starting HttpSourceConnector {}", props);
configProps = props;
}
@Override
public void stop() {
log.info("Stopping HttpSourceConnector");
}
@Override
public Class<? extends Task> taskClass() {
return HttpSourceTask.class;
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
taskConfigs.add(configProps);
}
return taskConfigs;
}
}
The HttpSourceConnector class is the entry point of our connector. Every Kafka Connect source connector must extend SourceConnector, and this is where we declare how the connector should behave at a high level.
config()- exposes the configuration definition we built earlier inHttpSourceConfig.start()andstop()- On startup, the framework passes the configuration properties, which we store for later use. On shutdown, we simply log that the connector is stopping.taskClass()- tells the framework whichSourceTaskimplementation should be used (in our case,HttpSourceTask).taskConfigs()– this method decides how the configuration should be split across multiple tasks. Right now, we’re not partitioning the work in any special way. For each requested task, we just pass along the same configuration properties. This means every task will behave the same. In more advanced connectors, this is where you might divide the work. For example, if you had multiple URLs to poll, you could assign different URLs to different tasks. But since our connector is currently built around a single URL, duplicating the same configuration is sufficient.
Deployment and Usage
All right, now that we have our connector—let's see it in action.
To do that, we need to build and package our connector into a fat JAR (also called an uber-jar) so that it contains not only our classes but also all the required dependencies.
We can do this with Maven by running:
mvn clean package
This will produce a JAR file under the target/ directory. If you check the folder after the build, you should see something like: kafka-connect-http-1.0-jar-with-dependencies.jar
Now for convenience I moved this jar under my user directory C:\Users\ionpa\kafka-connect-http. This way it’s easy to reference it later and mount it into our Kafka container.
Next, we'll use the landoop/fast-data-dev’s docker image, since it comes with almost everything properly configured starting from the zookeeper, schema registry, kafka-connect and the broker and ending with a nice UI provided by Landoop for managing everything Kafka-related. Here is the docker-compose.yml
version: '3'
services:
kafka-cluster:
container_name: kafka-cluster
image: landoop/fast-data-dev
environment:
ADV_HOST: 127.0.0.1 # Change to 192.168.99.100 if using Docker Toolbox
RUNTESTS: 0 # Disable Running tests so the cluster starts faster
ports:
- 2181:2181 # Zookeeper
- 3030:3030 # Landoop UI
- 8081-8083:8081-8083 # REST Proxy, Schema Registry, Kafka Connect ports
- 9581-9585:9581-9585 # JMX Ports
- 9092:9092 # Kafka Broker
volumes:
# Specify an absolute path mapping
- C:\Users\ionpa\kafka-connect-http:/connectors/kafka-connect-http
Notice the volume mapping C:\Users\ionpa\kafka-connect-http:/connectors/kafka-connect-http. According to Landoop’s documentation, /connectors is path for making custom connectors visible to Kafka Connect. Essentially, we’re telling Docker to mount our local folder (where the fat jar lives) into the container under /connectors/kafka-connect-http . However, not every Kafka Docker image follows the same convention. Each distribution may expect connectors in a different location, depending on how plugin.path is configured e.g. /usr/share/java/. Also the plugin.path can be changed via CONNECT_PLUGIN_PATH .
Now let’s start everything:
docker-compose up
Enter the container:
docker exec -it kafka-cluster bash
Inside the container, let’s verify that our custom connector JAR is properly mounted:
ls /connectors/
You should see kafka-connect-http listed.
Next, let’s check if Kafka Connect actually picked up the connector:
curl -s http://localhost:8083/connector-plugins \
| jq '.[] | select(.class | contains("HttpSourceConnector"))'
This should return our HttpSourceConnector entry like this:
{
"class": "inc.evil.kafka.connect.http.HttpSourceConnector",
"type": "source",
"version": "1.0"
}
Next let's create a new instance of the connector:
curl -X POST http://localhost:3030/api/kafka-connect/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "my-connector",
"config": {
"connector.class": "inc.evil.kafka.connect.http.HttpSourceConnector",
"http.headers": "Content-Type=application/json",
"tasks.max": "1",
"http.query.params": "random=123",
"http.url": "https://httpbin.org/get",
"http.read.timeout.ms": "10000",
"topic": "my-topic",
"http.poll.interval.ms": "10000",
"http.connect.timeout.ms": "5000",
"http.method": "GET",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
}' | jq
And right away let's check its status:
curl -X GET http://localhost:8083/connectors/my-connector/status | jq
You should see something like this:
{
"name": "my-connector",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
}
],
"type": "source"
}
This confirms that the connector is up and polling https://httpbin.org/get every 10 seconds.
Next, we need a Kafka consumer to verify that messages are being written to our topic my-topic:
kafka-console-consumer --bootstrap-server localhost:9092 --topic my-topic --from-beginning
You should start seeing the JSON payloads from the API appear in the console every 10 seconds.
Now all that's left is to delete the old cron job and add a simple Kafka consumer in the application to handle the offers, or maybe even better - a RedisSinkConnector to push the API responses directly to Redis? But that, of course, is a story for another time.
Conclusion
We’ve built a fully working HTTP source connector from scratch, learned how to configure it, package it as a fat JAR, and deploy it with Kafka Connect. You’ve seen how to mount the connector, verify it’s loaded, and consume the API responses in real time through Kafka.
All the code and configuration files are available for reference, so you can adapt them to your own APIs or use cases
That’s all folks, I hope that you’ve learned something. Happy coding!
Opinions expressed by DZone contributors are their own.
Comments