@@ -170,6 +170,83 @@ def delete_topic(project_id: str, topic_id: str) -> None:
170170 # [END pubsub_delete_topic]
171171
172172
173+ def pubsub_publish_otel_tracing (
174+ topic_project_id : str , trace_project_id : str , topic_id : str
175+ ) -> None :
176+ """
177+ Publish to `topic_id` in `topic_project_id` with OpenTelemetry enabled.
178+ Export the OpenTelemetry traces to Google Cloud Trace in project
179+ `trace_project_id`
180+
181+ Args:
182+ topic_project_id: project ID of the topic to publish to.
183+ trace_project_id: project ID to export Cloud Trace to.
184+ topic_id: topic ID to publish to.
185+
186+ Returns:
187+ None
188+ """
189+ # [START pubsub_publish_otel_tracing]
190+
191+ from opentelemetry import trace
192+ from opentelemetry .sdk .trace import TracerProvider
193+ from opentelemetry .sdk .trace .export import (
194+ BatchSpanProcessor ,
195+ )
196+ from opentelemetry .exporter .cloud_trace import CloudTraceSpanExporter
197+ from opentelemetry .sdk .trace .sampling import TraceIdRatioBased , ParentBased
198+
199+ from google .cloud .pubsub_v1 import PublisherClient
200+ from google .cloud .pubsub_v1 .types import PublisherOptions
201+
202+ # TODO(developer)
203+ # topic_project_id = "your-topic-project-id"
204+ # trace_project_id = "your-trace-project-id"
205+ # topic_id = "your-topic-id"
206+
207+ # In this sample, we use a Google Cloud Trace to export the OpenTelemetry
208+ # traces: https://cloud.google.com/trace/docs/setup/python-ot
209+ # Choose and configure the exporter for your set up accordingly.
210+
211+ sampler = ParentBased (root = TraceIdRatioBased (1 ))
212+ trace .set_tracer_provider (TracerProvider (sampler = sampler ))
213+
214+ # Export to Google Trace.
215+ cloud_trace_exporter = CloudTraceSpanExporter (
216+ project_id = trace_project_id ,
217+ )
218+ trace .get_tracer_provider ().add_span_processor (
219+ BatchSpanProcessor (cloud_trace_exporter )
220+ )
221+
222+ # Set the `enable_open_telemetry_tracing` option to True when creating
223+ # the publisher client. This in itself is necessary and sufficient for
224+ # the library to export OpenTelemetry traces. However, where the traces
225+ # must be exported to needs to be configured based on your OpenTelemetry
226+ # set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
227+ publisher = PublisherClient (
228+ publisher_options = PublisherOptions (
229+ enable_open_telemetry_tracing = True ,
230+ ),
231+ )
232+
233+ # The `topic_path` method creates a fully qualified identifier
234+ # in the form `projects/{project_id}/topics/{topic_id}`
235+ topic_path = publisher .topic_path (topic_project_id , topic_id )
236+ # Publish messages.
237+ for n in range (1 , 10 ):
238+ data_str = f"Message number { n } "
239+ # Data must be a bytestring
240+ data = data_str .encode ("utf-8" )
241+ # When you publish a message, the client returns a future.
242+ future = publisher .publish (topic_path , data )
243+ print (future .result ())
244+
245+ print (f"Published messages to { topic_path } ." )
246+
247+ # [END pubsub_publish_otel_tracing]
248+
249+
173250def publish_messages (project_id : str , topic_id : str ) -> None :
174251 """Publishes multiple messages to a Pub/Sub topic."""
175252 # [START pubsub_quickstart_publisher]
@@ -522,6 +599,13 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
522599 create_parser = subparsers .add_parser ("create" , help = create_topic .__doc__ )
523600 create_parser .add_argument ("topic_id" )
524601
602+ pubsub_publish_otel_tracing_parser = subparsers .add_parser (
603+ "pubsub-publish-otel-tracing" , help = pubsub_publish_otel_tracing .__doc__
604+ )
605+ pubsub_publish_otel_tracing_parser .add_argument ("topic_project_id" )
606+ pubsub_publish_otel_tracing_parser .add_argument ("trace_project_id" )
607+ pubsub_publish_otel_tracing_parser .add_argument ("topic_id" )
608+
525609 create_topic_with_kinesis_ingestion_parser = subparsers .add_parser (
526610 "create_kinesis_ingestion" , help = create_topic_with_kinesis_ingestion .__doc__
527611 )
@@ -638,3 +722,7 @@ def detach_subscription(project_id: str, subscription_id: str) -> None:
638722 resume_publish_with_ordering_keys (args .project_id , args .topic_id )
639723 elif args .command == "detach-subscription" :
640724 detach_subscription (args .project_id , args .subscription_id )
725+ elif args .command == "pubsub-publish-otel-tracing" :
726+ pubsub_publish_otel_tracing (
727+ args .topic_project_id , args .trace_project_id , args .topic_id
728+ )
0 commit comments