Skip to content

Commit

Permalink
Merge pull request #1546 from jertel/jertel/not
Browse files Browse the repository at this point in the history
System notification improvements
  • Loading branch information
nsano-rururu authored Oct 5, 2024
2 parents 3b0dde6 + 82e8e73 commit e3fba4c
Show file tree
Hide file tree
Showing 8 changed files with 110 additions and 10 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
- None

## New features
- None
- [Notifications] System error notifications can now be delivered via the same set of alerters as rule alerts - [#1546](https://github.com/jertel/elastalert2/pull/1546) - @jertel
- [Notifications] New config option `notify_all_errors` supports all system errors, including loss of data connectivity - [#1546](https://github.com/jertel/elastalert2/pull/1546) - @jertel

## Other changes
- [Docs] Mention the two available Spike-rule metrics that are add into the match record - [#1542](https://github.com/jertel/elastalert2/pull/1542) - @ulmako
Expand Down
2 changes: 2 additions & 0 deletions docs/source/alerts.rst
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
.. _Alerts:

Alerts
******

Expand Down
34 changes: 31 additions & 3 deletions docs/source/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,37 @@ rule will no longer be run until either ElastAlert 2 restarts or the rule file h

``show_disabled_rules``: If true, ElastAlert 2 show the disable rules' list when finishes the execution. This defaults to True.

``notify_email``: An email address, or list of email addresses, to which notification emails will be sent. Currently,
only an uncaught exception will send a notification email. The from address, SMTP host, and reply-to header can be set
using ``from_addr``, ``smtp_host``, and ``email_reply_to`` options, respectively. By default, no emails will be sent.
``notify_alert``: List of alerters to execute upon encountering a system error. System errors occur when an unexpected exception is thrown during rule processing. For additional notifications, such as when ElastAlert 2 background tests encounter problems, or when connectivity to the data storage system is lost, enable ``notify_all_errors``.

See the :ref:`Alerts` section for the list of available alerters and their parameters.

Included fields in a system notification are:

- message: The details about the error
- timestamp: The time that the error occurred
- rule: Rule object if the error occurred during the processing of a rule, otherwise will be empty/None.

The following example shows how all ElastAlert 2 system errors can be delivered to both a Matrix chat server and an email address.

.. code-block:: yaml
notify_alert:
- email
- matrixhookshot
notify_all_errors: true
email:
- [email protected]
smtp_host: some-mail-host.com
from_addr: "ElastAlert 2 <[email protected]>"
smtp_auth_file: /opt/elastalert2/smtp.auth
matrixhookshot_webhook_url: https://some-matrix-server/webhook/xyz
``notify_all_errors``: If true, notification emails will be sent on additional system errors. This can cause a large number of emails to be sent when connectivity to Elasticsearch is lost. When set to false, only unexpected, rule-specific errors will be sent.

``notify_email``: (DEPRECATED) An email address, or list of email addresses, to which notification emails will be sent upon encountering an unexpected rule error. The from address, SMTP host, and reply-to header can be set
using ``from_addr``, ``smtp_host``, and ``email_reply_to`` options, respectively. By default, no emails will be sent. NOTE: This is a legacy method with limited email delivery support. Use the newer ``notify_alert`` setting to gain the full flexibility of ElastAlert 2's alerter library for system notifications.

single address example::

Expand Down
39 changes: 35 additions & 4 deletions elastalert/elastalert.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from elastalert.opensearch_external_url_formatter import create_opensearch_external_url_formatter
from elastalert.prometheus_wrapper import PrometheusWrapper
from elastalert.ruletypes import FlatlineRule
from elastalert.ruletypes import RuleType
from elastalert.util import (add_keyword_postfix, cronite_datetime_to_timestamp, dt_to_ts, dt_to_unix, EAException,
elastalert_logger, elasticsearch_client, format_index, lookup_es_key, parse_deadline,
parse_duration, pretty_ts, replace_dots_in_field_names, seconds, set_es_key,
Expand Down Expand Up @@ -137,6 +138,13 @@ def __init__(self, args):
self.old_query_limit = self.conf['old_query_limit']
self.disable_rules_on_error = self.conf['disable_rules_on_error']
self.notify_email = self.conf.get('notify_email', [])
self.notify_all_errors = self.conf.get('notify_all_errors', False)
self.notify_alert = self.conf.get('notify_alert', [])
alert_conf_obj = self.conf.copy()
alert_conf_obj['name'] = 'ElastAlert 2 System Error Notification'
alert_conf_obj['alert'] = self.notify_alert
alert_conf_obj['type'] = RuleType({})
self.notify_alerters = self.rules_loader.load_alerts(alert_conf_obj, self.notify_alert)
self.from_addr = self.conf.get('from_addr', 'ElastAlert')
self.smtp_host = self.conf.get('smtp_host', 'localhost')
self.max_aggregation = self.conf.get('max_aggregation', 10000)
Expand Down Expand Up @@ -1500,6 +1508,8 @@ def writeback(self, doc_type, body, rule=None, match_body=None):
return res
except ElasticsearchException as e:
elastalert_logger.exception("Error writing alert info to Elasticsearch: %s" % (e))
if self.notify_all_errors:
self.handle_notify_error(e, None)

def find_recent_pending_alerts(self, time_limit):
""" Queries writeback_es to find alerts that did not send
Expand All @@ -1521,6 +1531,8 @@ def find_recent_pending_alerts(self, time_limit):
return res['hits']['hits']
except ElasticsearchException as e:
elastalert_logger.exception("Error finding recent pending alerts: %s %s" % (e, query))
if self.notify_all_errors:
self.handle_notify_error(e, None)
return []

def send_pending_alerts(self):
Expand Down Expand Up @@ -1788,6 +1800,23 @@ def is_silenced(self, rule_name):
return True
return False

def handle_notify_error(self, message, rule, exception=None):
if self.notify_email:
self.send_notification_email(exception=exception, rule=rule)
if self.notify_alerters:
alert_pipeline = {"alert_time": ts_now()}
details = [{
'timestamp': ts_now(),
'message': message,
'rule': rule,
}]
for alerter in self.notify_alerters:
alerter.pipeline = alert_pipeline
try:
alerter.alert(details)
except Exception as e:
elastalert_logger.error('Error while running notify alert %s: %s' % (alerter.get_info()['type'], e))

def handle_error(self, message, data=None):
''' Logs message at error level and writes message, data and traceback to Elasticsearch. '''
elastalert_logger.error(message)
Expand All @@ -1797,18 +1826,20 @@ def handle_error(self, message, data=None):
if data:
body['data'] = data
self.writeback('elastalert_error', body)
if self.notify_all_errors:
self.handle_notify_error(message, None)

def handle_uncaught_exception(self, exception, rule):
""" Disables a rule and sends a notification. """
elastalert_logger.error(traceback.format_exc())
self.handle_error('Uncaught exception running rule %s: %s' % (rule['name'], exception), {'rule': rule['name']})
msg = 'Uncaught exception running rule %s: %s' % (rule['name'], exception)
self.handle_error(msg, {'rule': rule['name']})
if self.disable_rules_on_error:
self.rules = [running_rule for running_rule in self.rules if running_rule['name'] != rule['name']]
self.disabled_rules.append(rule)
self.scheduler.pause_job(job_id=rule['name'])
elastalert_logger.info('Rule %s disabled', rule['name'])
if self.notify_email:
self.send_notification_email(exception=exception, rule=rule)
self.handle_notify_error(msg, rule, exception=exception)

def send_notification_email(self, text='', exception=None, rule=None, subject=None, rule_file=None):
email_body = text
Expand Down Expand Up @@ -1846,7 +1877,7 @@ def send_notification_email(self, text='', exception=None, rule=None, subject=No
smtp = SMTP(self.smtp_host)
smtp.sendmail(self.from_addr, recipients, email.as_string())
except (SMTPException, error) as e:
self.handle_error('Error connecting to SMTP host: %s' % (e), {'email_body': email_body})
elastalert_logger.error('Error connecting to SMTP host: %s' % (e), {'email_body': email_body})

def get_top_counts(self, rule, starttime, endtime, keys, number=None, qk=None):
""" Counts the number of events for each unique value for each key field.
Expand Down
11 changes: 10 additions & 1 deletion elastalert/schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,16 @@ definitions:
type: [string, array]
items: {type: string}

# Either a single string OR an array of strings OR an array of ararys
# Either a single string OR an array of strings OR an array of arrays
arrayOfStringsOrOtherArrays: &arrayOfStringsOrOtherArray
type: [string, array]
items: {type: [string, array]}

# Either a single string OR an array of strings OR an array of objects
stringOrArrayOfStringsOrObjects: &stringOrArrayOfStringsOrObjects
type: [string, array]
items: {type: [string, object]}

timedelta: &timedelta
type: object
additionalProperties: false
Expand Down Expand Up @@ -304,6 +309,10 @@ properties:
replace_dots_in_field_names: {type: boolean}
scan_entire_timeframe: {type: boolean}

## System Error Notifications
notify_alert: *stringOrArrayOfStringsOrObjects
notify_all_errors: {type: boolean}

### summary table
summary_table_fields: {type: array, items: {type: string}}
summary_table_type: {type: string, enum: ['ascii', 'html', 'markdown']}
Expand Down
29 changes: 29 additions & 0 deletions tests/base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from elasticsearch.exceptions import ConnectionError
from elasticsearch.exceptions import ElasticsearchException

from elastalert.alerts import Alerter
from elastalert.enhancements import BaseEnhancement
from elastalert.enhancements import DropMatchException
from elastalert.enhancements import TimeEnhancement
Expand Down Expand Up @@ -1288,6 +1289,34 @@ def test_uncaught_exceptions(ea):
assert mock_email.call_args_list[0][1] == {'exception': e, 'rule': ea.disabled_rules[0]}


def test_handle_notify_error_unconfigured(ea):
testmsg = "testing"
testrule = {}
testex = Exception()

with mock.patch.object(ea, 'send_notification_email') as mock_email:
ea.handle_notify_error(testmsg, testrule, testex)
assert not mock_email.called


def test_handle_notify_error_alerts(ea):
testmsg = "testing"
testrule = {}
testex = Exception()

fake_alert = Alerter(testrule)

ea.notify_alerters = [fake_alert]
with mock.patch.object(fake_alert, 'alert') as mock_alert:
ea.handle_notify_error(testmsg, testrule, testex)
assert mock_alert.called
actual = mock_alert.call_args_list[0][0][0]
details = actual[0]
assert details['timestamp'] is not None
assert details['message'] == testmsg
assert details['rule'] == testrule


def test_get_top_counts_handles_no_hits_returned(ea):
with mock.patch.object(ea, 'get_hits_terms') as mock_hits:
mock_hits.return_value = None
Expand Down
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def __init__(self, conf):
self.load = mock.Mock()
self.get_hashes = mock.Mock()
self.load_configuration = mock.Mock()
self.load_alerts = mock.Mock(return_value=[])


class mock_ruletype(object):
Expand Down
1 change: 0 additions & 1 deletion tests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
version: '2'
services:
tox:
build:
Expand Down

0 comments on commit e3fba4c

Please sign in to comment.