Hi,
I want to define nested objects that can be read/written by several processes defined by multiprocessing module.
I search on some networks but it seems to not be solved.
Can you help me to find a solution.
Example of structure below to be solved : the subclass seems to not be seen as a shared object and is not updated, whereas the first level object is.
=> the question is for complex object that linked strcutures as objects, dict or functions ref.
=> it's a real problem to know how to define shareable objects.
I want to define nested objects that can be read/written by several processes defined by multiprocessing module.
I search on some networks but it seems to not be solved.
Can you help me to find a solution.
Example of structure below to be solved : the subclass seems to not be seen as a shared object and is not updated, whereas the first level object is.
=> the question is for complex object that linked strcutures as objects, dict or functions ref.
=> it's a real problem to know how to define shareable objects.
from multiprocessing import Lock, Process, Queue, current_process, Pool
from multiprocessing.managers import BaseManager, NamespaceProxy
import time
import queue # imported for using queue.Empty exception
class MyManager(BaseManager): pass
def Manager():
m = MyManager()
m.start()
return m
class ConfAppSubClass():
def __init__(self,id):
# todo
self.subid = id
def run(self):
# todo
pass
def get(self):
# todo
return self.subid
def put(self,id):
# todo
self.subid = 1000 + self.subid + id
def sub(self,id):
# todo
self.subid = 1000 + self.subid + id
class ConfApp():
def __init__(self,id):
# todo
self.id = id
self.subclass = ConfAppSubClass(10*id)
def run(self):
# todo
pass
def get(self):
# todo
return self.id
def getSubClass(self):
# todo
return self.subclass
def put(self,id):
# todo
self.id = 2000 + self.id + id
def sub(self,id):
# todo
self.id = 2000 + self.id + id
def do_job_tx(tasks_to_accomplish, tasks_that_are_done, tasks_conf_object_out):
while True:
try:
'''
try to get task from the queue. get_nowait() function will
raise queue.Empty exception if the queue is empty.
queue(False) function would do the same task also.
'''
task = tasks_to_accomplish.get_nowait()
except queue.Empty:
break
else:
'''
if no exception has been raised, add the task completion
message to task_that_are_done queue
'''
print(task)
tasks_that_are_done.put(task + ' is done by ' + current_process().name)
ConfObjIf = tasks_conf_object_out
print("Task no %s : out conf obj %s "% (task, str(ConfObjIf)))
print("Task no %s : out conf obj val1 %s "% (task, str(ConfObjIf.get())))
ConfObjIf.put(10000)
ConfObjIf.getSubClass().put(3000)
print("Task no %s : out conf obj val2 %s "% (task, str(ConfObjIf.get())))
print("%s object created as output" % task)
time.sleep(.5)
return True
def do_job_rx(tasks_to_accomplish, tasks_that_are_done, tasks_conf_object_in):
while True:
try:
'''
try to get task from the queue. get_nowait() function will
raise queue.Empty exception if the queue is empty.
queue(False) function would do the same task also.
'''
task = tasks_to_accomplish.get_nowait()
except queue.Empty:
break
else:
'''
if no exception has been raised, add the task completion
message to task_that_are_done queue
'''
print(task)
tasks_that_are_done.put(task + ' is done by ' + current_process().name)
ConfObjIf = tasks_conf_object_in
print("Task no %s : out conf obj %s "% (task, str(ConfObjIf)))
print("Task no %s : out conf obj val1 %s "% (task, str(ConfObjIf.get())))
ConfObjIf.sub(500)
ConfObjIf.getSubClass().sub(200)
print("Task no %s : out conf obj val2 %s "% (task, str(ConfObjIf.get())))
time.sleep(.5)
return True
def main():
number_of_task = 10
number_of_processes = 4
tasks_to_accomplish = Queue()
tasks_that_are_done = Queue()
tasks_object_out = Queue()
processes = []
MyManager.register('ConfApp', ConfApp)
MyManager.register('ConfAppSubClass', ConfAppSubClass)
manager = Manager()
MyConfObj = manager.ConfApp(5);
for i in range(number_of_task):
tasks_to_accomplish.put("Task no " + str(i))
# print tasks_to_accomplish entire queue
temp = [None,None,None,None,None,None,None,None,None,None]
for i in range(number_of_task):
temp[i] = tasks_to_accomplish.get()
for i in range(number_of_task):
tasks_to_accomplish.put(temp[i])
tasks_object_out.put(MyConfObj) ####
print(str(temp))
# creating processes
for w in range(number_of_processes):
p = Process(target=do_job_tx, args=(tasks_to_accomplish, tasks_that_are_done, MyConfObj))
processes.append(p)
p.start()
# completing process
for p in processes:
p.join()
# print the output : info on processes end
while not tasks_that_are_done.empty():
print(tasks_that_are_done.get())
print("MyConfObj: val is %s "% (str(MyConfObj.get())))
print("MyConfObj subclass: val is %s "% (str(MyConfObj.getSubClass().get())))
for i in range(number_of_task):
tasks_to_accomplish.put("Task no " + str(i))
tasks_object_out.put(MyConfObj) ####
# creating processes
for w in range(number_of_processes):
p = Process(target=do_job_rx, args=(tasks_to_accomplish, tasks_that_are_done, MyConfObj))
processes.append(p)
p.start()
# completing process
for p in processes:
p.join()
# print the output : info on processes end
while not tasks_that_are_done.empty():
print(tasks_that_are_done.get())
print("MyConfObj: val is %s "% (str(MyConfObj.get())))
print("MyConfObj subclass: val is %s "% (str(MyConfObj.getSubClass().get())))
return True
if __name__ == '__main__':
main()
