Skip to content

Commit

Permalink
committing multiple columns for Scala TSDF initialization (databricks…
Browse files Browse the repository at this point in the history
…labs#65)

* committing multiple columns for Scala TSDF initialization

* add daily resample
  • Loading branch information
rportilla-databricks authored Apr 14, 2021
1 parent bea4a7a commit 16b2424
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 59 deletions.
2 changes: 1 addition & 1 deletion python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
],
)
)
2 changes: 1 addition & 1 deletion python/tempo/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from tempo.tsdf import TSDF
from tempo.tsdf import TSDF
8 changes: 4 additions & 4 deletions python/tempo/resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@
SEC = 'sec'
MIN = 'min'
HR = 'hr'
DAY = 'day'

# define global aggregate function options for downsampling
CLOSEST_LEAD = "closest_lead"
MIN_LEAD = "min_lead"
MAX_LEAD = "max_lead"
MEAN_LEAD = "mean_lead"

allowableFreqs = [SEC, MIN, HR]
allowableFreqs = [SEC, MIN, HR, DAY]

def __appendAggKey(tsdf, freq = None):
"""
Expand All @@ -31,14 +32,13 @@ def __appendAggKey(tsdf, freq = None):
hour_col = f.hour(f.col(tsdf.ts_col))

if (freq == SEC):
#agg_key = f.concat(f.col(tsdf.ts_col).cast("date"), f.lpad(hour_col, 2, '0'), f.lpad(min_col, 2, '0'), f.lpad(sec_col, 2, '0'))
agg_key = f.concat(f.col(tsdf.ts_col).cast("date"), f.lit(" "), f.lpad(hour_col, 2, '0'), f.lit(':'), f.lpad(min_col, 2, '0'), f.lit(':'), f.lpad(sec_col, 2, '0')).cast("timestamp")
elif (freq == MIN):
#agg_key = f.concat(f.col(tsdf.ts_col).cast("date"), f.lpad(hour_col, 2, '0'), f.lpad(min_col, 2, '0'))
agg_key = f.concat(f.col(tsdf.ts_col).cast("date"), f.lit(' '), f.lpad(hour_col, 2, '0'), f.lit(':'), f.lpad(min_col, 2, '0'), f.lit(':'), f.lit('00')).cast("timestamp")
elif (freq == HR):
#agg_key = f.concat(f.col(tsdf.ts_col).cast("date"), f.lpad(hour_col, 2, '0'))
agg_key = f.concat(f.col(tsdf.ts_col).cast("date"), f.lit(' '), f.lpad(hour_col, 2, '0'), f.lit(':'), f.lit('00'), f.lit(':'), f.lit('00')).cast("timestamp")
elif (freq == DAY):
agg_key = f.col(tsdf.ts_col).cast("date").cast("timestamp")

df = df.withColumn("agg_key", agg_key)
return tempo.TSDF(df, tsdf.ts_col, partition_cols = tsdf.partitionCols)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,6 @@ object EMA {

val emaDF = Range(0,window+1).foldLeft(df)((df:DataFrame, k: Int) => emaIncrement(df,k))

TSDF(emaDF, tsdf.tsColumn.name, tsdf.partitionCols.map(_.name):_*)
TSDF(emaDF, tsdf.tsColumn.name, tsdf.partitionCols.map(_.name))
}
}
58 changes: 29 additions & 29 deletions scala/tempo/src/main/scala/com/databrickslabs/tempo/TSDF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ sealed trait TSDF
* @param partitionCols the names of columns used to partition the SeqDF
* @return a new SeqDF instance, partitioned according to the given columns
*/
def partitionedBy(partitionCols: String*): TSDF
def partitionedBy(partitionCols: Seq[String]): TSDF

def select(cols: Column*): TSDF

Expand Down Expand Up @@ -167,7 +167,7 @@ sealed trait TSDF
*/
private[tempo] sealed class BaseTSDF(val df: DataFrame,
val tsColumn: StructField,
val partitionCols: StructField* )
val partitionCols: Seq[StructField] )
extends TSDF
{
// Validate the arguments
Expand Down Expand Up @@ -212,11 +212,11 @@ private[tempo] sealed class BaseTSDF(val df: DataFrame,
* @param partitionCols the names of columns used to partition the SeqDF
* @return a new SeqDF instance, partitioned according to the given columns
*/
def partitionedBy(partitionCols: String*): TSDF =
TSDF(df, tsColumn.name, partitionCols :_*)
def partitionedBy(partitionCols: Seq[String]): TSDF =
TSDF(df, tsColumn.name, partitionCols)

def withColumn(colName: String, col: Column): TSDF = {
TSDF(df.withColumn(colName, col), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name).mkString)
TSDF(df.withColumn(colName, col), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name))
}

def select(cols: Column*): TSDF = {
Expand All @@ -227,7 +227,7 @@ private[tempo] sealed class BaseTSDF(val df: DataFrame,

//if (!timeAndPartitionsPresent) {throw new RuntimeException("Timestamp column or partition columns are missing")}

TSDF(df.select(cols:_*), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name).mkString)
TSDF(df.select(cols:_*), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name))
}

def select(col: String,
Expand All @@ -240,56 +240,56 @@ private[tempo] sealed class BaseTSDF(val df: DataFrame,

//if (!timeAndPartitionsPresent) {throw new RuntimeException("Timestamp column or partition columns are missing")}

TSDF(df.select(masterList.head, masterList.tail:_*), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name).mkString)
TSDF(df.select(masterList.head, masterList.tail:_*), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name))
}

def selectExpr(exprs: String*): TSDF = {
TSDF(df.selectExpr(exprs:_*), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name).mkString)
TSDF(df.selectExpr(exprs:_*), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name))
}

def filter(condition: Column): TSDF = {
TSDF(df.filter(condition), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name).mkString)
TSDF(df.filter(condition), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name))
}

def filter(conditionExpr: String): TSDF = {
TSDF(df.filter(conditionExpr), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name).mkString)
TSDF(df.filter(conditionExpr), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name))
}

def where(condition: Column): TSDF = {
TSDF(df.where(condition), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name).mkString)
TSDF(df.where(condition), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name))
}

def where(conditionExpr: String): TSDF = {
TSDF(df.where(conditionExpr), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name).mkString)
TSDF(df.where(conditionExpr), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name))
}

def limit(n: Int): TSDF = {
TSDF(df.limit(n), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name).mkString)
TSDF(df.limit(n), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name))
}

def union(other: TSDF): TSDF = {
TSDF(df.union(other.df), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name).mkString)
TSDF(df.union(other.df), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name))
}

def unionAll(other: TSDF): TSDF = {
TSDF(df.unionAll(other.df), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name).mkString)
TSDF(df.unionAll(other.df), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name))
}

def withColumnRenamed(existingName: String,
newName: String): TSDF = {
TSDF(df.withColumnRenamed(existingName, newName), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name).mkString)
TSDF(df.withColumnRenamed(existingName, newName), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name))
}

def drop(colName: String): TSDF = {
TSDF(df.drop(colName), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name).mkString)
TSDF(df.drop(colName), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name))
}

def drop(colNames: String*): TSDF = {
TSDF(df.drop(colNames:_*), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name).mkString)
TSDF(df.drop(colNames:_*), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name))
}

def drop(col: Column): TSDF = {
TSDF(df.drop(col), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name).mkString)
TSDF(df.drop(col), tsColumnName = tsColumn.name, partitionColumnNames = partitionCols.map(_.name))
}

// Window builder functions
Expand Down Expand Up @@ -361,8 +361,6 @@ private[tempo] sealed class BaseTSDF(val df: DataFrame,
tsPartitionVal: Int = 0,
fraction: Double = 0.1): TSDF = {

if (tsPartitionVal > 0) {println("WARNING: You are using the skew version of the AS OF join. This may result in null values if there are any values outside of the maximum lookback. For maximum efficiency, choose smaller values of maximum lookback, trading off performance and potential blank AS OF values for sparse keys")}

if (leftPrefix == "" && tsPartitionVal == 0) {
asofJoinExec(this,rightTSDF, leftPrefix = None, rightPrefix, maxLookback, tsPartitionVal = None, fraction)
}
Expand Down Expand Up @@ -399,7 +397,7 @@ private[tempo] sealed class BaseTSDF(val df: DataFrame,

vwapped = vwapped.withColumnRenamed("time_group", "event_ts")

TSDF( vwapped, tsColumn.name, partitionColumnNames = partitionCols.map(_.name).mkString )
TSDF( vwapped, tsColumn.name, partitionColumnNames = partitionCols.map(_.name) )
}
//
def rangeStats(colsToSummarise: Seq[String] = Seq(), rangeBackWindowSecs: Int = 1000): TSDF = {
Expand Down Expand Up @@ -506,7 +504,7 @@ private[tempo] sealed class BaseTSDF(val df: DataFrame,
collect_list(col(tempArrayColName)).over(lookback_win))
.drop(tempArrayColName))

val pCols = partitionCols.map(_.name).mkString
val pCols = partitionCols.map(_.name)


// make sure only windows of exact size are allowed
Expand Down Expand Up @@ -549,7 +547,9 @@ object TSDF {
* otherwise a [[NoSuchElementException]] is thrown
*/
private[tempo] def colByName(df: DataFrame)(colName: String): StructField = {
df.schema.find(_.name.toLowerCase() == colName.toLowerCase()).get

df.schema.find(x => {x.name.toLowerCase().equalsIgnoreCase(colName.toLowerCase())}).get

}

// TSDF Constructors
Expand All @@ -562,12 +562,12 @@ object TSDF {
*/
def apply(df: DataFrame,
tsColumnName: String,
partitionColumnNames: String*): TSDF = {
partitionColumnNames: Seq[String]): TSDF = {
val colFinder = colByName(df) _
val tsColumn = colFinder(tsColumnName)
val partitionCols = partitionColumnNames.map(colFinder)

new BaseTSDF(df, tsColumn, partitionCols: _*)
new BaseTSDF(df, tsColumn, partitionCols)
}

/**
Expand Down Expand Up @@ -597,7 +597,7 @@ object TSDF {
val newDF = df.withColumn(sequenceColName, row_number().cast(LongType).over(seq_win))

// construct our TSDF
apply(newDF, sequenceColName, partitionCols: _*)
apply(newDF, sequenceColName, partitionCols)
}

/**
Expand Down Expand Up @@ -629,8 +629,8 @@ object TSDF {
* @return a [[TSDF]] based on the current [[DataFrame]]
*/
def toTSDF(tsColumnName: String,
partitionColumnNames: String*): TSDF =
TSDF(df, tsColumnName, partitionColumnNames: _*)
partitionColumnNames: Seq[String]): TSDF =
TSDF(df, tsColumnName, partitionColumnNames)

/**
* Convert a [[DataFrame]] to [[TSDF]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ object asofJoin {
.foldLeft(Seq[String]())((newPartCols, PartCol) =>
newPartCols :+ (if (col_list.contains(PartCol)) prefix + PartCol.name else PartCol.name))

TSDF(prefixedDF,ts_colName, partitionColstrings:_*)
TSDF(prefixedDF,ts_colName, partitionColstrings)
}

// Add columns from other DF ahead of combining them through Union
Expand All @@ -40,7 +40,7 @@ object asofJoin {
.withColumn(colStruct.name, lit(null)))

// TODO: fix partition column names, it's not nice this way
TSDF(newDF, tsdf.tsColumn.name, tsdf.partitionCols.map(_.name):_*)
TSDF(newDF, tsdf.tsColumn.name, tsdf.partitionCols.map(_.name))
}

// union the time series A and time series B data frames
Expand All @@ -57,7 +57,7 @@ object asofJoin {
coalesce(col(leftTSDF.tsColumn.name), col(rightTSDF.tsColumn.name)))
.withColumn("rec_ind", coalesce(col(left_rec_ind), col(right_rec_ind)))

TSDF(combinedDF, combinedTsCol, leftTSDF.partitionCols.map(_.name):_*)
TSDF(combinedDF, combinedTsCol, leftTSDF.partitionCols.map(_.name))
}

// helper method to obtain the columns from time series B to paste onto time series A
Expand All @@ -84,7 +84,7 @@ object asofJoin {

df = df.drop("rec_ind")

TSDF(df, left_ts_col.name, tsdf.partitionCols.map(_.name):_*)
TSDF(df, left_ts_col.name, tsdf.partitionCols.map(_.name))
}

// Create smaller time-based ranges inside of the TSDF partition
Expand Down Expand Up @@ -112,7 +112,7 @@ object asofJoin {

partitionDF.unpersist()

TSDF(df, combinedTSDF.tsColumn.name, newPartitionColNames:_*)
TSDF(df, combinedTSDF.tsColumn.name, newPartitionColNames)
}

// wrapper method for executing the AS OF join based on various parameters
Expand Down Expand Up @@ -169,7 +169,7 @@ object asofJoin {
.filter(col("is_original") === lit(1))
.drop("ts_partition","is_original")

TSDF(asofDF, prefixedLeftTSDF.tsColumn.name, leftTSDF.partitionCols.map(_.name):_*)
TSDF(asofDF, prefixedLeftTSDF.tsColumn.name, leftTSDF.partitionCols.map(_.name))
}
case false => getLastRightRow(combinedTSDF, prefixedLeftTSDF.tsColumn,rightCols, maxLookback)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object rangeStats {
derivedDf
})

TSDF(summaryDF, tsdf.tsColumn.name, tsdf.partitionCols.map(_.name):_*)
TSDF(summaryDF, tsdf.tsColumn.name, tsdf.partitionCols.map(_.name))
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ val allowableFreqs = List(SEC, MIN, HR)
}

df = df.withColumn("agg_key", agg_key)
return TSDF(df, tsColumnName= tsdf.tsColumn.name, partitionColumnNames = tsdf.partitionCols.map(x => x.name): _*)
return TSDF(df, tsColumnName= tsdf.tsColumn.name, partitionColumnNames = tsdf.partitionCols.map(x => x.name))
}

/**
Expand Down Expand Up @@ -92,7 +92,7 @@ val allowableFreqs = List(SEC, MIN, HR)
}

res = res.drop(tsdf.tsColumn.name).withColumnRenamed("agg_key", tsdf.tsColumn.name)
return(TSDF(res, tsColumnName = tsdf.tsColumn.name, partitionColumnNames = tsdf.partitionCols.map(x => x.name): _*))
return(TSDF(res, tsColumnName = tsdf.tsColumn.name, partitionColumnNames = tsdf.partitionCols.map(x => x.name)))
}


Expand Down
Loading

0 comments on commit 16b2424

Please sign in to comment.