Skip to the content.

Kafka Connect HTTP Connector

Build Codacy Badge FOSSA Status Release to GitHub Release to Maven Central Maven Central

Kafka Connect connector that enables Change Data Capture from JSON/HTTP APIs into Kafka.

This connector is for you if


See examples, e.g.

Getting Started

If your Kafka Connect deployment is automated and packaged with Maven, you can unpack the artifact on Kafka Connect plugins folder.


Otherwise, you’ll have to do it manually by downloading the package from the Releases Page.

More details on how to Install Connectors.

Source Connector


Extension points

The connector can be easily extended by implementing your own version of any of the components below.

These are better understood by looking at the source task implementation:

public List<SourceRecord> poll() throws InterruptedException {


    HttpRequest request = requestFactory.createRequest(offset);

    HttpResponse response = requestExecutor.execute(request);

    List<SourceRecord> records = responseParser.parse(response);

    List<SourceRecord> unseenRecords = recordSorter.sort(records).stream()

    confirmationWindow = new ConfirmationWindow<>(extractOffsets(unseenRecords));

    return unseenRecords;

public void commitRecord(SourceRecord record, RecordMetadata metadata) {

public void commit() {
    offset = confirmationWindow.getLowWatermarkOffset()

Timer: Throttling HttpRequest

Controls the rate at which HTTP requests are performed by informing the task, how long until the next execution is due.


public interface Timer extends Configurable {

    Long getRemainingMillis();

    default void reset(Instant lastZero) {
        // Do nothing

Throttling HttpRequest with FixedIntervalThrottler

Throttles rate of requests based on a fixed interval.


Interval in between requests

Throttling HttpRequests with AdaptableIntervalThrottler

Throttles rate of requests based on a fixed interval. It has, however, two modes of operation, with two different intervals:


Interval in between requests when up-to-date


Interval in between requests when catching up

HttpRequestFactory: Creating a HttpRequest

The first thing our connector will need to do is creating a HttpRequest.


public interface HttpRequestFactory extends Configurable {

    HttpRequest createRequest(Offset offset);


Initial offset, comma separated list of pairs.

Creating a HttpRequest with TemplateHttpRequestFactory

This HttpRequestFactory is based on template resolution.


Http method to use in the request.


Http url to use in the request.


Http headers to use in the request, , separated list of : separated pairs.


Http query parameters to use in the request, & separated list of = separated pairs.


Http body to use in the request.

public interface TemplateFactory {

    Template create(String template);

public interface Template {

    String apply(Offset offset);

Class responsible for creating the templates that will be used on every request.

Creating a HttpRequest with FreeMarkerTemplateFactory

FreeMarker templates will have the following data model available:

Accessing any of the above withing a template can be achieved like this:


For an Epoch representation of the same string, FreeMarker built-ins should be used:


For a complete understanding of the features provided by FreeMarker, please, refer to the User Manual

HttpClient: Executing a HttpRequest

Once our HttpRequest is ready, we have to execute it to get some results out of it. That’s the purpose of the HttpClient


public interface HttpClient extends Configurable {

    HttpResponse execute(HttpRequest request) throws IOException;

Executing a HttpRequest with OkHttpClient

Uses a OkHttp client.


Timeout for opening a connection

Timeout for reading a response


Time to live for the connection

Hostname of the HTTP Proxy


Port of the HTTP Proxy


Username of the HTTP Proxy


Password of the HTTP Proxy

HttpAuthenticator: Authenticating a HttpRequest

When executing the request, authentication might be required. The HttpAuthenticator is responsible for resolving the Authorization header to be included in the HttpRequest.


public interface HttpAuthenticator extends Configurable {

    Optional<String> getAuthorizationHeader();

Authenticating with ConfigurableHttpAuthenticator

Allows selecting the authentication type via configuration property


Type of authentication

Authenticating with BasicHttpAuthenticator

Allows selecting the authentication type via configuration property


HttpResponseParser: Parsing a HttpResponse

Once our HttpRequest has been executed, as a result we’ll have to deal with a HttpResponse and translate it into the list of SourceRecords expected by Kafka Connect.


public interface HttpResponseParser extends Configurable {

    List<SourceRecord> parse(HttpResponse response);

Parsing with PolicyHttpResponseParser

Vets the HTTP response deciding whether the response should be processed, skipped or failed. This decision is delegated to a HttpResponsePolicy. When the decision is to process the response, this processing is delegated to a secondary HttpResponseParser.

HttpResponsePolicy: Vetting a HttpResponse
public interface HttpResponsePolicy extends Configurable {

    HttpResponseOutcome resolve(HttpResponse response);

    enum HttpResponseOutcome {
Vetting with StatusCodeHttpResponsePolicy

Does response vetting based on HTTP status codes in the response and the configuration below.

Comma separated list of code ranges that will result in the parser processing the response

Comma separated list of code ranges that will result in the parser skipping the response

Parsing with KvHttpResponseParser

Parses the HTTP response into a key-value SourceRecord. This process is decomposed in two steps:

public interface KvRecordHttpResponseParser extends Configurable {

    List<KvRecord> parse(HttpResponse response);
public interface KvSourceRecordMapper extends Configurable {

    SourceRecord map(KvRecord record);
Parsing with JacksonKvRecordHttpResponseParser

Uses Jackson to look for the records in the response.


JsonPointer to the property in the response body containing an array of records


JsonPointer to the individual record to be used as kafka record body. Useful when the object we are interested in is under a nested structure


Comma separated list of key=/value pairs where the key is the name of the property in the offset, and the value is the JsonPointer to the value being used as offset for future requests. This is the mechanism that enables sharing state in between HttpRequests. HttpRequestFactory implementations receive this Offset.

Special properties:

One of the roles of the offset, even if not required for preparing the next request, is helping in deduplication of already seen records, by providing a sense of progress, assuming consistent ordering. (e.g. even if the response returns some repeated results in between requests because they have the same timestamp, anything prior to the last seen offset will be ignored). see OffsetFilterFactory


Class responsible for converting the timestamp property captured above into a java.time.Instant.


When using DateTimeFormatterTimestampParser, a custom pattern can be specified

Timezone of the timestamp. Accepts ZoneId valid identifiers


When using RegexTimestampParser, a custom regex pattern can be specified


When using RegexTimestampParser, a delegate class to parse timestamp

Mapping a KvRecord into SourceRecord with SimpleKvSourceRecordMapper

Once we have our KvRecord we have to translate it into what Kafka Connect is expecting: SourceRecords

Embeds the record properties into a common simple envelope to enable schema evolution. This envelope simply contains a key and a value properties with customizable field names.

Here is also where we’ll tell Kafka Connect to what topic and on what partition do we want to send our record.

** It’s worth noticing there are projects out there that allow you to infer the schema from your json document. (e.g. expandjsonsmt)


Name of the topic where the record will be sent to

Name of the key property in the key-value envelope

Name of the value property in the key-value envelope

SourceRecordSorter: Sorting SourceRecords

Some Http resources not designed for CDC, return snapshots with most recent records first. In this cases de-duplication is especially important, as subsequent request are likely to produce similar results. The de-duplication mechanisms offered by this connector are order-dependent, as they are usually based on timestamps.

To enable de-duplication in cases like this, we can instruct the connector to assume a specific order direction, either ASC, DESC, or IMPLICIT, where implicit figures it out based on records’ timestamps.


public interface SourceRecordSorter extends Configurable {

    List<SourceRecord> sort(List<SourceRecord> records);


Order direction of the results in the response list.

SourceRecordFilterFactory: Filtering out SourceRecord

There are cases when we’ll be interested in filtering out certain records. One of these would be de-duplication.


public interface SourceRecordFilterFactory extends Configurable {

    Predicate<SourceRecord> create(Offset offset);

Filtering out SourceRecord with OffsetTimestampRecordFilterFactory

De-duplicates based on Offset’s timestamp, filtering out records with earlier or the same timestamp. Useful when timestamp is used to filter the HTTP resource, but the filter does not have full timestamp precision. Assumptions:

If the latter assumption cannot be satisfied, check OffsetRecordFilterFactory to try and prevents data loss.

Filtering out SourceRecord with OffsetRecordFilterFactory

De-duplicates based on Offset’s timestamp, key and any other custom property present in the Offset, filtering out records with earlier timestamps, or when in the same timestamp, only those up to the last seen Offset properties. Useful when timestamp alone is not unique but together with some other Offset property is. Assumptions:



mvn package


Using Pre-configured docker setup

You can easily run a Kafka Connect cluster with kafka-connect-http pre-installed by executing:

mvn verify -Pdebug -DskipTests

It’ll run dockerized versions of kafka and kafka-connect which you can access via REST API or attach debuggers to the url printed in console:

Kafka Connect testcontainers infra is ready
  Rest API: http://localhost:33216
  Debug agent: localhost:33217

Right after, it’ll allow you to specify the file path to your connector’s json configuration:

Introduce the path to your connector JSON configuration file:

It’ll subscribe to the corresponding kafka topic, printing every message going through the output topic of your connector.

Using Kafka Connect standalone

These instructions are phrased in terms of the steps needed when using IntelliJ, but other integrated development environments are likely to be similar.

Point the Kafka stand-alone plugin.path at the module compile Output path. Assuming you are using the default Maven project import, this is the ./target directory, so the config/ file would contain the line

plugin.path=<directory where git clone was executed>/kafka-connect-http/kafka-connect-http/target

In the Run/Debug Configurations dialog, create a new Remote JVM Debug configuration with the mode Attach to remote JVM. When remote debugging, some Java parameters need to be specified when the program is executed. Fortunately there are hooks in the Kafka shell scripts to accommodate this. The Remote JVM Debug configuration specifies the needed Command line arguments for remote JVM. In the terminal console where you execute the connect command line, define KAFKA_DEBUG and JAVA_DEBUG_OPTS as:

export KAFKA_DEBUG=true
export JAVA_DEBUG_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005

Place a suitable breakpoint in the kafka-connect-http code, e.g. in HttpSourceTask.start(), and launch the standalone connect program:

bin/ config/ plugins/<kafka-connect-http properties file>

Click the Debug icon in IntelliJ and ensure the debugger console says Connected to the target VM, address: 'localhost:5005', transport: 'socket' and the breakpoint you placed becomes checked. The program should now break when the breakpoint is hit.

Running the tests

mvn test



Contributions are welcome via pull requests, pending definition of code of conduct, please just follow existing conventions.


We use SemVer for versioning.


This project is licensed under the Apache 2.0 License - see the LICENSE.txt file for details

Built With