Skip to content

Commit

Permalink
Apply black, isort, and flake8 formatting
Browse files Browse the repository at this point in the history
- run black and isort, manually handle overlong doc comment
- add a setup.cfg with black-compatible flake8 and isort conf
- add a makefile which can be used to `make autoformat` or `make lint`
  (assume py3.6+ for black compat)
  • Loading branch information
sirosen committed Jul 9, 2019
1 parent f19a67c commit 6a26319
Show file tree
Hide file tree
Showing 11 changed files with 162 additions and 89 deletions.
32 changes: 32 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
.PHONY: help
help:
@echo "These are our make targets and what they do."
@echo ""
@echo " help: Show this helptext"
@echo ""
@echo " autoformat:"
@echo " run autoformatting tools"
@echo ""
@echo " lint: run autoformatting tools as linters + flake8"
@echo ""
@echo " clean: remove tooling virtualenv"

.venv:
python3 -m venv .venv
.venv/bin/pip install black isort flake8
touch .venv

.PHONY: autoformat
autoformat: .venv
.venv/bin/isort --recursive .
.venv/bin/black .

.PHONY: lint
lint: .venv
.venv/bin/isort --check-only --recursive .
.venv/bin/black --check .
.venv/bin/flake8

.PHONY: clean
clean:
rm -rf .venv
35 changes: 20 additions & 15 deletions client/globus_cw_client/client.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""
Python client API for cwlogs daemon
"""
import time
import socket
import json
import socket
import time

try:
# Python 2
Expand All @@ -29,11 +29,11 @@ def log_event(message, retries=10, wait=0.1):
message = message.decode("utf-8")
assert isinstance(message, UNICODE_TYPE)

assert(type(retries) == int)
assert(retries >= 0)
assert type(retries) == int
assert retries >= 0

assert(type(wait) == int or type(wait) == float)
assert(wait >= 0)
assert type(wait) == int or type(wait) == float
assert wait >= 0

req = dict()
req["message"] = message
Expand All @@ -48,7 +48,7 @@ def _connect(retries, wait):
Raise: Exception if max attempts exceeded
"""
addr = "\0org.globus.cwlogs"
for i in range(retries+1):
for i in range(retries + 1):
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
try:
sock.connect(addr)
Expand Down Expand Up @@ -90,14 +90,19 @@ def _request(req, retries, wait):
raise CWLoggerDaemonError("unknown response type", d)


# Ignore (swallow) these exceptions at your own risk.
# CWLoggerDaemonError can be caused by many things, including but not limited to:
# bad IAM policy, a killed / failed daemon background thread, AWS throttling,
# invalid length/encoding.
# Ignore only if you have some other mechanism (e.g. a lambda / cloudwatch / heartbeat monitor)
# to ensure logs are properly configured and working, and/or write logs to disk manually.
# Note that even in the absence of exceptions, messages may still be lost - the daemon has a
# very large memory queue and works asynchronously.
"""
Ignore (swallow) these exceptions at your own risk.
CWLoggerDaemonError can be caused by many things, including but not limited to:
bad IAM policy, a killed / failed daemon background thread, AWS throttling,
invalid length/encoding.
Ignore only if you have some other mechanism
(e.g. a lambda / cloudwatch / heartbeat monitor) to ensure logs are properly configured
and working, and/or write logs to disk manually.
Note that even in the absence of exceptions, messages may still be lost - the daemon
has a very large memory queue and works asynchronously.
"""


class CWLoggerError(Exception):
Expand Down
3 changes: 1 addition & 2 deletions client/setup.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from setuptools import setup, find_packages
from setuptools import find_packages, setup

setup(
name="globus_cw_client",
version=1.0,
packages=find_packages(),

# descriptive info, non-critical
description="Client for Globus CloudWatch Logger",
url="https://github.com/globus/globus-cwlogger",
Expand Down
43 changes: 24 additions & 19 deletions daemon/globus_cw_daemon/cwlogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@
- log records too old are discarded by AWS (tooOldLogEventEndIndex)
- log records in the future are discarded by AWS (tooNewLogEventStartIndex)
"""
import time
import logging
import time

import boto3


MAX_EVENT_BYTES = (256 * 1024)
MAX_EVENT_BYTES = 256 * 1024

# These make our batches conform to the AWS API
MAX_BATCH_BYTES = 800000 # Officially 1MB
MAX_BATCH_RECORDS = 5000 # Officially 10,000
MAX_BATCH_RANGE_HOURS = 6 # Officially 24 hours
MAX_BATCH_BYTES = 800000 # Officially 1MB
MAX_BATCH_RECORDS = 5000 # Officially 10,000
MAX_BATCH_RANGE_HOURS = 6 # Officially 24 hours

_log = logging.getLogger(__name__)

Expand Down Expand Up @@ -80,7 +79,7 @@ def get_records_for_boto(self):
def time_diff_exceeded(a, b):
diff_ms = b.timestamp - a.timestamp
assert diff_ms >= 0
return (diff_ms >= (3600 * MAX_BATCH_RANGE_HOURS * 1000))
return diff_ms >= (3600 * MAX_BATCH_RANGE_HOURS * 1000)


class LogWriter(object):
Expand All @@ -93,17 +92,16 @@ def __init__(self, group_name, stream_name, aws_region=None):

# Keep a connection around for performance. boto is smart enough
# to refresh role creds right before they expire (see provider.py).
self.client = boto3.client('logs', region_name=(aws_region or "us-east-1"))
self.client = boto3.client("logs", region_name=(aws_region or "us-east-1"))

self.group_name = group_name
self.stream_name = stream_name
# will be passed on the first call and caught as InvalidSequenceTokenError
self.sequence_token = 'invalid'
self.sequence_token = "invalid"

try:
self.client.create_log_stream(
logGroupName=self.group_name,
logStreamName=self.stream_name
logGroupName=self.group_name, logStreamName=self.stream_name
)
except self.client.exceptions.ResourceAlreadyExistsException:
pass
Expand All @@ -119,8 +117,9 @@ def upload_events(self, events):
break
events.pop()

_log.debug("flushing batch, bytes=%d, recs=%d",
batch.nr_bytes, len(batch.records))
_log.debug(
"flushing batch, bytes=%d, recs=%d", batch.nr_bytes, len(batch.records)
)
self._flush_events(batch.get_records_for_boto())

def _flush_events(self, events):
Expand All @@ -135,16 +134,21 @@ def _flush_events(self, events):
logGroupName=self.group_name,
logStreamName=self.stream_name,
logEvents=events,
sequenceToken=self.sequence_token
sequenceToken=self.sequence_token,
)
_log.debug("flush ok")
self.sequence_token = ret["nextSequenceToken"]
return
except (self.client.exceptions.DataAlreadyAcceptedException,
self.client.exceptions.InvalidSequenceTokenException) as e:
except (
self.client.exceptions.DataAlreadyAcceptedException,
self.client.exceptions.InvalidSequenceTokenException,
) as e:
self.sequence_token = e.response["Error"]["Message"].split()[-1]
_log.info("{}, sequence_token={}".format(
e.response["Error"]["Code"], self.sequence_token))
_log.info(
"{}, sequence_token={}".format(
e.response["Error"]["Code"], self.sequence_token
)
)
return
except Exception as e:
_log.error("error: %r", e)
Expand All @@ -156,11 +160,12 @@ def test():

def now_ms():
import time

time.sleep(0.1)
return int(time.time() * 1000)

def hours(n):
return 3600*n*1000
return 3600 * n * 1000

writer = LogWriter("cwlogger-test", "test-stream")

Expand Down
41 changes: 25 additions & 16 deletions daemon/globus_cw_daemon/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@
"""
Upload messages to cloud watch logs
"""
import errno
import json
import logging
import os
import sys
import socket
import logging
import time
import errno
import sys
import threading
import json
import time

import globus_cw_daemon.cwlogs as cwlogs
import globus_cw_daemon.config as config
import globus_cw_daemon.cwlogs as cwlogs
import globus_cw_daemon.local_logging as local_logging


# Note that the total memory limit is double this:
# * the flush thread can be flushing MAX_EVENT_QUEUE_LEN
# * the front end thread can be receiving MAX_EVENT_QUEUE_LEN
Expand All @@ -32,7 +31,7 @@

# Data shared with flush thread
_g_lock = threading.Lock()
_g_queue = [] # List of Events
_g_queue = [] # List of Events
_g_nr_dropped = 0

# get constant instance_id on start
Expand Down Expand Up @@ -96,8 +95,12 @@ def _flush_thread_main(writer):


def _get_drop_event(nr_dropped):
data = dict(type="audit", subtype="cwlogs.dropped",
dropped=nr_dropped, instance_id=INSTANCE_ID)
data = dict(
type="audit",
subtype="cwlogs.dropped",
dropped=nr_dropped,
instance_id=INSTANCE_ID,
)
ret = cwlogs.Event(timestamp=None, message=json.dumps(data))
return ret

Expand All @@ -117,9 +120,12 @@ def _health_info(q_len=None):


def _get_heartbeat_event(nr_found):
data = dict(type="audit", subtype="cwlogs.heartbeat",
instance_id=INSTANCE_ID,
health=_health_info(nr_found))
data = dict(
type="audit",
subtype="cwlogs.heartbeat",
instance_id=INSTANCE_ID,
health=_health_info(nr_found),
)
ret = cwlogs.Event(timestamp=None, message=json.dumps(data))
return ret

Expand Down Expand Up @@ -220,7 +226,8 @@ def main():
if not INSTANCE_ID:
raise Exception(
"no stream_name found in /etc/cwlogd.ini, and "
"no ec2 instance_id found in /var/lib/cloud/instance")
"no ec2 instance_id found in /var/lib/cloud/instance"
)
else:
stream_name = INSTANCE_ID

Expand All @@ -232,8 +239,10 @@ def main():
try:
group_name = config.get_string("group_name")
except KeyError:
raise Exception("no group_name found in /etc/cwlogd.ini, have you "
"run globus_cw_daemon_install?")
raise Exception(
"no group_name found in /etc/cwlogd.ini, have you "
"run globus_cw_daemon_install?"
)

writer = cwlogs.LogWriter(group_name, stream_name, aws_region=aws_region)

Expand Down
2 changes: 1 addition & 1 deletion daemon/globus_cw_daemon/local_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def configure():
Set up a stderr StreamHandler, PrintKFormatter, and log level pulled from
config
"""
logger = logging.getLogger('globus_cw_daemon')
logger = logging.getLogger("globus_cw_daemon")

handler = logging.StreamHandler()
formatter = PrintKFormatter()
Expand Down
40 changes: 23 additions & 17 deletions daemon/globus_cw_daemon_install/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
- Creates config file at at /etc/cwlogd.ini
- Copies globus_cw_daemon.service to /etc/systemd/system/
"""
import os
import shutil
import argparse
import configparser
import os
import shutil


def main():
Expand All @@ -16,17 +16,22 @@ def main():

# get group name argument
parser = argparse.ArgumentParser()
parser.add_argument("group_name",
help="Name of the existing CloudWatch log group "
"to log to.")
parser.add_argument("--stream-name",
help="Specify a stream name. Default is the current "
"ec2 instance id.")
parser.add_argument("--heartbeat-interval", type=int,
help="Specify the time in seconds between heartbeats. "
"Default is 60 seconds.")
parser.add_argument("--no-heartbeats", action="store_true",
help="Turn off heartbeats.")
parser.add_argument(
"group_name", help="Name of the existing CloudWatch log group " "to log to."
)
parser.add_argument(
"--stream-name",
help="Specify a stream name. Default is the current " "ec2 instance id.",
)
parser.add_argument(
"--heartbeat-interval",
type=int,
help="Specify the time in seconds between heartbeats. "
"Default is 60 seconds.",
)
parser.add_argument(
"--no-heartbeats", action="store_true", help="Turn off heartbeats."
)

args = parser.parse_args()
group_name = args.group_name
Expand All @@ -38,8 +43,7 @@ def main():
raise ValueError("heartbeat interval must be > 0")

if no_heartbeats and heartbeat_interval:
raise ValueError(
"Attempting to set heartbeat interval and turn off heartbeats")
raise ValueError("Attempting to set heartbeat interval and turn off heartbeats")

# read default-config.ini
config = configparser.ConfigParser()
Expand All @@ -58,8 +62,10 @@ def main():
config.write(open("/etc/cwlogd.ini", "w"))

# globus_cw_daemon.service to /etc/systemd/system/
shutil.copy(install_dir_path + "/globus_cw_daemon.service",
"/etc/systemd/system/globus_cw_daemon.service")
shutil.copy(
install_dir_path + "/globus_cw_daemon.service",
"/etc/systemd/system/globus_cw_daemon.service",
)


if __name__ == "__main__":
Expand Down
Loading

0 comments on commit 6a26319

Please sign in to comment.