Skip to content

Commit

Permalink
Metrics API Rework (#1180)
Browse files Browse the repository at this point in the history
* added new metrics

* fixed interval issue for aggregate metric

* unstaged panther_config changes

* addressed initial PR feedback

* addressed pr feedback

* final pr feedback changes
  • Loading branch information
nhakmiller authored Jul 17, 2020
1 parent f0a44a4 commit 941701a
Show file tree
Hide file tree
Showing 12 changed files with 646 additions and 128 deletions.
43 changes: 24 additions & 19 deletions api/lambda/metrics/models/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,37 +31,42 @@ type LambdaInput struct {

// GetMetricsInput is used to request data points for a number of metrics over a given time frame
type GetMetricsInput struct {
MetricNames []string `json:"metricNames" validate:"required"`
Namespace string `json:"namespace"`
FromDate time.Time `json:"fromDate" validate:"required"`
ToDate time.Time `json:"toDate" validate:"required,gtfield=FromDate"`
IntervalHours int64 `json:"intervalHours" validate:"required,gt=0"`
MetricNames []string `json:"metricNames" validate:"required"`
Namespace string `json:"namespace"`
FromDate time.Time `json:"fromDate" validate:"required"`
ToDate time.Time `json:"toDate" validate:"required,gtfield=FromDate"`
IntervalMinutes int64 `json:"intervalMinutes" validate:"required,gt=0"`
}

// GetMetricsOutput contains data points for a number of metrics over the specified time frame
type GetMetricsOutput struct {
MetricResults []MetricResult `json:"metricResults"`
FromDate time.Time `json:"fromDate"`
ToDate time.Time `json:"toDate"`
IntervalHours int64 `json:"intervalHours"`
EventsProcessed *MetricResult `json:"eventsProcessed,omitempty"`
TotalAlertsDelta *MetricResult `json:"totalAlertsDelta,omitempty"`
AlertsBySeverity *MetricResult `json:"alertsBySeverity,omitempty"`
FromDate time.Time `json:"fromDate"`
ToDate time.Time `json:"toDate"`
IntervalMinutes int64 `json:"intervalMinutes"`
}

// MetricResult is either a single data point or a series of timestamped data points
type MetricResult = struct {
MetricName string
SingleValue []SingleMetricValue `json:"singleValue,omitempty"`
SeriesData []TimeSeriesResponse `json:"seriesData,omitempty"`
SingleValue []SingleMetric `json:"singleValue,omitempty"`
SeriesData TimeSeriesMetric `json:"seriesData,omitempty"`
}

type SingleMetricValue struct {
Label *string
Value int64
type SingleMetric struct {
Label *string `json:"label"`
Value *float64 `json:"value"`
}

// TimeSeriesResponse contains the pertinent fields from the GetMetricData response to be passed
// back to the frontend
type TimeSeriesResponse struct {
Label *string
Timestamps []*time.Time
Values []*float64
type TimeSeriesMetric struct {
Timestamps []*time.Time `json:"timestamps"`
Series []TimeSeriesValues `json:"series"`
}

type TimeSeriesValues struct {
Label *string `json:"label"`
Values []*float64 `json:"values"`
}
8 changes: 4 additions & 4 deletions internal/core/alert_delivery/delivery/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func send(alert *alertmodels.Alert, output *outputmodels.AlertOutput, statusChan
//
// Returns true if the alert was sent successfully, false if it needs to be retried.
func dispatch(alert *alertmodels.Alert) bool {
outputs, err := getAlertOutputs(alert)
alertOutputs, err := getAlertOutputs(alert)

if err != nil {
zap.L().Warn("failed to get the outputs for the alert",
Expand All @@ -108,7 +108,7 @@ func dispatch(alert *alertmodels.Alert) bool {
return false
}

if len(outputs) == 0 {
if len(alertOutputs) == 0 {
zap.L().Info("no outputs configured",
zap.String("policyId", alert.AnalysisID),
zap.String("severity", alert.Severity),
Expand All @@ -119,13 +119,13 @@ func dispatch(alert *alertmodels.Alert) bool {
// Dispatch all outputs in parallel.
// This ensures one slow or failing output won't block the others.
statusChannel := make(chan outputStatus)
for _, output := range outputs {
for _, output := range alertOutputs {
go send(alert, output, statusChannel)
}

// Wait until all outputs have finished, gathering any that need to be retried.
var retryOutputs []string
for range outputs {
for range alertOutputs {
status := <-statusChannel
if status.needsRetry {
retryOutputs = append(retryOutputs, status.outputID)
Expand Down
2 changes: 1 addition & 1 deletion internal/core/alert_delivery/main/lambda.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func lambdaHandler(ctx context.Context, event events.SQSEvent) (err error) {
var alerts []*models.Alert

lc, _ := lambdalogger.ConfigureGlobal(ctx, nil)
operation := oplog.NewManager("cloudsec", "alert_delivery").Start(lc.InvokedFunctionArn).WithMemUsed(lambdacontext.MemoryLimitInMB)
operation := oplog.NewManager("core", "alert_delivery").Start(lc.InvokedFunctionArn).WithMemUsed(lambdacontext.MemoryLimitInMB)
defer func() {
operation.Stop().Log(err, zap.Int("numEvents", len(event.Records)), zap.Int("numAlerts", len(alerts)))
}()
Expand Down
153 changes: 153 additions & 0 deletions internal/core/metrics_api/api/alert_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package api

/**
* Panther is a Cloud-Native SIEM for the Modern Security Team.
* Copyright (C) 2020 Panther Labs Inc
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

import (
"math"
"strconv"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/cloudwatch"
"go.uber.org/zap"

analysismodels "github.com/panther-labs/panther/api/gateway/analysis/models"
"github.com/panther-labs/panther/api/lambda/metrics/models"
)

const alertsMetric = "AlertsCreated"

// getAlertsBySeverity returns the count of log analysis alerts generated by severity
//
// This is a time series metric.
func getAlertsBySeverity(input *models.GetMetricsInput, output *models.GetMetricsOutput) error {
// Build the query based on the applicable metric dimensions
analysisSeverities := []*string{
aws.String(string(analysismodels.SeverityCRITICAL)),
aws.String(string(analysismodels.SeverityHIGH)),
aws.String(string(analysismodels.SeverityMEDIUM)),
aws.String(string(analysismodels.SeverityLOW)),
aws.String(string(analysismodels.SeverityINFO)),
}

queries := make([]*cloudwatch.MetricDataQuery, 0, len(analysisSeverities))
for i, severity := range analysisSeverities {
queries = append(queries, &cloudwatch.MetricDataQuery{
Id: aws.String("query" + strconv.Itoa(i)),
Label: severity,
MetricStat: &cloudwatch.MetricStat{
Metric: &cloudwatch.Metric{
Dimensions: []*cloudwatch.Dimension{
{
Name: aws.String("AnalysisType"),
Value: aws.String("Rule"),
},
{
Name: aws.String("Severity"),
Value: severity,
},
},
MetricName: aws.String(alertsMetric),
Namespace: aws.String(input.Namespace),
},
Period: aws.Int64(input.IntervalMinutes * 60), // number of seconds, must be multiple of 60
Stat: aws.String("Sum"),
Unit: aws.String("Count"),
},
})
}
zap.L().Debug("prepared metric queries", zap.Any("queries", queries), zap.Any("toDate", input.ToDate), zap.Any("fromDate", input.FromDate))

metricData, err := getMetricData(input, queries)
if err != nil {
return err
}
values, timestamps := normalizeTimeStamps(input, metricData)

output.AlertsBySeverity = &models.MetricResult{
SeriesData: models.TimeSeriesMetric{
Timestamps: timestamps,
Series: values,
},
}
return nil
}

// getTotalAlertsDelta the total count of alerts in the given time period, and additionally the
// total count of alerts in the previous time period of equal size to the requested time period.
//
// This is a single value metric.
func getTotalAlertsDelta(input *models.GetMetricsInput, output *models.GetMetricsOutput) error {
// Whatever time frame we're supposed to look at, actually look at double that time frame
timeFrame := input.FromDate.Sub(input.ToDate)
dStart := input.FromDate.Add(timeFrame)

// Construct a new input model here because we want to overwrite the FromDate and IntervalHours
// fields without potentially affecting any other metrics that need to be generated.
dInput := &models.GetMetricsInput{
FromDate: dStart,
ToDate: input.ToDate,
IntervalMinutes: int64(math.Abs(timeFrame.Minutes())),
}

// Build the query based on the applicable metric dimensions
queries := []*cloudwatch.MetricDataQuery{
{
Id: aws.String("query"),
Label: aws.String("alerts"),
MetricStat: &cloudwatch.MetricStat{
Metric: &cloudwatch.Metric{
Dimensions: []*cloudwatch.Dimension{
{
Name: aws.String("AnalysisType"),
Value: aws.String("Rule"),
},
},
MetricName: aws.String(alertsMetric),
Namespace: aws.String(input.Namespace),
},
Period: aws.Int64(dInput.IntervalMinutes * 60), // number of seconds, must be multiple of 60
Stat: aws.String("Sum"),
Unit: aws.String("Count"),
},
},
}

zap.L().Debug("prepared metric queries", zap.Any("queries", queries), zap.Any("toDate", input.ToDate), zap.Any("fromDate", input.FromDate))

metricData, err := getMetricData(dInput, queries)
if err != nil {
return err
}

values, _ := normalizeTimeStamps(dInput, metricData)
output.TotalAlertsDelta = &models.MetricResult{
SingleValue: []models.SingleMetric{
{
Label: aws.String("Current Period"),
Value: values[0].Values[0],
},
{
Label: aws.String("Previous Period"),
Value: values[0].Values[1],
},
},
}

return nil
}
Loading

0 comments on commit 941701a

Please sign in to comment.