Skip to content

Commit 9be1bae

Browse files
Kafka support (#1555)
* kafka instrumentation for apm-python agent. * Adapt #964 for Kafka/messaging support Changes include: * automatically create transactions when looping over results * instrument KadkaConsumer.poll * add binary TraceParent handling * added support for span links * add tests * create/delete topics explicitly in a pytest fixture * add changelog entry Co-authored-by: Rohit-dot-KumarAM <rohit.kumaram@maplelabs.com>
1 parent cca0f70 commit 9be1bae

18 files changed

+569
-1
lines changed

‎.ci/.jenkins_framework.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,4 @@ FRAMEWORK:
5353
- sanic-newest
5454
- aiomysql-newest
5555
- aiobotocore-newest
56+
- kafka-python-newest

‎.ci/.jenkins_framework_full.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,3 +85,4 @@ FRAMEWORK:
8585
- sanic-20.12
8686
- sanic-newest
8787
- aiobotocore-newest
88+
- kafka-python-newest

‎CHANGELOG.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ endif::[]
3737
===== Features
3838
3939
* Add instrumentation for https://github.com/aio-libs/aiobotocore[`aiobotocore`] {pull}1520[#1520]
40+
* Add instrumentation for https://kafka-python.readthedocs.io/en/master/[`kafka-python`] {pull}1555[#1555]
4041
4142
[float]
4243
===== Bug fixes

‎docs/supported-technologies.asciidoc

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -466,6 +466,23 @@ Collected trace data:
466466

467467
* Destination (address and port)
468468

469+
[float]
470+
[[automatic-instrumentation-db-kafka-python]]
471+
==== kafka-python
472+
473+
Library: `kafka-python` (`>=2.0`)
474+
475+
Instrumented methods:
476+
477+
* `kafka.KafkaProducer.send`,
478+
* `kafka.KafkaConsumer.poll`,
479+
* `kafka.KafkaConsumer.\\__next__`
480+
481+
Collected trace data:
482+
483+
* Destination (address and port)
484+
* topic (if applicable)
485+
469486

470487
[float]
471488
[[automatic-instrumentation-http]]

‎elasticapm/base.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,12 +632,21 @@ def load_processors(self):
632632
return [seen.setdefault(path, import_string(path)) for path in processors if path not in seen]
633633

634634
def should_ignore_url(self, url):
635+
"""Checks if URL should be ignored based on the transaction_ignore_urls setting"""
635636
if self.config.transaction_ignore_urls:
636637
for pattern in self.config.transaction_ignore_urls:
637638
if pattern.match(url):
638639
return True
639640
return False
640641

642+
def should_ignore_topic(self, topic: str) -> bool:
643+
"""Checks if messaging topic should be ignored based on the ignore_message_queues setting"""
644+
if self.config.ignore_message_queues:
645+
for pattern in self.config.ignore_message_queues:
646+
if pattern.match(topic):
647+
return True
648+
return False
649+
641650
def check_python_version(self):
642651
v = tuple(map(int, platform.python_version_tuple()[:2]))
643652
if v == (2, 7):

‎elasticapm/conf/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,7 @@ class Config(_ConfigBase):
642642
autoinsert_django_middleware = _BoolConfigValue("AUTOINSERT_DJANGO_MIDDLEWARE", default=True)
643643
transactions_ignore_patterns = _ListConfigValue("TRANSACTIONS_IGNORE_PATTERNS", default=[])
644644
transaction_ignore_urls = _ListConfigValue("TRANSACTION_IGNORE_URLS", type=starmatch_to_regex, default=[])
645+
ignore_message_queues = _ListConfigValue("IGNORE_MESSAGE_QUEUES", type=starmatch_to_regex, default=[])
645646
service_version = _ConfigValue("SERVICE_VERSION")
646647
framework_name = _ConfigValue("FRAMEWORK_NAME")
647648
framework_version = _ConfigValue("FRAMEWORK_VERSION")

‎elasticapm/conf/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ def _starmatch_to_regex(pattern):
6464
TRACE_CONTEXT_VERSION = 0
6565
TRACEPARENT_HEADER_NAME = "traceparent"
6666
TRACEPARENT_LEGACY_HEADER_NAME = "elastic-apm-traceparent"
67+
TRACEPARENT_BINARY_HEADER_NAME = "elasticapmtraceparent"
6768
TRACESTATE_HEADER_NAME = "tracestate"
6869

6970
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
# BSD 3-Clause License
2+
#
3+
# Copyright (c) 2019, Elasticsearch BV
4+
# All rights reserved.
5+
#
6+
# Redistribution and use in source and binary forms, with or without
7+
# modification, are permitted provided that the following conditions are met:
8+
#
9+
# * Redistributions of source code must retain the above copyright notice, this
10+
# list of conditions and the following disclaimer.
11+
#
12+
# * Redistributions in binary form must reproduce the above copyright notice,
13+
# this list of conditions and the following disclaimer in the documentation
14+
# and/or other materials provided with the distribution.
15+
#
16+
# * Neither the name of the copyright holder nor the names of its
17+
# contributors may be used to endorse or promote products derived from
18+
# this software without specific prior written permission.
19+
#
20+
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21+
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22+
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23+
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24+
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25+
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26+
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27+
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28+
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29+
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30+
31+
import time
32+
from typing import Optional
33+
34+
import elasticapm
35+
from elasticapm import get_client
36+
from elasticapm.conf import constants
37+
from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule
38+
from elasticapm.traces import DroppedSpan, capture_span, execution_context
39+
from elasticapm.utils.disttracing import TraceParent
40+
41+
42+
class KafkaInstrumentation(AbstractInstrumentedModule):
43+
44+
instrument_list = [
45+
("kafka", "KafkaProducer.send"),
46+
("kafka", "KafkaConsumer.poll"),
47+
("kafka", "KafkaConsumer.__next__"),
48+
]
49+
provider_name = "kafka"
50+
name = "kafka"
51+
52+
def _trace_send(self, instance, wrapped, *args, destination_info=None, **kwargs):
53+
topic = args[0] if args else kwargs["topic"]
54+
headers = args[4] if len(args) > 4 else kwargs.get("headers", None)
55+
56+
span_name = f"Kafka SEND to {topic}"
57+
destination_info["service"]["resource"] += topic
58+
with capture_span(
59+
name=span_name,
60+
span_type="messaging",
61+
span_subtype=self.provider_name,
62+
span_action="send",
63+
leaf=True,
64+
extra={
65+
"message": {"queue": {"name": topic}},
66+
"destination": destination_info,
67+
},
68+
) as span:
69+
transaction = execution_context.get_transaction()
70+
if transaction:
71+
tp = transaction.trace_parent.copy_from(span_id=span.id)
72+
if headers:
73+
headers.append((constants.TRACEPARENT_BINARY_HEADER_NAME, tp.to_binary()))
74+
else:
75+
headers = [(constants.TRACEPARENT_BINARY_HEADER_NAME, tp.to_binary())]
76+
if len(args) > 4:
77+
args = list(args)
78+
args[4] = headers
79+
else:
80+
kwargs["headers"] = headers
81+
result = wrapped(*args, **kwargs)
82+
if instance and instance._metadata.controller and not isinstance(span, DroppedSpan):
83+
address = instance._metadata.controller[1]
84+
port = instance._metadata.controller[2]
85+
span.context["destination"]["address"] = address
86+
span.context["destination"]["port"] = port
87+
return result
88+
89+
def call_if_sampling(self, module, method, wrapped, instance, args, kwargs):
90+
# Contrasting to the superclass implementation, we *always* want to
91+
# return a proxied connection, even if there is no ongoing elasticapm
92+
# transaction yet. This ensures that we instrument the cursor once
93+
# the transaction started.
94+
return self.call(module, method, wrapped, instance, args, kwargs)
95+
96+
def call(self, module, method, wrapped, instance, args, kwargs):
97+
client = get_client()
98+
destination_info = {
99+
"service": {"name": "kafka", "resource": "kafka/", "type": "messaging"},
100+
}
101+
102+
if method == "KafkaProducer.send":
103+
topic = args[0] if args else kwargs["topic"]
104+
if client.should_ignore_topic(topic) or not execution_context.get_transaction():
105+
return wrapped(*args, **kwargs)
106+
return self._trace_send(instance, wrapped, destination_info=destination_info, *args, **kwargs)
107+
108+
elif method == "KafkaConsumer.poll":
109+
transaction = execution_context.get_transaction()
110+
if transaction:
111+
with capture_span(
112+
name="Kafka POLL",
113+
span_type="messaging",
114+
span_subtype=self.provider_name,
115+
span_action="poll",
116+
leaf=True,
117+
extra={
118+
"destination": destination_info,
119+
},
120+
) as span:
121+
if not isinstance(span, DroppedSpan) and instance._subscription.subscription:
122+
span.name += " from " + ", ".join(sorted(instance._subscription.subscription))
123+
results = wrapped(*args, **kwargs)
124+
return results
125+
else:
126+
return wrapped(*args, **kwargs)
127+
128+
elif method == "KafkaConsumer.__next__":
129+
transaction = execution_context.get_transaction()
130+
if transaction and transaction.transaction_type != "messaging":
131+
# somebody started a transaction outside of the consumer,
132+
# so we capture it as a span, and record the causal trace as a link
133+
with capture_span(
134+
name="consumer",
135+
span_type="messaging",
136+
span_subtype=self.provider_name,
137+
span_action="receive",
138+
leaf=True,
139+
extra={
140+
"message": {"queue": {"name": ""}},
141+
"destination": destination_info,
142+
},
143+
) as span:
144+
try:
145+
result = wrapped(*args, **kwargs)
146+
except StopIteration:
147+
span.cancel()
148+
raise
149+
if not isinstance(span, DroppedSpan):
150+
topic = result[0]
151+
if client.should_ignore_topic(topic):
152+
span.cancel()
153+
return result
154+
trace_parent = self.get_traceparent_from_result(result)
155+
if trace_parent:
156+
span.add_link(trace_parent)
157+
destination_info["service"]["resource"] += topic
158+
span.context["message"]["queue"]["name"] = topic
159+
span.name = "Kafka RECEIVE from " + topic
160+
return result
161+
else:
162+
# No transaction running, or this is a transaction started by us,
163+
# so let's end it and start the next,
164+
# unless a StopIteration is raised, at which point we do nothing.
165+
if transaction:
166+
client.end_transaction()
167+
result = wrapped(*args, **kwargs)
168+
topic = result[0]
169+
if client.should_ignore_topic(topic):
170+
return result
171+
trace_parent = self.get_traceparent_from_result(result)
172+
transaction = client.begin_transaction("messaging", trace_parent=trace_parent)
173+
if result.timestamp_type == 0:
174+
current_time_millis = int(round(time.time() * 1000))
175+
age = current_time_millis - result.timestamp
176+
transaction.context = {
177+
"message": {"age": {"ms": age}, "queue": {"name": topic}},
178+
"service": {"framework": {"name": "Kafka"}},
179+
}
180+
transaction_name = "Kafka RECEIVE from " + topic
181+
elasticapm.set_transaction_name(transaction_name, override=True)
182+
res = constants.OUTCOME.SUCCESS
183+
elasticapm.set_transaction_result(res, override=False)
184+
return result
185+
186+
def get_traceparent_from_result(self, result) -> Optional[TraceParent]:
187+
for k, v in result.headers:
188+
if k == constants.TRACEPARENT_BINARY_HEADER_NAME:
189+
return TraceParent.from_binary(v)

‎elasticapm/instrumentation/register.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
"elasticapm.instrumentation.packages.httpx.sync.httpcore.HTTPCoreInstrumentation",
6767
"elasticapm.instrumentation.packages.httplib2.Httplib2Instrumentation",
6868
"elasticapm.instrumentation.packages.azure.AzureInstrumentation",
69+
"elasticapm.instrumentation.packages.kafka.KafkaInstrumentation",
6970
}
7071

7172
if sys.version_info >= (3, 7):

‎elasticapm/traces.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
from collections import defaultdict
3939
from datetime import timedelta
4040
from types import TracebackType
41-
from typing import Any, Callable, Dict, Optional, Tuple, Type, Union
41+
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
4242

4343
import elasticapm
4444
from elasticapm.conf import constants
@@ -100,6 +100,7 @@ def __init__(self, labels=None, start=None):
100100
self.start_time: float = time_to_perf_counter(start) if start is not None else _time_func()
101101
self.ended_time: Optional[float] = None
102102
self.duration: Optional[timedelta] = None
103+
self.links: List[Dict[str, str]] = []
103104
if labels:
104105
self.label(**labels)
105106

@@ -144,6 +145,12 @@ def label(self, **labels):
144145
labels = encoding.enforce_label_format(labels)
145146
self.labels.update(labels)
146147

148+
def add_link(self, trace_parent: TraceParent) -> None:
149+
"""
150+
Causally link this span/transaction to another span/transaction
151+
"""
152+
self.links.append({"trace_id": trace_parent.trace_id, "span_id": trace_parent.span_id})
153+
147154
def set_success(self):
148155
self.outcome = constants.OUTCOME.SUCCESS
149156

@@ -394,6 +401,8 @@ def to_dict(self) -> dict:
394401
# only set parent_id if this transaction isn't the root
395402
if self.trace_parent.span_id and self.trace_parent.span_id != self.id:
396403
result["parent_id"] = self.trace_parent.span_id
404+
if self.links:
405+
result["links"] = self.links
397406
# faas context belongs top-level on the transaction
398407
if "faas" in self.context:
399408
result["faas"] = self.context.pop("faas")
@@ -477,6 +486,7 @@ class Span(BaseSpan):
477486
"sync",
478487
"outcome",
479488
"_child_durations",
489+
"_cancelled",
480490
)
481491

482492
def __init__(
@@ -527,6 +537,7 @@ def __init__(
527537
self.action = span_action
528538
self.dist_tracing_propagated = False
529539
self.composite: Dict[str, Any] = {}
540+
self._cancelled: bool = False
530541
super(Span, self).__init__(labels=labels, start=start)
531542
self.timestamp = transaction.timestamp + (self.start_time - transaction.start_time)
532543
if self.transaction._breakdown:
@@ -564,6 +575,8 @@ def to_dict(self) -> dict:
564575
if self.context is None:
565576
self.context = {}
566577
self.context["tags"] = self.labels
578+
if self.links:
579+
result["links"] = self.links
567580
if self.context:
568581
self.autofill_resource_context()
569582
# otel attributes and spankind need to be top-level
@@ -666,6 +679,8 @@ def report(self) -> None:
666679
if self.discardable and self.duration < self.transaction.config_exit_span_min_duration:
667680
self.transaction.track_dropped_span(self)
668681
self.transaction.dropped_spans += 1
682+
elif self._cancelled:
683+
self.transaction._span_counter -= 1
669684
else:
670685
self.tracer.queue_func(SPAN, self.to_dict())
671686

@@ -757,6 +772,15 @@ def autofill_resource_context(self):
757772
if "type" not in self.context["destination"]["service"]:
758773
self.context["destination"]["service"]["type"] = ""
759774

775+
def cancel(self) -> None:
776+
"""
777+
Mark span as cancelled. Cancelled spans don't count towards started spans nor dropped spans.
778+
779+
No checks are made to ensure that spans which already propagated distributed context are not
780+
cancelled.
781+
"""
782+
self._cancelled = True
783+
760784
def __str__(self):
761785
return "{}/{}/{}".format(self.name, self.type, self.subtype)
762786

0 commit comments

Comments
 (0)