Apache Kafka Integration

Timestamps

As of Kafka version 0.10.0 support for timestamps has existed as part of the messageformat. This is exposed to SQLStreamBuilder via the virtual column eventTimestamp. The time in the message is the time at message creation. SQLStreamBuilder also respects current_timestamp which is the current timestamp at query execution start time.

Thus, queries can be constructed like:

SELECT sensor_name
FROM sensors
WHERE eventTimestamp > current_timestamp;

and can be used in window queries like:

SELECT SUM(CAST(amount AS numeric)) AS payment_volume,
CAST(TUMBLE_END(eventTimestamp, interval '1' hour) AS varchar) AS ts
FROM payments
GROUP BY TUMBLE(eventTimestamp, interval '1' hour);

Assigning keys for output

If using a Kafka as a sink, keys can be assigned using the special alias _eventKey. This will specify the column as the partition key using the Kafka driver. For instance:

SELECT sensor_name AS _eventKey --sensor_name becomes the key in the output kafka topic
FROM sensors
WHERE eventTimestamp > current_timestamp;

Performance & Scalability

SQLStreamBuilder is high performance and extremely scalable, but proper configuration and design of the source Kafka topic is critical. SQLStreamBuilder can read a maximum of one thread per Kafka partition. It’s generally true that setting SQLStreamBuilder threads to >= number of partitions is the highest performance configuration.

If the number of partitions is less than SQLStreamBuilder threads, then SQLStreamBuilder will have idle threads and messages will show up in the logs indicating as such.

The Kafka documentation explains Kafka partition design strategies at length.

Kafka message header access

As part of KIP-82, the ability to write custom headers was added to Kafka. Typically headers are used for metadata - perhaps routing information, filtering, etc. SQLStreamBuilder has access to the header information using the input transforms functionality. See section on Input Transforms for more details.

The following attributes are supported:

message.topic
message.key
message.value
message.headers
message.offset
message.partition

For example, an input transformation could be expressed as:

var out = JSON.parse(record);
out['topic'] = message.topic;
out['partition'] = message.partition;
JSON.stringify(out);

with a schema defined as:

{
  "name": "myschema",
  "type": "record",
  "namespace": "com.eventador.test",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "topic",
      "type": "string"
    },
    {
      "name": "partition",
      "type": "string"
    }
  ]
}

The attribute message.headers is an array that can be iterated over:

var out = JSON.parse(record);
var header = JSON.parse(message.headers);
var interested_keys = ['DC'];               // should match schema definition

out['topic'] = message.topic;
out['partition'] = message.partition;

Object.keys(header).forEach(function(key) {
    if (interested_keys.indexOf(key) > -1){  // if match found for schema, set value
        out[key] = header[key];
    }
});

JSON.stringify(out);

with a schema defined as:

{
  "name": "myschema",
  "type": "record",
  "namespace": "com.eventador.test",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "topic",
      "type": "string"
    },
    {
      "name": "partition",
      "type": "string"
    },
    {
      "name": "DC",
      "type": "string"
    }
  ]
}

Note: dynamic schema creation is not supported at this time.