-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Expand file tree
/
Copy pathtypes.ts
More file actions
248 lines (231 loc) · 7.42 KB
/
Copy pathtypes.ts
File metadata and controls
248 lines (231 loc) · 7.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
import type { Edge } from 'reactflow'
import type { AsyncExecutionCorrelation } from '@/lib/core/async-jobs/types'
import type { NodeMetadata } from '@/executor/dag/types'
import type {
BlockLog,
BlockState,
NormalizedBlockOutput,
StreamingExecution,
} from '@/executor/types'
import type { RunFromBlockContext } from '@/executor/utils/run-from-block'
import type { SubflowType } from '@/stores/workflows/workflow/types'
export interface ExecutionMetadata {
requestId: string
executionId: string
workflowId: string
workspaceId: string
userId: string
sessionUserId?: string
workflowUserId?: string
triggerType: string
triggerBlockId?: string
useDraftState: boolean
startTime: string
isClientSession?: boolean
enforceCredentialAccess?: boolean
pendingBlocks?: string[]
resumeFromSnapshot?: boolean
resumeTerminalNoop?: boolean
credentialAccountUserId?: string
workflowStateOverride?: {
blocks: Record<string, any>
edges: Edge[]
loops?: Record<string, any>
parallels?: Record<string, any>
deploymentVersionId?: string
}
largeValueExecutionIds?: string[]
allowLargeValueWorkflowScope?: boolean
callChain?: string[]
correlation?: AsyncExecutionCorrelation
executionMode?: 'sync' | 'stream' | 'async'
}
export interface SerializableExecutionState {
blockStates: Record<string, BlockState>
executedBlocks: string[]
blockLogs: BlockLog[]
decisions: {
router: Record<string, string>
condition: Record<string, string>
}
completedLoops: string[]
loopExecutions?: Record<string, any>
parallelExecutions?: Record<string, any>
parallelBlockMapping?: Record<string, any>
activeExecutionPath: string[]
pendingQueue?: string[]
remainingEdges?: Edge[]
resumeTerminalNoop?: boolean
dagIncomingEdges?: Record<string, string[]>
completedPauseContexts?: string[]
}
/**
* Represents the iteration state of an ancestor subflow in a nested chain.
* Used to propagate parent iteration context through SSE events for both
* loop-in-loop and parallel-in-parallel nesting hierarchies.
*/
export interface ParentIteration {
iterationCurrent: number
iterationTotal?: number
iterationType: SubflowType
iterationContainerId: string
}
export interface IterationContext {
iterationCurrent: number
iterationTotal?: number
iterationType: SubflowType
/**
* Block ID of the loop or parallel container owning this iteration.
* Optional because generic `<loop.index>` references may resolve before
* the container ID is known (e.g., via `context.loopScope` fallback).
* Always present on {@link ParentIteration} entries since those are built
* from fully resolved ancestor loops.
*/
iterationContainerId?: string
parentIterations?: ParentIteration[]
}
/**
* Metadata passed to block handlers that execute within subflow contexts
* (loops, parallels, child workflows). Extends the DAG node metadata with
* runtime identifiers needed for execution tracking.
*/
export interface WorkflowNodeMetadata
extends Pick<
NodeMetadata,
'loopId' | 'parallelId' | 'branchIndex' | 'branchTotal' | 'originalBlockId' | 'isLoopNode'
> {
nodeId: string
executionOrder?: number
}
export interface ChildWorkflowContext {
/** The workflow block's ID in the parent execution */
parentBlockId: string
/** Display name of the child workflow */
workflowName: string
/** Child workflow ID */
workflowId: string
/** Nesting depth (1 = first level child) */
depth: number
}
export interface ExecutionCallbacks {
onStream?: (streamingExec: StreamingExecution) => Promise<void>
onBlockStart?: (
blockId: string,
blockName: string,
blockType: string,
executionOrder: number,
iterationContext?: IterationContext,
childWorkflowContext?: ChildWorkflowContext
) => Promise<void>
onBlockComplete?: (
blockId: string,
blockName: string,
blockType: string,
output: any,
iterationContext?: IterationContext,
childWorkflowContext?: ChildWorkflowContext
) => Promise<void>
/** Fires immediately after instanceId is generated, before child execution begins. */
onChildWorkflowInstanceReady?: (
blockId: string,
childWorkflowInstanceId: string,
iterationContext?: IterationContext,
executionOrder?: number,
childWorkflowContext?: ChildWorkflowContext
) => Promise<void>
}
export interface ContextExtensions {
workspaceId?: string
executionId?: string
largeValueExecutionIds?: string[]
allowLargeValueWorkflowScope?: boolean
userId?: string
stream?: boolean
selectedOutputs?: string[]
edges?: Array<{ source: string; target: string }>
isDeployedContext?: boolean
enforceCredentialAccess?: boolean
isChildExecution?: boolean
resumeFromSnapshot?: boolean
resumePendingQueue?: string[]
remainingEdges?: Array<{
source: string
target: string
sourceHandle?: string
targetHandle?: string
}>
dagIncomingEdges?: Record<string, string[]>
snapshotState?: SerializableExecutionState
metadata?: ExecutionMetadata
/**
* AbortSignal for cancellation support.
* When aborted, the execution should stop gracefully.
*/
abortSignal?: AbortSignal
includeFileBase64?: boolean
base64MaxBytes?: number
onStream?: (streamingExecution: StreamingExecution) => Promise<void>
onBlockStart?: (
blockId: string,
blockName: string,
blockType: string,
executionOrder: number,
iterationContext?: IterationContext,
childWorkflowContext?: ChildWorkflowContext
) => Promise<void>
onBlockComplete?: (
blockId: string,
blockName: string,
blockType: string,
output: {
input?: any
output: NormalizedBlockOutput
executionTime: number
startedAt: string
executionOrder: number
endedAt: string
/** Per-invocation unique ID linking this workflow block execution to its child block events. */
childWorkflowInstanceId?: string
},
iterationContext?: IterationContext,
childWorkflowContext?: ChildWorkflowContext
) => Promise<void>
/** Context identifying this execution as a child of a workflow block */
childWorkflowContext?: ChildWorkflowContext
/** Fires immediately after instanceId is generated, before child execution begins. */
onChildWorkflowInstanceReady?: (
blockId: string,
childWorkflowInstanceId: string,
iterationContext?: IterationContext,
executionOrder?: number,
childWorkflowContext?: ChildWorkflowContext
) => Promise<void>
/**
* Run-from-block configuration. When provided, executor runs in partial
* execution mode starting from the specified block.
*/
runFromBlockContext?: RunFromBlockContext
/**
* Stop execution after this block completes. Used for "run until block" feature.
*/
stopAfterBlockId?: string
/**
* Ordered list of workflow IDs in the current call chain, used for cycle detection.
* Each hop appends the current workflow ID before making outgoing requests.
*/
callChain?: string[]
}
export interface WorkflowInput {
[key: string]: unknown
}
interface BlockStateReader {
getBlockOutput(blockId: string, currentNodeId?: string): NormalizedBlockOutput | undefined
hasExecuted(blockId: string): boolean
}
export interface BlockStateWriter {
setBlockOutput(blockId: string, output: NormalizedBlockOutput, executionTime?: number): void
setBlockState(blockId: string, state: BlockState): void
deleteBlockState(blockId: string): void
unmarkExecuted(blockId: string): void
}
export type BlockStateController = BlockStateReader & BlockStateWriter