Jul-02-2024, 11:14 PM
This is my main code:
I have:
The thing is, it just terminates everything, and I need certain events to happen during the termination sequence. In get_market_data at the bottom you see
if __name__ == "__main__":
queue_dict = {item[0]: mp.Queue() for item in symlist}
# Start market data process
market_data_proc = mp.Process(target=gogo_market_data, args=(queue_dict,), daemon=True)
market_data_proc.start()
# Start symbol-specific processes
symbol_procs = []
for item in symlist:
p = mp.Process(target=start_symbol_process, args=(item, queue_dict[item[0]]))
symbol_procs.append(p)
p.start()
# Start the termination listener in a separate thread
termination_listener = threading.Thread(target=listen_for_termination, daemon=True)
termination_listener.start()
# Join processes
try:
while True:
if termination_event.is_set():
break
for p in symbol_procs:
p.join(1)
market_data_proc.join(1)
except KeyboardInterrupt:
termination_event.set()
print("Terminating all processes...")
for p in symbol_procs:
if p.is_alive():
p.terminate()
#if market_data_proc.is_alive():
# market_data_proc.terminate()
print("All processes terminated.")It calls a starter function to run asyncio on a particular multiprocessing thread:# Starter function to start asyncio symbol processing def start_symbol_process(symbol_list, data_queue): asyncio.run(process_symbol(symbol_list, data_queue))That in turn calls the asyncio function that does the processing:
# This function streams market data from Polygon
async def get_market_data(queue_dict):
msg_delays = {'A': [], 'Q': [], 'T': []}
async with websockets.connect(POLY_WEBSOCKETS_URL) as websocket:
await websocket.send(json.dumps({"action": "auth", "params": POLY_API_KEY}))
# Build the string of the services that we are going to subscribe to - need to sub to second aggs and quotes for each
param_string = ''
#for item in symlist: param_string += 'A.' + item[0] + ','
#for item in symlist: param_string += 'A.' + item[0] + ',Q.' + item[0] + ','
for item in symlist: param_string += 'A.' + item[0] + ',Q.' + item[0] + ',T.' + item[0] + ','
param_string = param_string[:-1]
await websocket.send(json.dumps({"action": "subscribe", "params": param_string}))
# Receive messages
while not termination_event.is_set():
try:
message = await websocket.recv()
data = json.loads(message)
# Iterate over each individual json message in a data packet
for json_msg in data:
# Process aggregate message
if json_msg['ev'] == 'A':
msg_time = pytz.utc.localize(datetime.datetime.utcfromtimestamp(json_msg['e'] / 1000.0)).astimezone(ny_tz)
if log_pg_latency:
msg_delays['A'].append(datetime.datetime.now(ny_tz) - msg_time)
#print('Aggregates time delay:', msg_delays['A'][-1])
queue_dict[json_msg['sym']].put(('A', (json_msg['v'], json_msg['vw'], json_msg['o'], json_msg['c'], json_msg['h'], json_msg['l'], msg_time)))
# Process quote message
elif json_msg['ev'] == 'Q':
msg_time = pytz.utc.localize(datetime.datetime.utcfromtimestamp(json_msg['t'] / 1000.0)).astimezone(ny_tz)
if log_pg_latency:
msg_delays['Q'].append(datetime.datetime.now(ny_tz) - msg_time)
#print('Quotes time delay:', msg_delays['Q'][-1])
queue_dict[json_msg['sym']].put(('Q', json_msg['bp'], json_msg['ap']))
elif json_msg['ev'] == 'T':
msg_time = pytz.utc.localize(datetime.datetime.utcfromtimestamp(json_msg['t'] / 1000.0)).astimezone(ny_tz)
if log_pg_latency:
msg_delays['T'].append(datetime.datetime.now(ny_tz) - msg_time)
#print('Trades time delay:', msg_delays['T'][-1])
#print(json.dumps(data, indent=4))
except websockets.ConnectionClosed: break
await websocket.close()
print("Termination event set, processing msg_delays...")
print(msg_delays) get_market_data(queue_dict) pulls live stock market data and queue_dict is a mp.Queue() that I use to get this data out to other multiprocessing threads for analysis. I have:
# Function to listen for keyboard input and set the termination event
def listen_for_termination():
print("Press 'x' to terminate the script.")
while True:
if sys.stdin.read(1).strip().lower() == 'x':
termination_event.set()
breakSet as a function to listen for a termination signal. It works. The thing is, it just terminates everything, and I need certain events to happen during the termination sequence. In get_market_data at the bottom you see
print("Termination event set, processing msg_delays...")
print(msg_delays)I need those things, and additional code I have yet to write, to run when terminating. But that code never runs it just terminates... I cannot figure out why any help would be appreciated.
