-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdatapipeline.py
76 lines (60 loc) · 2.07 KB
/
datapipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import click
from baskets.src.transforms import (
chi2normal_transformation, percentile_score, dropna,
resample_return, columns_selector, stdout)
from baskets.utils import generator, processor, apply
import pandas as pd
import types
@click.group(chain=True)
def data_pipeline():
"""Data-Pipeline
"""
pass
@data_pipeline.command("open")
@click.option('-i', '--inputs', 'inputs', type=click.Path(),
multiple=True, help='inputfile to open.')
@click.option('-x', '--header', 'index_col', multiple=True, type=int)
@generator
def make_open(inputs, index_col):
for input_ in inputs:
try:
data = pd.read_csv(input_, header=list(index_col), index_col=0,
parse_dates=True)
yield data
except Exception as e:
click.echo('Could not input file "%s": %s' % (input_, e), err=True)
# making transforma functions
transforms = [chi2normal_transformation, percentile_score, dropna,
resample_return, columns_selector, stdout]
for transform in transforms:
g = apply(transform)
data_pipeline.command()(g)
@data_pipeline.command("count_na")
@apply
def count_na(data):
click.echo(data.isna().sum())
@data_pipeline.command()
@click.option("-o", "--output", "output", type=click.File("w"))
@apply
def dump(data, output):
content = data.to_csv()
output.writelines(content)
@data_pipeline.resultcallback()
def process_commands(processors):
"""This result callback is invoked with an iterable of all the chained
subcommands. As in this example each subcommand returns a function
we can chain them together to feed one into the other, similar to how
a pipe on unix works.
""[summary]
"""
# Start with an empty iterable.
stream = ()
# Pipe it through all stream processors.
for processor in processors:
if processor is not None:
stream = processor(stream)
if isinstance(stream, types.GeneratorType):
for pipe in stream:
pass
if __name__ == "__main__":
data_pipeline(prog_name="data-pipline")