-
Notifications
You must be signed in to change notification settings - Fork 16
Expand file tree
/
Copy pathStackFlow.h
More file actions
444 lines (409 loc) · 16.5 KB
/
Copy pathStackFlow.h
File metadata and controls
444 lines (409 loc) · 16.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
/*
* SPDX-FileCopyrightText: 2024 M5Stack Technology CO LTD
*
* SPDX-License-Identifier: MIT
*/
#pragma once
// #define __cplusplus 1
#include <semaphore.h>
#include <unistd.h>
// #define CONFIG_SUPPORTTHREADSAFE 0
#include <string>
#include <list>
#include <functional>
#include <unordered_map>
#include <mutex>
#include <eventpp/eventqueue.h>
#include <thread>
#include <memory>
#include "json.hpp"
#include <regex>
#include "pzmq.hpp"
#include "StackFlowUtil.h"
namespace StackFlows {
template <typename T>
class ThreadSafeWrapper {
public:
template <typename... Args>
ThreadSafeWrapper(Args &&...args) : object(std::make_unique<T>(std::forward<Args>(args)...))
{
}
template <typename Func>
auto access(Func func) -> decltype(func(std::declval<T &>()))
{
std::lock_guard<std::mutex> lock(mutex);
return func(*object);
}
ThreadSafeWrapper &operator=(const T &newValue)
{
std::lock_guard<std::mutex> lock(mutex);
*object = newValue;
return *this;
}
ThreadSafeWrapper &operator=(T &&newValue)
{
std::lock_guard<std::mutex> lock(mutex);
*object = std::move(newValue);
return *this;
}
private:
std::unique_ptr<T> object;
mutable std::mutex mutex;
};
#define LLM_NO_ERROR std::string("")
#define LLM_NONE std::string("None")
class llm_channel_obj {
private:
std::unordered_map<int, std::shared_ptr<pzmq>> zmq_;
std::atomic<int> zmq_url_index_;
std::unordered_map<std::string, int> zmq_url_map_;
public:
std::string unit_name_;
bool enoutput_;
bool enstream_;
std::string request_id_;
std::string work_id_;
std::string inference_url_;
std::string output_url_;
static std::string uart_push_url;
std::string publisher_url;
llm_channel_obj(const std::string &_publisher_url, const std::string &inference_url, const std::string &unit_name);
~llm_channel_obj();
inline void set_output(bool flage)
{
enoutput_ = flage;
}
inline bool get_output()
{
return enoutput_;
}
inline void set_stream(bool flage)
{
enstream_ = flage;
}
inline bool get_stream()
{
return enstream_;
}
void subscriber_event_call(const std::function<void(const std::string &, const std::string &)> &call, pzmq *_pzmq,
const std::shared_ptr<pzmq_data> &raw);
int subscriber_work_id(const std::string &work_id,
const std::function<void(const std::string &, const std::string &)> &call);
void stop_subscriber_work_id(const std::string &work_id);
void subscriber(const std::string &zmq_url, const pzmq::msg_callback_fun &call);
void stop_subscriber(const std::string &zmq_url);
int check_zmq_errno(void *ctx, void *com, int code);
int send_raw_to_pub(const std::string &raw);
int send_raw_to_usr(const std::string &raw);
template <typename T, typename U>
int output_data(const std::string &object, const T &data, const U &error_msg)
{
return output_data(request_id_, work_id_, object, data, error_msg);
}
void set_push_url(const std::string &url);
void cear_push_url();
static int output_to_uart(const std::string &data);
static int send_raw_for_url(const std::string &zmq_url, const std::string &raw);
template <typename T, typename U>
static int output_data_for_url(const std::string &zmq_url, const std::string &request_id,
const std::string &work_id, const std::string &object, const U &data,
const T &error_msg, bool outuart = false)
{
nlohmann::json out_body;
out_body["request_id"] = request_id;
out_body["work_id"] = work_id;
out_body["created"] = time(NULL);
out_body["object"] = object;
out_body["data"] = data;
if (error_msg.empty()) {
out_body["error"]["code"] = 0;
out_body["error"]["message"] = "";
} else
out_body["error"] = error_msg;
std::string out = out_body.dump();
out += "\n";
if (outuart)
return output_to_uart(out);
else
return send_raw_for_url(zmq_url, out);
}
template <typename T, typename U>
int send(const std::string &object, const U &data, const T &error_msg, const std::string &work_id = "",
bool outuart = false)
{
nlohmann::json out_body;
out_body["request_id"] = request_id_;
out_body["work_id"] = work_id.empty() ? work_id_ : work_id;
out_body["created"] = time(NULL);
out_body["object"] = object;
out_body["data"] = data;
if (error_msg.empty()) {
out_body["error"]["code"] = 0;
out_body["error"]["message"] = "";
} else
out_body["error"] = error_msg;
std::string out = out_body.dump();
out += "\n";
send_raw_to_pub(out);
if (enoutput_) return send_raw_to_usr(out);
return 0;
}
template <typename T, typename U>
int output_data(const std::string &request_id, const std::string &work_id, const std::string &object, const T &data,
const U &error_msg)
{
nlohmann::json out_body;
out_body["request_id"] = request_id;
out_body["work_id"] = work_id;
out_body["created"] = time(NULL);
out_body["object"] = object;
out_body["data"] = data;
if (error_msg.empty()) {
out_body["error"]["code"] = 0;
out_body["error"]["message"] = "";
} else
out_body["error"] = error_msg;
std::string out = out_body.dump();
out += "\n";
send_raw_to_pub(out);
if (enoutput_) return send_raw_to_usr(out);
return 0;
}
};
class stackflow_data {
public:
stackflow_data()
{
}
stackflow_data(const std::string &_data1)
{
str_data[0] = _data1;
}
stackflow_data(const std::string &_data1, const std::string &_data2)
{
str_data[0] = _data1;
str_data[1] = _data2;
}
stackflow_data(const std::string &_data1, int _data2)
{
str_data[0] = _data1;
int_data[0] = _data2;
}
std::string string(int index = 0)
{
return str_data[index];
}
int integer(int index = 0)
{
return int_data[index];
}
std::string str_data[2];
int int_data[2];
};
class StackFlow {
private:
std::atomic_int work_id_num_cout_;
protected:
std::string unit_name_;
typedef enum {
EVENT_NONE = 0,
EVENT_SETUP,
EVENT_EXIT,
EVENT_PAUSE,
EVENT_WORK,
EVENT_STOP,
EVENT_LINK,
EVENT_UNLINK,
EVENT_TASKINFO,
EVENT_SWITCH_INPUT,
EVENT_SYS_INIT,
EVENT_CREAT_CHANNEL,
EVENT_GET_CHANNL,
EVENT_REPEAT_EVENT,
EVENT_EXPORT,
} local_event_t;
eventpp::EventQueue<int, void(const std::shared_ptr<void> &)> event_queue_;
std::unique_ptr<std::thread> even_loop_thread_;
std::unique_ptr<pzmq> rpc_ctx_;
std::atomic<int> status_;
std::unordered_map<int, std::shared_ptr<llm_channel_obj>> llm_task_channel_;
std::unordered_map<std::string, std::function<int(void)>> repeat_callback_fun_;
std::mutex repeat_callback_fun_mutex_;
void _repeat_loop(const std::shared_ptr<void> &arg);
public:
std::string request_id_;
std::string out_zmq_url_;
std::function<void(const std::string &, const std::string &, const std::string &)> _link_;
std::function<void(const std::string &, const std::string &, const std::string &)> _unlink_;
std::function<int(const std::string &, const std::string &, const std::string &)> _setup_;
std::function<int(const std::string &, const std::string &, const std::string &)> _exit_;
std::function<void(const std::string &, const std::string &, const std::string &)> _pause_;
std::function<void(const std::string &, const std::string &, const std::string &)> _work_;
std::function<void(const std::string &, const std::string &, const std::string &)> _taskinfo_;
std::function<void(const std::string &, const std::string &, const std::string &)> _get_channl_;
std::atomic<bool> exit_flage_;
StackFlow(const std::string &unit_name);
void even_loop();
void _none_event(const std::shared_ptr<void> &arg);
template <typename T>
std::shared_ptr<llm_channel_obj> get_channel(T workid)
{
int _work_id_num;
if constexpr (std::is_same<T, int>::value) {
_work_id_num = workid;
} else if constexpr (std::is_same<T, std::string>::value) {
_work_id_num = sample_get_work_id_num(workid);
} else {
return nullptr;
}
return llm_task_channel_.at(_work_id_num);
}
std::string _rpc_setup(pzmq *_pzmq, const std::shared_ptr<pzmq_data> &data);
void _setup(const std::shared_ptr<void> &arg)
{
std::shared_ptr<stackflow_data> originalPtr = std::static_pointer_cast<stackflow_data>(arg);
std::string zmq_url = originalPtr->string(0);
std::string data = originalPtr->string(1);
// printf("void _setup run \n");
request_id_ = sample_json_str_get(data, "request_id");
out_zmq_url_ = zmq_url;
if (status_.load()) setup(zmq_url, data);
};
virtual int setup(const std::string &zmq_url, const std::string &raw);
virtual int setup(const std::string &work_id, const std::string &object, const std::string &data);
std::string _rpc_link(pzmq *_pzmq, const std::shared_ptr<pzmq_data> &data);
void _link(const std::shared_ptr<void> &arg)
{
std::shared_ptr<stackflow_data> originalPtr = std::static_pointer_cast<stackflow_data>(arg);
std::string zmq_url = originalPtr->string(0);
std::string data = originalPtr->string(1);
// printf("void _link run \n");
request_id_ = sample_json_str_get(data, "request_id");
out_zmq_url_ = zmq_url;
if (status_.load()) link(zmq_url, data);
};
virtual void link(const std::string &zmq_url, const std::string &raw);
virtual void link(const std::string &work_id, const std::string &object, const std::string &data);
std::string _rpc_unlink(pzmq *_pzmq, const std::shared_ptr<pzmq_data> &data);
void _unlink(const std::shared_ptr<void> &arg)
{
std::shared_ptr<stackflow_data> originalPtr = std::static_pointer_cast<stackflow_data>(arg);
std::string zmq_url = originalPtr->string(0);
std::string data = originalPtr->string(1);
// printf("void _unlink run \n");
request_id_ = sample_json_str_get(data, "request_id");
out_zmq_url_ = zmq_url;
if (status_.load()) unlink(zmq_url, data);
};
virtual void unlink(const std::string &zmq_url, const std::string &raw);
virtual void unlink(const std::string &work_id, const std::string &object, const std::string &data);
std::string _rpc_exit(pzmq *_pzmq, const std::shared_ptr<pzmq_data> &data);
void _exit(const std::shared_ptr<void> &arg)
{
std::shared_ptr<stackflow_data> originalPtr = std::static_pointer_cast<stackflow_data>(arg);
std::string zmq_url = originalPtr->string(0);
std::string data = originalPtr->string(1);
request_id_ = sample_json_str_get(data, "request_id");
out_zmq_url_ = zmq_url;
if (status_.load()) exit(zmq_url, data);
}
virtual int exit(const std::string &zmq_url, const std::string &raw);
virtual int exit(const std::string &work_id, const std::string &object, const std::string &data);
std::string _rpc_work(pzmq *_pzmq, const std::shared_ptr<pzmq_data> &data);
void _work(const std::shared_ptr<void> &arg)
{
std::shared_ptr<stackflow_data> originalPtr = std::static_pointer_cast<stackflow_data>(arg);
std::string zmq_url = originalPtr->string(0);
std::string data = originalPtr->string(1);
request_id_ = sample_json_str_get(data, "request_id");
out_zmq_url_ = zmq_url;
if (status_.load()) work(zmq_url, data);
}
virtual void work(const std::string &zmq_url, const std::string &raw);
virtual void work(const std::string &work_id, const std::string &object, const std::string &data);
std::string _rpc_pause(pzmq *_pzmq, const std::shared_ptr<pzmq_data> &data);
void _pause(const std::shared_ptr<void> &arg)
{
std::shared_ptr<stackflow_data> originalPtr = std::static_pointer_cast<stackflow_data>(arg);
std::string zmq_url = originalPtr->string(0);
std::string data = originalPtr->string(1);
request_id_ = sample_json_str_get(data, "request_id");
out_zmq_url_ = zmq_url;
if (status_.load()) pause(zmq_url, data);
}
virtual void pause(const std::string &zmq_url, const std::string &raw);
virtual void pause(const std::string &work_id, const std::string &object, const std::string &data);
std::string _rpc_taskinfo(pzmq *_pzmq, const std::shared_ptr<pzmq_data> &data);
void _taskinfo(const std::shared_ptr<void> &arg)
{
std::shared_ptr<stackflow_data> originalPtr = std::static_pointer_cast<stackflow_data>(arg);
std::string zmq_url = originalPtr->string(0);
std::string data = originalPtr->string(1);
request_id_ = sample_json_str_get(data, "request_id");
out_zmq_url_ = zmq_url;
if (status_.load()) taskinfo(zmq_url, data);
}
virtual void taskinfo(const std::string &zmq_url, const std::string &raw);
virtual void taskinfo(const std::string &work_id, const std::string &object, const std::string &data);
void _sys_init(const std::shared_ptr<void> &arg);
void user_output(const std::string &zmq_url, const std::string &request_id, const std::string &data);
template <typename T, typename U>
int send(const std::string &object, const U &data, const T &error_msg, const std::string &work_id,
const std::string &zmq_url = "")
{
nlohmann::json out_body;
out_body["request_id"] = request_id_;
out_body["work_id"] = work_id;
out_body["created"] = time(NULL);
out_body["object"] = object;
out_body["data"] = data;
if (error_msg.empty()) {
out_body["error"]["code"] = 0;
out_body["error"]["message"] = "";
} else
out_body["error"] = error_msg;
if (zmq_url.empty()) {
pzmq _zmq(out_zmq_url_, ZMQ_PUSH);
std::string out = out_body.dump();
out += "\n";
return _zmq.send_data(out);
} else {
pzmq _zmq(zmq_url, ZMQ_PUSH);
std::string out = out_body.dump();
out += "\n";
return _zmq.send_data(out);
}
}
void llm_firework_exit()
{
}
std::string sys_sql_select(const std::string &key);
void sys_sql_set(const std::string &key, const std::string &val);
void sys_sql_unset(const std::string &key);
int sys_register_unit(const std::string &unit_name);
template <typename T>
bool sys_release_unit(T workid)
{
std::string _work_id;
int _work_id_num;
if constexpr (std::is_same<T, int>::value) {
_work_id = sample_get_work_id(workid, unit_name_);
_work_id_num = workid;
} else if constexpr (std::is_same<T, std::string>::value) {
_work_id = workid;
_work_id_num = sample_get_work_id_num(workid);
} else {
return false;
}
pzmq _call("sys");
_call.call_rpc_action("release_unit", _work_id, [](pzmq *_pzmq, const std::shared_ptr<pzmq_data> &data) {});
llm_task_channel_[_work_id_num].reset();
llm_task_channel_.erase(_work_id_num);
// SLOGI("release work_id %s success", _work_id.c_str());
return false;
}
bool sys_release_unit(int work_id_num, const std::string &work_id);
void repeat_event(int ms, std::function<int(void)> repeat_fun, bool now = true);
~StackFlow();
};
}; // namespace StackFlows