Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: Basic anomaly detector #393

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
ENH: Basic anomaly detector
  • Loading branch information
tibkiss committed Nov 19, 2020
commit 07342c7a63fa5393b2fd3e0203e493341b873e4b
3 changes: 3 additions & 0 deletions sqlparser/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"github.com/alpacahq/marketstore/v4/contrib/candler/candlecandler"
"github.com/alpacahq/marketstore/v4/contrib/candler/tickcandler"
"github.com/alpacahq/marketstore/v4/uda"
"github.com/alpacahq/marketstore/v4/uda/anomaly"
"github.com/alpacahq/marketstore/v4/uda/avg"
"github.com/alpacahq/marketstore/v4/uda/count"
"github.com/alpacahq/marketstore/v4/uda/gap"
Expand All @@ -26,4 +27,6 @@ var AggRegistry = map[string]uda.AggInterface{
"avg": &avg.Avg{},
"Gap": &gap.Gap{},
"gap": &gap.Gap{},
"anomaly": &anomaly.Anomaly{},
"Anomaly": &anomaly.Anomaly{},
}
4 changes: 2 additions & 2 deletions tests/integ/dockerfiles/pyclient/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ test-jsonrpc:
# for json-RPC API
docker exec -e MARKETSTORE_PORT=5993 -e USE_GRPC=false $(CONTAINER_NAME) \
bash -c \
"pytest -v -v -v $(TEST_FILENAME)"
"pytest -v -v -v -s $(TEST_FILENAME)"

test-grpc:
# for gRPC API
docker exec -e MARKETSTORE_PORT=5995 -e USE_GRPC=true $(CONTAINER_NAME) \
bash -c \
"pytest -v -v -v $(TEST_FILENAME)"
"pytest -v -v -v -s $(TEST_FILENAME)"
79 changes: 79 additions & 0 deletions tests/integ/tests/test_anomaly_detector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""
Integration Test for anomaly detector
"""
import pytest
import os

import numpy as np
import pandas as pd
from pandas.testing import assert_frame_equal

import pymarketstore as pymkts

# Constants
DATA_TYPE_TICK = [('Epoch', 'i8'), ('Bid', 'f4'), ('Ask', 'f4'), ('Nanoseconds', 'i4')]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i4 is the right type for Nanoseconds, but later in the tests, all nanosec data are given as floats, so numpy trucates all of them to 0. I'm don't think it's causing much trouble, but better be on the safe side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, we need to address that

DATA_TYPE_CANDLE = [('Epoch', 'i8'), ('Open', 'f8'), ('High', 'f8'), ('Low', 'f8'), ('Close', 'f8'), ('Volume', 'f8')]
MARKETSTORE_HOST = "localhost"
MARKETSTORE_PORT = 5993

client = pymkts.Client(f"http://127.0.0.1:{os.getenv('MARKETSTORE_PORT',5993)}/rpc",
grpc=(os.getenv("USE_GRPC", "false") == "true"))


def timestamp(datestr):
return int(pd.Timestamp(datestr).value / 10 ** 9)


@pytest.mark.parametrize('symbol, columns, detection_type, threshold, data, expected_df', [
('AT_SINGLE_COL_FIXED', 'Ask', 'fixed_pct', '0.045',
[(timestamp('2019-01-01 04:19:00'), 15, 11, 0.01), # epoch, bid, ask, nanosecond
(timestamp('2019-01-01 04:19:01'), 20, 11.5, 0.02),
(timestamp('2019-01-02 05:59:59'), 30, 11.6, 0.03)],
pd.DataFrame(index=[pd.Timestamp('2019-01-01 04:19:01', tz='UTC')],
data=np.array([1], dtype='uint64'), columns=['ColumnsBitmap']).rename_axis('Epoch')),
('AT_MULTI_COL_FIXED', 'Bid,Ask', 'fixed_pct', '0.045',
[(timestamp('2019-01-01 04:19:00'), 15, 11, 0.01),
(timestamp('2019-01-01 04:19:01'), 20, 11.5, 0.02),
(timestamp('2019-01-02 05:59:59'), 30, 11.6, 0.03)],
pd.DataFrame(index=[pd.Timestamp('2019-01-01 04:19:01', tz='UTC'),
pd.Timestamp('2019-01-02 05:59:59', tz='UTC')],
data=np.array([3, 1], dtype='uint64'), columns=['ColumnsBitmap']).rename_axis('Epoch')),
('AT_SINGLE_COL_ZSCORE', 'Bid', 'z_score', '1.0',
[(timestamp('2019-01-01 04:19:00'), 10.1, 11, 0.01),
(timestamp('2019-01-01 04:19:01'), 10.2, 11.5, 0.02),
(timestamp('2019-01-01 04:19:03'), 10.1, 11.5, 0.03),
(timestamp('2019-01-01 04:19:04'), 10.3, 11.5, 0.04),
(timestamp('2019-01-01 04:19:05'), 10.2, 11.5, 0.05),
(timestamp('2019-01-01 04:19:06'), 10.2, 11.5, 0.06),
(timestamp('2019-01-02 05:59:59'), 100.1, 11.6, 0.07)],
pd.DataFrame(index=[pd.Timestamp('2019-01-02 05:59:59', tz='UTC')],
data=np.array([1], dtype='uint64'), columns=['ColumnsBitmap']).rename_axis('Epoch')),
('AT_MULTI_COL_ZSCORE', 'Bid,Ask', 'z_score', '1.0',
[(timestamp('2019-01-01 04:19:00'), 10.1, 11, 0.01),
(timestamp('2019-01-01 04:19:01'), 10.2, 11.5, 0.02),
(timestamp('2019-01-01 04:19:03'), 10.1, 0.0015, 0.03),
(timestamp('2019-01-01 04:19:04'), 10.3, 11.5, 0.04),
(timestamp('2019-01-01 04:19:05'), 10.2, 11.5, 0.05),
(timestamp('2019-01-01 04:19:06'), 10.2, 11.5, 0.06),
(timestamp('2019-01-02 05:59:59'), 100.1, 11.6, 0.07)],
pd.DataFrame(index=[pd.Timestamp('2019-01-01 04:19:04', tz='UTC'),
pd.Timestamp('2019-01-02 05:59:59', tz='UTC')],
data=np.array([2,1], dtype='uint64'), columns=['ColumnsBitmap']).rename_axis('Epoch')),
])
def test_anomaly_one_symbol(symbol, columns, detection_type, threshold, data, expected_df):
# ---- given ----
tbk = "{}/1Sec/TICK".format(symbol)
client.destroy(tbk)
client.write(np.array(data, dtype=DATA_TYPE_TICK), tbk, isvariablelength=True)

# ---- when ----
params = pymkts.Params(symbol, '1Sec', 'TICK')
params.functions = [f"anomaly('{columns}', '{detection_type}', '{threshold}')"]
reply = client.query(params)

# ---- then ----
actual_df = reply.first().df()
print()
print(actual_df)
print(expected_df)
assert_frame_equal(actual_df, expected_df)
227 changes: 227 additions & 0 deletions uda/anomaly/anomaly.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package anomaly

import (
"fmt"
"github.com/alpacahq/marketstore/v4/uda"
"github.com/alpacahq/marketstore/v4/utils/functions"
"github.com/alpacahq/marketstore/v4/utils/io"
"gonum.org/v1/gonum/floats"
"gonum.org/v1/gonum/stat"
"math"
"sort"
"strconv"
"strings"
)

var (
requiredColumns = []io.DataShape{}
optionalColumns = []io.DataShape{}
initArgs = []io.DataShape{}
)

const (
DetectByZScore = "z_score"
DetectByFixedPct = "fixed_pct"
DefaultZScoreThreshold = 3.0
)

type Anomaly struct {
uda.AggInterface

ArgMap *functions.ArgumentMap
Columns []string
DetectionType string
Threshold float64

AnomalyIdxsByColumn map[int64]uint64
Input *io.ColumnInterface
}

func (a Anomaly) New() (out uda.AggInterface, am *functions.ArgumentMap) {
gx := NewAnomaly(requiredColumns, optionalColumns)
return gx, gx.ArgMap
}

func NewAnomaly(inputColumns, optionalInputColumns []io.DataShape) (g *Anomaly) {
g = new(Anomaly)
g.ArgMap = functions.NewArgumentMap(inputColumns, optionalInputColumns...)
return g
}

func (a *Anomaly) GetRequiredArgs() []io.DataShape {
return requiredColumns
}
func (a *Anomaly) GetOptionalArgs() []io.DataShape {
return optionalColumns
}
func (a *Anomaly) GetInitArgs() []io.DataShape {
return initArgs
}

func (a *Anomaly) Init(args ...interface{}) error {
a.Reset()

// select anomaly('bid,ask', 'fixed_pct', 0.15) from `ORCL/1Sec/TRADE`;
// select anomaly('price,qty', 'z_score', 3.0) from `ORCL/1Sec/TRADE`;

if len(args) != 1 && len(args[0].([]string)) != 3 {
return fmt.Errorf("not enough parameters. expected: columns, detectionType, threshold")
}

argz := args[0].([]string)
columns := argz[0]
detectionType := argz[1]
threshold := argz[2]

a.Columns = strings.Split(columns, ",")

switch detectionType {
case DetectByFixedPct:
fallthrough
case DetectByZScore:
break
default:
return fmt.Errorf("invalid detection type: %v", detectionType)
}
a.DetectionType = detectionType

var err error
a.Threshold, err = strconv.ParseFloat(threshold, 10)
if err != nil {
return fmt.Errorf("error parsing threshold: %w", err)
}

return nil
}

func (a *Anomaly) Reset() {
a.AnomalyIdxsByColumn = make(map[int64]uint64)
a.Input = nil
a.DetectionType = DetectByZScore
a.Threshold = DefaultZScoreThreshold
}

func (a *Anomaly) Accum(cols io.ColumnInterface) (err error) {
a.Input = &cols

if cols.Len() == 0 {
return nil
}

for columnNr, columnName := range a.Columns {
err = a.detect(cols, columnName, columnNr)
if err != nil {
return err
}
}

return nil
}

func (a Anomaly) detect(cols io.ColumnInterface, columnName string, columnNr int) error {
epochs := cols.GetColumn("Epoch").([]int64)
columnData, err := uda.ColumnToFloat64(cols, columnName)
if err != nil {
return err
}

if columnData == nil {
return fmt.Errorf("no data available")
}

if len(columnData) < 2 {
return fmt.Errorf("not enough data available")
}

size := len(columnData)
pctChange := make([]float64, size-1)

// pctChange = (a - b)/a
// floats.SubTo(pctChange, columnData[1:], columnData[:size-1])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

todo: remove

// floats.DivTo(pctChange, pctChange, columnData[:size-1])
floats.SubTo(pctChange, columnData[:size-1], columnData[1:])
floats.DivTo(pctChange, pctChange, columnData[:size-1])

switch a.DetectionType {
case DetectByZScore:
a.detectByZSCore(epochs[1:], pctChange, columnNr)
case DetectByFixedPct:
a.detectByFixedPct(epochs[1:], pctChange, columnNr)
default:
return fmt.Errorf("invalid detection type: %v", a.DetectionType)
}

return nil
}

func (a *Anomaly) detectByZSCore(epochs []int64, series []float64, columnNr int) {
m := stat.Mean(series, nil)
s := stat.StdDev(series, nil)
if s == 0 {
s = 1
}

for i, x := range series {
if math.Abs(stat.StdScore(x, m, s)) >= a.Threshold {
epoch := epochs[i]
previousValue := uint64(0)
if _, ok := a.AnomalyIdxsByColumn[epoch]; ok {
previousValue = a.AnomalyIdxsByColumn[epoch]
}
a.AnomalyIdxsByColumn[epoch] = previousValue | 1<<columnNr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If columnNr is big enough (which seems very unlikely btw) then an overflow might occur.

}
}
}

func (a *Anomaly) detectByFixedPct(epochs []int64, series []float64, columnNr int) {
fmt.Println(">>>")
for i, x := range series {
fmt.Println(i, math.Abs(x), a.Threshold)
if math.Abs(x) >= a.Threshold {
epoch := epochs[i]
previousValue := uint64(0)
if _, ok := a.AnomalyIdxsByColumn[epoch]; ok {
previousValue = a.AnomalyIdxsByColumn[epoch]
}
a.AnomalyIdxsByColumn[epoch] = previousValue | 1<<columnNr
}
}
}

// Returns `Epoch, ColumnsBitmap` where ColumnsBitmap represents
// anomalies found in the columns by using the column index as
// bitmap position for the signal. Example:
// - Given three columns: open,high,close
// - When anomalies found at Epoch 0,1,2: open, open+high, open+close
// - Then the returned bitmap looks as follows:
// Epoch 0: 1 << 0 = 1
// Epoch 1: 1 << 0 | 1 << 2 = 3
// Epoch 2: 1 << 0 | 1 << 3 = 5
func (a *Anomaly) Output() *io.ColumnSeries {
cs := io.NewColumnSeries()

resultRows := len(a.AnomalyIdxsByColumn)
epochs := make([]int64, resultRows)
columns := make([]uint64, resultRows)

if len(a.AnomalyIdxsByColumn) > 0 && a.Input != nil {
var anomalyEpochsOrdered []int64
for k := range a.AnomalyIdxsByColumn {
anomalyEpochsOrdered = append(anomalyEpochsOrdered, k)
}
sort.Slice(anomalyEpochsOrdered,
func(i, j int) bool {
return anomalyEpochsOrdered[i] < anomalyEpochsOrdered[j]
})

for i, epoch := range anomalyEpochsOrdered {
epochs[i] = epoch
columns[i] = a.AnomalyIdxsByColumn[epoch]
}
}

cs.AddColumn("Epoch", epochs)
cs.AddColumn("ColumnsBitmap", columns)

return cs
}