My first workflow
In the previous example you made a simple workflow with two independent tasks and learned how to run, unrun and handle conflicts. Now you are ready to build your first more complex workflow with depending tasks and subworkflows.
Change to a new directory and initialize a tb repository.
$ tb init
Created repository using module "taskblaster.repository" in "/home/myuser/tmprepo".
Make three simple tasks in tasks.py
def ok(number):
return number
def plus_two(number):
return number + 2
def cond_fail(number, max_num):
if number > max_num:
raise ValueError(f'{number} > {max_num}')
else:
return number
Now write the following workflow to a file workflow.py
import taskblaster as tb
@tb.workflow
class Workflow:
number = tb.var()
max_num = tb.var()
@tb.task
def ok(self):
return tb.node('ok', number=self.number)
@tb.task
def plus_two(self):
return tb.node('plus_two', number=self.ok)
@tb.task
def might_fail1(self):
return tb.node('cond_fail', number=self.ok, max_num=self.max_num)
@tb.task
def might_fail2(self):
return tb.node('cond_fail', number=self.plus_two, max_num=self.max_num)
def workflow(runner):
runner.run_workflow(Workflow(number=1, max_num=2))
Notice that e.g. the task plus_two
takes the output from the task ok
as an input.
Run the workflow
$ tb workflow workflow.py entry: add new 0/0 tree/ok add new 0/1 tree/plus_two add new 0/1 tree/might_fail1 add new 0/1 tree/might_fail2
Try ls
$ tb ls state info tags worker time folder ──────── ────────── ─────────── ─────────── ─────────── ───────────────────────────── new 0/1 tree/might_fail1 new 0/1 tree/might_fail2 new 0/0 tree/ok new 0/1 tree/plus_two
From the column deps
, we can see that the task ok
has all its
dependencies met and is ready to run, while the other tasks depend on ok
.
We can now try to run only the task ok
:
$ tb run tree/ok
Starting worker rank=000 size=001
[rank=000 2025-03-17 10:08:38 N/A-0/1] Worker class: —
[rank=000 2025-03-17 10:08:38 N/A-0/1] Required tags: —
[rank=000 2025-03-17 10:08:38 N/A-0/1] Supported tags: —
[rank=000 2025-03-17 10:08:38 N/A-0/1] name: None
tags: —
required_tags: —
resources: None
max_tasks: None
subworker_size: None
subworker_count: None
wall_time: None
[rank=000 2025-03-17 10:08:38 N/A-0/1] Main loop
[rank=000 2025-03-17 10:08:38 N/A-0/1] Running ok ...
[rank=000 2025-03-17 10:08:38 N/A-0/1] Task ok finished in 0:00:00.000961
[rank=000 2025-03-17 10:08:38 N/A-0/1] No available tasks, end worker main loop
$ tb ls state info tags worker time folder ──────── ────────── ─────────── ─────────── ─────────── ───────────────────────────── new 1/1 tree/might_fail1 new 0/1 tree/might_fail2 done 0/0 N/A-0/1 00:00:00 tree/ok new 1/1 tree/plus_two
ok
is now done
and the dependencies to might_fail1
and plus_two
are’
fulfilled. The dependencies to might_fail2
are still not met since it depends
on the task plus_two
. However, since TaskBlaster keeps track of all
dependencies we can run all tasks in one go, and TaskBlaster will make sure
that they are executed in the correct order.
$ tb run .
Starting worker rank=000 size=001
[rank=000 2025-03-17 10:08:39 N/A-0/1] Worker class: —
[rank=000 2025-03-17 10:08:39 N/A-0/1] Required tags: —
[rank=000 2025-03-17 10:08:39 N/A-0/1] Supported tags: —
[rank=000 2025-03-17 10:08:39 N/A-0/1] name: None
tags: —
required_tags: —
resources: None
max_tasks: None
subworker_size: None
subworker_count: None
wall_time: None
[rank=000 2025-03-17 10:08:39 N/A-0/1] Main loop
[rank=000 2025-03-17 10:08:39 N/A-0/1] Running might_fail1 ...
[rank=000 2025-03-17 10:08:39 N/A-0/1] Task might_fail1 finished in 0:00:00.000950
[rank=000 2025-03-17 10:08:39 N/A-0/1] Running plus_two ...
[rank=000 2025-03-17 10:08:39 N/A-0/1] Task plus_two finished in 0:00:00.000518
[rank=000 2025-03-17 10:08:39 N/A-0/1] Running might_fail2 ...
[rank=000 2025-03-17 10:08:39 N/A-0/1] Traceback (most recent call last):
File "/home/docs/checkouts/readthedocs.org/user_builds/taskblaster/envs/latest/lib/python3.12/site-packages/taskblaster/worker.py", line 497, in process_one_task
loaded_task.run(self)
File "/home/docs/checkouts/readthedocs.org/user_builds/taskblaster/envs/latest/lib/python3.12/site-packages/taskblaster/worker.py", line 218, in run
output = function(**kwargs)
^^^^^^^^^^^^^^^^^^
File "/home/myuser/tmprepo/tasks.py", line 11, in cond_fail
raise ValueError(f'{number} > {max_num}')
ValueError: 3 > 2
[rank=000 2025-03-17 10:08:39 N/A-0/1] Task might_fail2 failed: 3 > 2
[rank=000 2025-03-17 10:08:39 N/A-0/1] WARDEN: Task might_fail2 has handlers [].
[rank=000 2025-03-17 10:08:39 N/A-0/1] WARDEN: No handlers found.
[rank=000 2025-03-17 10:08:39 N/A-0/1] No available tasks, end worker main loop
$ tb ls state info tags worker time folder ──────── ────────── ─────────── ─────────── ─────────── ───────────────────────────── done 1/1 N/A-0/1 00:00:00 tree/might_fail1 fail 1/1 N/A-0/1 00:00:00 tree/might_fail2 ^^^^ ValueError: 3 > 2 done 0/0 N/A-0/1 00:00:00 tree/ok done 1/1 N/A-0/1 00:00:00 tree/plus_two
As expected all tasks except might_fail2
succeeded. The reason for the
failure of might_fail2
is clearly shown in the output of tb ls
.
We will now add a subworkflow. To keep track of what we have done, make a new
file workflow2.py
with the following code:
import taskblaster as tb
@tb.workflow
class SubWorkflow:
number = tb.var()
second_number = tb.var()
@tb.task
def ok(self):
return tb.node('ok', number=self.number)
@tb.task
def ok2(self):
return tb.node('ok', number=self.second_number)
@tb.workflow
class Workflow:
number = tb.var()
max_num = tb.var()
@tb.task
def ok(self):
return tb.node('ok', number=self.number)
@tb.task
def plus_two(self):
return tb.node('plus_two', number=self.ok)
@tb.task
def might_fail1(self):
return tb.node('cond_fail', number=self.ok, max_num=self.max_num)
@tb.task
def might_fail2(self):
return tb.node('cond_fail', number=self.plus_two, max_num=self.max_num)
@tb.subworkflow
def subwf(self):
return SubWorkflow(number=self.ok, second_number=self.might_fail2)
def workflow(runner):
runner.run_workflow(Workflow(number=1, max_num=2))
There is now a new workflow called SubWorkflow
with two input parameters and two
tasks. The main workflow class Workflow
includes SubWorkflow
as a
subworkflow. Let’s run the workflow and see what happens
$ tb workflow workflow2.py entry: add new 1/1 tree/subwf/ok add cancel 0/1 tree/subwf/ok2 entry: have done 0/0 tree/ok have done 1/1 tree/plus_two have done 1/1 tree/might_fail1 have fail 1/1 tree/might_fail2 ^^^^ ValueError: 3 > 2
$ tb ls state info tags worker time folder ──────── ────────── ─────────── ─────────── ─────────── ───────────────────────────── done 1/1 N/A-0/1 00:00:00 tree/might_fail1 fail 1/1 N/A-0/1 00:00:00 tree/might_fail2 ^^^^ ValueError: 3 > 2 done 0/0 N/A-0/1 00:00:00 tree/ok done 1/1 N/A-0/1 00:00:00 tree/plus_two new 1/1 tree/subwf/ok cancel 0/1 tree/subwf/ok2
Note that all tasks that belong to the subworkflow are located in the folder
subwf
. Since the task tree/subwf/ok2
depends on the failed task, it has the
state cancel
and cannot be run. The other task is in the new state and can be
run directly.
$ tb run .
Starting worker rank=000 size=001
[rank=000 2025-03-17 10:08:39 N/A-0/1] Worker class: —
[rank=000 2025-03-17 10:08:39 N/A-0/1] Required tags: —
[rank=000 2025-03-17 10:08:39 N/A-0/1] Supported tags: —
[rank=000 2025-03-17 10:08:39 N/A-0/1] name: None
tags: —
required_tags: —
resources: None
max_tasks: None
subworker_size: None
subworker_count: None
wall_time: None
[rank=000 2025-03-17 10:08:39 N/A-0/1] Main loop
[rank=000 2025-03-17 10:08:39 N/A-0/1] Running subwf/ok ...
[rank=000 2025-03-17 10:08:39 N/A-0/1] Task subwf/ok finished in 0:00:00.000958
[rank=000 2025-03-17 10:08:39 N/A-0/1] No available tasks, end worker main loop
$ tb ls state info tags worker time folder ──────── ────────── ─────────── ─────────── ─────────── ───────────────────────────── done 1/1 N/A-0/1 00:00:00 tree/might_fail1 fail 1/1 N/A-0/1 00:00:00 tree/might_fail2 ^^^^ ValueError: 3 > 2 done 0/0 N/A-0/1 00:00:00 tree/ok done 1/1 N/A-0/1 00:00:00 tree/plus_two done 1/1 N/A-0/1 00:00:00 tree/subwf/ok cancel 0/1 tree/subwf/ok2
Now all tasks except the failed task and its dependencies are done
. Now let
us change the input to the workflow so that tree/might_fail2
will pass. Copy
workflow2.py
to a new file workflow3.py
and change the input max_num
to
the workflow.
def workflow(runner):
runner.run_workflow(Workflow(number=1, max_num=20))
Now run the workflow again
$ tb workflow workflow3.py entry: have done 1/1 tree/subwf/ok have cancel 0/1 tree/subwf/ok2 entry: have done 0/0 tree/ok have done 1/1 tree/plus_two conflict done 1/1 ❄ C tree/might_fail1 conflict fail 1/1 ❄ C tree/might_fail2 ^^^^ ValueError: 3 > 2
Since we changed the input there is a conflict for the tasks might_fail1
and
might_fail2
. Let us keep the task might_fail1
as it is and mark the
conflict as resolved
$ tb resolve tree/might_fail1
In practice this means that the conflict state of this task will be changed to
resolved
, so that we remember that we have seen the conflict and decided to
keep the original input parameters to the task. However, we unrun the failed
task
$ tb unrun tree/might_fail2 --force unrun: cancel subwf/ok2 unrun: fail might_fail2 2 tasks were unrun.
$ tb ls -cscf state conflict folder ──────── ─────────── ───────────────────────────── done resolved tree/might_fail1 new none tree/might_fail2 done none tree/ok done none tree/plus_two done none tree/subwf/ok new none tree/subwf/ok2
Now both the previously failed task and its dependencies have the state new
.
We can now update the input to these tasks with the new input
$ tb workflow workflow3.py entry: have done 1/1 tree/subwf/ok have new 0/1 tree/subwf/ok2 entry: have done 0/0 tree/ok have done 1/1 tree/plus_two resolved done 1/1 R tree/might_fail1 update new 1/1 tree/might_fail2
and run the tasks
$ tb run .
Starting worker rank=000 size=001
[rank=000 2025-03-17 10:08:40 N/A-0/1] Worker class: —
[rank=000 2025-03-17 10:08:40 N/A-0/1] Required tags: —
[rank=000 2025-03-17 10:08:40 N/A-0/1] Supported tags: —
[rank=000 2025-03-17 10:08:40 N/A-0/1] name: None
tags: —
required_tags: —
resources: None
max_tasks: None
subworker_size: None
subworker_count: None
wall_time: None
[rank=000 2025-03-17 10:08:40 N/A-0/1] Main loop
[rank=000 2025-03-17 10:08:40 N/A-0/1] Running might_fail2 ...
[rank=000 2025-03-17 10:08:40 N/A-0/1] Task might_fail2 finished in 0:00:00.001095
[rank=000 2025-03-17 10:08:40 N/A-0/1] Running subwf/ok2 ...
[rank=000 2025-03-17 10:08:40 N/A-0/1] Task subwf/ok2 finished in 0:00:00.000548
[rank=000 2025-03-17 10:08:40 N/A-0/1] No available tasks, end worker main loop
Verify that all tasks are done
$ tb ls state info tags worker time folder ──────── ────────── ─────────── ─────────── ─────────── ───────────────────────────── done 1/1 R N/A-0/1 00:00:00 tree/might_fail1 done 1/1 N/A-0/1 00:00:00 tree/might_fail2 done 0/0 N/A-0/1 00:00:00 tree/ok done 1/1 N/A-0/1 00:00:00 tree/plus_two done 1/1 N/A-0/1 00:00:00 tree/subwf/ok done 1/1 N/A-0/1 00:00:00 tree/subwf/ok2
Finally we will see what happens when you unrun a task with descendants that
are done
. Try to unrun the task tree/plus_two
$ tb unrun tree/plus_two --force unrun: done subwf/ok2 unrun: done might_fail2 unrun: done plus_two 3 tasks were unrun.
$ tb ls state info tags worker time folder ──────── ────────── ─────────── ─────────── ─────────── ───────────────────────────── done 1/1 R N/A-0/1 00:00:00 tree/might_fail1 new 0/1 tree/might_fail2 done 0/0 N/A-0/1 00:00:00 tree/ok new 1/1 tree/plus_two done 1/1 N/A-0/1 00:00:00 tree/subwf/ok new 0/1 tree/subwf/ok2
As you can see the task tree/plus_two
as well as all tasks that depend on it
were set to the new
state.
You can run these tasks again
$ tb run .
Starting worker rank=000 size=001
[rank=000 2025-03-17 10:08:40 N/A-0/1] Worker class: —
[rank=000 2025-03-17 10:08:40 N/A-0/1] Required tags: —
[rank=000 2025-03-17 10:08:40 N/A-0/1] Supported tags: —
[rank=000 2025-03-17 10:08:40 N/A-0/1] name: None
tags: —
required_tags: —
resources: None
max_tasks: None
subworker_size: None
subworker_count: None
wall_time: None
[rank=000 2025-03-17 10:08:40 N/A-0/1] Main loop
[rank=000 2025-03-17 10:08:40 N/A-0/1] Running plus_two ...
[rank=000 2025-03-17 10:08:40 N/A-0/1] Task plus_two finished in 0:00:00.001153
[rank=000 2025-03-17 10:08:40 N/A-0/1] Running might_fail2 ...
[rank=000 2025-03-17 10:08:40 N/A-0/1] Task might_fail2 finished in 0:00:00.000556
[rank=000 2025-03-17 10:08:40 N/A-0/1] Running subwf/ok2 ...
[rank=000 2025-03-17 10:08:40 N/A-0/1] Task subwf/ok2 finished in 0:00:00.000540
[rank=000 2025-03-17 10:08:40 N/A-0/1] No available tasks, end worker main loop
Try also to unrun the same task without using the flag --force
. This is often
the safer option to use.
You have now completed your first realistic workflow!