htmengine
is a python package that comprises a set of long-running services, upon which a scalable, real-time, and production-ready scalar anomaly detection application may be built. The ideal use-case for htmengine
is a scenario where there is a continuous stream of data for many metrics, each with a relatively low sample rate e.g. 5-minute intervals.
An example application of HTM Engine is available with an accompanying screencast.
A walkthrough of the HTM Engine with an example application of New York City live traffic speeds.
First, install nta.utils
. Then, to install htmengine
:
python setup.py develop [OPTIONS]
- See
python setup.py develop --help
for options that may be helpful for your specific environment.
In order to use htmengine
, there are some additional steps you MUST do to create a fully functional application. The below instructions include the details for scaffolding an application for use with htmengine
. For a TL;DR version and concrete example in which the following has been done for you, see https://github.com/nupic-community/skeleton-htmengine-app for a simple, fully-functional application upon which you may base your own.
Create a conf/
directory, and use the following files as templates for your own configuration. Carefully review each file, and fill in or replace any values that are specific to your application.
-
conf/application.conf
Replace
APPLICATION_NAME
in the template below with the name of your application. Add database credentials in the[repository]
section with values that match your own MySQL database.[debugging] # Controls whether to log performance profiling information: true or false profiling = false # MySQL database connection parameters [repository] db = host = user = passwd = port = [metric_streamer] # Exchange to push model results results_exchange_name = APPLICATION_NAME.model.results # Max records per batch to stream to model chunk_size = 1440 [metric_collector] # How often to poll metrics for data in seconds poll_interval = 60 # Metric error grace period seconds after which the metric will be promoted to # ERROR state if it continues to encounter errors metric_error_grace_period = 10800 [metric_listener] # Port to listen on for plaintext protocol messages plaintext_port = 2003 queue_name = APPLICATION_NAME.metric.custom.data [anomaly_likelihood] # Minimal sample size for statistic calculation statistics_min_sample_size=100 # How often to refresh the anomaly statistics in rows # We refresh once every two hours (ideally we would do this every record) statistics_refresh_rate=24 # Sample size to be used for the statistic calculation # We keep a max of one month of history (assumes 5 min metric period) statistics_sample_size=8640
-
conf/model-checkpoint.conf
Replace
/ABSOLUTE/PATH/ON/LOCAL/FILESYSTEM/model_checkpoints
with the absolute path to a location where you will store model checkpoints created byhtmengine
.# Model Checkpoint Manager Configuration [storage] # The root directory of the model checkpoint archive. # May use environment variables; MUST expand to absolute path root = /ABSOLUTE/PATH/ON/LOCAL/FILESYSTEM/model_checkpoints
-
conf/model-swapper.conf
Replace
APPLICATION_NAME
with the name of your application in the template below.# Model Swapper Configuration [debugging] # Controls whether to log performance profiling information: true or false profiling = false # Model Swapper Interface Bus [interface_bus] # Name of the queue for model command and inference results results_queue = APPLICATION_NAME.mswapper.results # A model's input queue name is the concatenation of this prefix and model id model_input_queue_prefix = APPLICATION_NAME.mswapper.model.input. # Name of the Model Scheduler notification queue scheduler_notification_queue = APPLICATION_NAME.mswapper.scheduler.notification [model_runner] # The target number of model input request objects to be processed per # checkpoint. This may span multiple batches, until the number of requests # reaches or exceeds this value. Since it always processes all requests in a # batch, the actual number of requests processed before checkpointing the model # may be higher than this number. target_requests_per_checkpoint = 500
-
conf/supervisord.conf
Replace
/ABSOLUTE/PATH/TO/THIS/APPLICATION/conf
in the template below to the location of the applicationconf/
directory containing the files you just created. Replace/ABSOLUTE/PATH/TO/htmengine
with the absolute path to the htmengine source.; Sample supervisor config file. ; ; For more information on the config file, please see: ; http://supervisord.org/configuration.html ; ; Note: shell expansion ("~" or "$HOME") is not supported. Environment ; variables can be expanded using this syntax: "%(ENV_HOME)s". [unix_http_server] file=%(here)s/../supervisor.sock ; (the path to the socket file) [inet_http_server] port=127.0.0.1:9001 [supervisord] environment=APPLICATION_CONFIG_PATH=/ABSOLUTE/PATH/TO/THIS/APPLICATION/conf pidfile=%(here)s/../supervisord.pid identifier=APPLICATION_NAME-supervisor logfile=%(here)s/../logs/supervisord.log # NOTE: logfile_maxbytes=0 turns off supervisor log rotation logfile_maxbytes=50MB logfile_backups=10 loglevel=info nodaemon=false minfds=1024 minprocs=200 [rpcinterface:supervisor] supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface [supervisorctl] serverurl=http://: [include] files = /ABSOLUTE/PATH/TO/htmengine/conf/supervisord-base.conf
In your own environment, you MUST set the APPLICATION_CONFIG_PATH
environment variable to the absolute path to the application config path. This must match the value in conf/supervisord.conf
!
htmengine
uses MySQL to cache data locally at various stages of processing and has a very specific schema (included below) that it requires to be implemented:
CREATE TABLE instance_status_history (
server VARCHAR(100) DEFAULT '' NOT NULL,
timestamp DATETIME NOT NULL DEFAULT '0000-00-00 00:00:00',
status VARCHAR(32) DEFAULT '' NOT NULL,
PRIMARY KEY (server, timestamp)
);
CREATE TABLE `lock` (
name VARCHAR(40) NOT NULL,
PRIMARY KEY (name)
);
INSERT INTO `lock` (`name`) VALUES('metrics');
CREATE TABLE metric (
uid VARCHAR(40) NOT NULL,
datasource VARCHAR(100),
name VARCHAR(255),
description VARCHAR(200),
server VARCHAR(100),
location VARCHAR(200),
parameters TEXT,
status INTEGER DEFAULT '0',
message TEXT,
collector_error TEXT,
last_timestamp DATETIME NULL DEFAULT NULL,
poll_interval INTEGER DEFAULT '60',
tag_name VARCHAR(200),
model_params TEXT,
last_rowid INTEGER,
PRIMARY KEY (uid)
);
CREATE INDEX datasource_idx ON metric (datasource);
CREATE INDEX location_idx ON metric (location);
CREATE INDEX server_idx ON metric (server);
CREATE TABLE metric_data (
uid VARCHAR(40) DEFAULT '' NOT NULL,
rowid INTEGER DEFAULT '0' NOT NULL,
timestamp DATETIME NOT NULL,
metric_value DOUBLE NOT NULL,
raw_anomaly_score DOUBLE,
anomaly_score DOUBLE,
display_value INTEGER,
PRIMARY KEY (uid, rowid),
CONSTRAINT metric_data_to_metric_fk FOREIGN KEY(uid) REFERENCES metric (uid) ON DELETE CASCADE ON UPDATE CASCADE
);
CREATE INDEX anomaly_score_idx ON metric_data (anomaly_score);
CREATE INDEX timestamp_idx ON metric_data (timestamp);
htmengine
services write out log data to logs/
from the base of the application directory. Create it, if it does not exist.
mkdir -p logs
Supervisor is used to start, and keep running, a set of processes that implement a processing pipeline for incoming scalar data for the purpose of calculating and reporting on NuPIC Anomaly Scores. RabbitMQ brokers messages between these processes.
htmengine.runtime.metric_listener
implements a Graphite-compatible API to accept plain-text metric data over a TCP connection on port 2003. Metric data is accepted in the following format:
name value timestamp\n
name
is the metric name, typically composed of a hierarchical period-delimited path.value
is the metric value, which can be any scalar valuetimestamp
is the unix timestamp associated with the metric sample.
For example:
foo.bar.stats 21 1443197361
Multiple metric samples may be sent over a single connection, as long as the above format is used, and metric samples are sent one-per-line. Metric samples will be processed in the order they are sent, so if you are sending a batch of data at once, be sure to sort it by timestamp!
htmengine.runtime.metric_storer
stores the metric samples in a local MySQL database, and is separate from the Metric Listener so as to not block clients of the API on database I/O.
htmengine.model_swapper.model_scheduler_service
implements the "Model Swapping" process that allows htmengine
to scale to hundreds, and possibly even thousands, of NuPIC anomaly models on a single instance. This works by regularly checkpointing models to disk, and reloading them on-demand. Incoming samples are buffered until the model can be loaded and as a result, there is a short, but undefined latency associated with the processing of any given metric sample.
htmengine.runtime.anomaly_service
calculates the final Anomaly Likelihood score and publishes the results to an exchange for later processing by the user-defined application, if necessary. The results are also saved to the local MySQL database.
You may also programatticaly send data directly to RabbitMQ, bypassing the Metric Listener TCP service:
import json
from nta.utils import message_bus_connector
msg = json.dumps(dict(protocol="plain", data=["foo.bar.stats 21 1443197361"]))
bus = message_bus_connector.MessageBusConnector()
bus.publish(mqName=MESSAGE_QUEUE_NAME, body=msg, persistent=True)
Inbound data is only cached until a model is specifically created for that metric. There is an API for monitoring a specific metric:
from htmengine.adapters.datasource import createDatasourceAdapter
modelSpec = {
"datasource": "custom",
"metricSpec": {
"metric": "foo.bar.stats"
},
"modelParams": {
"min": 0, # optional
"max": 100 # optional
}
}
adapter = createDatasourceAdapter(modelSpec["datasource"])
modelId = adapter.monitorMetric(modelSpec)
The results are cached in the MySQL database, in the metric_data
table. htmengine
uses sqlalchemy to access the database, and provides a high-level function to query the data directly from the database. For example, the following python snippet queries the repository for, and prints metric data:
import calendar
from htmengine import repository
from htmengine.repository import schema
from nta.utils.config import Config
METRIC_NAME = "foo.bar.stats"
appConfig = Config("application.conf", os.environ["APPLICATION_CONFIG_PATH"])
with repository.engineFactory(appConfig).connect() as conn:
metricObj = repository.getCustomMetricByName(conn,
METRIC_NAME,
fields=[schema.metric.c.uid])
fields = (schema.metric_data.c.metric_value,
schema.metric_data.c.timestamp,
schema.metric_data.c.rowid,
schema.metric_data.c.anomaly_score)
sort = schema.metric_data.c.timestamp.asc()
result = repository.getMetricData(conn,
metricId=metricObj.uid,
fields=fields,
sort=sort)
for row in result:
print METRIC_NAME,
print str(row.metric_value),
print str(calendar.timegm(row.timestamp.timetuple())),
print str(row.anomaly_score)))
You may also process the results in real-time by creating a queue bound to the results exchange:
import os
from nta.utils import amqp
from nta.utils.config import Config
from htmengine import htmengineerrno
from htmengine.runtime.anomaly_service import AnomalyService
appConfig = Config("application.conf", os.environ["APPLICATION_CONFIG_PATH"])
modelResultsExchange = appConfig.get("metric_streamer",
"results_exchange_name")
queueName = "APPLICATION_NAME_results"
def declareExchanges(amqpClient):
""" Declares model results and non-metric data exchanges
"""
amqpClient.declareExchange(exchange=modelResultsExchange,
exchangeType="fanout",
durable=True)
def declareQueueAndBindToExchanges(amqpClient):
""" Declares queue and binds to model results.
"""
result = amqpClient.declareQueue(queueName, durable=True)
amqpClient.bindQueue(exchange=modelResultsExchange,
queue=result.queue, routingKey="")
def configChannel(amqpClient):
amqpClient.requestQoS(prefetchCount=1)
def handleModelInferenceResults(body):
""" Model results batch handler.
:param body: Serialized message payload; the message is compliant with
htmengine/runtime/json_schema/model_inference_results_msg_schema.json.
:type body: str
"""
try:
batch = AnomalyService.deserializeModelResult(body)
except Exception:
print "Error deserializing model result"
raise
metricId = batch["metric"]["uid"]
metricName = batch["metric"]["name"]
print "Handling %d model result(s) for %s - %s" % (len(batch["results"]),
metricId,
metricName)
if not batch["results"]:
print "Empty results in model inference results batch; model=%s" % metricId
return
print metricId, batch["results"]
def handleModelCommandResult(body):
""" ModelCommandResult handler. Handles model creation/deletion events
:param body: Incoming message payload
:type body: str
"""
try:
modelCommandResult = AnomalyService.deserializeModelResult(body)
except Exception:
print "Error deserializing model command result"
raise
if modelCommandResult["status"] != htmengineerrno.SUCCESS:
return # Ignore...
if modelCommandResult["method"] == "defineModel":
print "Handling `defineModel` for %s" % modelCommandResult.get("modelId")
print modelCommandResult
elif modelCommandResult["method"] == "deleteModel":
print "Handling `deleteModel` for %s" % modelCommandResult.get("modelId")
print modelCommandResult
def messageHandler(message):
""" Inspect all inbound model results
We will key off of routing key to determine specific handler for inbound
message. If routing key is `None`, attempt to decode message using
`AnomalyService.deserializeModelResult()`.
:param amqp.messages.ConsumerMessage message: ``message.body`` is one of:
Serialized batch of model inference results generated in
``AnomalyService`` and must be deserialized using
``AnomalyService.deserializeModelResult()``. Per
htmengine/runtime/json_schema/model_inference_results_msg_schema.json
Serialized ``ModelCommandResult`` generated in ``AnomalyService``
per model_command_result_amqp_message.json and must be deserialized
using ``AnomalyService.deserializeModelResult()``
"""
if message.methodInfo.routingKey is None:
print "Unrecognized routing key."
else:
dataType = (message.properties.headers.get("dataType")
if message.properties.headers else None)
if not dataType:
handleModelInferenceResults(message.body)
elif dataType == "model-cmd-result":
handleModelCommandResult(message.body)
else:
print "Unexpected message header dataType=%s" % dataType
message.ack()
if __name__ == "__main__":
with amqp.synchronous_amqp_client.SynchronousAmqpClient(
amqp.connection.getRabbitmqConnectionParameters(),
channelConfigCb=configChannel) as amqpClient:
declareExchanges(amqpClient)
declareQueueAndBindToExchanges(amqpClient)
consumer = amqpClient.createConsumer(queueName)
# Start consuming messages
for evt in amqpClient.readEvents():
if isinstance(evt, amqp.messages.ConsumerMessage):
messageHandler(evt)
elif isinstance(evt, amqp.consumer.ConsumerCancellation):
# Bad news: this likely means that our queue was deleted externally
msg = "Consumer cancelled by broker: %r (%r)" % (evt, consumer)
raise Exception(msg)
else:
print "Unexpected amqp event=%r" % evt
The HTM engine has a basic skeleton application exclusively for running tests. To run the tests, you must have MySQL, RabbitMQ, and Supervisor running.
Make sure that APPLICATION_CONFIG_PATH
is set to point to tests/support/skeleton-app/conf
For example:
export APPLICATION_CONFIG_PATH=/Users/{name}/nta/numenta-apps/htmengine/tests/support/skeleton-app/conf
(alternatively you can use the APPLICATION_CONFIG_PATH
from an app based off
of the HTMEngine, just be careful to not reset an existing MySQL database)
With MySQL already started start/restart RabbitMQ:
rabbitmq-server -detached
Setup the htmengine MySQL database:
(don't do this if you're testing on HTM-IT, Taurus, or another HTMEngine application)
./tests/support/skeleton-app/reset_db.py
Start supervisord
:
cd tests/support/skeleton-app
supervisord -c conf/supervisord.conf
NOTE: Be sure to run mkdir logs
within the skeleton-app
directory so
supervisor has a place to store its logs
Run the integration tests:
py.test tests/integration