Skip to the content.

Elasticsearch Search API

Documentation: Search API

Sample Request

curl --request POST \
  --header 'Content-Type: application/json' \
  --data '{"size": 100, "sort": [{"@timestamp": "asc"}], "search_after": [1589500805994]}' \
  --url 'http://domain/index_name/_search'

Sample Response

{
  "took":87,
  "timed_out":false,
  "_shards":{
    "total":4,
    "successful":4,
    "skipped":0,
    "failed":0
  },
  "hits":{
    "total":10836885,
    "max_score":null,
    "hits":[
      ...,
      {
        "_index":"index_name",
        "_type":"type_name",
        "_id":"Y2Q3MGM3OWQtNWVkZC00YzFjLTliMDAtZTMyMDAxZDg2MTI0",
        "_score":null,
        "_source":{
          ...,
          "@timestamp":"2020-05-15T00:00:06.008000000+00:00"
        },
        "sort":[
          1589500806008
        ]
      }
    ]
  }
}

Elasticsearch recommends using search_after for deep pagination, which fits perfectly with our CDC purposes.

All we have to do is select some properties to sort by, and those will be then used as a scrolling mechanism in between queries.

Sample Configuration

{
    "name": "my_index.elasticsearch.http.source",
    "config": {
        "connector.class": "com.github.castorm.kafka.connect.http.HttpSourceConnector",
        "tasks.max": "1",
        "http.offset.initial": "timestamp=2020-01-01T00:00:00Z",
        "http.request.url": "http://domain/index_name/_search",
        "http.request.method": "POST",
        "http.request.headers": "Content-Type: application/json",
        "http.request.body": "{\"size\": 100, \"sort\": [{\"@timestamp\": \"asc\"}], \"search_after\": [${offset.timestamp?datetime.iso?long}]}",
        "http.response.list.pointer": "/hits/hits",
        "http.response.record.pointer": "/_source",
        "http.response.record.offset.pointer": "key=/_id, timestamp=/sort/0",
        "http.timer.interval.millis": "30000",
        "http.timer.catchup.interval.millis": "1000",
        "kafka.topic": "topic"
    }
}