-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathaxon.py
More file actions
91 lines (70 loc) · 3.33 KB
/
Copy pathaxon.py
File metadata and controls
91 lines (70 loc) · 3.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
"""Axon resource class for synchronous operations."""
from __future__ import annotations
from typing_extensions import Unpack, override
from ._types import (
BaseRequestOptions,
SDKAxonPublishParams,
SDKAxonSqlBatchParams,
SDKAxonSqlQueryParams,
)
from .._client import Runloop
from .._streaming import Stream
from ..types.axon_view import AxonView
from ..types.axon_event_view import AxonEventView
from ..types.publish_result_view import PublishResultView
from ..types.axons.sql_batch_result_view import SqlBatchResultView
from ..types.axons.sql_query_result_view import SqlQueryResultView
class AxonSqlOps:
"""[Beta] SQL operations for an axon's SQLite database.
Access via ``axon.sql``.
Example:
>>> axon = runloop.axon.create()
>>> axon.sql.query(sql="CREATE TABLE tasks (id INTEGER PRIMARY KEY, name TEXT)")
>>> result = axon.sql.query(sql="SELECT * FROM tasks WHERE id = ?", params=[1])
"""
def __init__(self, client: Runloop, axon_id: str) -> None:
self._client = client
self._axon_id = axon_id
def query(self, **params: Unpack[SDKAxonSqlQueryParams]) -> SqlQueryResultView:
"""[Beta] Execute a single parameterized SQL statement against this axon's SQLite database."""
return self._client.axons.sql.query(self._axon_id, **params)
def batch(self, **params: Unpack[SDKAxonSqlBatchParams]) -> SqlBatchResultView:
"""[Beta] Execute multiple SQL statements atomically within a single transaction."""
return self._client.axons.sql.batch(self._axon_id, **params)
class Axon:
"""[Beta] Wrapper around synchronous axon operations.
Axons are event communication channels that support publishing events,
subscribing to event streams via server-sent events (SSE), and executing
SQL queries against an embedded SQLite database.
Obtain instances via ``runloop.axon.create()`` or ``runloop.axon.from_id()``.
Example:
>>> runloop = RunloopSDK()
>>> axon = runloop.axon.create()
>>> axon.publish(event_type="task_done", origin="AGENT_EVENT", payload="{}", source="my-agent")
>>> with axon.subscribe_sse() as stream:
... for event in stream:
... print(event.event_type, event.payload)
>>> axon.sql.query(sql="CREATE TABLE tasks (id INTEGER PRIMARY KEY, name TEXT)")
"""
def __init__(self, client: Runloop, axon_id: str) -> None:
self._client = client
self._id = axon_id
self._sql = AxonSqlOps(client, axon_id)
@property
def sql(self) -> AxonSqlOps:
return self._sql
@override
def __repr__(self) -> str:
return f"<Axon id={self._id!r}>"
@property
def id(self) -> str:
return self._id
def get_info(self, **options: Unpack[BaseRequestOptions]) -> AxonView:
"""[Beta] Retrieve the latest axon information."""
return self._client.axons.retrieve(self._id, **options)
def publish(self, **params: Unpack[SDKAxonPublishParams]) -> PublishResultView:
"""[Beta] Publish an event to this axon."""
return self._client.axons.publish(self._id, **params)
def subscribe_sse(self, **options: Unpack[BaseRequestOptions]) -> Stream[AxonEventView]:
"""[Beta] Subscribe to this axon's event stream via SSE."""
return self._client.axons.subscribe_sse(self._id, **options)