Programming interface reference

taskblaster module

class taskblaster.InputVariable(default)

Declaration of a workflow input variable.

Use the function var() to get an instance of InputVariable. When assigned as a class attribute to the workflow class, it signifies that the workflow will have a parameter, with name of the attribute it is assigned to.

@tb.workflow
class MyWorkflow:
    # Following two declarations are equivalent
    my_input_parameter = tb.var(default='mydefault')
    my_input_parameter2 = tb.InputParameter(default='mydefault')
class taskblaster.JSONCodec

Abstract class for the Encoder/decoder for custom types.

Taskblaster can encode and decode only specific types. A plugin can provide a custom implementation of this class to support additional types.

abstractmethod decode(dct: Dict[str, Any]) Any

Decode dictionary generated by encode into object.

abstractmethod encode(obj: Any) Dict[str, Any]

Encode object as dictionary.

This should raise TypeError for types that cannot be encoded.

class taskblaster.Phi(index=(), _rn=None, resolved='<unresolved Phi operator>', debug=False, **kwargs)

Operator to determine an input to a task depending on the from branch.

For example, if there is a divergence of branches, Phi operator can be utilized to converge and gather output from multiple tasks. Essentially replaced the use of variables in context of workflows.

As a practical example, consider a while loop, which would perform some kind of relaxation to an object until converged. Naturally, the first time the relaxation loop is entered, the input object must come from actual input of the workflow, but on the subsequent steps, the input object must come from the result of the previous step. This would be decribed as:

@tb.branch('relaxbranch')
@tb.task
def relax(self):
    return tb.node('postprocess', object=tb.phi(relaxbranch=self.relax,
                                                entry=self.initial_object)

Note that the name of the default starting branch is always entry.

Takes as an input a dictionary, where the keys determine the branch names one is jumping from. A complete example (see also the If, while and branching tutorial and Dynamical workflows explanation):

@tb.workflow
class Workflow:
    @tb.branch('entry')
    @tb._if(true='truebranch', false='falsebranch')
    @tb.task
    def evaluate(self):
        return tb.node('decide_branch', input=self.inputA)

    @tb.branch('truebranch')
    @tb.jump('finalbranch')
    @tb.task
    def calc_true(self):
        return tb.node('calculateA', input=self.inputA)

    @tb.branch('falsebranch')
    @tb.jump('finalbranch')
    @tb.task
    def calc_false(self):
        return tb.node('calculateB', input=self.inputA)

    @tb.branch('finalbranch')
    @tb.task
    def final_result(self):
        return tb.node('postprocess',
                       output=self.Phi(truebranch=self.calc_true,
                                     falsebranch=self.calc_false))

In the following code we indicate, that postprocess task will take an input argument named output and it will either take it from calculate_true or calculate_false branches, depending to which branch the control flow came from to the finalbranch. This pattern would be equivalent to following pseudo code:

if decide_branch(inputA):
    tmp = calculateA(input=inputA)
else:
    tmp = calculateB(input=inputA)
final_result = postprocess(output=tmp)
exception taskblaster.TBUserError

Errors in workflows or other components controlled by user.

The design goal of TaskBlaster is that any error which is generated by user, for example invalid workflow, should only raise TBUserError inside the TaskBlaster source code. Otherwise, it might be inconclusive whether it is internal error or error cause by the users actions.

class taskblaster.TaskView(indexnode, future)

View of a task object for use with actions.

Actions associated with a task will receive this object as input.

property directory

The directory of the task.

Use this to access files generated by the task.

property input

Dictionary of task inputs with references.

property output

Output of task, i.e., the return value of the task function.

If task is not done, the output will be None.

property realized_input

Dictionary of task inputs with references resolved to objects.

exception taskblaster.UserWorkflowError
taskblaster.actions(**actions)

Return decorator for task function to provide custom actions.

taskblaster.branch(name, loop=False)

Specifies that a task or a subworkflow is to belong to a particular branch.

taskblaster.dynamical_workflow_generator(result_tasks=None)

Decorator to specify dynamical subworkflow generator.

Takes a keyword argument called result_tasks.

taskblaster.fixedpoint(_property)

Specifies that this is an OUTPUT of a workflow, to which other workflows can depend on.

The fixedpoint decorator is required for a subworkflow, which is dynamic, and the master workflow wants to depend on a task, which is not in the entry branch.

Utilizing the fixed point decorator, will create a dummy placeholder for the master workflow to depend on, until the control flow of the subworkflow reaches this output fixed point:

@tb.workflow
class DynamicSubWorkflow:
    ...

    @tb.branch('loop')
    @tb.task
    def relax(self):
        return tb.node('relax', input=self.Phi(...))

    ...


    @tb.branch('final')
    @tb.fixedpoint # <-- this must be defined as fixed point
    @tb.task
    def converged_result(self):
        return tb.node('postprocess', input=tb.relax)

@tb.workflow
class MyWorkflow:
    @tb.subworkflow
    def dynamic_subworkflow(self):
        return DynamicSubworkflow()

    @tb.task # <-- for this task to depend on non-entry branch task
    def depend_on_dynamic_result(self):
        return tb.node('post_process',
                       inp=self.dynamic_subworkflow.converged_result)

The reason for required to predeclare fixedpoint is that at MyWorkflow, both the subworkflow and the depend_on_dynamic_result are on the same branch, and hence made into tasks simultaneously (with topological sort). If the converged_result tasks of the subworkflow wouln’t exist yet, depend_on_dynamic_result wouldn’t be able to depend on it.

taskblaster.jump(branch_name)

Specifies that there is an jump to another branch from this branch.

taskblaster.mpi(_meth=None, min_cores=None, max_cores=None)

Decorator for task implementations that require MPI.

With multiple MPI subworkers, tasks cannot use MPI world as that would cause deadlocks among subworkers.

Those tasks should instead bear this decorator, which causes the taskblaster worker to pass an MPI context to the task:

@mpi
def mytask(mpi, a):
     print(mpi.comm.rank)

Tasks that need to use a communicator which is not MPI world world communicator may need to access a taskblaster worker’s communicator.

Additionally, one may also specify min_cores and max_cores parameters to guard code against accidental parallel, or accidental serial execution:

@mpi(max_cores=1)
def my_serial_task(mpi, a):
    pass  # Always serial, will cause error if attempted to in parallel

@mpi(min_cores=64)
def my_parallel_task(mpi, a):
    pass  # Always at least 64 cores. Will cause error otherwise.
taskblaster.subworkflow(_meth)

Decorator to specify subworkflows within the workflow class.

Returns a property of type WorkflowSpeificationProperty.

taskblaster.task(_meth=None, **kwargs)

Decorator to specify tasks within the workflow class.

Returns a property of type TaskSpecificationProperty.

taskblaster.var(default=<object object>) InputVariable

Define an input variable for a workflow.

If default is specified, the input variable becomes optional.

Example:

@tb.workflow
class MyWorklow:
    x = tb.var()
    y = tb.var(default=42)

wf = MyWorkflow(x=17)
taskblaster.workflow(cls)

Class decorator for workflows.

Example:

@workflow
class MyClass:
    a = tb.var()

    ...
class taskblaster.worker.TaskContext(comm, tb_comm, workername)

Object that allows tasks to access information about worker.