-
Notifications
You must be signed in to change notification settings - Fork 1
Table Definitions
Kinesis streams stores data as stream of bytes and leaves it to producers and consumers to define how a message should be interpreted. For Presto, this data must be mapped into columns to allow queries against the data. The JSON table definition files take care of this mapping.
These files can be stored in one of two places:
- A defined location on Amazon S3 (At StitchFix we use: s3://stitchfix.aa.config/tools/presto-kinesis)
- Local filesystem on each node We prefer the first choice, since it's easier to keep individual Presto nodes up to date.
A table definition file consists of a JSON definition for a table, which corresponds to one stream in Kinesis. The name of the file can be arbitrary but must end in .json
. The structure of the table definition is as follows:
{
"tableName": ...,
"schemaName": ...,
"streamName": ...,
"message": {
"dataFormat": ...,
"field": [
...
]
}
}
Field | Required | Type | Description |
---|---|---|---|
tableName |
required | string | Presto table name defined by this file |
schemaName |
optional | string | Schema which will contain the table. If omitted, the default schema name is used |
streamName |
required | string | Name of the Kinesis Stream that is mapped |
message |
optional | JSON object | Field definitions for data columns mapped to the message itself |
Every message in a Kinesis stream can be decoded using the definition provided in the message
object.
The json object message
in the table definition contains two fields:
Field | Required | Type | Description |
---|---|---|---|
dataFormat |
required | string | Selects the decoder for this group of fields (json, csv, or raw) |
fields |
required | JSON array | A list of field definitions. Each field definition creates a new column in the Presto table and maps that column to a part of the Kinesis message |
Each field definition is a JSON object. At a minimum, you'll want to provide a name, type, and a mapping. The overall structure looks like this:
{
"name": ...,
"type": ...,
"dataFormat": ...,
"mapping": ...,
"formatHint": ...,
"hidden": ...,
"comment": ...
}
Field | Required | Type | Description |
---|---|---|---|
name |
required | string | Name of the column in the Presto table |
type |
required | string | Presto type of the column |
dataFormat |
optional | string | Selects the column decoder for this field. Defaults to the default decoder for the message data format and column type |
mapping |
optional | string | Mapping information for the column. This is explained below |
formatHint |
optional | string | Sets a column specific format hint to the column decoder |
hidden |
optional | boolean | Hides the column from DESCRIBE <table name> and SELECT * . Defaults to false
|
comment |
optional | string | Add a column comment which is shown with DESCRIBE <table name>
|
There is no limit on the number of field descriptions for message.
The name
field is exposed to presto as the column name, while the mapping
field is the portion of the message that gets mapped to that column. For JSON object messages, this refers to the field name of an object, and can be a path that drills into the object structure of the message. Additionally, you can map a field of the JSON object to a string column type, and if it is a more complex type (JSON array or JSON object) then the JSON itself will become the field value.
It is possible to add other fields to the JSON object for documentation or other metadata. These should be added as additional fields in the root object after the four main fields described above (tableName, schemaName, streamName, message) and not within those fields. The extra fields will simply be ignored by the connector and it will only read the four fields it knows about.
{
"tableName": ...,
"schemaName": ...,
"streamName": ...,
"message": {
"dataFormat": ...,
"field": [
...
]
},
"comment" : "This stream is used to capture new signups",
"client-meta": {
"color" : ...
}
}
Following is a complete example using a JSON message.
{
"tableName": "test_table",
"schemaName": "otherworld",
"streamName": "test_kinesis_stream",
"message": {
"dataFormat": "json",
"fields": [
{
"name": "client_id",
"type": "BIGINT",
"mapping": "body/profile/clientId",
"comment": "The client ID field"
},
{
"name": "acct_balance",
"type": "DOUBLE",
"mapping": "body/acct_balance",
"comment": "Current account balance"
},
{
"name": "service_type",
"mapping": "body/profile/service_type",
"type": "VARCHAR(20)"
},
{
"name": "routing_time",
"mapping": "header/routing_time",
"type": "DATE",
"dataFormat": "iso8601"
}
]
}
}
If you want to see what's going on with your table definition, you can query all the fields with the following:
SELECT * FROM kinesis.SCHEMA.TABLE LIMIT 10;
This will show all of the fields, including a bunch prefixed by an underscore, such as _message
and _shard_id
. Most of these are self explanatory, but the one that's most useful is _message
, which is what the mapping
in the field definition refers to. If you're having issues getting a field to show up, you can do something along the lines of the following to help figure it out:
-- assuming your mapping looks like PATH/TO/FIELD
SELECT
PROBLEM_FIELD, json_extract_scalar(_message, '$.PATH.TO.FIELD') as PROBLEM_FIELD_msg, _message
FROM kinesis.SCHEMA.TABLE LIMIT 10