DZone
Thanks for visiting DZone today,
Edit Profile
  • Manage Email Subscriptions
  • How to Post to DZone
  • Article Submission Guidelines
Sign Out View Profile
  • Post an Article
  • Manage My Drafts
Over 2 million developers have joined DZone.
Log In / Join
Refcards Trend Reports
Events Video Library
Refcards
Trend Reports

Events

View Events Video Library

Related

  • Contract-First Integration: Building Scalable Systems With Flyway, OpenAPI, and Kafka
  • From APIs to Event-Driven Systems: Modern Java Backend Design
  • Secrets in Code: Understanding Secret Detection and Its Blind Spots
  • API Design First: AsyncAPI in .Net

Trending

  • Optimizing High-Volume REST APIs Using Redis Caching and Spring Boot (With Load Testing Code)
  • A System Cannot Protect What It Does Not Understand
  • Building a RAG-Powered Bug Triage Agent With AWS Bedrock and OpenSearch k-NN
  • How SaaS Architectures Break at Scale — and the Engineering Decisions That Prevent It
  1. DZone
  2. Data Engineering
  3. Big Data
  4. From HTTP to Kafka: A Custom Source Connector

From HTTP to Kafka: A Custom Source Connector

Learn how to implement a custom Kafka Connect HTTP source connector to integrate with HTTP endpoints, covering connector configuration, deployment and usage.

By 
Ion Pascari user avatar
Ion Pascari
·
Sep. 10, 25 · Tutorial
Likes (5)
Comment
Save
Tweet
Share
4.7K Views

Join the DZone community and get the full member experience.

Join For Free

Recently, I came across an interesting scenario: one application had a cron job constantly polling an API for active offers, just to refresh a Redis cache that powered the offer view. So, I started thinking—isn’t there a better way to handle this? Or at least a way to offload such repetitive tasks outside the application itself?

That’s when it hit me: this pattern looks way too similar to the CDC flows we already implement with Kafka Connect JDBC source connectors. So why not apply the same idea to HTTP? After a bit of digging, I found the answer was yes—we can definitely do it. But there’s a catch. The official Confluent HTTP source connector requires a license, and most open-source alternatives are either too complex or don’t quite match the use case.

So I decided to build my own HTTP source connector.

Spoiler alert: the implementation turned out to be relatively simple, tailored to this specific scenario. The goal of this article is to share that base skeleton—something you can adapt to your own needs, while also exploring some of Kafka Connect’s extensibility in practice. 

Note: here is the source code in case you want to follow along.

Implementation

We are going to create a new maven project with the following pom.xml:

XML
 
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>inc.evil</groupId>
    <artifactId>kafka-connect-http</artifactId>
    <version>1.0</version>
    <packaging>jar</packaging>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <kafka.version>3.0.0</kafka.version>
        <httpclient.version>5.5</httpclient.version>
        <slf4j.version>2.0.17</slf4j.version>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>connect-api</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents.client5</groupId>
            <artifactId>httpclient5</artifactId>
            <version>${httpclient.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.7.1</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>


Basically we are going to use:

  • The Kafka Connect API to build and run reusable data for the HTTP source connector that polls data from HTTP endpoints and streams it directly into Kafka topics.
  • Apache HttpClient 5 that is a pure Java, standards-based implementation of the HTTP protocol. It offers a robust and flexible toolkit for building HTTP-aware client applications, and in this case, it serves as the backbone for polling the API. 
  • The Simple Logging Facade for Java (SLF4J) which serves as a simple facade for logging.
  • The Maven Assembly Plugin that enables the creation of a fat jar.

First we are going to tackle the configuration part of the connector.

Configuration

Java
 
public class HttpSourceConfig extends AbstractConfig {

    public static final String HTTP_URL = "http.url";
    public static final String HTTP_QUERY_PARAMS = "http.query.params";
    public static final String HTTP_REQUEST_BODY = "http.request.body";
    public static final String HTTP_HEADERS = "http.headers";
    public static final String HTTP_POLL_INTERVAL_MS = "http.poll.interval.ms";
    public static final String HTTP_METHOD = "http.method";
    public static final String HTTP_AUTH_USERNAME = "http.auth.username";
    public static final String HTTP_AUTH_PASSWORD = "http.auth.password";
    public static final String HTTP_AUTH_BEARER = "http.auth.bearer";
    public static final String HTTP_CONNECT_TIMEOUT_MS = "http.connect.timeout.ms";
    public static final String HTTP_READ_TIMEOUT_MS = "http.read.timeout.ms";
    public static final String TOPIC = "topic";
    public static final String VERSION = "1.0";

    public static ConfigDef getConfig() {
        return new ConfigDef()
                .define(TOPIC,
                        ConfigDef.Type.STRING,
                        ConfigDef.Importance.HIGH,
                        "The Kafka topic to write the fetched data to.")
                .define(HTTP_HEADERS,
                        ConfigDef.Type.STRING,
                        "",
                        ConfigDef.Importance.LOW,
                        "Optional HTTP request headers in 'key=value' pairs separated by commas. " +
                        "Example: 'Accept=application/json'.")
                .define(HTTP_QUERY_PARAMS,
                        ConfigDef.Type.STRING,
                        "",
                        ConfigDef.Importance.LOW,
                        "Optional query parameters appended to the HTTP request URL in 'key=value' pairs separated by '&'. " +
                        "Example: 'updatedSince=2023-01-01&limit=100'.")
                .define(HTTP_REQUEST_BODY,
                        ConfigDef.Type.STRING,
                        "",
                        ConfigDef.Importance.MEDIUM,
                        "The HTTP request body to be sent with the request. Only applicable for methods like POST and PUT.")
                .define(HTTP_URL,
                        ConfigDef.Type.STRING,
                        "https://httpbin.org/get",
                        new ConfigDef.NonEmptyString(),
                        ConfigDef.Importance.HIGH,
                        "The base HTTP URL to fetch data from.",
                        "Connection",
                        1,
                        ConfigDef.Width.LONG,
                        HTTP_URL)
                .define(HTTP_POLL_INTERVAL_MS,
                        ConfigDef.Type.INT,
                        60000,
                        ConfigDef.Range.atLeast(5000),
                        ConfigDef.Importance.HIGH,
                        "Polling interval in milliseconds between consecutive HTTP requests. " +
                        "Minimum allowed is 10000 ms (10 seconds). Default is 60000 ms (1 minute).")
                .define(HTTP_METHOD,
                        ConfigDef.Type.STRING,
                        "GET",
                        ConfigDef.ValidString.in("GET", "POST", "PATCH", "PUT", "DELETE"),
                        ConfigDef.Importance.HIGH,
                        "The HTTP method to use for requests.",
                        "Connection",
                        2,
                        ConfigDef.Width.SHORT,
                        HTTP_METHOD)
                .define(HTTP_AUTH_USERNAME,
                        ConfigDef.Type.STRING,
                        "",
                        ConfigDef.Importance.MEDIUM,
                        "Username for HTTP Basic Authentication. Used together with '" + HTTP_AUTH_PASSWORD + "'.")
                .define(HTTP_AUTH_PASSWORD,
                        ConfigDef.Type.PASSWORD,
                        "",
                        ConfigDef.Importance.MEDIUM,
                        "Password for HTTP Basic Authentication. Used together with '" + HTTP_AUTH_USERNAME + "'.")
                .define(HTTP_AUTH_BEARER,
                        ConfigDef.Type.PASSWORD,
                        "",
                        ConfigDef.Importance.MEDIUM,
                        "Bearer token for Authorization header. Example: 'eyJhbGciOi...'.")
                .define(HTTP_CONNECT_TIMEOUT_MS,
                        ConfigDef.Type.INT,
                        5000,
                        ConfigDef.Range.atLeast(1000),
                        ConfigDef.Importance.MEDIUM,
                        "Timeout in milliseconds for establishing the HTTP connection. Default is 5000 ms.")
                .define(HTTP_READ_TIMEOUT_MS,
                        ConfigDef.Type.INT,
                        10000,
                        ConfigDef.Range.atLeast(1000),
                        ConfigDef.Importance.MEDIUM,
                        "Timeout in milliseconds for reading the HTTP response. Default is 10000 ms.");
    }

    public HttpSourceConfig(Map<String, String> originals) {
        super(getConfig(), originals);
    }


}


In Kafka Connect, every connector needs a way to define and validate its configuration. That’s exactly what the HttpSourceConfig class does. It extends AbstractConfig and declares all the settings our HTTP source connector will support.

At the top, I define constants like http.url, http.method, http.headers, topic, etc. These are the configuration keys that users will specify when they set up the connector in JSON. For example, http.url points to the API endpoint, and topic is the Kafka topic where the data should land.

The main interest is getConfig() method. Here I build a ConfigDef, which is Kafka Connect’s way of describing the incoming connector's parameters by defining on each:

  • ConfigDef.Type : what kind of value is expected (STRING, INT, PASSWORD, …)
  • Default value: what to fall back on if nothing is provided
  • Validation: rules like NonEmptyString or allowed ranges via ConfigDef.Range
  • ConfigDef.Importance : whether the setting is critical or optional (HIGH, MEDIUM, LOW)
  • Documentation: human-friendly descriptions that show up in the connector’s docs

For example:

  • http.poll.interval.ms defaults to 60 seconds, but must be at least 5 seconds.
  • http.method can only be one of GET, POST, PATCH, PUT, or DELETE.
  • http.auth.username and http.auth.password let you configure Basic Auth, while http.auth.bearer covers token-based authentication.

Finally, in the constructor I just call super(getConfig(), originals) that wires both our properties and the original/default ones from Kafka Connect API.

HTTP Client

Java
 
public class HttpApiClient implements AutoCloseable {

    private static final Logger log = LoggerFactory.getLogger(HttpApiClient.class);

    private final String requestParams;
    private final String requestBody;
    private final String headers;
    private final String authUsername;
    private final String authPassword;
    private final String authBearer;
    private final int connectTimeoutMs;
    private final int readTimeoutMs;

    private CloseableHttpClient httpClient;

    public HttpApiClient(AbstractConfig config) {
        this.requestParams = config.getString(HttpSourceConfig.HTTP_QUERY_PARAMS);
        this.requestBody = config.getString(HttpSourceConfig.HTTP_REQUEST_BODY);
        this.headers = config.getString(HttpSourceConfig.HTTP_HEADERS);
        this.authUsername = config.getString(HttpSourceConfig.HTTP_AUTH_USERNAME);
        this.authPassword = config.getPassword(HttpSourceConfig.HTTP_AUTH_PASSWORD).value();
        this.authBearer = config.getPassword(HttpSourceConfig.HTTP_AUTH_BEARER).value();
        this.connectTimeoutMs = config.getInt(HttpSourceConfig.HTTP_CONNECT_TIMEOUT_MS);
        this.readTimeoutMs = config.getInt(HttpSourceConfig.HTTP_READ_TIMEOUT_MS);
        this.httpClient = HttpClients.custom()
                .setDefaultRequestConfig(RequestConfig.custom()
                        .setConnectTimeout(connectTimeoutMs, TimeUnit.MILLISECONDS)
                        .setResponseTimeout(readTimeoutMs, TimeUnit.MILLISECONDS)
                        .build())
                .build();
    }

    public String executeRequest(String baseUri, String method) throws IOException, URISyntaxException {
        log.info("Polling API at {}", baseUri);
        return httpClient.execute(createHttpRequest(baseUri, Method.normalizedValueOf(method.toUpperCase())), new StringResponseHandler());
    }

    private HttpUriRequestBase createHttpRequest(String baseUri, Method method) throws URISyntaxException {
        URI uri = buildUriWithParams(baseUri);
        HttpUriRequestBase request;
        switch (method) {
            case GET:
                request = new HttpGet(uri);
                break;
            case POST:
                request = new HttpPost(uri);
                addBody(request);
                break;
            case PUT:
                request = new HttpPut(uri);
                addBody(request);
                break;
            case PATCH:
                request = new HttpPatch(uri);
                addBody(request);
                break;
            case DELETE:
                request = new HttpDelete(uri);
                break;
            default:
                throw new UnsupportedOperationException("Unsupported HTTP method: " + method);
        }
        addHeaders(request);
        addAuth(request);
        log.debug("Computed HTTP request={}", request);
        return request;
    }

    private void addBody(HttpUriRequestBase request) {
        if (requestBody == null || requestBody.isEmpty()) {
            return;
        }
        if (request instanceof HttpPost || request instanceof HttpPut || request instanceof HttpPatch) {
            StringEntity entity = new StringEntity(requestBody, StandardCharsets.UTF_8);
            request.setEntity(entity);
        }
    }

    private URI buildUriWithParams(String baseUri) throws URISyntaxException {
        if (requestParams == null || requestParams.isEmpty()) {
            return new URI(baseUri);
        }

        StringBuilder sb = new StringBuilder(baseUri);
        if (baseUri.contains("?")) {
            sb.append("&");
        } else {
            sb.append("?");
        }
        sb.append(requestParams);

        return new URI(sb.toString());
    }

    private void addHeaders(HttpUriRequestBase request) {
        if (headers == null || headers.isEmpty()) {
            return;
        }
        String[] headersArray = headers.split(",");
        for (String header : headersArray) {
            String[] parts = header.split("=", 2);
            if (parts.length == 2) {
                request.addHeader(parts[0].trim(), parts[1].trim());
            } else {
                log.warn("Skipping invalid header format: {}", header);
            }
        }
    }

    private void addAuth(HttpUriRequestBase request) {
        if (authBearer != null && !authBearer.isEmpty()) {
            request.addHeader(HttpHeaders.AUTHORIZATION, "Bearer " + authBearer);
        } else if (authUsername != null && !authUsername.isEmpty() && authPassword != null && !authPassword.isEmpty()) {
            String auth = authUsername + ":" + authPassword;
            String encodedAuth = Base64.getEncoder().encodeToString(auth.getBytes(StandardCharsets.UTF_8));
            request.addHeader(HttpHeaders.AUTHORIZATION, "Basic " + encodedAuth);
        }
    }

    @Override
    public void close() throws IOException {
        if (httpClient != null) {
            httpClient.close();
        }
    }

    static class StringResponseHandler implements HttpClientResponseHandler<String> {

        @Override
        public String handleResponse(ClassicHttpResponse response) throws IOException {
            int statusCode = response.getCode();
            if (statusCode >= HttpStatus.SC_OK && statusCode < HttpStatus.SC_REDIRECTION) {
                HttpEntity entity = response.getEntity();
                try {
                    return EntityUtils.toString(entity);
                } catch (ParseException e) {
                    throw new ConnectException("Failed to parse HTTP response.", e);
                }
            } else {
                throw new ConnectException("HTTP request failed with status code: " + statusCode);
            }
        }
    }

    void setHttpClient(CloseableHttpClient httpClient) {
        this.httpClient = httpClient;
    }

}


The HttpApiClient is a simple wrapper around Apache HttpClient 5 that hides the boilerplate of building and executing HTTP requests. It consumes the configuration values from the HttpSourceConfig we defined earlier—things like headers, query params, authentication, timeouts, and request body and uses them to construct proper HTTP calls.

It supports all the common methods (GET, POST, PUT, PATCH, DELETE), attaches request bodies when needed, and adds authentication headers automatically. Responses are processed through a simple StringResponseHandler, which converts successful responses into plain strings, while surfacing errors and unexpected status codes as connector exceptions.

This class isn’t the centerpiece of the connector, but rather a utility layer. Its role is to encapsulate HTTP-related concerns so that the connector task code can stay focused on polling data and writing records into Kafka.

Source Task

Java
 
public class HttpSourceTask extends SourceTask {

    private static final Logger log = LoggerFactory.getLogger(HttpSourceTask.class);

    private String url;
    private String method;
    private long pollIntervalMs;
    private String topic;
    private HttpApiClient apiClient;
    private Map<String, String> sourcePartition;
    private Map<String, Object> sourceOffset;
    private long lastPollTime = 0L;

    @Override
    public String version() {
        return HttpSourceConfig.VERSION;
    }

    @Override
    public void start(Map<String, String> props) {
        log.info("Starting HttpSourceTask with properties: {}", props);
        try {
            HttpSourceConfig config = new HttpSourceConfig(props);
            this.url = config.getString(HttpSourceConfig.HTTP_URL);
            this.method = config.getString(HttpSourceConfig.HTTP_METHOD);
            this.pollIntervalMs = config.getInt(HttpSourceConfig.HTTP_POLL_INTERVAL_MS);
            this.topic = config.getString(HttpSourceConfig.TOPIC);

            this.apiClient = new HttpApiClient(config);

            this.sourcePartition = Collections.singletonMap("url", this.url);
            this.sourceOffset = context.offsetStorageReader().offset(this.sourcePartition);
            if (this.sourceOffset != null) {
                log.info("Found persisted offset: {}", this.sourceOffset);
                Long lastPolledTimestamp = (Long) this.sourceOffset.get("last_polled_timestamp");
                if (lastPolledTimestamp != null) {
                    this.lastPollTime = lastPolledTimestamp;
                }
            } else {
                log.info("No previous offset found. Starting from scratch.");
            }

        } catch (ConfigException e) {
            throw new ConnectException("Invalid connector configuration.", e);
        }
    }

    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        long currentTime = System.currentTimeMillis();
        if (currentTime - lastPollTime < pollIntervalMs) {
            long waitTime = pollIntervalMs - (currentTime - lastPollTime);
            log.info("Waiting for {} ms before next poll.", waitTime);
            Thread.sleep(waitTime);
            return Collections.emptyList();
        }

        try {

            String payload = this.apiClient.executeRequest(url, method);
            SourceRecord record = getSourceRecord(payload, currentTime);
            log.debug("Publishing fetched data={}", record);
            this.lastPollTime = currentTime;

            return Collections.singletonList(record);

        } catch (ConnectException e) {
            log.error("API client reported an unrecoverable error.", e);
            throw e;
        } catch (IOException e) {
            log.warn("An I/O error occurred during the HTTP request. This is likely temporary.", e);
            throw new RetriableException("I/O error during HTTP request.", e);
        } catch (Exception e) {
            log.error("An unexpected error occurred during the HTTP request.", e);
            throw new ConnectException("Unexpected error.", e);
        }
    }

    private SourceRecord getSourceRecord(String payload, long currentTime) {
        log.info("Successfully fetched data. Payload size: {}", payload.length());
        return new SourceRecord(
                this.sourcePartition,
                Collections.singletonMap("last_polled_timestamp", currentTime),
                this.topic,
                Schema.STRING_SCHEMA,
                Instant.now().toString(),
                Schema.STRING_SCHEMA,
                payload
        );
    }

    @Override
    public void stop() {
        log.info("Stopping HttpSourceTask");
        if (this.apiClient != null) {
            try {
                this.apiClient.close();
            } catch (IOException e) {
                log.error("Failed to close HTTP client.", e);
            }
        }
    }
}


The HttpSourceTask is where the connector actually does its job. This class is executed by Kafka Connect workers, and it’s responsible for polling the HTTP API at a fixed interval, turning the responses into Kafka records, and tracking offsets so the connector knows where it left off.

Initialization - start()

  • Reads our custom properties via HttpSourceConfig.
  • Extracts the required properties: HTTP_URL, HTTP_METHOD, HTTP_POLL_INTERVAL_MS and target TOPIC.
  • Instantiates the HttpApiClient (our wrapper from before).
  • Sets up partition/offset tracking:
    • In Kafka Connect, each source partition is a logical identifier that lets the framework track offsets and progress for a given data stream. There are different ways you can define partitions. For example, by combining multiple fields, using unique IDs, or even composite keys depending on the system you’re integrating with. In our case, since we are polling HTTP endpoints, the url itself is enough to act as the partition key. Each URL uniquely identifies the API response we’re working with, so Kafka Connect can reliably track offsets per endpoint. 
    • Uses offsetStorageReader to load the last saved offset (basically: when was this API last polled?) by invoking the this.sourceOffset.get("last_polled_timestamp")
    • If found, resumes from that timestamp; otherwise starts fresh.

Polling - poll()

  • Runs repeatedly, controlled by Kafka Connect’s scheduling.
  • Checks if enough time has passed since the last poll (pollIntervalMs). If not, it waits before trying again via Thread.sleep(waitTime).
  • When ready, it calls HttpApiClient.executeRequest() to fetch the data.
  • Wraps the API response in a SourceRecord:
    • Attaches the source partition (url) and new offset (last_polled_timestamp).
    • Includes the payload body as the record’s value and Instant.now() as record's key both in plain string using Schema.STRING_SCHEMA
    • Sets the configured Kafka topic.
  • Handles errors:
    • I/O exceptions treated as RetriableException
    • Unexpected runtime issues treated as ConnectException.
    • Other errors will result in connector's failure

Shutdown - stop()

  • Ensures the HttpApiClient is closed and resources are freed.

Source Connector

Java
 
public class HttpSourceConnector extends SourceConnector {

    private static final Logger log = LoggerFactory.getLogger(HttpSourceConnector.class);

    private Map<String, String> configProps = null;

    @Override
    public String version() {
        return VERSION;
    }

    @Override
    public ConfigDef config() {
        return HttpSourceConfig.getConfig();
    }

    @Override
    public void start(Map<String, String> props) {
        log.info("Starting HttpSourceConnector {}", props);
        configProps = props;
    }

    @Override
    public void stop() {
        log.info("Stopping HttpSourceConnector");
    }

    @Override
    public Class<? extends Task> taskClass() {
        return HttpSourceTask.class;
    }

    @Override
    public List<Map<String, String>> taskConfigs(int maxTasks) {
        List<Map<String, String>> taskConfigs = new ArrayList<>();
        for (int i = 0; i < maxTasks; i++) {
            taskConfigs.add(configProps);
        }
        return taskConfigs;
    }

}


The HttpSourceConnector class is the entry point of our connector. Every Kafka Connect source connector must extend SourceConnector, and this is where we declare how the connector should behave at a high level. 

  • config() - exposes the configuration definition we built earlier in HttpSourceConfig. 
  • start() and stop() - On startup, the framework passes the configuration properties, which we store for later use. On shutdown, we simply log that the connector is stopping.
  • taskClass() - tells the framework which SourceTask implementation should be used (in our case, HttpSourceTask).
  • taskConfigs() – this method decides how the configuration should be split across multiple tasks. Right now, we’re not partitioning the work in any special way. For each requested task, we just pass along the same configuration properties. This means every task will behave the same. In more advanced connectors, this is where you might divide the work. For example, if you had multiple URLs to poll, you could assign different URLs to different tasks. But since our connector is currently built around a single URL, duplicating the same configuration is sufficient. 

Deployment and Usage

All right, now that we have our connector—let's see it in action.

To do that, we need to build and package our connector into a fat JAR (also called an uber-jar) so that it contains not only our classes but also all the required dependencies. 

We can do this with Maven by running:

Shell
 
mvn clean package

This will produce a JAR file under the target/ directory. If you check the folder after the build, you should see something like: kafka-connect-http-1.0-jar-with-dependencies.jar

Now for convenience I moved this jar under my user directory C:\Users\ionpa\kafka-connect-http. This way it’s easy to reference it later and mount it into our Kafka container.

Next, we'll use the landoop/fast-data-dev’s docker image, since it comes with almost everything properly configured starting from the zookeeper, schema registry, kafka-connect and the broker and ending with a nice UI provided by Landoop for managing everything Kafka-related. Here is the docker-compose.yml 

YAML
 
version: '3'
services:
    kafka-cluster:
        container_name: kafka-cluster
        image: landoop/fast-data-dev
        environment:
          ADV_HOST: 127.0.0.1         # Change to 192.168.99.100 if using Docker Toolbox
          RUNTESTS: 0                 # Disable Running tests so the cluster starts faster
        ports:
          - 2181:2181                 # Zookeeper
          - 3030:3030                 # Landoop UI
          - 8081-8083:8081-8083       # REST Proxy, Schema Registry, Kafka Connect ports
          - 9581-9585:9581-9585       # JMX Ports
          - 9092:9092                 # Kafka Broker
        volumes:
          # Specify an absolute path mapping
          - C:\Users\ionpa\kafka-connect-http:/connectors/kafka-connect-http


Notice the volume mapping C:\Users\ionpa\kafka-connect-http:/connectors/kafka-connect-http. According to Landoop’s documentation, /connectors is path for making custom connectors visible to Kafka Connect. Essentially, we’re telling Docker to mount our local folder (where the fat jar lives) into the container under /connectors/kafka-connect-http . However, not every Kafka Docker image follows the same convention. Each distribution may expect connectors in a different location, depending on how plugin.path is configured e.g. /usr/share/java/. Also the plugin.path  can be changed via CONNECT_PLUGIN_PATH .

Now let’s start everything:  

Shell
 
docker-compose up


Enter the container:

Shell
 
docker exec -it kafka-cluster bash


 Inside the container, let’s verify that our custom connector JAR is properly mounted:

Shell
 
ls /connectors/


You should see kafka-connect-http listed. 

Next, let’s check if Kafka Connect actually picked up the connector:

Shell
 
curl -s http://localhost:8083/connector-plugins \
  | jq '.[] | select(.class | contains("HttpSourceConnector"))'


This should return our HttpSourceConnector entry like this:

JSON
 
{
  "class": "inc.evil.kafka.connect.http.HttpSourceConnector",
  "type": "source",
  "version": "1.0"
}


Next let's create a new instance of the connector:

Shell
 
    curl -X POST http://localhost:3030/api/kafka-connect/connectors \
      -H "Content-Type: application/json" \
      -d '{
        "name": "my-connector",
        "config": {
          "connector.class": "inc.evil.kafka.connect.http.HttpSourceConnector",
          "http.headers": "Content-Type=application/json",
          "tasks.max": "1",
          "http.query.params": "random=123",
          "http.url": "https://httpbin.org/get",
          "http.read.timeout.ms": "10000",
          "topic": "my-topic",
          "http.poll.interval.ms": "10000",
          "http.connect.timeout.ms": "5000",
          "http.method": "GET",
          "value.converter": "org.apache.kafka.connect.storage.StringConverter",
          "key.converter": "org.apache.kafka.connect.storage.StringConverter"
        }
      }' | jq


And right away let's check its status:

Shell
 
curl -X GET http://localhost:8083/connectors/my-connector/status | jq


You should see something like this: 

JSON
 
{
  "name": "my-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "127.0.0.1:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "127.0.0.1:8083"
    }
  ],
  "type": "source"
}


This confirms that the connector is up and polling https://httpbin.org/get every 10 seconds.

Next, we need a Kafka consumer to verify that messages are being written to our topic my-topic:

Shell
 
kafka-console-consumer --bootstrap-server localhost:9092 --topic my-topic --from-beginning


You should start seeing the JSON payloads from the API appear in the console every 10 seconds.

Now all that's left is to delete the old cron job and add a simple Kafka consumer in the application to handle the offers, or maybe even better - a RedisSinkConnector to push the API responses directly to Redis? But that, of course, is a story for another time.

Conclusion

We’ve built a fully working HTTP source connector from scratch, learned how to configure it, package it as a fat JAR, and deploy it with Kafka Connect. You’ve seen how to mount the connector, verify it’s loaded, and consume the API responses in real time through Kafka. 

All the code and configuration files are available for reference, so you can adapt them to your own APIs or use cases

That’s all folks, I hope that you’ve learned something. Happy coding!

API kafka Strings

Opinions expressed by DZone contributors are their own.

Related

  • Contract-First Integration: Building Scalable Systems With Flyway, OpenAPI, and Kafka
  • From APIs to Event-Driven Systems: Modern Java Backend Design
  • Secrets in Code: Understanding Secret Detection and Its Blind Spots
  • API Design First: AsyncAPI in .Net

Partner Resources

×

Comments

The likes didn't load as expected. Please refresh the page and try again.

  • RSS
  • X
  • Facebook

ABOUT US

  • About DZone
  • Support and feedback
  • Community research

ADVERTISE

  • Advertise with DZone

CONTRIBUTE ON DZONE

  • Article Submission Guidelines
  • Become a Contributor
  • Core Program
  • Visit the Writers' Zone

LEGAL

  • Terms of Service
  • Privacy Policy

CONTACT US

  • 3343 Perimeter Hill Drive
  • Suite 215
  • Nashville, TN 37211
  • [email protected]

Let's be friends:

  • RSS
  • X
  • Facebook