1 year ago

#367033

test-img

Florent

How to execute nested tasks with Celery?

I need to run multiple tasks (task3) for a list of users and the execution time should be as short as possible. So I want to group the tasks and run them in parallel. The problem is that two other tasks (task1 and task2) must be executed before.

This is how the tasks structure could be schematized :

                            /task3('user1')
                task2('i3')/_task3('user2')
               /           \ task3('user3')
              /
             /              task3('user4')
task1('BMW')/__task2('i4')/_task3('user5')
            \             \ task3('user6')
             \
              \              task3('user7')
               \task2('i8')/_task3('user8')
                           \ task3('user9')


                             /task3('user10')
                 task2('Ka')/_task3('user11')
                /           \ task3('user12')
               /
              /                  task3('user13')
task1('Ford')/__task2('Fiesta')/_task3('user14')
             \                 \ task3('user15')
              \
               \                 task3('user16')
                \task2('Focus')/_task3('user17')
                               \ task3('user18')

In this example tasks1 for BMW and Ford should be grouped and executed in parallel. Then, when one of the two task is complete the next task (task2) should start. And so on until all task3 are executed.

What is the proper way to do that ?

This is my code but the problem is that Celery throws an error:

TypeError: group2() takes 1 positional argument but 2 were given

Code

@shared_task(name='Run tasks')
def group1():
    chain1 = [chain(task1.s(i), group2.s(i)) for i in ['BMW', 'Ford']]
    res = group(*chain1)()

@shared_task()
def group2(car):
    models = get_models(car)
    chain2 = [chain(task2.s(m), group3.s(m)) for m in models]
    res = group(*chain2)()

@shared_task()
def group3(model):
    users = get_users(model)
    res = group(task3.s(u) for u in users)()

@shared_task()
def task1(car):
    do_stuff(car)

@shared_task()
def task2(model):
    do_stuff(model)

@shared_task()
def task3(user):
    do_stuff(user)

python

celery

celery-task

0 Answers

Your Answer

Accepted video resources