This library is compatible with Go 1.11+
Please refer to CHANGELOG.md
if you encounter breaking changes.
- Motivation
- Introduction
- Tail Service
- Dispatch Service
- Getting Started
- Usage
- Data ingestion rule
- End to end testing
The goal of this project is to provide a single point of robust and cost-effective event driven, data ingestion to Big Query. BqTail elegantly addresses BigQuery limitations/restriction/quotas (load jobs per table, project, corrupted data files) with batching and transient project(s)/dataset approach. In addition it also provides data transformation enrichment and deduplication capabilities.
A single point means that there is only one deployment instance of bqtai/dispatch/monitor to ingest data to various BigQuery projects, where each ingestion process can use a dedicated one or more transient projects to control billing, reservation, or default-pipeline workload. The transient project function is to load data to a temp table, run the optional transformation, and copy data to a destination table.
Note that free of charge data ingestion takes place on default-pipeline reservation which is governed by fair scheduler allocating resources among competing load job across various projects. To guarantee ingestion speed for critical data it is recommended to use project with slot reservation
BqTail is used by Viant to ingest 70+ billions transactions daily, 1.4 million files to 100+ tables, all under $15, as viable alternative for Big Query Streaming API, BigQuery Transfer Service, Cloud Dataflow.
BqTail process can ingest data in async mode using serverless cloud functions based tail and dispatch service, or sync mode with standalone bqtail command.
BqTail command is great place to start to start building and validating ingestion rule locally.
## to validate
bqtail -s=localSourceFolder -d='myproject:mydataset.mytable' -w=90 -p=myproject -V
## to load data
bqtail -s=localSourceFolder -d='myproject:mydataset.mytable' -w=90
## to load with rule file
bqtail -s=localSourceFolder -r='myrule.yaml'
### to stream data from s3
export AWS_SDK_LOAD_CONFIG=true
bqtail -s='s3://bybucket/dataxx/' -r='myrule.yaml' -X
The following define rule to ingest data in batches within 30 sec time window in async mode.
When:
Prefix: "/data/"
Suffix: ".avro"
Dest:
Table: mydataset.mytable
Async: true
Batch:
Window:
DurationInSec: 30
OnSuccess:
- Action: delete
The following define rule to ingest data in batches within 60 sec time window in async mode.
Async: true
When:
Prefix: "/data/"
Suffix: ".avro"
Dest:
Table: mydataset.mytable
TempDataset: transfer
UniqueColumns:
- id
Batch:
Window:
DurationInSec: 60
OnSuccess:
- Action: query
Request:
SQL: SELECT $EventID AS job_id, COUNT(1) AS row_count, CURRENT_TIMESTAMP() AS
completed FROM $TempTable
Dest: mydataset.summary
- Action: delete
OnFailure:
- Action: notify
Request:
Channels:
- "#e2e"
From: BqTail
Title: bqtail.wrong_dummy ingestion
Message: "$Error"
Token: SlackToken
For example if your logs are stored in gs://$bqTailTriggerBucket/mylogs/logName1/2020/01/11/ you can extract date from URL to use in destination table suffix.
When:
Prefix: "/mylogs/"
Async: true
Batch:
Window:
DurationInSec: 120
Dest:
Pattern: "/mylogs/.+/(\\d{4})/(\\d{2})/(\\d{2})/.+"
Table: myproject:mydataset.mytable_$1$2$3
SourceFormat: NEWLINE_DELIMITED_JSON
Transient:
Dataset: temp
Schema:
Template: myproject:mydataset.mytempate
Split:
ClusterColumns:
- meta.eventId
Mapping:
- When: meta.eventId IN (101, 102)
Then: myproject:mydataset.my_table1_$1$2$3
- When: meta.eventId IN (103, 104)
Then: myproject:mydataset.my_table2_$1$2$3
- When: meta.eventId > 104
Then: myproject:mydataset.my_table2_$Mod(10)_$1$2$3
OnSuccess:
- Action: delete
OnFailure:
- Action: notify
Request:
Channels:
- "#my_error_channel"
Title: My log ingestion
Message: "$Error"
Info:
Workflow: My log ingestion
ProjectURL: JIRA/WIKi or any link referece
LeadEngineer: [email protected]
[
{
"When": {
"Prefix": "/data/",
"Suffix": ".csv"
},
"Async": true,
"Dest": {
"Override": true,
"Table": "myproject:mydataset.mytable",
"Partition": "$Date",
"Transient":{
"Dataset":"temp"
},
"SkipLeadingRows": 1,
"MaxBadRecords": 3,
"FieldDelimiter": ",",
"IgnoreUnknownValues": true
},
"OnSuccess": [
{
"Action": "delete"
}
]
}
]
When:
Prefix: "/mypath/mysubpath"
Suffix: ".json"
Async: true
Batch:
Window:
DurationInSec: 10
Dest:
Table: bqtail.transactions
Transient:
Dataset: temp
Alias: t
Transform:
charge: (CASE WHEN type_id = 1 THEN t.payment + f.value WHEN type_id = 2 THEN t.payment * (1 + f.value) END)
SideInputs:
- Table: bqtail.fees
Alias: f
'On': t.fee_id = f.id
OnSuccess:
- Action: query
Request:
SQL: SELECT
DATE(timestamp) AS date,
sku_id,
supply_entity_id,
MAX($EventID) AS batch_id,
SUM( payment) payment,
SUM((CASE WHEN type_id = 1 THEN t.payment + f.value WHEN type_id = 2 THEN t.payment * (1 + f.value) END)) charge,
SUM(COALESCE(qty, 1.0)) AS qty
FROM $TempTable t
LEFT JOIN bqtail.fees f ON f.id = t.fee_id
GROUP BY 1, 2, 3
Dest: bqtail.supply_performance
Append: true
OnSuccess:
- Action: delete
The following snapshot show serverless cost overhead per one day data ingestion (70TB, 1.6 millions files).
Note that actual data ingestion with load and copy BigQuery operations are free of charge.
The following link details generic deployment.
The following link details bqtail monitoring.
Bqtail is fully end to end test with including batch allocation stress testing with 2k files.
You can try on all data ingestion by simply running e2e test cases:
BqTail is an open source project and contributors are welcome!
See TODO list
The source code is made available under the terms of the Apache License, Version 2, as stated in the file LICENSE
.
Individual files may be made available under their own specific license, all compatible with Apache License, Version 2. Please see individual files for details.
Library Author: Adrian Witas