My first workflow

In the previous example you made a simple workflow with two independent tasks and learned how to run, unrun and handle conflicts. Now you are ready to build your first more complex workflow with depending tasks and subworkflows.

Change to a new directory and initialize a tb repository.

$ tb init
Created repository using module "taskblaster.repository" in "/home/myuser/tmprepo".

Make three simple tasks in tasks.py

def ok(number):
    return number


def plus_two(number):
    return number + 2


def cond_fail(number, max_num):
    if number > max_num:
        raise ValueError(f'{number} > {max_num}')
    else:
        return number

Now write the following workflow to a file workflow.py

import taskblaster as tb


@tb.workflow
class Workflow:
    number = tb.var()
    max_num = tb.var()

    @tb.task
    def ok(self):
        return tb.node('ok', number=self.number)

    @tb.task
    def plus_two(self):
        return tb.node('plus_two', number=self.ok)

    @tb.task
    def might_fail1(self):
        return tb.node('cond_fail', number=self.ok, max_num=self.max_num)

    @tb.task
    def might_fail2(self):
        return tb.node('cond_fail', number=self.plus_two, max_num=self.max_num)


def workflow(runner):
    runner.run_workflow(Workflow(number=1, max_num=2))

Notice that e.g. the task plus_two takes the output from the task ok as an input.

Run the workflow

$ tb workflow workflow.py
entry:                    add  new      0/0        tree/ok 
                          add  new      0/1        tree/plus_two 
                          add  new      0/1        tree/might_fail1 
                          add  new      0/1        tree/might_fail2 

Try ls

$ tb ls
state    info       tags        worker         time     folder
──────── ────────── ─────────── ─────────── ─────────── ─────────────────────────────
new      0/1                                            tree/might_fail1
new      0/1                                            tree/might_fail2
new      0/0                                            tree/ok
new      0/1                                            tree/plus_two

From the column deps, we can see that the task ok has all its dependencies met and is ready to run, while the other tasks depend on ok.

We can now try to run only the task ok:

$ tb run tree/ok
Starting worker rank=000 size=001
[rank=000 2025-03-17 10:08:38 N/A-0/1] Worker class: —
[rank=000 2025-03-17 10:08:38 N/A-0/1] Required tags: —
[rank=000 2025-03-17 10:08:38 N/A-0/1] Supported tags: —
[rank=000 2025-03-17 10:08:38 N/A-0/1] name: None
    tags: —
    required_tags: —
    resources: None
    max_tasks: None
    subworker_size: None
    subworker_count: None
    wall_time: None
[rank=000 2025-03-17 10:08:38 N/A-0/1] Main loop
[rank=000 2025-03-17 10:08:38 N/A-0/1] Running ok ...
[rank=000 2025-03-17 10:08:38 N/A-0/1] Task ok finished in 0:00:00.000961
[rank=000 2025-03-17 10:08:38 N/A-0/1] No available tasks, end worker main loop
$ tb ls
state    info       tags        worker         time     folder
──────── ────────── ─────────── ─────────── ─────────── ─────────────────────────────
new      1/1                                            tree/might_fail1
new      0/1                                            tree/might_fail2
done     0/0                    N/A-0/1        00:00:00 tree/ok
new      1/1                                            tree/plus_two

ok is now done and the dependencies to might_fail1 and plus_two are’ fulfilled. The dependencies to might_fail2 are still not met since it depends on the task plus_two. However, since TaskBlaster keeps track of all dependencies we can run all tasks in one go, and TaskBlaster will make sure that they are executed in the correct order.

$ tb run .
Starting worker rank=000 size=001
[rank=000 2025-03-17 10:08:39 N/A-0/1] Worker class: —
[rank=000 2025-03-17 10:08:39 N/A-0/1] Required tags: —
[rank=000 2025-03-17 10:08:39 N/A-0/1] Supported tags: —
[rank=000 2025-03-17 10:08:39 N/A-0/1] name: None
    tags: —
    required_tags: —
    resources: None
    max_tasks: None
    subworker_size: None
    subworker_count: None
    wall_time: None
[rank=000 2025-03-17 10:08:39 N/A-0/1] Main loop
[rank=000 2025-03-17 10:08:39 N/A-0/1] Running might_fail1 ...
[rank=000 2025-03-17 10:08:39 N/A-0/1] Task might_fail1 finished in 0:00:00.000950
[rank=000 2025-03-17 10:08:39 N/A-0/1] Running plus_two ...
[rank=000 2025-03-17 10:08:39 N/A-0/1] Task plus_two finished in 0:00:00.000518
[rank=000 2025-03-17 10:08:39 N/A-0/1] Running might_fail2 ...
[rank=000 2025-03-17 10:08:39 N/A-0/1] Traceback (most recent call last):
  File "/home/docs/checkouts/readthedocs.org/user_builds/taskblaster/envs/latest/lib/python3.12/site-packages/taskblaster/worker.py", line 497, in process_one_task
    loaded_task.run(self)
  File "/home/docs/checkouts/readthedocs.org/user_builds/taskblaster/envs/latest/lib/python3.12/site-packages/taskblaster/worker.py", line 218, in run
    output = function(**kwargs)
             ^^^^^^^^^^^^^^^^^^
  File "/home/myuser/tmprepo/tasks.py", line 11, in cond_fail
    raise ValueError(f'{number} > {max_num}')
ValueError: 3 > 2

[rank=000 2025-03-17 10:08:39 N/A-0/1] Task might_fail2 failed: 3 > 2
[rank=000 2025-03-17 10:08:39 N/A-0/1] WARDEN: Task might_fail2 has handlers [].
[rank=000 2025-03-17 10:08:39 N/A-0/1] WARDEN: No handlers found.
[rank=000 2025-03-17 10:08:39 N/A-0/1] No available tasks, end worker main loop
$ tb ls
state    info       tags        worker         time     folder
──────── ────────── ─────────── ─────────── ─────────── ─────────────────────────────
done     1/1                    N/A-0/1        00:00:00 tree/might_fail1
fail     1/1                    N/A-0/1        00:00:00 tree/might_fail2
^^^^  ValueError: 3 > 2
done     0/0                    N/A-0/1        00:00:00 tree/ok
done     1/1                    N/A-0/1        00:00:00 tree/plus_two

As expected all tasks except might_fail2 succeeded. The reason for the failure of might_fail2 is clearly shown in the output of tb ls.

We will now add a subworkflow. To keep track of what we have done, make a new file workflow2.py with the following code:

import taskblaster as tb


@tb.workflow
class SubWorkflow:
    number = tb.var()
    second_number = tb.var()

    @tb.task
    def ok(self):
        return tb.node('ok', number=self.number)

    @tb.task
    def ok2(self):
        return tb.node('ok', number=self.second_number)


@tb.workflow
class Workflow:
    number = tb.var()
    max_num = tb.var()

    @tb.task
    def ok(self):
        return tb.node('ok', number=self.number)

    @tb.task
    def plus_two(self):
        return tb.node('plus_two', number=self.ok)

    @tb.task
    def might_fail1(self):
        return tb.node('cond_fail', number=self.ok, max_num=self.max_num)

    @tb.task
    def might_fail2(self):
        return tb.node('cond_fail', number=self.plus_two, max_num=self.max_num)

    @tb.subworkflow
    def subwf(self):
        return SubWorkflow(number=self.ok, second_number=self.might_fail2)


def workflow(runner):
    runner.run_workflow(Workflow(number=1, max_num=2))

There is now a new workflow called SubWorkflow with two input parameters and two tasks. The main workflow class Workflow includes SubWorkflow as a subworkflow. Let’s run the workflow and see what happens

$ tb workflow workflow2.py
entry:                    add  new      1/1        tree/subwf/ok 
                          add  cancel   0/1        tree/subwf/ok2 
entry:                   have  done     0/0        tree/ok 
                         have  done     1/1        tree/plus_two 
                         have  done     1/1        tree/might_fail1 
                         have  fail     1/1        tree/might_fail2
^^^^  ValueError: 3 > 2 
$ tb ls
state    info       tags        worker         time     folder
──────── ────────── ─────────── ─────────── ─────────── ─────────────────────────────
done     1/1                    N/A-0/1        00:00:00 tree/might_fail1
fail     1/1                    N/A-0/1        00:00:00 tree/might_fail2
^^^^  ValueError: 3 > 2
done     0/0                    N/A-0/1        00:00:00 tree/ok
done     1/1                    N/A-0/1        00:00:00 tree/plus_two
new      1/1                                            tree/subwf/ok
cancel   0/1                                            tree/subwf/ok2

Note that all tasks that belong to the subworkflow are located in the folder subwf. Since the task tree/subwf/ok2 depends on the failed task, it has the state cancel and cannot be run. The other task is in the new state and can be run directly.

$ tb run .
Starting worker rank=000 size=001
[rank=000 2025-03-17 10:08:39 N/A-0/1] Worker class: —
[rank=000 2025-03-17 10:08:39 N/A-0/1] Required tags: —
[rank=000 2025-03-17 10:08:39 N/A-0/1] Supported tags: —
[rank=000 2025-03-17 10:08:39 N/A-0/1] name: None
    tags: —
    required_tags: —
    resources: None
    max_tasks: None
    subworker_size: None
    subworker_count: None
    wall_time: None
[rank=000 2025-03-17 10:08:39 N/A-0/1] Main loop
[rank=000 2025-03-17 10:08:39 N/A-0/1] Running subwf/ok ...
[rank=000 2025-03-17 10:08:39 N/A-0/1] Task subwf/ok finished in 0:00:00.000958
[rank=000 2025-03-17 10:08:39 N/A-0/1] No available tasks, end worker main loop
$ tb ls
state    info       tags        worker         time     folder
──────── ────────── ─────────── ─────────── ─────────── ─────────────────────────────
done     1/1                    N/A-0/1        00:00:00 tree/might_fail1
fail     1/1                    N/A-0/1        00:00:00 tree/might_fail2
^^^^  ValueError: 3 > 2
done     0/0                    N/A-0/1        00:00:00 tree/ok
done     1/1                    N/A-0/1        00:00:00 tree/plus_two
done     1/1                    N/A-0/1        00:00:00 tree/subwf/ok
cancel   0/1                                            tree/subwf/ok2

Now all tasks except the failed task and its dependencies are done. Now let us change the input to the workflow so that tree/might_fail2 will pass. Copy workflow2.py to a new file workflow3.py and change the input max_num to the workflow.

def workflow(runner):
    runner.run_workflow(Workflow(number=1, max_num=20))

Now run the workflow again

$ tb workflow workflow3.py
entry:                   have  done     1/1        tree/subwf/ok 
                         have  cancel   0/1        tree/subwf/ok2 
entry:                   have  done     0/0        tree/ok 
                         have  done     1/1        tree/plus_two 
                     conflict  done     1/1 ❄ C    tree/might_fail1 
                     conflict  fail     1/1 ❄ C    tree/might_fail2
^^^^  ValueError: 3 > 2 

Since we changed the input there is a conflict for the tasks might_fail1 and might_fail2. Let us keep the task might_fail1 as it is and mark the conflict as resolved

$ tb resolve tree/might_fail1

In practice this means that the conflict state of this task will be changed to resolved, so that we remember that we have seen the conflict and decided to keep the original input parameters to the task. However, we unrun the failed task

$ tb unrun tree/might_fail2 --force
unrun:  cancel   subwf/ok2
unrun:  fail     might_fail2
2 tasks were unrun.
$ tb ls -cscf
state    conflict    folder
──────── ─────────── ─────────────────────────────
done     resolved    tree/might_fail1
new      none        tree/might_fail2
done     none        tree/ok
done     none        tree/plus_two
done     none        tree/subwf/ok
new      none        tree/subwf/ok2

Now both the previously failed task and its dependencies have the state new. We can now update the input to these tasks with the new input

$ tb workflow workflow3.py
entry:                   have  done     1/1        tree/subwf/ok 
                         have  new      0/1        tree/subwf/ok2 
entry:                   have  done     0/0        tree/ok 
                         have  done     1/1        tree/plus_two 
                     resolved  done     1/1 R      tree/might_fail1 
                       update  new      1/1        tree/might_fail2 

and run the tasks

$ tb run .
Starting worker rank=000 size=001
[rank=000 2025-03-17 10:08:40 N/A-0/1] Worker class: —
[rank=000 2025-03-17 10:08:40 N/A-0/1] Required tags: —
[rank=000 2025-03-17 10:08:40 N/A-0/1] Supported tags: —
[rank=000 2025-03-17 10:08:40 N/A-0/1] name: None
    tags: —
    required_tags: —
    resources: None
    max_tasks: None
    subworker_size: None
    subworker_count: None
    wall_time: None
[rank=000 2025-03-17 10:08:40 N/A-0/1] Main loop
[rank=000 2025-03-17 10:08:40 N/A-0/1] Running might_fail2 ...
[rank=000 2025-03-17 10:08:40 N/A-0/1] Task might_fail2 finished in 0:00:00.001095
[rank=000 2025-03-17 10:08:40 N/A-0/1] Running subwf/ok2 ...
[rank=000 2025-03-17 10:08:40 N/A-0/1] Task subwf/ok2 finished in 0:00:00.000548
[rank=000 2025-03-17 10:08:40 N/A-0/1] No available tasks, end worker main loop

Verify that all tasks are done

$ tb ls
state    info       tags        worker         time     folder
──────── ────────── ─────────── ─────────── ─────────── ─────────────────────────────
done     1/1 R                  N/A-0/1        00:00:00 tree/might_fail1
done     1/1                    N/A-0/1        00:00:00 tree/might_fail2
done     0/0                    N/A-0/1        00:00:00 tree/ok
done     1/1                    N/A-0/1        00:00:00 tree/plus_two
done     1/1                    N/A-0/1        00:00:00 tree/subwf/ok
done     1/1                    N/A-0/1        00:00:00 tree/subwf/ok2

Finally we will see what happens when you unrun a task with descendants that are done. Try to unrun the task tree/plus_two

$ tb unrun tree/plus_two --force
unrun:  done     subwf/ok2
unrun:  done     might_fail2
unrun:  done     plus_two
3 tasks were unrun.
$ tb ls
state    info       tags        worker         time     folder
──────── ────────── ─────────── ─────────── ─────────── ─────────────────────────────
done     1/1 R                  N/A-0/1        00:00:00 tree/might_fail1
new      0/1                                            tree/might_fail2
done     0/0                    N/A-0/1        00:00:00 tree/ok
new      1/1                                            tree/plus_two
done     1/1                    N/A-0/1        00:00:00 tree/subwf/ok
new      0/1                                            tree/subwf/ok2

As you can see the task tree/plus_two as well as all tasks that depend on it were set to the new state.

You can run these tasks again

$ tb run .
Starting worker rank=000 size=001
[rank=000 2025-03-17 10:08:40 N/A-0/1] Worker class: —
[rank=000 2025-03-17 10:08:40 N/A-0/1] Required tags: —
[rank=000 2025-03-17 10:08:40 N/A-0/1] Supported tags: —
[rank=000 2025-03-17 10:08:40 N/A-0/1] name: None
    tags: —
    required_tags: —
    resources: None
    max_tasks: None
    subworker_size: None
    subworker_count: None
    wall_time: None
[rank=000 2025-03-17 10:08:40 N/A-0/1] Main loop
[rank=000 2025-03-17 10:08:40 N/A-0/1] Running plus_two ...
[rank=000 2025-03-17 10:08:40 N/A-0/1] Task plus_two finished in 0:00:00.001153
[rank=000 2025-03-17 10:08:40 N/A-0/1] Running might_fail2 ...
[rank=000 2025-03-17 10:08:40 N/A-0/1] Task might_fail2 finished in 0:00:00.000556
[rank=000 2025-03-17 10:08:40 N/A-0/1] Running subwf/ok2 ...
[rank=000 2025-03-17 10:08:40 N/A-0/1] Task subwf/ok2 finished in 0:00:00.000540
[rank=000 2025-03-17 10:08:40 N/A-0/1] No available tasks, end worker main loop

Try also to unrun the same task without using the flag --force. This is often the safer option to use.

You have now completed your first realistic workflow!