Skip to content

Commit

Permalink
feat: record existing Prometheus metrics into OpenTelemetry Histograms (
Browse files Browse the repository at this point in the history
  • Loading branch information
girishc13 authored Oct 18, 2022
1 parent 62c5fe9 commit 71e4222
Show file tree
Hide file tree
Showing 19 changed files with 967 additions and 112 deletions.
1 change: 1 addition & 0 deletions extra-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,4 @@ jsonschema: cicd
portforward>=0.2.4: cicd
tensorflow>=2.0: cicd
opentelemetry-test-utils>=0.33b0: test
prometheus-api-client>=0.5.1: test
1 change: 1 addition & 0 deletions jina/resources/extra-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,4 @@ jsonschema: cicd
portforward>=0.2.4: cicd
tensorflow>=2.0: cicd
opentelemetry-test-utils>=0.33b0: test
prometheus-api-client>=0.5.1: test
91 changes: 69 additions & 22 deletions jina/serve/executors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@
from jina.serve.executors.decorators import avoid_concurrent_lock_cls
from jina.serve.executors.metas import get_executor_taboo
from jina.serve.helper import store_init_kwargs, wrap_func
from jina.serve.instrumentation import MetricsTimer

if TYPE_CHECKING:
from opentelemetry.context.context import Context
from prometheus_client import Summary

__dry_run_endpoint__ = '_jina_dry_run_'

Expand Down Expand Up @@ -133,8 +133,8 @@ def __init__(
self._add_metas(metas)
self._add_requests(requests)
self._add_runtime_args(runtime_args)
self._init_monitoring()
self._init_instrumentation(runtime_args)
self._init_monitoring()
self._init_workspace = workspace
self.logger = JinaLogger(self.__class__.__name__)
if __dry_run_endpoint__ not in self.requests:
Expand Down Expand Up @@ -180,6 +180,18 @@ def _init_monitoring(self):
self._summary_method = None
self._metrics_buffer = None

if self.meter:
self._process_request_histogram = self.meter.create_histogram(
name='jina_process_request_seconds',
description='Time spent when calling the executor request method',
)
self._histogram_buffer = {
'jina_process_request_seconds': self._process_request_histogram
}
else:
self._process_request_histogram = None
self._histogram_buffer = None

def _init_instrumentation(self, _runtime_args: Optional[Dict] = None):
if not _runtime_args:
_runtime_args = {}
Expand Down Expand Up @@ -319,8 +331,10 @@ async def __acall__(self, req_endpoint: str, **kwargs):
async def __acall_endpoint__(
self, req_endpoint, tracing_context: Optional['Context'], **kwargs
):
async def exec_func(summary, tracing_context):
with summary:
async def exec_func(
summary, histogram, histogram_metric_labels, tracing_context
):
with MetricsTimer(summary, histogram, histogram_metric_labels):
if iscoroutinefunction(func):
return await func(self, tracing_context=tracing_context, **kwargs)
else:
Expand All @@ -335,10 +349,15 @@ async def exec_func(summary, tracing_context):
_summary = (
self._summary_method.labels(
self.__class__.__name__, req_endpoint, runtime_name
).time()
)
if self._summary_method
else contextlib.nullcontext()
else None
)
_histogram_metric_labels = {
'executor': self.__class__.__name__,
'executor_endpoint': req_endpoint,
'runtime_name': runtime_name,
}

if self.tracer:
with self.tracer.start_span(req_endpoint, context=tracing_context) as _:
Expand All @@ -349,9 +368,19 @@ async def exec_func(summary, tracing_context):

tracing_carrier_context = {}
TraceContextTextMapPropagator().inject(tracing_carrier_context)
return await exec_func(_summary, extract(tracing_carrier_context))
return await exec_func(
_summary,
self._process_request_histogram,
_histogram_metric_labels,
extract(tracing_carrier_context),
)
else:
return await exec_func(_summary, None)
return await exec_func(
_summary,
self._process_request_histogram,
_histogram_metric_labels,
None,
)

@property
def workspace(self) -> Optional[str]:
Expand Down Expand Up @@ -566,26 +595,44 @@ def to_docker_compose_yaml(

def monitor(
self, name: Optional[str] = None, documentation: Optional[str] = None
) -> Optional['Summary']:
) -> Optional[MetricsTimer]:
"""
Get a given prometheus metric, if it does not exist yet, it will create it and store it in a buffer.
:param name: the name of the metrics
:param documentation: the description of the metrics
:return: the given prometheus metrics or None if monitoring is not enable.
"""
_summary = (
self._metrics_buffer.get(name, None) if self._metrics_buffer else None
)
_histogram = (
self._histogram_buffer.get(name, None) if self._histogram_buffer else None
)

if self._metrics_buffer:
if name not in self._metrics_buffer:
from prometheus_client import Summary
if self._metrics_buffer and not _summary:
from prometheus_client import Summary

self._metrics_buffer[name] = Summary(
name,
documentation,
registry=self.runtime_args.metrics_registry,
namespace='jina',
labelnames=('runtime_name',),
).labels(self.runtime_args.name)
return self._metrics_buffer[name].time()
else:
return contextlib.nullcontext()
_summary = Summary(
name,
documentation,
registry=self.runtime_args.metrics_registry,
namespace='jina',
labelnames=('runtime_name',),
).labels(self.runtime_args.name)
self._metrics_buffer[name] = _summary

if self._histogram_buffer and not _histogram:
_histogram = self.meter.create_histogram(
name=f'jina_{name}', description=documentation
)
self._histogram_buffer[name] = _histogram

if _summary or _histogram:
return MetricsTimer(
_summary,
_histogram,
histogram_metric_labels={'runtime_name': self.runtime_args.name},
)

return contextlib.nullcontext()
4 changes: 4 additions & 0 deletions jina/serve/gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from opentelemetry.instrumentation.grpc._client import (
OpenTelemetryClientInterceptor,
)
from opentelemetry.metrics import Meter
from prometheus_client import CollectorRegistry


Expand Down Expand Up @@ -79,6 +80,7 @@ def inject_dependencies(
args: 'argparse.Namespace' = None,
timeout_send: Optional[float] = None,
metrics_registry: Optional['CollectorRegistry'] = None,
meter: Optional['Meter'] = None,
runtime_name: Optional[str] = None,
tracing: Optional[bool] = False,
tracer_provider: Optional['trace.TracerProvider'] = None,
Expand All @@ -93,6 +95,7 @@ def inject_dependencies(
:param args: runtime args
:param timeout_send: grpc connection timeout
:param metrics_registry: metric registry when monitoring is enabled
:param meter: optional OpenTelemetry meter that can provide instruments for collecting metrics
:param runtime_name: name of the runtime providing the streamer
:param tracing: Enables tracing if set to True.
:param tracer_provider: If tracing is enabled the tracer_provider will be used to instrument the code.
Expand Down Expand Up @@ -124,6 +127,7 @@ def inject_dependencies(
prefetch=args.prefetch,
logger=self.logger,
metrics_registry=metrics_registry,
meter=meter,
aio_tracing_client_interceptors=aio_tracing_client_interceptors,
tracing_client_interceptor=tracing_client_interceptor,
)
Expand Down
51 changes: 50 additions & 1 deletion jina/serve/instrumentation/__init__.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
from typing import TYPE_CHECKING, Optional, Sequence
import functools
from timeit import default_timer
from typing import TYPE_CHECKING, Dict, Optional, Sequence

if TYPE_CHECKING:
from grpc.aio._interceptor import ClientInterceptor, ServerInterceptor
from opentelemetry.instrumentation.grpc._client import (
OpenTelemetryClientInterceptor,
)
from opentelemetry.metrics import Histogram
from prometheus_client import Summary


class InstrumentationMixin:
Expand Down Expand Up @@ -123,3 +127,48 @@ def tracing_client_interceptor(self) -> Optional['OpenTelemetryClientInterceptor
return grpc_client_interceptor(self.tracer_provider)
else:
return None


class MetricsTimer:
'''Helper dataclass that accepts optional Summary or Histogram recorders which are used to record the time take to execute
the decorated or context managed function
'''

def __init__(
self,
summary_metric: Optional['Summary'],
histogram: Optional['Histogram'],
histogram_metric_labels: Dict[str, str] = {},
) -> None:
self._summary_metric = summary_metric
self._histogram = histogram
self._histogram_metric_labels = histogram_metric_labels

def _new_timer(self):
return self.__class__(self._summary_metric, self._histogram)

def __enter__(self):
self._start = default_timer()
return self

def __exit__(self, *exc):
duration = max(default_timer() - self._start, 0)
if self._summary_metric:
self._summary_metric.observe(duration)
if self._histogram:
self._histogram.record(duration, attributes=self._histogram_metric_labels)

def __call__(self, f):
'''function that gets called when this class is used as a decortor
:param f: function that is decorated
:return: wrapped function
'''

@functools.wraps(f)
def wrapped(*args, **kwargs):
# Obtaining new instance of timer every time
# ensures thread safety and reentrancy.
with self._new_timer():
return f(*args, **kwargs)

return wrapped
Loading

0 comments on commit 71e4222

Please sign in to comment.