Dec-17-2024, 04:27 PM
The main function simulates receiving as much data as possible and then forwarding it to other subprocesses for processing. There will actually be multiple subprocesses, and only one subprocess is tested here. The data calculation in the subprocess cannot affect the data reception, and the calculation time is uncertain and may be longer. Below is my test code. I want to know how to optimize it to make it faster.
import multiprocessing
import threading
import collections
import time
def doCount(data, num):
num=num+1
#print("doCount:",time.time_ns())
time.sleep(0.000001)
def threadWorker(d,event):
tstart = time.time_ns()
num = 0
while True:
if d:
data = d.popleft()
if data is None:
break
doCount(data,num)
else:
event.clear()
event.wait()
tend = time.time_ns()
print('thread-recv:', (tend - tstart) / 1000000)
def processWorker(conn):
d = collections.deque()
event = threading.Event()
t = threading.Thread(target=threadWorker, args=(d,event), daemon=True)
t.start()
cstart = time.time_ns()
while True:
try:
data = conn.recv()
d.append(data)
event.set()
if data is None:
break
except EOFError:
break
cend = time.time_ns()
print('recv:',(cend- cstart)/1000000)
print('thread-wait:', time.time_ns())
t.join()
print('thread-end:', time.time_ns())
def main():
read_conn, write_conn = multiprocessing.Pipe(duplex=False)
p = multiprocessing.Process(target=processWorker, args=(read_conn,))
p.start()
pstart = time.time_ns()
for i in range(100000):
write_conn.send(
{"code": f"code {i}", "time": time.time_ns(), "type": 1, "num": 100000, "aaa": "ddd", "bbbb": "dddd",
"ccc": "dddd", "fff": "dddd", "ggg": "dddd", "hhh": "dddd", "iii": "dddd"})
write_conn.send(None)
write_conn.close()
pend = time.time_ns()
print('send:', (pend - pstart) / 1000000)
p.join()
if __name__ == '__main__':
main()
