Skip to content

Kafka support #1555

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
May 24, 2022
Prev Previous commit
Next Next commit
Adapt #964 for Kafka/messaging support
Changes include:
 * automatically create transactions when looping over results
 * instrument KadkaConsumer.poll
 * add binary TraceParent handling
 * added support for span links
 * add tests
  • Loading branch information
beniwohli committed May 23, 2022
commit 034694cce1561a3370b527126d59ac069266d406
17 changes: 17 additions & 0 deletions docs/supported-technologies.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,23 @@ Collected trace data:

* Destination (address and port)

[float]
[[automatic-instrumentation-db-kafka-python]]
==== kafka-python

Library: `kafka-python` (`>=2.0`)

Instrumented methods:

* `kafka.KafkaProducer.send`,
* `kafka.KafkaConsumer.poll`,
* `kafka.KafkaConsumer.\\__next__`

Collected trace data:

* Destination (address and port)
* topic (if applicable)


[float]
[[automatic-instrumentation-http]]
Expand Down
9 changes: 9 additions & 0 deletions elasticapm/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -632,12 +632,21 @@ def load_processors(self):
return [seen.setdefault(path, import_string(path)) for path in processors if path not in seen]

def should_ignore_url(self, url):
"""Checks if URL should be ignored based on the transaction_ignore_urls setting"""
if self.config.transaction_ignore_urls:
for pattern in self.config.transaction_ignore_urls:
if pattern.match(url):
return True
return False

def should_ignore_topic(self, topic: str) -> bool:
"""Checks if messaging topic should be ignored based on the ignore_message_queues setting"""
if self.config.ignore_message_queues:
for pattern in self.config.ignore_message_queues:
if pattern.match(topic):
return True
return False

def check_python_version(self):
v = tuple(map(int, platform.python_version_tuple()[:2]))
if v == (2, 7):
Expand Down
1 change: 1 addition & 0 deletions elasticapm/conf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,7 @@ class Config(_ConfigBase):
autoinsert_django_middleware = _BoolConfigValue("AUTOINSERT_DJANGO_MIDDLEWARE", default=True)
transactions_ignore_patterns = _ListConfigValue("TRANSACTIONS_IGNORE_PATTERNS", default=[])
transaction_ignore_urls = _ListConfigValue("TRANSACTION_IGNORE_URLS", type=starmatch_to_regex, default=[])
ignore_message_queues = _ListConfigValue("IGNORE_MESSAGE_QUEUES", type=starmatch_to_regex, default=[])
service_version = _ConfigValue("SERVICE_VERSION")
framework_name = _ConfigValue("FRAMEWORK_NAME")
framework_version = _ConfigValue("FRAMEWORK_VERSION")
Expand Down
1 change: 1 addition & 0 deletions elasticapm/conf/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def _starmatch_to_regex(pattern):
TRACE_CONTEXT_VERSION = 0
TRACEPARENT_HEADER_NAME = "traceparent"
TRACEPARENT_LEGACY_HEADER_NAME = "elastic-apm-traceparent"
TRACEPARENT_BINARY_HEADER_NAME = "elasticapmtraceparent"
TRACESTATE_HEADER_NAME = "tracestate"

TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
Expand Down
197 changes: 91 additions & 106 deletions elasticapm/instrumentation/packages/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,96 +29,61 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import time
from typing import Optional

import elasticapm
from elasticapm import get_client
from elasticapm.conf import constants
from elasticapm.contrib.django.client import get_client as d_client
from elasticapm.contrib.flask import get_client as f_client
from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule
from elasticapm.traces import capture_span, execution_context
from elasticapm.traces import DroppedSpan, capture_span, execution_context
from elasticapm.utils.disttracing import TraceParent


def django_client():
_client = None
try:
_client = d_client()
except Exception: # in some cases we get a different exception
return None
return _client


def get_client():
if django_client():
return django_client()
elif f_client():
return f_client()


def get_trace_id(result):
for i in result:
if isinstance(i, list):
if isinstance(i[0], tuple) and len(i[0]) == 2:
for k, v in i:
k_str = str(k)
if k_str == "trace":
return v
return None


class KafkaInstrumentation(AbstractInstrumentedModule):

instrument_list = [
("kafka", "KafkaProducer.send"),
("kafka", "KafkaConsumer.next"),
("kafka", "KafkaConsumer.poll"),
("kafka", "KafkaConsumer.__next__"),
]
provider_name = "kafka"
name = "kafka"

def _trace_send(
self,
method,
topic,
value=None,
key=None,
headers=None,
partition=None,
timestamp_ms=None,
action="send",
):
span_name = "KafkaProducer#send to " + topic
service = self.destination_info["service"]
service["resource"] = service["resource"] + topic
self.destination_info["service"] = service
def _trace_send(self, instance, wrapped, *args, destination_info=None, **kwargs):
topic = args[0] if args else kwargs["topic"]
headers = args[4] if len(args) > 4 else kwargs.get("headers", None)

span_name = f"Kafka SEND to {topic}"
destination_info["service"]["resource"] += topic
with capture_span(
name=span_name,
span_type="messaging",
span_subtype=self.provider_name,
span_action=action,
span_action="send",
leaf=True,
extra={
"message": {"queue": {"name": topic}},
"destination": self.destination_info,
"destination": destination_info,
},
) as span:
transaction = execution_context.get_transaction()
if transaction:
tp = transaction.trace_parent
tp_string = tp.to_string()
# ID REPLACE SECTION START
new_tp_string = tp_string.replace(transaction.id, span.id)
tp = transaction.trace_parent.copy_from(span_id=span.id)
if headers:
headers.append(("trace", bytes(new_tp_string)))
headers.append((constants.TRACEPARENT_BINARY_HEADER_NAME, tp.to_binary()))
else:
headers = [("trace", bytes(new_tp_string))]
# ID REPLACE SECTION STOP
result = method(
topic,
value=value,
key=key,
headers=headers,
partition=partition,
timestamp_ms=timestamp_ms,
)
headers = [(constants.TRACEPARENT_BINARY_HEADER_NAME, tp.to_binary())]
if len(args) > 4:
args = list(args)
args[4] = headers
else:
kwargs["headers"] = headers
result = wrapped(*args, **kwargs)
if instance and instance._metadata.controller and not isinstance(span, DroppedSpan):
address = instance._metadata.controller[1]
port = instance._metadata.controller[2]
span.context["destination"]["address"] = address
span.context["destination"]["port"] = port
return result

def call_if_sampling(self, module, method, wrapped, instance, args, kwargs):
Expand All @@ -129,76 +94,96 @@ def call_if_sampling(self, module, method, wrapped, instance, args, kwargs):
return self.call(module, method, wrapped, instance, args, kwargs)

def call(self, module, method, wrapped, instance, args, kwargs):
topic = None
client = get_client()
destination_info = {
"service": {"name": "kafka", "resource": "kafka/", "type": "messaging"},
}
self.destination_info = destination_info

if method == "KafkaProducer.send":
address = None
port = None
time_start = time.time()
while not instance._metadata.controller:
if time.time() - time_start > 1:
break
continue
if instance:
if instance._metadata.controller:
address = instance._metadata.controller[1]
port = instance._metadata.controller[2]
self.destination_info["port"] = port
self.destination_info["address"] = address
topic = args[0].encode("utf-8")
topic = args[0] if args else kwargs["topic"]
if client.should_ignore_topic(topic) or not execution_context.get_transaction():
return wrapped(*args, **kwargs)
return self._trace_send(instance, wrapped, destination_info=destination_info, *args, **kwargs)

elif method == "KafkaConsumer.poll":
transaction = execution_context.get_transaction()
if transaction:
return self._trace_send(wrapped, topic, **kwargs)
with capture_span(
name="Kafka POLL",
span_type="messaging",
span_subtype=self.provider_name,
span_action="poll",
leaf=True,
extra={
"destination": destination_info,
},
) as span:
if not isinstance(span, DroppedSpan) and instance._subscription.subscription:
span.name += " from " + ", ".join(sorted(instance._subscription.subscription))
results = wrapped(*args, **kwargs)
return results
else:
return wrapped(*args, **kwargs)

if method == "KafkaConsumer.next":
elif method == "KafkaConsumer.__next__":
transaction = execution_context.get_transaction()
if transaction and transaction.transaction_type != "messaging":
action = "consume"
# somebody started a transaction outside of the consumer,
# so we capture it as a span, and record the causal trace as a link
with capture_span(
name="consumer",
span_type="messaging",
span_subtype=self.provider_name,
span_action=action,
span_action="receive",
leaf=True,
extra={
"message": {"queue": {"name": ""}},
"destination": self.destination_info,
"destination": destination_info,
},
) as span:
result = wrapped(*args, **kwargs)
topic = result[0]
new_trace_id = get_trace_id(result)
service = self.destination_info["service"]
service["resource"] = service["resource"] + topic
span.context["message"]["queue"]["name"] = topic
span.context["destination"]["service"] = service
span.name = "KafkaConsumer#receive from " + topic
transaction.trace_parent = TraceParent.from_string(new_trace_id)
try:
result = wrapped(*args, **kwargs)
except StopIteration:
span.cancel()
raise
if not isinstance(span, DroppedSpan):
topic = result[0]
if client.should_ignore_topic(topic):
span.cancel()
return result
trace_parent = self.get_traceparent_from_result(result)
if trace_parent:
span.add_link(trace_parent)
destination_info["service"]["resource"] += topic
span.context["message"]["queue"]["name"] = topic
span.name = "Kafka RECEIVE from " + topic
return result
else:
client = get_client()
if transaction and transaction.transaction_type == "messaging":
# No transaction running, or this is a transaction started by us,
# so let's end it and start the next,
# unless a StopIteration is raised, at which point we do nothing.
if transaction:
client.end_transaction()

result = wrapped(*args, **kwargs)
topic = result[0]
new_trace_id = None
new_trace_id = get_trace_id(result)

client.begin_transaction("messaging", trace_parent=None)
transaction = execution_context.get_transaction()
if client.should_ignore_topic(topic):
return result
trace_parent = self.get_traceparent_from_result(result)
transaction = client.begin_transaction("messaging", trace_parent=trace_parent)
if result.timestamp_type == 0:
current_time_millis = int(round(time.time() * 1000))
age = current_time_millis - result.timestamp
transaction.context = {
"message": {"age": {"ms": age}, "queue": {"name": topic}}
"message": {"age": {"ms": age}, "queue": {"name": topic}},
"service": {"framework": {"name": "Kafka"}},
}
if new_trace_id:
transaction.trace_parent = TraceParent.from_string(new_trace_id)
t_name = "Kafka record from " + topic
elasticapm.set_transaction_name(t_name, override=True)
transaction_name = "Kafka RECEIVE from " + topic
elasticapm.set_transaction_name(transaction_name, override=True)
res = constants.OUTCOME.SUCCESS
elasticapm.set_transaction_result(res, override=False)
return result

def get_traceparent_from_result(self, result) -> Optional[TraceParent]:
for k, v in result.headers:
if k == constants.TRACEPARENT_BINARY_HEADER_NAME:
return TraceParent.from_binary(v)
Loading