Hi,
I have developed this semi-scientific code that acts as a TCP server. It listen for incoming messages, processes those and stores it in the computer RAM. Once the client disconnects, the PC attempts to write all the gathered data to a text file. The complete provided code is really girthy, but you can only concentrate on 2 functions:
print_decimal_chunks_to_file_and_terminal
write_accumulated_data_to_file
So the PC gathers data, an processes it to its final form like this:
2024-12-18 14:11:03.031651 - 0 CPS - 485 702 925 1152 1319 1440 1511 1568 1602 1624 1631 1631 1599 1534 1441 1344 1245 1124 994 896 774 645 513 390 283 189 5 0 5 5 34 133 289
This is an example of one completely processed data packet. Depending on the environment and my application use case, I can either expect only a few of those lines I will be writing to a file, but it is also possible I will have to write up to half a million of such lines to a text file.
Problem is that when I get more and more lines to write, it takes exponentially longer to complete the text file. For example: couple of thousand lines takes around few seconds, but 50k+ lines may take up to 15 minutes. 129k lines took half a day... I guess it should not be like that. I do not have any hardware limitations and I am ready to optimize much of the usual OS working conditions, but I want to believe its possible to write 500k lines in under 5-10 minutes somehow...
I did my initial research, but I am already using the memory mapped files, I do not use many write. commands, I try to do it all at once. Not much of quick fixes are left for me... So I am asking for help and observations on where are the bottlenecks in my code. I do not have much experience with python, but this time I can not fundamentally change the program architecture - I can not write data in small chunks as soon as it is received, I still have to write all buffer at once. Thank you for any help.
I have developed this semi-scientific code that acts as a TCP server. It listen for incoming messages, processes those and stores it in the computer RAM. Once the client disconnects, the PC attempts to write all the gathered data to a text file. The complete provided code is really girthy, but you can only concentrate on 2 functions:
print_decimal_chunks_to_file_and_terminal
write_accumulated_data_to_file
So the PC gathers data, an processes it to its final form like this:
2024-12-18 14:11:03.031651 - 0 CPS - 485 702 925 1152 1319 1440 1511 1568 1602 1624 1631 1631 1599 1534 1441 1344 1245 1124 994 896 774 645 513 390 283 189 5 0 5 5 34 133 289
This is an example of one completely processed data packet. Depending on the environment and my application use case, I can either expect only a few of those lines I will be writing to a file, but it is also possible I will have to write up to half a million of such lines to a text file.
Problem is that when I get more and more lines to write, it takes exponentially longer to complete the text file. For example: couple of thousand lines takes around few seconds, but 50k+ lines may take up to 15 minutes. 129k lines took half a day... I guess it should not be like that. I do not have any hardware limitations and I am ready to optimize much of the usual OS working conditions, but I want to believe its possible to write 500k lines in under 5-10 minutes somehow...
I did my initial research, but I am already using the memory mapped files, I do not use many write. commands, I try to do it all at once. Not much of quick fixes are left for me... So I am asking for help and observations on where are the bottlenecks in my code. I do not have much experience with python, but this time I can not fundamentally change the program architecture - I can not write data in small chunks as soon as it is received, I still have to write all buffer at once. Thank you for any help.
import socket
import struct
import numpy as np
import time
from datetime import datetime
import os
import threading
from collections import deque
import sys
from threading import Lock
import mmap
TARGET_IP = None
TARGET_PORT = 8000
UDP_MESSAGE = "ESP32AGAIN"
UDP_BROADCAST_IP = "255.255.255.255"
UDP_BROADCAST_PORT = 55555
BUFFER_SIZE = 128
NUM_TO_PRINT = 32
BUFFER_LIMIT = 500 * 1024 * 1024
write_lock = Lock()
buffer_queue = deque()
timestamps = deque()
cps_values = []
active_connection = False
total_logs_received = 0
client_connection = None
last_packet = None
start_time = None
end_time = None
filename = None
def process_data_chunk(data):
result_array = (
(((data >> 14) & 1) << 0) |
(((data >> 13) & 1) << 1) |
(((data >> 15) & 1) << 2) |
(((data >> 4) & 1) << 3) |
(((data >> 9) & 1) << 4) |
(((data >> 17) & 1) << 5) |
(((data >> 1) & 1) << 6) |
(((data >> 5) & 1) << 7) |
(((data >> 10) & 1) << 8) |
(((data >> 3) & 1) << 9) |
(((data >> 12) & 1) << 10) |
(((data >> 2) & 1) << 11)
)
return result_array
def process_data(data):
result_array = np.array([process_data_chunk(struct.unpack('I', data[i:i+4])[0]) for i in range(0, len(data), 4)])
max_index = np.argmax(result_array)
higher_indices = np.arange(0, max_index + 1)
lower_indices = np.arange(max_index, len(result_array))
higher_elements = result_array[higher_indices]
lower_elements = result_array[lower_indices]
rearranged_buffer = np.concatenate([higher_elements, lower_elements])
max_index_rearranged = np.argmax(rearranged_buffer)
rearranged_buffer = np.roll(rearranged_buffer, 10 - max_index_rearranged)
return rearranged_buffer
def print_decimal_chunks_to_file_and_terminal(data, file_path, timestamps, cps_values):
global total_logs_received
global filename
try:
data = list(data)
timestamps = list(map(float, timestamps))
cps_values = list(cps_values)
lines_to_write = []
for i in range(len(data)):
packet_data = data[i]
timestamp = timestamps[i]
cps = cps_values[i]
formatted_timestamp = time.strftime('%Y-%m-%d %H:%M:%S.', time.localtime(timestamp)) + f"{timestamp:.6f}"[11:]
formatted_cps = f"{max(0, int(cps) - 1):d}" if cps is not None else "N/A"
line = f"{formatted_timestamp} - {formatted_cps} CPS - "
formatted_line = " ".join(f"{max(num - 2048, 0):4d}" for num in packet_data)
line += formatted_line
lines_to_write.append(line + "\n")
total_logs_received += 1
data_to_write = ''.join(lines_to_write)
if os.path.getsize(filename) == 0:
first_line = lines_to_write[0]
with open(filename, "a+") as file:
file.write(first_line)
lines_to_write = lines_to_write[1:]
remaining_data = ''.join(lines_to_write).encode()
data_length = len(remaining_data)
with open(filename, "r+b") as file:
file.seek(0, os.SEEK_END)
current_size = file.tell()
file.write(b'\0' * data_length)
mmapped_file = mmap.mmap(file.fileno(), current_size + data_length)
mmapped_file.seek(current_size)
mmapped_file.write(remaining_data)
mmapped_file.close()
except Exception as write_error:
print(f"Error writing to file: {write_error}")
def count_formatted_logs_in_last_second(timestamps, current_timestamp):
count = 0
for ts in timestamps:
if current_timestamp - ts < 1 and current_timestamp >= ts:
count += 1
return count
def calculate_cps(timestamps):
cps_values = []
for i in range(len(timestamps)):
cps = count_formatted_logs_in_last_second(timestamps, timestamps[i])
cps_values.append(cps)
return cps_values
def broadcast_message():
udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
time.sleep(0.5)
udp_socket.sendto(UDP_MESSAGE.encode(), (UDP_BROADCAST_IP, UDP_BROADCAST_PORT))
time.sleep(0.5)
udp_socket.sendto(UDP_MESSAGE.encode(), (UDP_BROADCAST_IP, UDP_BROADCAST_PORT))
udp_socket.close()
def write_accumulated_data_to_file():
global buffer_queue
global timestamps
global cps_values
global filename
with write_lock:
if buffer_queue and timestamps:
try:
cps_values = calculate_cps(timestamps)
with open(filename, "a+") as file:
print_decimal_chunks_to_file_and_terminal(buffer_queue, file, timestamps, cps_values)
file.flush()
buffer_queue.clear()
timestamps.clear()
cps_values = []
except Exception as write_error:
print(f"Error writing to file: {write_error}")
def format_time(seconds):
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
seconds = int(seconds % 60)
return f"{hours}h {minutes}m {seconds}s"
def handle_client(connection, address):
global active_connection
global buffer_queue
global timestamps
global cps_values
global last_packet
global end_time
global start_time
global filename
try:
buffer = bytearray(BUFFER_SIZE)
reference_time = time.time()
start_perf_counter = time.perf_counter_ns()
start_time = reference_time
while active_connection:
try:
total_received = 0
while total_received < BUFFER_SIZE:
additional_data = connection.recv_into(buffer)
if additional_data == 0:
print(f"Connection closed by the client ({address}) at {time.strftime('%Y-%m-%d %H:%M:%S.%f')}")
active_connection = False
break
total_received += additional_data
if not active_connection:
break
elapsed_ns = time.perf_counter_ns() - start_perf_counter
timestamp = reference_time + elapsed_ns / 1e9
if total_received == BUFFER_SIZE:
data = process_data(buffer)
buffer_queue.append(data)
timestamps.append(timestamp)
if len(buffer_queue) > BUFFER_LIMIT // (BUFFER_SIZE // 4):
print("Buffer size limit reached. Closing connection and writing data to file.")
threading.Thread(target=write_accumulated_data_to_file).start()
active_connection = False
break
cps = count_formatted_logs_in_last_second(timestamps, timestamp)
cps_values.append(cps)
except Exception as receive_error:
break
if active_connection:
end_perf_counter = time.perf_counter_ns()
elapsed_ns = end_perf_counter - start_perf_counter
end_time = reference_time + elapsed_ns / 1e9
last_packet = buffer[:BUFFER_SIZE]
buffer_queue.pop()
last_packet_ints = struct.unpack("II", last_packet[:8])
total_measurement_seconds = max(0, end_time - start_time - 5)
total_measurement_time = format_time(total_measurement_seconds)
network_congestion_time = format_time(last_packet_ints[0])
network_congestion_events = last_packet_ints[1]
summary_line = (
f"Total measurement time: {total_measurement_time}. "
f"Network congestion time: {network_congestion_time}. "
f"Network congestion events: {network_congestion_events}"
)
print(f"Impulse count: {len(buffer_queue)}")
print("Disconnection detected. Closing socket and writing data to file. Please wait... ")
writer_thread = threading.Thread(target=write_accumulated_data_to_file)
writer_thread.start()
writer_thread.join()
active_connection = False
finally:
connection.close()
with open(filename, "a+") as file:
file.write("\n" + summary_line + "\n")
print("Finished writing")
print("Connection closed.")
def get_local_ip():
try:
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
return ip_address
except Exception as e:
print(f"Error getting local IP address: {e}")
def timer_callback():
if not buffer_queue:
print("No pulses detected in 10 seconds, check your hardware setup")
def receive_data():
global active_connection
global client_connection
global TARGET_IP
global TARGET_PORT
broadcast_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
broadcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
broadcast_socket.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
broadcast_socket.bind(('0.0.0.0', 8000))
try:
print("__________________________________________________________________________________________________________________")
print("Listening for broadcast messages...")
while True:
data, address = broadcast_socket.recvfrom(1024)
message = data.decode("utf-8")
if message.strip() == "Scintilator":
TARGET_IP = address[0]
TARGET_PORT = address[1]
print(f"Subnet broadcast message received from {address}. IP and Port obtained: {TARGET_IP}:{TARGET_PORT}")
time.sleep(0.1)
broadcast_message = "Scintilator"
broadcast_socket.sendto(broadcast_message.encode("utf-8"), (TARGET_IP,8000))
print("Recognition message sent.")
active_connection = True
break
except Exception as e:
print(f"Error receiving broadcast message: {e}")
finally:
broadcast_socket.close()
if TARGET_IP and TARGET_PORT and active_connection:
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
print(f"Listening for client connections on {TARGET_IP}:{TARGET_PORT}...")
server_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1048576)
server_socket.bind((get_local_ip(), 8000))
server_socket.listen()
while True:
if not active_connection:
break
client_connection, client_address = server_socket.accept()
print(f"Connected by {client_address}")
while True:
input_bit = input("Enter 0 if you want to specify the measurement time, 1 if you want to specify the number of impulses: ")
if input_bit == 1:
timer = threading.Timer(10, timer_callback)
timer.start()
if input_bit in ['0', '1']:
break
else:
print("Invalid choice. Please enter 0 or 1.")
while True:
user_input = float(input("Enter the parameter (in seconds or quantity of pulses accordingly) : "))
if ((input_bit == 0) and (user_input == 0)) or ((input_bit == 1) and (user_input<1)):
print("Invalid parameters entered")
else:
break
print("Measurement started at", datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
integer_part = int(user_input)
fractional_part = user_input - integer_part
if input_bit == '0':
final_float = 2 * integer_part + fractional_part
else:
final_float = 2 * integer_part + 1 + fractional_part
user_input_packed = struct.pack("!f", final_float)
client_connection.sendall(user_input_packed)
handle_client(client_connection, client_address)
except Exception as bind_error:
print(f"Error during socket bind: {bind_error}")
finally:
threading.Thread(target=write_accumulated_data_to_file).start()
server_socket.close()
if __name__ == "__main__":
timestamp_str = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
filename = os.path.splitext("Rezultatai.txt")[0] + " " + timestamp_str + os.path.splitext("Rezultatai.txt")[1]
receive_data()
while True:
input_bit = input("Enter 0 if you want repeat, enter any other character if you want to terminate the program: ")
timestamp_str = time.strftime("%Y-%m-%d_%H-%M-%S", time.localtime())
filename = os.path.splitext("Rezultatai.txt")[0] + " " + timestamp_str + os.path.splitext("Rezultatai.txt")[1]
if input_bit == '0':
broadcast_message()
sys.stdin.flush()
sys.stdout.flush()
receive_data()
else:
print("Program terminated")
sys.exit(0)
