@@ -68,6 +68,95 @@ def list_subscriptions_in_project(project_id: str) -> None:
6868 # [END pubsub_list_subscriptions]
6969
7070
71+ def pubsub_subscribe_otel_tracing (
72+ subscription_project_id : str ,
73+ cloud_trace_project_id : str ,
74+ subscription_id : str ,
75+ timeout : Optional [float ] = None ,
76+ ) -> None :
77+ """
78+ Subscribe to `subscription_id` in `subscription_project_id` with OpenTelemetry enabled.
79+ Export the OpenTelemetry traces to Google Cloud Trace in project
80+ `trace_project_id`
81+ Args:
82+ subscription_project_id: project ID of the subscription.
83+ cloud_trace_project_id: project ID to export Cloud Trace to.
84+ subscription_id: subscription ID to subscribe from.
85+ timeout: time until which to subscribe to.
86+ Returns:
87+ None
88+ """
89+ # [START pubsub_subscribe_otel_tracing]
90+ from opentelemetry import trace
91+ from opentelemetry .sdk .trace import TracerProvider
92+ from opentelemetry .sdk .trace .export import (
93+ BatchSpanProcessor ,
94+ )
95+ from opentelemetry .exporter .cloud_trace import CloudTraceSpanExporter
96+ from opentelemetry .sdk .trace .sampling import TraceIdRatioBased , ParentBased
97+
98+ from google .cloud import pubsub_v1
99+ from google .cloud .pubsub_v1 import SubscriberClient
100+ from google .cloud .pubsub_v1 .types import SubscriberOptions
101+
102+ # TODO(developer)
103+ # subscription_project_id = "your-subscription-project-id"
104+ # subscription_id = "your-subscription-id"
105+ # cloud_trace_project_id = "your-cloud-trace-project-id"
106+ # timeout = 300.0
107+
108+ # In this sample, we use a Google Cloud Trace to export the OpenTelemetry
109+ # traces: https://cloud.google.com/trace/docs/setup/python-ot
110+ # Choose and configure the exporter for your set up accordingly.
111+
112+ # Sample 1 in every 100 traces
113+ sampler = ParentBased (root = TraceIdRatioBased (1 / 100 ))
114+ trace .set_tracer_provider (TracerProvider (sampler = sampler ))
115+
116+ # Export to Google Trace
117+ cloud_trace_exporter = CloudTraceSpanExporter (
118+ project_id = cloud_trace_project_id ,
119+ )
120+ trace .get_tracer_provider ().add_span_processor (
121+ BatchSpanProcessor (cloud_trace_exporter )
122+ )
123+ # Set the `enable_open_telemetry_tracing` option to True when creating
124+ # the subscriber client. This in itself is necessary and sufficient for
125+ # the library to export OpenTelemetry traces. However, where the traces
126+ # must be exported to needs to be configured based on your OpenTelemetry
127+ # set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
128+ subscriber = SubscriberClient (
129+ subscriber_options = SubscriberOptions (enable_open_telemetry_tracing = True )
130+ )
131+
132+ # The `subscription_path` method creates a fully qualified identifier
133+ # in the form `projects/{project_id}/subscriptions/{subscription_id}`
134+ subscription_path = subscriber .subscription_path (
135+ subscription_project_id , subscription_id
136+ )
137+
138+ # Define callback to be called when a message is received.
139+ def callback (message : pubsub_v1 .subscriber .message .Message ) -> None :
140+ # Ack message after processing it.
141+ print (message .data )
142+ message .ack ()
143+
144+ # Wrap subscriber in a 'with' block to automatically call close() when done.
145+ with subscriber :
146+ try :
147+ # Optimistically subscribe to messages on the subscription.
148+ streaming_pull_future = subscriber .subscribe (
149+ subscription_path , callback = callback
150+ )
151+ streaming_pull_future .result (timeout = timeout )
152+ except TimeoutError :
153+ print ("Successfully subscribed until the timeout passed." )
154+ streaming_pull_future .cancel () # Trigger the shutdown.
155+ streaming_pull_future .result () # Block until the shutdown is complete.
156+
157+ # [END pubsub_subscribe_otel_tracing]
158+
159+
71160def create_subscription (project_id : str , topic_id : str , subscription_id : str ) -> None :
72161 """Create a new pull subscription on the given topic."""
73162 # [START pubsub_create_pull_subscription]
0 commit comments