Skip to content
This repository was archived by the owner on Mar 9, 2026. It is now read-only.

Commit d808b42

Browse files
Add Subscribe Side OpenTelemetry sample
1 parent 4be04ac commit d808b42

1 file changed

Lines changed: 89 additions & 0 deletions

File tree

samples/snippets/subscriber.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,95 @@ def list_subscriptions_in_project(project_id: str) -> None:
6868
# [END pubsub_list_subscriptions]
6969

7070

71+
def pubsub_subscribe_otel_tracing(
72+
subscription_project_id: str,
73+
cloud_trace_project_id: str,
74+
subscription_id: str,
75+
timeout: Optional[float] = None,
76+
) -> None:
77+
"""
78+
Subscribe to `subscription_id` in `subscription_project_id` with OpenTelemetry enabled.
79+
Export the OpenTelemetry traces to Google Cloud Trace in project
80+
`trace_project_id`
81+
Args:
82+
subscription_project_id: project ID of the subscription.
83+
cloud_trace_project_id: project ID to export Cloud Trace to.
84+
subscription_id: subscription ID to subscribe from.
85+
timeout: time until which to subscribe to.
86+
Returns:
87+
None
88+
"""
89+
# [START pubsub_subscribe_otel_tracing]
90+
from opentelemetry import trace
91+
from opentelemetry.sdk.trace import TracerProvider
92+
from opentelemetry.sdk.trace.export import (
93+
BatchSpanProcessor,
94+
)
95+
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter
96+
from opentelemetry.sdk.trace.sampling import TraceIdRatioBased, ParentBased
97+
98+
from google.cloud import pubsub_v1
99+
from google.cloud.pubsub_v1 import SubscriberClient
100+
from google.cloud.pubsub_v1.types import SubscriberOptions
101+
102+
# TODO(developer)
103+
# subscription_project_id = "your-subscription-project-id"
104+
# subscription_id = "your-subscription-id"
105+
# cloud_trace_project_id = "your-cloud-trace-project-id"
106+
# timeout = 300.0
107+
108+
# In this sample, we use a Google Cloud Trace to export the OpenTelemetry
109+
# traces: https://cloud.google.com/trace/docs/setup/python-ot
110+
# Choose and configure the exporter for your set up accordingly.
111+
112+
# Sample 1 in every 100 traces
113+
sampler = ParentBased(root=TraceIdRatioBased(1 / 100))
114+
trace.set_tracer_provider(TracerProvider(sampler=sampler))
115+
116+
# Export to Google Trace
117+
cloud_trace_exporter = CloudTraceSpanExporter(
118+
project_id=cloud_trace_project_id,
119+
)
120+
trace.get_tracer_provider().add_span_processor(
121+
BatchSpanProcessor(cloud_trace_exporter)
122+
)
123+
# Set the `enable_open_telemetry_tracing` option to True when creating
124+
# the subscriber client. This in itself is necessary and sufficient for
125+
# the library to export OpenTelemetry traces. However, where the traces
126+
# must be exported to needs to be configured based on your OpenTelemetry
127+
# set up. Refer: https://opentelemetry.io/docs/languages/python/exporters/
128+
subscriber = SubscriberClient(
129+
subscriber_options=SubscriberOptions(enable_open_telemetry_tracing=True)
130+
)
131+
132+
# The `subscription_path` method creates a fully qualified identifier
133+
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
134+
subscription_path = subscriber.subscription_path(
135+
subscription_project_id, subscription_id
136+
)
137+
138+
# Define callback to be called when a message is received.
139+
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
140+
# Ack message after processing it.
141+
print(message.data)
142+
message.ack()
143+
144+
# Wrap subscriber in a 'with' block to automatically call close() when done.
145+
with subscriber:
146+
try:
147+
# Optimistically subscribe to messages on the subscription.
148+
streaming_pull_future = subscriber.subscribe(
149+
subscription_path, callback=callback
150+
)
151+
streaming_pull_future.result(timeout=timeout)
152+
except TimeoutError:
153+
print("Successfully subscribed until the timeout passed.")
154+
streaming_pull_future.cancel() # Trigger the shutdown.
155+
streaming_pull_future.result() # Block until the shutdown is complete.
156+
157+
# [END pubsub_subscribe_otel_tracing]
158+
159+
71160
def create_subscription(project_id: str, topic_id: str, subscription_id: str) -> None:
72161
"""Create a new pull subscription on the given topic."""
73162
# [START pubsub_create_pull_subscription]

0 commit comments

Comments
 (0)