-
Notifications
You must be signed in to change notification settings - Fork 202
Expand file tree
/
Copy pathworker.py
More file actions
387 lines (305 loc) · 12.6 KB
/
Copy pathworker.py
File metadata and controls
387 lines (305 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
"""Worker using SDK Core. (unstable)
Nothing in this module should be considered stable. The API may change.
"""
from __future__ import annotations
from collections.abc import Awaitable, Callable, Sequence
from dataclasses import dataclass
from typing import (
TypeAlias,
)
import temporalio.bridge.client
import temporalio.bridge.proto
import temporalio.bridge.proto.activity_task
import temporalio.bridge.proto.nexus
import temporalio.bridge.proto.workflow_activation
import temporalio.bridge.proto.workflow_completion
import temporalio.bridge.runtime
import temporalio.bridge.temporal_sdk_bridge
import temporalio.converter
import temporalio.converter._extstore
from temporalio.api.common.v1.message_pb2 import Payload
from temporalio.bridge._visitor_functions import PayloadSequence, VisitorFunctions
from temporalio.bridge.temporal_sdk_bridge import (
CustomSlotSupplier as BridgeCustomSlotSupplier,
)
from temporalio.bridge.temporal_sdk_bridge import (
PollShutdownError, # type: ignore # noqa: F401
)
from temporalio.worker._command_aware_visitor import CommandAwarePayloadVisitor
@dataclass
class WorkerConfig:
"""Python representation of the Rust struct for configuring a worker."""
namespace: str
task_queue: str
versioning_strategy: WorkerVersioningStrategy
identity_override: str | None
max_cached_workflows: int
tuner: TunerHolder
workflow_task_poller_behavior: PollerBehavior
nonsticky_to_sticky_poll_ratio: float
activity_task_poller_behavior: PollerBehavior
no_remote_activities: bool
task_types: WorkerTaskTypes
sticky_queue_schedule_to_start_timeout_millis: int
max_heartbeat_throttle_interval_millis: int
default_heartbeat_throttle_interval_millis: int
max_activities_per_second: float | None
max_task_queue_activities_per_second: float | None
graceful_shutdown_period_millis: int
nondeterminism_as_workflow_fail: bool
nondeterminism_as_workflow_fail_for_types: set[str]
nexus_task_poller_behavior: PollerBehavior
plugins: Sequence[str]
storage_drivers: set[str]
@dataclass
class PollerBehaviorSimpleMaximum:
"""Python representation of the Rust struct for simple poller behavior."""
simple_maximum: int
@dataclass
class PollerBehaviorAutoscaling:
"""Python representation of the Rust struct for autoscaling poller behavior."""
minimum: int
maximum: int
initial: int
PollerBehavior: TypeAlias = PollerBehaviorSimpleMaximum | PollerBehaviorAutoscaling
@dataclass
class WorkerDeploymentVersion:
"""Python representation of the Rust struct for configuring a worker deployment version."""
deployment_name: str
build_id: str
@dataclass
class WorkerDeploymentOptions:
"""Python representation of the Rust struct for configuring a worker deployment options."""
version: WorkerDeploymentVersion
use_worker_versioning: bool
default_versioning_behavior: int
"""An enums.v1.VersioningBehavior as an int"""
@dataclass
class WorkerVersioningStrategyNone:
"""Python representation of the Rust struct for configuring a worker versioning strategy None."""
build_id_no_versioning: str
@dataclass
class WorkerVersioningStrategyLegacyBuildIdBased:
"""Python representation of the Rust struct for configuring a worker versioning strategy legacy Build ID-based."""
build_id_with_versioning: str
WorkerVersioningStrategy: TypeAlias = (
WorkerVersioningStrategyNone
| WorkerDeploymentOptions
| WorkerVersioningStrategyLegacyBuildIdBased
)
@dataclass
class ResourceBasedTunerConfig:
"""Python representation of the Rust struct for configuring a resource-based tuner."""
target_memory_usage: float
target_cpu_usage: float
@dataclass
class ResourceBasedSlotSupplier:
"""Python representation of the Rust struct for a resource-based slot supplier."""
minimum_slots: int
maximum_slots: int
ramp_throttle_ms: int
tuner_config: ResourceBasedTunerConfig
@dataclass(frozen=True)
class FixedSizeSlotSupplier:
"""Python representation of the Rust struct for a fixed-size slot supplier."""
num_slots: int
SlotSupplier: TypeAlias = (
FixedSizeSlotSupplier | ResourceBasedSlotSupplier | BridgeCustomSlotSupplier
)
@dataclass
class TunerHolder:
"""Python representation of the Rust struct for a tuner holder."""
workflow_slot_supplier: SlotSupplier
activity_slot_supplier: SlotSupplier
local_activity_slot_supplier: SlotSupplier
nexus_slot_supplier: SlotSupplier
@dataclass
class WorkerTaskTypes:
"""Python representation of the Rust struct for worker task types"""
enable_workflows: bool
enable_local_activities: bool
enable_remote_activities: bool
enable_nexus: bool
class Worker:
"""SDK Core worker."""
@staticmethod
def create(client: temporalio.bridge.client.Client, config: WorkerConfig) -> Worker:
"""Create a bridge worker from a bridge client."""
return Worker(
temporalio.bridge.temporal_sdk_bridge.new_worker(
client._runtime._ref, client._ref, config
)
)
@staticmethod
def for_replay(
runtime: temporalio.bridge.runtime.Runtime,
config: WorkerConfig,
) -> tuple[Worker, temporalio.bridge.temporal_sdk_bridge.HistoryPusher]:
"""Create a bridge replay worker."""
[
replay_worker,
pusher,
] = temporalio.bridge.temporal_sdk_bridge.new_replay_worker(
runtime._ref, config
)
return Worker(replay_worker), pusher
def __init__(self, ref: temporalio.bridge.temporal_sdk_bridge.WorkerRef) -> None:
"""Create SDK core worker from a bridge worker."""
self._ref = ref
async def validate(
self,
) -> temporalio.bridge.proto.NamespaceInfo:
"""Validate the bridge worker."""
return temporalio.bridge.proto.NamespaceInfo.FromString(
await self._ref.validate() # type: ignore[reportOptionalMemberAccess]
)
async def poll_workflow_activation(
self,
) -> temporalio.bridge.proto.workflow_activation.WorkflowActivation:
"""Poll for a workflow activation."""
return (
temporalio.bridge.proto.workflow_activation.WorkflowActivation.FromString(
await self._ref.poll_workflow_activation() # type: ignore[reportOptionalMemberAccess]
)
)
async def poll_activity_task(
self,
) -> temporalio.bridge.proto.activity_task.ActivityTask:
"""Poll for an activity task."""
return temporalio.bridge.proto.activity_task.ActivityTask.FromString(
await self._ref.poll_activity_task() # type: ignore[reportOptionalMemberAccess]
)
async def poll_nexus_task(
self,
) -> temporalio.bridge.proto.nexus.NexusTask:
"""Poll for a nexus task."""
return temporalio.bridge.proto.nexus.NexusTask.FromString(
await self._ref.poll_nexus_task() # type: ignore[reportOptionalMemberAccess]
)
async def complete_workflow_activation(
self,
comp: temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion,
) -> None:
"""Complete a workflow activation."""
await self._ref.complete_workflow_activation(comp.SerializeToString()) # type: ignore[reportOptionalMemberAccess]
async def complete_activity_task(
self, comp: temporalio.bridge.proto.ActivityTaskCompletion
) -> None:
"""Complete an activity task."""
await self._ref.complete_activity_task(comp.SerializeToString()) # type: ignore[reportOptionalMemberAccess]
async def complete_nexus_task(
self, comp: temporalio.bridge.proto.nexus.NexusTaskCompletion
) -> None:
"""Complete a nexus task."""
await self._ref.complete_nexus_task(comp.SerializeToString()) # type: ignore[reportOptionalMemberAccess]
def record_activity_heartbeat(
self, comp: temporalio.bridge.proto.ActivityHeartbeat
) -> None:
"""Record an activity heartbeat."""
self._ref.record_activity_heartbeat(comp.SerializeToString()) # type: ignore[reportOptionalMemberAccess]
def request_workflow_eviction(self, run_id: str) -> None:
"""Request a workflow be evicted."""
self._ref.request_workflow_eviction(run_id) # type: ignore[reportOptionalMemberAccess]
def replace_client(self, client: temporalio.bridge.client.Client) -> None:
"""Replace the worker client."""
self._ref.replace_client(client._ref) # type: ignore[reportOptionalMemberAccess]
def initiate_shutdown(self) -> None:
"""Start shutdown of the worker."""
self._ref.initiate_shutdown() # type: ignore[reportOptionalMemberAccess]
async def finalize_shutdown(self) -> None:
"""Finalize the worker.
This will fail if shutdown hasn't completed fully due to internal
reference count checks.
"""
ref = self._ref
self._ref = None
await ref.finalize_shutdown() # type: ignore[reportOptionalMemberAccess]
class _Visitor(VisitorFunctions):
def __init__(
self,
f: Callable[[Sequence[Payload]], Awaitable[list[Payload]]],
visit_system_nexus_envelope: Callable[[Payload], Awaitable[None]] | None = None,
):
self._f = f
self._visit_system_nexus_envelope = visit_system_nexus_envelope
async def visit_payload(self, payload: Payload) -> None:
new_payload = (await self._f([payload]))[0]
if new_payload is not payload:
payload.CopyFrom(new_payload)
async def visit_payloads(self, payloads: PayloadSequence) -> None:
if len(payloads) == 0:
return
new_payloads = await self._f(payloads)
if new_payloads is payloads:
return
del payloads[:]
payloads.extend(new_payloads)
async def visit_system_nexus_envelope(self, payload: Payload) -> None:
if self._visit_system_nexus_envelope is not None:
await self._visit_system_nexus_envelope(payload)
async def decode_activation(
activation: temporalio.bridge.proto.workflow_activation.WorkflowActivation,
data_converter: temporalio.converter.DataConverter,
decode_headers: bool,
storage_concurrency_limit: int,
) -> temporalio.converter._extstore.StorageOperationMetrics:
"""Decode all payloads in the activation.
Returns:
Metrics from any external storage retrieval operations that occurred.
"""
metrics = temporalio.converter._extstore.StorageOperationMetrics()
with metrics.track():
await CommandAwarePayloadVisitor(
skip_search_attributes=True,
skip_headers=not decode_headers,
concurrency_limit=storage_concurrency_limit,
).visit(
_Visitor(data_converter._external_retrieve_payload_sequence), activation
)
await CommandAwarePayloadVisitor(
skip_search_attributes=True,
skip_headers=not decode_headers,
).visit(_Visitor(data_converter._decode_payload_sequence), activation)
return metrics
async def encode_completion(
completion: temporalio.bridge.proto.workflow_completion.WorkflowActivationCompletion,
data_converter: temporalio.converter.DataConverter,
encode_headers: bool,
storage_concurrency_limit: int,
) -> temporalio.converter._extstore.StorageOperationMetrics:
"""Encode all payloads in the completion.
Returns:
Metrics from any external storage store operations that occurred.
"""
async def _validate_system_nexus_envelope(payload: Payload) -> None:
data_converter._validate_payload_limits([payload])
await CommandAwarePayloadVisitor(
skip_search_attributes=True,
skip_headers=not encode_headers,
).visit(
_Visitor(
data_converter._encode_payload_sequence,
visit_system_nexus_envelope=_validate_system_nexus_envelope,
),
completion,
)
async def _store_and_validate(
payloads: Sequence[Payload],
) -> list[Payload]:
stored = await data_converter._external_store_payload_sequence(payloads)
data_converter._validate_payload_limits(stored)
return stored
metrics = temporalio.converter._extstore.StorageOperationMetrics()
with metrics.track():
await CommandAwarePayloadVisitor(
skip_search_attributes=True,
skip_headers=not encode_headers,
concurrency_limit=storage_concurrency_limit,
).visit(
_Visitor(
_store_and_validate,
visit_system_nexus_envelope=_validate_system_nexus_envelope,
),
completion,
)
return metrics