Skip to content

Commit

Permalink
Refactor partition cols (databrickslabs#31)
Browse files Browse the repository at this point in the history
* intermediate commit - refactored partitionCols into class-level attribute. Still seeing errors in asOfJoin test code

* Updated test code to use new function signatures
  • Loading branch information
tnixon authored Oct 12, 2020
1 parent 51cd086 commit 93c349e
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 44 deletions.
137 changes: 97 additions & 40 deletions tempo/tsdf.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,42 @@

class TSDF:

def __init__(self, df, ts_col = "EVENT_TS"):
self.df = df
self.ts_col = ts_col

def __init__(self, df, ts_col = "EVENT_TS", partitionCols = []):
"""
Constructor
:param df:
:param ts_col:
:param partitionCols:
"""
self.df = df
self.ts_col = self.__validated_column(ts_col)
self.partitionCols = self.__validated_columns(partitionCols)

##
## Helper functions
##

def __validated_column(self,colname):
if type(colname) != str:
raise TypeError(f"Column names must be of type str; found {type(colname)} instead!")
if colname.lower() not in [col.lower() for col in self.df.columns]:
raise ValueError(f"Column {colname} not found in Dataframe")
return colname

def __validated_columns(self,colnames):
# if provided a string, treat it as a single column
if type(colnames) == str:
colnames = [ colnames ]
# otherwise we really should have a list or None
if colnames is None:
colnames = []
elif type(colnames) != list:
raise TypeError(f"Columns must be of type list, str, or None; found {type(colname)} instead!")
# validate each column
for col in colnames:
self.__validated_column(col)
return colnames

def __createTimeSeriesDF(self, df, ts_select_cols, fillLeft = True, partitionCols = []):
left_ts_val, right_ts_val = (col(self.ts_col), lit(None)) if fillLeft else (lit(None),col(self.ts_col))

Expand Down Expand Up @@ -55,8 +87,33 @@ def __getLastRightTs(self,UnionDF, ts_select_cols, partitionCols = []):
.filter((col(ts_select_cols[2])== col(ts_select_cols[0]).cast("double")) &
(col("is_original") == lit(1))) # Remove the overlapping partition parts in case we made use of time-partitions.
.select(partitionCols + [ts_select_cols[0],ts_select_cols[1]]))

def asofJoin(self, right_DF, right_ts_col_name = None, partitionCols = [], tsPartitionVal = None, fraction = 0.5, asof_prefix = None):

def __baseWindow(self):
w = Window().orderBy(fn.col(self.ts_col).cast("long"))
if self.partitionCols:
w = w.partitionBy([fn.col(elem) for elem in self.partitionCols])
return w

def __rangeBetweenWindow(self, range_from, range_to):
return self.__baseWindow().rangeBetween(range_from, range_to)

def __rowsBetweenWindow(self, rows_from, rows_to):
return self.__baseWindow().rowsBetween(rows_from, rows_to)

##
## TSDF API functions
##

def withPartitionCols(self, partitionCols):
"""
Sets certain columns of the TSDF as partition columns. Partition columns are those that differentiate distinct timeseries
from each other.
:param partitionCols: a list of columns used to partition distinct timeseries
:return: a TSDF object with the given partition columns
"""
return TSDF( self.df, self.ts_col, partitionCols )

def asofJoin(self, right_DF, right_ts_col_name = None, tsPartitionVal = None, fraction = 0.5, asof_prefix = None):

right_ts_col_name = self.ts_col if right_ts_col_name is None else right_ts_col_name

Expand All @@ -66,24 +123,25 @@ def asofJoin(self, right_DF, right_ts_col_name = None, partitionCols = [], tsPar

# in order to avoid duplicate output columns from the join, we'll supply a prefix for asof fields only
prefix = asof_prefix + '_' if asof_prefix is not None else ''
right_DF = right_DF.toDF(*(prefix+c if c not in list(set().union(partitionCols,ts_select_cols, [self.ts_col, right_ts_col_name])) else c for c in right_DF.columns))
unionDF = self.__getUnionDF(right_DF.withColumnRenamed(right_ts_col_name,self.ts_col),ts_select_cols,partitionCols)
right_DF = right_DF.toDF(*(prefix+c if c not in list(set().union(self.partitionCols,ts_select_cols, [self.ts_col, right_ts_col_name])) else c for c in right_DF.columns))
unionDF = self.__getUnionDF(right_DF.withColumnRenamed(right_ts_col_name,self.ts_col),ts_select_cols,self.partitionCols)

# Only make use of time partitions if tsPartitionVal was supplied
if tsPartitionVal is None:
asofDF = self.__getLastRightTs(unionDF,ts_select_cols,partitionCols)
asofDF = self.__getLastRightTs(unionDF,ts_select_cols,self.partitionCols)
else:
tsPartitionDF = self.__getTimePartitions(unionDF,ts_select_cols,tsPartitionVal, fraction = fraction)
asofDF = self.__getLastRightTs(tsPartitionDF,ts_select_cols,partitionCols = partitionCols + ["ts_partition"])
asofDF = self.__getLastRightTs(tsPartitionDF,ts_select_cols,partitionCols = self.partitionCols + ["ts_partition"])

# Now need to join asofDF to self_df and right_df to get all the columns from the original dataframes.
joinedDF = (asofDF
.join(self.df.withColumnRenamed(self.ts_col,ts_select_cols[0]),[ts_select_cols[0]]+ partitionCols)
.join(right_DF.withColumnRenamed(right_ts_col_name, ts_select_cols[1]),[ts_select_cols[1]] + partitionCols)
.join(self.df.withColumnRenamed(self.ts_col,ts_select_cols[0]),[ts_select_cols[0]]+ self.partitionCols)
.join(right_DF.withColumnRenamed(right_ts_col_name, ts_select_cols[1]),[ts_select_cols[1]] + self.partitionCols)
)
return TSDF( joinedDF, self.ts_col )

return TSDF(joinedDF, ts_select_cols[0], self.partitionCols)

def vwap(self, frequency='m',volume_col = "volume", price_col = "price", partitionCols = ['symbol']):
def vwap(self, frequency='m',volume_col = "volume", price_col = "price"):
# set pre_vwap as self or enrich with the frequency
pre_vwap = self.df
print('input schema: ', pre_vwap.printSchema())
Expand All @@ -95,33 +153,40 @@ def vwap(self, frequency='m',volume_col = "volume", price_col = "price", partiti
elif frequency == 'D':
pre_vwap = self.df.withColumn("time_group", concat(lpad(day(col(self.ts_col)), 2, '0')))

vwapped = pre_vwap.withColumn("dllr_value", col(price_col) * col(volume_col)).groupby(partitionCols + ['time_group']).agg(
sum('dllr_value').alias("dllr_value"), sum(volume_col).alias(volume_col),
max(price_col).alias("_".join(["max",price_col]))).withColumn("vwap", col("dllr_value") / col(volume_col))
return TSDF( vwapped, self.ts_col )
group_cols = ['time_group']
if self.partitionCols:
group_cols.extend(self.partitionCols)
vwapped = ( pre_vwap.withColumn("dllr_value", col(price_col) * col(volume_col))
.groupby(group_cols)
.agg( sum('dllr_value').alias("dllr_value"),
sum(volume_col).alias(volume_col),
max(price_col).alias("_".join(["max",price_col])) )
.withColumn("vwap", col("dllr_value") / col(volume_col)) )

return TSDF( vwapped, self.ts_col, self.partitionCols )

def EMA(self,colName,window=30,exp_factor = 0.2,partitionCols = []):
def EMA(self,colName,window=30,exp_factor = 0.2):
# Constructs an approximate EMA in the fashion of:
# EMA = e * lag(col,0) + e * (1 - e) * lag(col, 1) + e * (1 - e)^2 * lag(col, 2) etc, up until window

# Initialise EMA column:
emaColName = "_".join(["EMA",colName])
df = self.df.withColumn(emaColName,lit(0)).orderBy(self.ts_col)
w = self.__baseWindow()
# Generate all the lag columns:
for i in range(window):
lagColName = "_".join(["lag",colName,str(i)])
weight = exp_factor * (1 - exp_factor)**i
df = df.withColumn(lagColName, weight * (lag(col(colName),i).over(Window.partitionBy(partitionCols).orderBy(lit(1)))))
df = df.withColumn(lagColName, weight * lag(col(colName),i).over(w) )
df = df.withColumn(emaColName,col(emaColName) + when(col(lagColName).isNull(),lit(0)).otherwise(col(lagColName))).drop(lagColName) # Nulls are currently removed

return TSDF( df, self.ts_col )
return TSDF( df, self.ts_col, self.partitionCols )

def withLookbackFeatures(self,
featureCols,
lookbackWindowSize,
exactSize=True,
featureColName="features",
partitionCols=[]):
featureColName="features"):
"""
Creates a 2-D feature tensor suitable for training an ML model to predict current values from the history of
some set of features. This function creates a new column containing, for each observation, a 2-D array of the values
Expand All @@ -135,18 +200,14 @@ def withLookbackFeatures(self,
observations that occurred less than lookbackWindowSize from the start of the timeseries. If False, no truncation
occurs, and the column may contain arrays less than lookbackWindowSize in length.
:param featureColName: The name of the feature column to be generated. Defaults to "features"
:param partitionCols: The names of any partition columns (columns whose values partition the DataFrame into
independent timeseries)
:return: a DataFrame with a feature column named featureColName containing the lookback feature tensor
"""
# first, join all featureCols into a single array column
tempArrayColName = "__TempArrayCol"
feat_array_tsdf = self.df.withColumn(tempArrayColName, fn.array(featureCols))

# construct a lookback array
lookback_win = Window.orderBy(self.ts_col).rowsBetween(-lookbackWindowSize, -1)
if partitionCols:
lookback_win = lookback_win.partitionBy(partitionCols)
lookback_win = self.__rowsBetweenWindow(-lookbackWindowSize, -1)
lookback_tsdf = (feat_array_tsdf.withColumn(featureColName,
fn.collect_list(fn.col(tempArrayColName)).over(lookback_win))
.drop(tempArrayColName))
Expand All @@ -155,15 +216,14 @@ def withLookbackFeatures(self,
if exactSize:
return lookback_tsdf.where(fn.size(featureColName) == lookbackWindowSize)

return TSDF( lookback_tsdf, self.ts_col )
return TSDF( lookback_tsdf, self.ts_col, self.partitionCols )

def withRangeStats(self, type='range', partitionCols=[], colsToSummarize=[], rangeBackWindowSecs=1000):
def withRangeStats(self, type='range', colsToSummarize=[], rangeBackWindowSecs=1000):
"""
Create a wider set of stats based on all numeric columns by default
Users can choose which columns they want to summarize also. These stats are:
mean/count/min/max/sum/std deviation/zscore
:param type - this is created in case we want to extend these stats to lookback over a fixed number of rows instead of ranging over column values
:param partitionCols - list of partitions columns to be used for the range windowing
:param colsToSummarize - list of user-supplied columns to compute stats for. All numeric columns are used if no list is provided
:param rangeBackWindowSecs - lookback this many seconds in time to summarize all stats. Note this will look back from the floor of the base event timestamp (as opposed to the exact time since we cast to long)
Assumptions:
Expand All @@ -173,27 +233,24 @@ def withRangeStats(self, type='range', partitionCols=[], colsToSummarize=[], ran
4. There is a cast to long from timestamp so microseconds or more likely breaks down - this could be more easily handled with a string timestamp or sorting the timestamp itself. If using a 'rows preceding' window, this wouldn't be a problem
"""

# build window
w = ( Window().orderBy(col(self.ts_col).cast("long"))
.rangeBetween(-1 * rangeBackWindowSecs, 0) )
if partitionCols:
w = w.partitionBy([col(elem) for elem in partitionCols])

# identify columns to summarize if not provided
# these should include all numeric columns that
# are not the timestamp column and not any of the partition columns
if not colsToSummarize:
# columns we should never summarize
prohibited_cols = [ self.ts_col.lower() ]
if partitionCols:
prohibited_cols.extend([ pc.lower() for pc in partitionCols])
if self.partitionCols:
prohibited_cols.extend([ pc.lower() for pc in self.partitionCols])
# types that can be summarized
summarizable_types = ['int', 'bigint', 'float', 'double']
# filter columns to find summarizable columns
colsToSummarize = [datatype[0] for datatype in self.df.dtypes if
((datatype[1] in summarizable_types) and
(datatype[0].lower() not in prohibited_cols))]

# build window
w = self.__rangeBetweenWindow(-1 * rangeBackWindowSecs, 0)

# compute column summaries
selectedCols = self.df.columns
derivedCols = []
Expand All @@ -210,4 +267,4 @@ def withRangeStats(self, type='range', partitionCols=[], colsToSummarize=[], ran
#print(derivedCols)
summary_df = selected_df.select(*selected_df.columns, *derivedCols)

return TSDF( summary_df, self.ts_col )
return TSDF(summary_df, self.ts_col, self.partitionCols)
8 changes: 4 additions & 4 deletions tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ def test_asof_join(self):
dfExpected = self.buildTestDF(expectedSchema, expected_data, ["EVENT_TS_right", "EVENT_TS_left"])

# perform the join
tsdf_left = TSDF(dfLeft)
joined_df = tsdf_left.asofJoin(dfRight, partitionCols=["symbol"]).df
tsdf_left = TSDF(dfLeft, partitionCols=["symbol"])
joined_df = tsdf_left.asofJoin(dfRight).df

# joined dataframe should equal the expected dataframe
self.assertDataFramesEqual( joined_df, dfExpected )
Expand Down Expand Up @@ -177,10 +177,10 @@ def test_range_stats(self):
dfExpected = self.buildTestDF(expectedSchema,expected_data)

# convert to TSDF
tsdf_left = TSDF(df)
tsdf_left = TSDF(df, partitionCols=["symbol"])

# using lookback of 20 minutes
featured_df = tsdf_left.withRangeStats(partitionCols=["symbol"], rangeBackWindowSecs=1200).df
featured_df = tsdf_left.withRangeStats(rangeBackWindowSecs=1200).df

# cast to decimal with precision in cents for simplicity
featured_df = featured_df.select(F.col("symbol"), F.col("event_ts"),
Expand Down

0 comments on commit 93c349e

Please sign in to comment.