Sep-04-2024, 07:26 AM
I have a number of tasks that I want to chain to build a pipeline. It consists of two main tasks, fetch_data and post_process. The fetch_data task is a recursive task. What I want is to have all the fetch_data tasks to complete before post_process is run.
However, the post_processing seems to run after the first task is completed. How do i make the post processing run after all the fetch tasks are completed?
However, the post_processing seems to run after the first task is completed. How do i make the post processing run after all the fetch tasks are completed?
@shared_task
def multi_fetch(count):
fetch_id = uuid.uuid4()
if count > 0:
multi_fetch.delay(count - 1)
print(f"Fetching data....[{fetch_id}]")
time.sleep(random.randint(5, 15))
return fetch_id
@shared_task
def multi_post_process(fetch_id):
print(f"Post processing...[{fetch_id}]")
workflow = chain(fetch_data.s(5), multi_post_process.s())
workflow.run()
