SQLStreamBuilder™

Eventador SQLStreamBuilder (SSB) is a production grade query engine for streaming data.

It is a comprehensive interface for creating stateful stream processing jobs against boundless streams of data using Structured Query Language (SQL). By using SQL, you can simply and easily declare expressions that filter, aggregate, route and otherwise mutate streams of data.

ssb

SQLStreamBuilder is particularly well-suited for:

  • Creating entire stream processing systems without needing to code in Java/Scala.
  • Building real time ETL pipelines.
  • Pre-aggregating data before storing it into traditional datastores.
  • Testing/Hypothesizing/Reasoning about your data.
  • Allowing anyone familiar with SQL to build stream processors.
  • Filtering, splitting, routing, cleansing, and aggregating to departmental specific streams/topics or datastores.
  • Joining various incoming streams into one useful dataset.

Concepts

Jobs

SSB runs in an interactive fashion where you can quickly see the results of your query and iterate on your SQL syntax. It also works in a persistent fashion where the SQL queries you execute run as jobs on the Flink cluster, operating on boundless streams of data until cancelled. This allows you to author, launch and monitor stream processing jobs within SSB. Every SQL query is a job.

Sources and Sinks

SSB processes data from a source to a sink using the SQL specified (or just to the browser only). When a source or sink is created, you assign it a virtual table name. You then use that virtual table name to address the FROM table in your query (source) and specify the destination (sink) in the interface. This allows you to create powerful aggregations, filters - any SQL expression against the stream. Sources and sinks are specified in the SSB console.

It’s important to note that because this is streaming SQL, the query does not end until canceled, and results may not show up immediately if there is a long time window, or if data matching the criteria isn’t being streamed at that moment.

Kafka Sinks

Kafka sinks are an output virtual table to send the data resulting from the query to. Kafka sink data is in JSON format.

Amazon S3 Sinks

An Amazon S3 Sink, is an empty S3 bucket that will be filled with objects containing the following filename layout:

2019-09-17T19:24:41.333Z

The contents of these files is one JSON message per line. The S3 sink is currently only supported in the same region as your cluster.

Key Changes

When a key is removed from the messages coming from a source, SSB will happily continue to consume messages; however, upon sinking, it will mark the missing key as NULL.

Similarly, when a key is removed from the source schema but not the messages coming from the source, SSB will ignore the key on the incoming stream.

Schema

Schema is defined for a given source when the source is created. You define a schema using JSON for the incoming data, and each virtual table has its own schema. For instance, if the source is Apache Kafka, you specify a topic name and you define a schema for the data.

For sinks, the schema is defined as the table structure in the SQL statement, both for column names and datatypes. SSB supports aliases for column names like SELECT bar AS foo FROM baz as well as CAST(value AS type) for type conversions.

SSB supports a couple of meta commands like show tables and describe <table> that make it easy to understand the schema if you are familar with most database platforms.

Where do the query results go?

SQLStreamBuilder is unique in that it lets you iterate on your SQL, but it also allows you to build robust data pipelines based on that SQL. When you execute a query, the results go to the Virtual Table Sink that you selected in the SQL window. This allows you to create aggregations, filters, joins, etc and then “pipe” the results to a sink. The schema for the results is the schema that you created when you ran the query (see above).

Results are also sampled to your browser so you can inspect the data, and iterate on your query. 100 rows are sampled back at a time. You can sample 100 more rows by clicking the Sample button. If you select Results in Browser for the Sink Virtual Table then results are only sampled to the browser.


Getting Started

There are a few steps to complete before you can start executing and iterating on SQL jobs for the first time:

  1. Create an Cloud Environment
  2. Create one or more data providers
  3. Create a virtual table as a source
  4. Create a virtual table as a sink
  5. Execute SQL jobs

Once an SSB deployment is created, data providers are defined, and you have a good set of virtual tables for your use case, you don’t need to repeat these steps.

1. Create a Cloud Environment.

A cloud environment is the cloud account processing resources will be allocated from. This can be your everyday AWS account, or a new account that is VPC Peered back to your main account.

  • Click on Cloud Environments
  • Click on the Create Cloud Environment button
  • Enter a logical name for this environment. It’s generally good to use the region in the name like AWS_EU_CENTRAL or something similar.
  • Add a description for the environment
  • Select AWS as your provider
  • Select the AWS region you would like to deploy to
  • Enter your AWS IAM Access Key
  • Enter your AWS IAM Secret Key
  • Select a VPC CIDR Range or use the default if it doesn’t collide with your IP range.
  • Ensure Triple AZ is selected
  • Select Create Environment

AWS resources will then be allocated to the account associated with the IAM keys. Initially just a couple of resources will be built, more will be created as you create clusters to run jobs on.

2. Create a data source

A data source is a registry that stores the clusters/endpoints you wish to query. Once created they hold the URI or connect string to a cluster or other data source, and the corresponding credentials. Once you setup data sources you can reference them without changing this configuration.

For data source types and availability see data providers.

  • Click on Data Sources on the side menu.
  • Click on Add Kafka Provider button at the top.
  • Add a unique name for the source, this can be anything that makes sense for your use case.
  • Add the Kafka brokers connect string.
  • If you are connecting to an endpoint over plaintext leave PLAINTEXT selected for Connection Protocol, otherwise select the connection protocol your source Kafka cluster supports.
  • If you selected SASL/SSL fill in SASL Username and SASL Password. Typically these can be generated/found in your cloud provider control panel.
  • Click on Save Changes to create the data provider.

Currently SSB only support Kafka data sources. We will be adding other sources soon.

2. Create virtual table as a source

You must have at least one source to run queries on SSB.

  • Click on SQLStreamBuilder on the side menu.
  • Click on the Virtual Tables tab.
  • Click on the Source Virtual Tables tab.
  • Click Add Source and select Apache Kafka.
  • Specify the name of the source, this will be what you use in the FROM clause of your query. Name it any logical name that makes sense to you.
  • Select a Kafka cluster, and topic for the source. This is the data provider you setup in the previous step.
  • Define a data format. Data formats can be of type JSON or AVRO.
  • Define or register a schema:

If you are using JSON encoded data you must define the schema. The format is JSON-schema like and a sample schema is provided as a template. Change the values to match your specific schema.

source

If you are using AVRO encoded data you must register the schema by providing a Schema Manager endpoint. The schema will be utilized in SSB and use Schema Registry for versioning.

  • Add the Schema Registry endpoint for Server and Port and click the plus button.

avro_source

  • Click on Save Changes.

Note: You can create multiple virtual tables for the same Kafka topic.

3. Create virtual table as a sink

(this step is optional - you can select Results in Browser to test functionality without sending the results to a sink)

  • Click on SQLStreamBuilder on the side menu.
  • Click on the Virtual Tables tab.
  • Click on the Sink Virtual Tables tab.
  • Click Add Sink and select Apache Kafka.
  • Specify the name of the sink, this will be what you select for a sink in the SQL window. Name it any logical name that makes sense to you.
  • Select a Kafka cluster, and topic for the sink. The schema in the sink is defined by the columns and their respective datatypes as specified in the query.

sink

4. Running SQL:

  • Click on SQLStreamBuilder on the side menu
  • Set a unique name for the SQL Job Name. Each query is a job and requires a name. If you are iterating quickly on your SQL statement, just use the default name. You can select the Name Job button to create a clever name for you if needed.
  • Select Environment.Cluster, and Add Cluster.
  • In the Add Deployment box, choose a logical Name for this deployment.
  • Give it a good Description.
  • Select a Region/Zone.
  • Click Create. Over the next few moments, a cluster will be provisioned, when it is complete move to the next step.

create

Note: If you already have a cluster created then select that one and skip this step. Creating cluster is typically just done a few times not every time you run a query.

  • Select Sink Virtual Table, and select the sink you want the query results to be sent to.

Execute SQL

  • To list the available virtual tables, begin editing within the SSB SQL Console and run SHOW TABLES, then click Execute or use the key combination Ctrl+Enter. A list of tables will be shown in the Results tab below the editor..
  • To view the schema for a virtual table, run DESC <tablename>, substituting the table you would like to run. Click Execute or use the key combination Ctrl+Enter. The schema will be displayed in the Results tab below the editor.
  • To run SQL, write a SQL statement into the editor and click Execute or use the key combination Ctrl+Enter.
  • SQL Syntax is Calcite Compatible, check out the SQL Syntax Reference.

Remember: Each execution is a new job.

The log window will offer verbose output on the parse status of the statement, and the results of the query will return to the Results tab below the editor as well as be spooled to the Virtual Table Sink. It should be noted that this paradigm is a bit different than typical database queries because it’s a streaming SQL query. The displayed results are a sample of the output from the query. If you navigate away from the page, the query will still be running as a job, and you can view it in the SQL Jobs tab.

If you don’t see results right away, this may be because of the nature of streaming SQL - data may not immediately match your criteria. The Logs tab shows the status of the SQL Job. You can also view the logs of an existing job by selecting the job in the SQL Jobs tab.

If you would like to re-sample the results from the query select the Sample button at any time.

Here is a video showing building and executing a query:


SQL Syntax Reference

SQLStreamBuilder utilizes Apache Calcite compatible streaming SQL. Beyond that there are metadata commands, and some built-ins.

Some supported operations are:

and a list of data types:

Kafka Timestamps

Kafka as of 0.10.0 has supported timestamps as part of the messageformat. This is exposed to SQLStreamBuilder via the keyword eventTimestamp. The time in the message is the time at message creation. SQLStreamBuilder also respects current_timestamp which is the current timestamp at query execution start time.

Thus, queries can be constructed like:

SELECT sensor_name
FROM sensors
WHERE eventTimestamp > current_timestamp;

and can be used in window queries like:

SELECT SUM(CAST(amount AS numeric)) AS payment_volume,
CAST(TUMBLE_END(eventTimestamp, interval '1' hour) AS varchar) AS ts
FROM payments
GROUP BY TUMBLE(eventTimestamp, interval '1' hour);

Metadata Commands

  • show tables - list virtual tables.
  • desc <vtable> - describe the specified virtual table, showing columns and types.
  • show jobs - list current running SQL jobs.
  • show history - show SQL query history (only successfully parsed/executed).
  • help - show help.

Hyperjoins

Joins are considered “hyperjoins” because SQLStreamBuilder has the ability to join multiple virtual tables in a single query, and because a virtual table is created from a data provider, these joins can span multiple clusters/connect strings, but also multiple types of sources (join Kafka and a database for instance). So something like this is possible:

SELECT us_west.user_score+ap_south.user_score
FROM kafka_in_zone_us_west us_west
FULL OUTER JOIN kafka_in_zone_ap_south ap_south
ON us_west.user_id = ap_south.user_id;

Managing Jobs

Every time you execute SQL, it becomes a job. It runs on the deployment as an Apache Flink job. You can manage running them using the jobs tab.

jobs

Monitoring a job

  • Click on SQLStreamBuilder on the side menu.
  • Click on the SQL Jobs tab.
  • Select the job you would like to monitor.
  • Select the Log or Performance tab on the bottom pane.

The Logs tab shows detailed logging from the Flink job itself - any exceptions will be logged here. As the job processes messages, a count of messages processed is spooled to the log every 5 seconds:

8/13/2019, 9:42:49 AM - INFO - 2019-08-08 19:24:17.752157 : 5 Second Count: 2502

The Performance tab shows a basic performance graph based on this data.

Stopping a job

  • Click on SQLStreamBuilder on the side menu.
  • Click on the SQL Jobs tab.
  • Click on the red stop button for the job you would like to stop.

Restarting a job

  • Click on SQLStreamBuilder on the side menu.
  • Click on the SQL Jobs tab.
  • Click on the select box below the State column and select Cancelled or Failed to see stopped jobs.
  • Select the job you would like to restart.
  • Select the Details tab at the bottom.
  • Select the Edit Selected Job button - this will bring up the SQL window in Edit Mode.
  • Select Restart to restart the job.

Editing a job

Editing a job is similar to restarting a job.

  • Click on SQLStreamBuilder on the side menu.
  • Click on the SQL Jobs tab.
  • Select the job you would like to edit.
  • Select the Details tab at the bottom.
  • Select the Edit Selected Job button - this will bring up the SQL window in Edit Mode.
  • Edit/alter the Target Deployment, Sink Virtual Table and the SQL itself as needed then select Restart to restart the job. The job will be stopped and restarted.

Sampling data for a running job

You can sample data from a running job. This is useful if you want to inspect the data to make sure the job is producing the results you expect. To sample the data from a running job:

  • Click on SQLStreamBuilder on the side menu.
  • Click on the SQL Jobs tab.
  • Select the job you would like to edit.
  • Select the Details tab at the bottom.
  • Select the Edit Selected Job button - this will bring up the SQL window in Edit Mode.
  • Click the Sample button. Results will be sampled and displayed in the results window. If there is no data meeting the SQL query, sampling will give up after a few tries.

Data Providers

Data providers is a catalog of data endpoints to be used as sources and sinks. Data providers allows you to register the provider using your security credentials, then use that provider for a source or sink in SQLStreamBuilder.

AWS MSK (Managed Streaming Kafka)

To register AWS MSK as a data provider:

It’s important to note that because AWS MSK doesn’t have any public endpoints, it must be VPC peered to your Eventador VPC for connectivity to work. If you need help with this, don’t hesitate to contact support and we will lead you through getting the VPC’s peered.

  • Click on the Data Providers menu option.
  • Click on Add Kafka Provider.
  • Provide a name for the provider. This can be any logical name that makes sense for your use case(s) and must be unique.
  • Enter the broker:port connection string for your cluster. You can find this information in the AWS MSK console, by clicking on Client Access. Check out the AWS documentation for specifics.
  • Click the + button to add the connect string, and then the Save Changes button.
  • You should see your new provider in the list of registered providers. You can now see the provider name in the list for sources and sinks.

Confluent Cloud

To register Confluent Cloud (CC) as a data provider:

  • Ensure you have your CC SASL/SSL username and password available.
  • Ensure you have your CC bootstrap.servers value available.
  • Click on the Data Providers menu option.
  • Click on Add Kafka Provider.
  • Provide a name for the provider. This can be any logical name that makes sense for your use case(s) and must be unique.
  • Enter the bootstrap.servers value from CC into the Brokers text field
  • Select SASL/SSL for the connection protocol.
  • Select plain for SASL Mechanism.
  • Enter the CC username and password.
  • Click Save changes. You should see the new provider in the list of registered providers.

source


Source and Sink availability

Available Sources

Source Availability
Kafka Yes
Kinesis TBA

Available Sinks

Source Availability
Kafka Yes
Amazon S3 Limited
Kinesis TBA
ElasticSearch TBA
JDBC (Relational DB) TBA
MongoDB TBA

Feedback and Support

  • Provide feedback and report bugs to our Zendesk system by emailing us.