forked from 1jehuang/jcode
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtask_control.rs
More file actions
88 lines (86 loc) · 3.1 KB
/
Copy pathtask_control.rs
File metadata and controls
88 lines (86 loc) · 3.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
#[tokio::test]
async fn task_control_wake_returns_structured_response_with_plan_summary() {
let (_env, _runtime) = RuntimeEnvGuard::new();
let swarm_id = "swarm-task-control";
let requester = "coord";
let worker = "worker";
let (client_tx, mut client_rx) = mpsc::unbounded_channel();
let worker_agent = test_agent().await;
let sessions = Arc::new(RwLock::new(HashMap::from([(
worker.to_string(),
worker_agent,
)])));
let soft_interrupt_queues = Arc::new(RwLock::new(HashMap::new()));
let client_connections = Arc::new(RwLock::new(HashMap::new()));
let swarm_members = Arc::new(RwLock::new(HashMap::from([
(requester.to_string(), {
let mut member = member(requester, swarm_id, "ready");
member.role = "coordinator".to_string();
member
}),
(worker.to_string(), member(worker, swarm_id, "ready")),
])));
let swarms_by_id = Arc::new(RwLock::new(HashMap::from([(
swarm_id.to_string(),
HashSet::from([requester.to_string(), worker.to_string()]),
)])));
let mut assigned = plan_item("active-task", "queued", "high", &[]);
assigned.assigned_to = Some(worker.to_string());
let swarm_plans = Arc::new(RwLock::new(HashMap::from([(
swarm_id.to_string(),
VersionedPlan {
items: vec![assigned, plan_item("next", "queued", "high", &[])],
version: 1,
participants: HashSet::from([requester.to_string(), worker.to_string()]),
task_progress: HashMap::new(),
},
)])));
let swarm_coordinators = Arc::new(RwLock::new(HashMap::from([(
swarm_id.to_string(),
requester.to_string(),
)])));
let event_history = Arc::new(RwLock::new(VecDeque::new()));
let event_counter = Arc::new(AtomicU64::new(1));
let (swarm_event_tx, _swarm_event_rx) = broadcast::channel(32);
let mutation_runtime = SwarmMutationRuntime::default();
handle_comm_task_control(
101,
requester.to_string(),
"wake".to_string(),
"active-task".to_string(),
Some(worker.to_string()),
Some("continue".to_string()),
&client_tx,
&sessions,
&soft_interrupt_queues,
&client_connections,
&swarm_members,
&swarms_by_id,
&swarm_plans,
&swarm_coordinators,
&event_history,
&event_counter,
&swarm_event_tx,
&mutation_runtime,
)
.await;
match client_rx.recv().await.expect("response") {
ServerEvent::CommTaskControlResponse {
id,
action,
task_id,
target_session,
status,
summary,
} => {
assert_eq!(id, 101);
assert_eq!(action, "wake");
assert_eq!(task_id, "active-task");
assert_eq!(target_session.as_deref(), Some(worker));
assert_eq!(status, "running");
assert_eq!(summary.item_count, 2);
assert!(summary.ready_ids.contains(&"next".to_string()));
}
other => panic!("expected CommTaskControlResponse, got {other:?}"),
}
}