Framework for building data pipelines that:
- Extract batch of payloads with hierarchical structure like JSON or XML;
- Transform batch of payloads to batch of rows for a table defined by columns;
- Load batch of table rows to persistent storage.
pip install batchout
It is better explained by example.
Let's say we periodically fetch a JSON message from some REST API:
{
"user": {
"id": "someuserid",
"last_seen": "2019-11-01T00:00:00"
},
"sessions": [
{
"id": "somesessionid",
"created_at": "2019-10-01T00:00:00",
"device": {
"useragent": "chrome"
}
},
{
"id": "othersessionid",
"created_at": "2019-11-01T00:00:00",
"device": {
"useragent": "firefox"
}
}
]
}
Fetched data has to be put into database table user_session
like this:
user_id user_last_seen session_id session_created_at session_useragent
-------------------------------------------------------------------------------------------
someuserid 2019-11-01T00:00:00 somesessionid 2019-10-01T00:00:00 chrome
someuserid 2019-11-01T00:00:00 othersessionid 2019-11-01T00:00:00 firefox
With Batchout, you don't need to write boilerplate code.
Just use batchout.Batch
for configuring and running your pipeline.
from batchout import Batch
batch = Batch.from_config(
dict(
inputs=dict(
some_api=dict(
type='http',
method='get',
uri='https://some.api/my/user/sessions',
),
),
extractors=dict(
first_match_in_json=dict(
type='jsonpath',
),
),
indexes=dict(
session_idx=dict(
type='for_list',
path='sessions',
)
),
columns=dict(
user_id=dict(
type='string',
path='user.id',
),
user_last_seen=dict(
type='string',
path='user.last_seen',
),
session_id=dict(
type='string',
path='sessions[{session_idx}].id', # notice usage of session_idx defined as index above
),
session_created_at=dict(
type='timestamp',
path='sessions[{session_idx}].created_at',
),
session_useragent=dict(
type='timestamp',
path='sessions[{session_idx}].device.useragent',
),
),
selectors=dict(
all_sessions=dict(
type='sql',
query='select * from some_api',
columns=[
'user_id',
'user_last_seen',
'session_id',
'session_created_at',
'session_useragent',
]
)
),
outputs=dict(
local_db=dict(
type='postgres',
mode='upsert',
keys=['user_id', 'session_id'],
host='localhost',
port=5432,
dbname='somedb',
table='user_session',
from_env=dict(
user='DB_USER', # DB_USER and
password='DB_PASSWORD', # DB_PASSWORD are read from environment
),
),
),
tasks=dict(
read_sessions=dict(
type='reader',
inputs=['some_api'],
),
walk_sessions=dict(
type='walker',
inputs=['some_api'],
indexes=['session_idx'],
columns=[
'user_id',
'user_last_seen',
'session_id',
'session_created_at',
'session_useragent',
],
),
write_sessions_to_local_db=dict(
type='writer',
selector='all_sessions',
outputs=['local_db']
)
)
),
defaults=dict(
columns=dict(
extractor='first_match_in_json',
),
indexes=dict(
extractor='first_match_in_json',
),
),
)
batch.run_once()
Batch.run_once()
processes exactly one batch of payloads from each input.
Batch.run_forever()
does the same in infinite loop.