Skip to content

Commit 483790b

Browse files
committed
Adapt elastic#964 for Kafka/messaging support
Changes include: * automatically create transactions when looping over results * instrument KadkaConsumer.poll * add binary TraceParent handling * added support for span links * add tests
1 parent d55ad15 commit 483790b

File tree

12 files changed

+381
-104
lines changed

12 files changed

+381
-104
lines changed

‎elasticapm/base.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -632,12 +632,21 @@ def load_processors(self):
632632
return [seen.setdefault(path, import_string(path)) for path in processors if path not in seen]
633633

634634
def should_ignore_url(self, url):
635+
"""Checks if URL should be ignored based on the transaction_ignore_urls setting"""
635636
if self.config.transaction_ignore_urls:
636637
for pattern in self.config.transaction_ignore_urls:
637638
if pattern.match(url):
638639
return True
639640
return False
640641

642+
def should_ignore_topic(self, topic: str) -> bool:
643+
"""Checks if messaging topic should be ignored based on the ignore_message_queues setting"""
644+
if self.config.ignore_message_queues:
645+
for pattern in self.config.ignore_message_queues:
646+
if pattern.match(topic):
647+
return True
648+
return False
649+
641650
def check_python_version(self):
642651
v = tuple(map(int, platform.python_version_tuple()[:2]))
643652
if v == (2, 7):

‎elasticapm/conf/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,7 @@ class Config(_ConfigBase):
642642
autoinsert_django_middleware = _BoolConfigValue("AUTOINSERT_DJANGO_MIDDLEWARE", default=True)
643643
transactions_ignore_patterns = _ListConfigValue("TRANSACTIONS_IGNORE_PATTERNS", default=[])
644644
transaction_ignore_urls = _ListConfigValue("TRANSACTION_IGNORE_URLS", type=starmatch_to_regex, default=[])
645+
ignore_message_queues = _ListConfigValue("IGNORE_MESSAGE_QUEUES", type=starmatch_to_regex, default=[])
645646
service_version = _ConfigValue("SERVICE_VERSION")
646647
framework_name = _ConfigValue("FRAMEWORK_NAME")
647648
framework_version = _ConfigValue("FRAMEWORK_VERSION")

‎elasticapm/conf/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ def _starmatch_to_regex(pattern):
6464
TRACE_CONTEXT_VERSION = 0
6565
TRACEPARENT_HEADER_NAME = "traceparent"
6666
TRACEPARENT_LEGACY_HEADER_NAME = "elastic-apm-traceparent"
67+
TRACEPARENT_BINARY_HEADER_NAME = "elasticapmtraceparent"
6768
TRACESTATE_HEADER_NAME = "tracestate"
6869

6970
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"

‎elasticapm/instrumentation/packages/kafka.py

Lines changed: 77 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -29,96 +29,60 @@
2929
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
3030

3131
import time
32+
from typing import Optional
3233

3334
import elasticapm
35+
from elasticapm import get_client
3436
from elasticapm.conf import constants
35-
from elasticapm.contrib.django.client import get_client as d_client
36-
from elasticapm.contrib.flask import get_client as f_client
3737
from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule
3838
from elasticapm.traces import capture_span, execution_context
3939
from elasticapm.utils.disttracing import TraceParent
4040

4141

42-
def django_client():
43-
_client = None
44-
try:
45-
_client = d_client()
46-
except Exception: # in some cases we get a different exception
47-
return None
48-
return _client
49-
50-
51-
def get_client():
52-
if django_client():
53-
return django_client()
54-
elif f_client():
55-
return f_client()
56-
57-
58-
def get_trace_id(result):
59-
for i in result:
60-
if isinstance(i, list):
61-
if isinstance(i[0], tuple) and len(i[0]) == 2:
62-
for k, v in i:
63-
k_str = str(k)
64-
if k_str == "trace":
65-
return v
66-
return None
67-
68-
6942
class KafkaInstrumentation(AbstractInstrumentedModule):
7043

7144
instrument_list = [
7245
("kafka", "KafkaProducer.send"),
73-
("kafka", "KafkaConsumer.next"),
46+
("kafka", "KafkaConsumer.poll"),
47+
("kafka", "KafkaConsumer.__next__"),
7448
]
7549
provider_name = "kafka"
7650
name = "kafka"
7751

78-
def _trace_send(
79-
self,
80-
method,
81-
topic,
82-
value=None,
83-
key=None,
84-
headers=None,
85-
partition=None,
86-
timestamp_ms=None,
87-
action="send",
88-
):
89-
span_name = "KafkaProducer#send to " + topic
90-
service = self.destination_info["service"]
91-
service["resource"] = service["resource"] + topic
92-
self.destination_info["service"] = service
52+
def _trace_send(self, instance, wrapped, *args, destination_info=None, **kwargs):
53+
topic = args[0] if args else kwargs["topic"]
54+
headers = args[4] if len(args) > 4 else kwargs.get("headers", None)
55+
56+
span_name = f"Kafka SEND to {topic}"
57+
destination_info["service"]["resource"] += topic
9358
with capture_span(
9459
name=span_name,
9560
span_type="messaging",
9661
span_subtype=self.provider_name,
97-
span_action=action,
62+
span_action="send",
9863
extra={
9964
"message": {"queue": {"name": topic}},
100-
"destination": self.destination_info,
65+
"destination": destination_info,
10166
},
10267
) as span:
10368
transaction = execution_context.get_transaction()
10469
if transaction:
105-
tp = transaction.trace_parent
106-
tp_string = tp.to_string()
107-
# ID REPLACE SECTION START
108-
new_tp_string = tp_string.replace(transaction.id, span.id)
70+
tp = transaction.trace_parent.copy_from(span_id=span.id)
10971
if headers:
110-
headers.append(("trace", bytes(new_tp_string)))
72+
headers.append((constants.TRACEPARENT_BINARY_HEADER_NAME, tp.to_binary()))
11173
else:
112-
headers = [("trace", bytes(new_tp_string))]
113-
# ID REPLACE SECTION STOP
114-
result = method(
115-
topic,
116-
value=value,
117-
key=key,
118-
headers=headers,
119-
partition=partition,
120-
timestamp_ms=timestamp_ms,
121-
)
74+
headers = [(constants.TRACEPARENT_BINARY_HEADER_NAME, tp.to_binary())]
75+
if len(args) > 4:
76+
args = list(args)
77+
args[4] = headers
78+
else:
79+
kwargs["headers"] = headers
80+
result = wrapped(*args, **kwargs)
81+
if instance and instance._metadata.controller:
82+
address = instance._metadata.controller[1]
83+
port = instance._metadata.controller[2]
84+
span.context["destination"]["address"] = address
85+
span.context["destination"]["port"] = port
12286
return result
12387

12488
def call_if_sampling(self, module, method, wrapped, instance, args, kwargs):
@@ -129,76 +93,86 @@ def call_if_sampling(self, module, method, wrapped, instance, args, kwargs):
12993
return self.call(module, method, wrapped, instance, args, kwargs)
13094

13195
def call(self, module, method, wrapped, instance, args, kwargs):
132-
topic = None
96+
client = get_client()
13397
destination_info = {
13498
"service": {"name": "kafka", "resource": "kafka/", "type": "messaging"},
13599
}
136-
self.destination_info = destination_info
100+
137101
if method == "KafkaProducer.send":
138-
address = None
139-
port = None
140-
time_start = time.time()
141-
while not instance._metadata.controller:
142-
if time.time() - time_start > 1:
143-
break
144-
continue
145-
if instance:
146-
if instance._metadata.controller:
147-
address = instance._metadata.controller[1]
148-
port = instance._metadata.controller[2]
149-
self.destination_info["port"] = port
150-
self.destination_info["address"] = address
151-
topic = args[0].encode("utf-8")
102+
topic = args[0] if args else kwargs["topic"]
103+
if client.should_ignore_topic(topic) or not execution_context.get_transaction():
104+
return wrapped(*args, **kwargs)
105+
return self._trace_send(instance, wrapped, destination_info=destination_info, *args, **kwargs)
106+
107+
elif method == "KafkaConsumer.poll":
152108
transaction = execution_context.get_transaction()
153109
if transaction:
154-
return self._trace_send(wrapped, topic, **kwargs)
110+
with capture_span(
111+
name="Kafka POLL",
112+
span_type="messaging",
113+
span_subtype=self.provider_name,
114+
span_action="poll",
115+
extra={
116+
"destination": destination_info,
117+
},
118+
) as span:
119+
if instance._subscription.subscription:
120+
span.name += " from " + ", ".join(sorted(instance._subscription.subscription))
121+
results = wrapped(*args, **kwargs)
122+
return results
155123

156-
if method == "KafkaConsumer.next":
124+
elif method == "KafkaConsumer.__next__":
157125
transaction = execution_context.get_transaction()
158126
if transaction and transaction.transaction_type != "messaging":
159-
action = "consume"
127+
# somebody started a transaction outside of the consumer,
128+
# so we capture it as a span, and record the causal trace as a link
160129
with capture_span(
161130
name="consumer",
162131
span_type="messaging",
163132
span_subtype=self.provider_name,
164-
span_action=action,
133+
span_action="receive",
165134
extra={
166135
"message": {"queue": {"name": ""}},
167-
"destination": self.destination_info,
136+
"destination": destination_info,
168137
},
169138
) as span:
170-
result = wrapped(*args, **kwargs)
139+
try:
140+
result = wrapped(*args, **kwargs)
141+
except StopIteration:
142+
span.cancel()
143+
raise
171144
topic = result[0]
172-
new_trace_id = get_trace_id(result)
173-
service = self.destination_info["service"]
174-
service["resource"] = service["resource"] + topic
145+
trace_parent = self.get_traceparent_from_result(result)
146+
if trace_parent:
147+
span.add_link(trace_parent)
148+
destination_info["service"]["resource"] += topic
175149
span.context["message"]["queue"]["name"] = topic
176-
span.context["destination"]["service"] = service
177-
span.name = "KafkaConsumer#receive from " + topic
178-
transaction.trace_parent = TraceParent.from_string(new_trace_id)
150+
span.name = "Kafka RECEIVE from " + topic
179151
return result
180152
else:
181-
client = get_client()
182-
if transaction and transaction.transaction_type == "messaging":
153+
# No transaction running, or this is a transaction started by us,
154+
# so let's end it and start the next,
155+
# unless a StopIteration is raised, at which point we do nothing.
156+
if transaction:
183157
client.end_transaction()
184-
185158
result = wrapped(*args, **kwargs)
186159
topic = result[0]
187-
new_trace_id = None
188-
new_trace_id = get_trace_id(result)
189-
190-
client.begin_transaction("messaging", trace_parent=None)
191-
transaction = execution_context.get_transaction()
160+
trace_parent = self.get_traceparent_from_result(result)
161+
transaction = client.begin_transaction("messaging", trace_parent=trace_parent)
192162
if result.timestamp_type == 0:
193163
current_time_millis = int(round(time.time() * 1000))
194164
age = current_time_millis - result.timestamp
195165
transaction.context = {
196-
"message": {"age": {"ms": age}, "queue": {"name": topic}}
166+
"message": {"age": {"ms": age}, "queue": {"name": topic}},
167+
"service": {"framework": {"name": "Kafka"}},
197168
}
198-
if new_trace_id:
199-
transaction.trace_parent = TraceParent.from_string(new_trace_id)
200-
t_name = "Kafka record from " + topic
201-
elasticapm.set_transaction_name(t_name, override=True)
169+
transaction_name = "Kafka RECEIVE from " + topic
170+
elasticapm.set_transaction_name(transaction_name, override=True)
202171
res = constants.OUTCOME.SUCCESS
203172
elasticapm.set_transaction_result(res, override=False)
204173
return result
174+
175+
def get_traceparent_from_result(self, result) -> Optional[TraceParent]:
176+
for k, v in result.headers:
177+
if k == constants.TRACEPARENT_BINARY_HEADER_NAME:
178+
return TraceParent.from_binary(v)

‎elasticapm/traces.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
from collections import defaultdict
3939
from datetime import timedelta
4040
from types import TracebackType
41-
from typing import Any, Callable, Dict, Optional, Tuple, Type, Union
41+
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
4242

4343
import elasticapm
4444
from elasticapm.conf import constants
@@ -100,6 +100,7 @@ def __init__(self, labels=None, start=None):
100100
self.start_time: float = time_to_perf_counter(start) if start is not None else _time_func()
101101
self.ended_time: Optional[float] = None
102102
self.duration: Optional[timedelta] = None
103+
self.links: List[Dict[str, str]] = []
103104
if labels:
104105
self.label(**labels)
105106

@@ -144,6 +145,12 @@ def label(self, **labels):
144145
labels = encoding.enforce_label_format(labels)
145146
self.labels.update(labels)
146147

148+
def add_link(self, trace_parent: TraceParent) -> None:
149+
"""
150+
Causally link this span/transaction to another span/transaction
151+
"""
152+
self.links.append({"trace_id": trace_parent.trace_id, "span_id": trace_parent.span_id})
153+
147154
def set_success(self):
148155
self.outcome = constants.OUTCOME.SUCCESS
149156

@@ -394,6 +401,8 @@ def to_dict(self) -> dict:
394401
# only set parent_id if this transaction isn't the root
395402
if self.trace_parent.span_id and self.trace_parent.span_id != self.id:
396403
result["parent_id"] = self.trace_parent.span_id
404+
if self.links:
405+
result["links"] = self.links
397406
# faas context belongs top-level on the transaction
398407
if "faas" in self.context:
399408
result["faas"] = self.context.pop("faas")
@@ -477,6 +486,7 @@ class Span(BaseSpan):
477486
"sync",
478487
"outcome",
479488
"_child_durations",
489+
"_cancelled",
480490
)
481491

482492
def __init__(
@@ -527,6 +537,7 @@ def __init__(
527537
self.action = span_action
528538
self.dist_tracing_propagated = False
529539
self.composite: Dict[str, Any] = {}
540+
self._cancelled: bool = False
530541
super(Span, self).__init__(labels=labels, start=start)
531542
self.timestamp = transaction.timestamp + (self.start_time - transaction.start_time)
532543
if self.transaction._breakdown:
@@ -564,6 +575,8 @@ def to_dict(self) -> dict:
564575
if self.context is None:
565576
self.context = {}
566577
self.context["tags"] = self.labels
578+
if self.links:
579+
result["links"] = self.links
567580
if self.context:
568581
self.autofill_resource_context()
569582
# otel attributes and spankind need to be top-level
@@ -666,6 +679,8 @@ def report(self) -> None:
666679
if self.discardable and self.duration < self.transaction.config_exit_span_min_duration:
667680
self.transaction.track_dropped_span(self)
668681
self.transaction.dropped_spans += 1
682+
elif self._cancelled:
683+
self.transaction._span_counter -= 1
669684
else:
670685
self.tracer.queue_func(SPAN, self.to_dict())
671686

@@ -757,6 +772,15 @@ def autofill_resource_context(self):
757772
if "type" not in self.context["destination"]["service"]:
758773
self.context["destination"]["service"]["type"] = ""
759774

775+
def cancel(self) -> None:
776+
"""
777+
Mark span as cancelled. Cancelled spans don't count towards started spans nor dropped spans.
778+
779+
No checks are made to ensure that spans which already propagated distributed context are not
780+
cancelled.
781+
"""
782+
self._cancelled = True
783+
760784
def __str__(self):
761785
return "{}/{}/{}".format(self.name, self.type, self.subtype)
762786

0 commit comments

Comments
 (0)