Work chains#

The main disadvantage of work functions is that they cannot be interrupted during their execution. If at any point during runtime the Python process is killed, the workflow is not able to terminate correctly. This is not a significant issue when running simple scripts, but when you start running workflows with steps that take longer to complete, this can become a real problem.

In order to overcome this limitation, AiiDA allows you to insert checkpoints, where the main code defining a workflow can be interrupted and you can even shut down the machine on which AiiDA is running. We call these workflows with checkpoints work chains because, as you will see, they basically amount to splitting a work function into a chain of steps.

In this module, you will learn step-by-step how to write work chains in AiiDA.

Note

To focus on the AiiDA concepts, the examples in this module are toy work chains that are purposefully kept very simple. In a later module on writing workflows, you will see a real-world example of a work chain that calculates the equation of state of a material.

Constructing our first work chain#

We will start with a very simple work chain which we then modify step by step to introduce new features. A very basic example to start with is a work chain that receives a single input and passes it as the output. To get started, create a Python file for the work chain (e.g. my_first_workchain.py), and add the following piece of code:

from aiida.orm import Int
from aiida.engine import WorkChain


class OutputInputWorkChain(WorkChain):
    """Toy WorkChain that simply passes the input as an output."""

    @classmethod
    def define(cls, spec):
        """Specify inputs, outputs, and the workchain outline."""
        super().define(spec)

        spec.input("x", valid_type=Int)
        spec.outline(cls.result)
        spec.output("workchain_result", valid_type=Int)

Writing a work chain in AiiDA requires creating a class that inherits from the WorkChain class, as shown in the code snippet above. You can give the work chain any valid Python class name, but the convention is to have it end in WorkChain so that it is always immediately clear what it references. For this basic example, we chose OutputInputWorkChain, since it simply passes the input Int node as an output. We will now explain the basic components of this toy example in detail.

Define method#

The most important method to implement for every work chain is the define() class method:

    @classmethod
    def define(cls, spec):
        """Specify inputs, outputs, and the workchain outline."""
        super().define(spec)

This class method must always start by calling the define() method of its parent class using the super() function. The define() method is used to define the specifications of the work chain, which are contained in the work chain spec. In the define() method, we can see three aspects of the work chain are specified:

  • The inputs are specified using the spec.input() method:

            spec.input("x", valid_type=Int)
    

    The first argument of the input() method is a string that specifies the label of the input, in this case 'x'. The valid_type keyword argument allows you to specify the required node type of the input. For the 'x' input of this work chain, only Int nodes are accepted.

  • The outline is specified using the spec.outline() method:

            spec.outline(cls.result)
    

    The outline of the workflow is constructed from the methods of the work chain class. For the OutputInputWorkChain, the outline is a single step: result. Later in this module we’ll be adding more steps to the outline.

  • The outputs are specified using the spec.output() method:

            spec.output("workchain_result", valid_type=Int)
    

    This method is very similar in its usage to the input() method, and just like the inputs you can have several outputs. For now, you can see that our work chain will output a single Int node with the label 'workchain_result'.

Note

All inputs and outputs of a work chain must be AiiDA data types so they can be stored as a Node in the AiiDA database.

Adding the steps in the outline#

Now that we’ve seen how to define the spec of the work chain using the define() method, let’s instruct the work chain on what to actually do for the result step in the outline. This is done by adding each step as a method to the work chain class. Let’s do this for our single result step:

from aiida.orm import Int
from aiida.engine import WorkChain


class OutputInputWorkChain(WorkChain):
    """Toy WorkChain that simply passes the input as an output."""

    @classmethod
    def define(cls, spec):
        """Specify inputs, outputs, and the workchain outline."""
        super().define(spec)

        spec.input("x", valid_type=Int)
        spec.outline(cls.result)
        spec.output("workchain_result", valid_type=Int)

    def result(self):
        """Pass the input as an output."""

        # Declaring the output
        self.out("workchain_result", self.inputs.x)

As you can see, we defined result() as a method of the OutputInputWorkChain class. In this step, we are simply passing the input, stored in self.inputs.x, to the output labeled workchain_result:

        self.out("workchain_result", self.inputs.x)

Two things are happening in this line:

  • The x input is obtained from the inputs using self.inputs.x, i.e. as an attribute from self.inputs.

  • Using the out() method, this input is attached as an output of the work chain with link label workchain_result. As the x work chain input is already an Int node, the valid_type condition is immediately satisfied.

Add the result method to the OutputInputWorkChain class in your my_first_workchain.py file. Now you should be ready to run your first work chain!

Run the work chain#

In the terminal, navigate to the folder where you saved the my_first_workchain.py file with the OutputInputWorkChain work chain and open a verdi shell. Then run the work chain as you have seen in the AiiDA basics section on running calculation jobs:

In [1]: from aiida.engine import run
In [2]: from my_first_workchain import OutputInputWorkChain
In [3]: result = run(OutputInputWorkChain, x=Int(4) )

This should complete almost instantaneously, we’re just passing an input after all! Let’s see what’s stored in the result variable:

In [4]: result
Out[4]:
{'workchain_result': <Int: uuid: ed5106ef-8eff-4e87-b2e9-ce6770a6b9a3 (pk: 4665) value: 4>}

You can see that the run() method has returned a dictionary whose key corresponds to the output link label ('workchain_result') and whose value is the Int node that has been passed as an output in the work chain.

Exit the verdi shell and check the list of processes with verdi process list, using -a/--all to also see all terminated processes and -p/--past-days 1 to only see processes created in the past day:

$ verdi process list -a -p 1
...
1982  2m ago     OutputInputWorkChain  ⏹ Finished [0]

Grab the PK of the OutputInputWorkChain and show some details about the inputs and outputs using verdi process show:

$ verdi process show <PK>

This results in the following output:

Property     Value
-----------  ------------------------------------
type         OutputInputWorkChain
state        Finished [0]
pk           1982
uuid         da86a26e-9b8b-4ab2-94ae-84016a17152a
label
description
ctime        2021-06-19 23:13:59.238930+00:00
mtime        2021-06-19 23:13:59.431924+00:00

Inputs      PK  Type
--------  ----  ------
x         1981  Int

Outputs             PK  Type
----------------  ----  ------
workchain_result  1981  Int

Observe that the PK of the input is the same as the output. That is because our first work chain did not create any data, but just passed the input as the output.

Exercises#

(1) Generate the provenance graph of the OutputInputWorkChain. Is it what you would expect?

(2) Try to pass a plain Python integer for the x input when running the OutputInputWorkChain, instead of an Int node. What happens?

How not to create data#

Our next goal is to try and write a work chain that receives two inputs, adds them together, and outputs the sum. In a first attempt do this, open the my_first_workchain.py file and make the following changes (highlighted):

from aiida.orm import Int
from aiida.engine import WorkChain


class AddWorkChain(WorkChain):
    """WorkChain to add two integers."""

    @classmethod
    def define(cls, spec):
        """Specify inputs, outputs, and the workchain outline."""
        super().define(spec)

        spec.input("x", valid_type=Int)
        spec.input("y", valid_type=Int)
        spec.outline(cls.result)
        spec.output("workchain_result", valid_type=Int)

    def result(self):
        """Parse the result."""

        summation = self.inputs.x + self.inputs.y
        # Declaring the output
        self.out("workchain_result", summation)

As you can see, the first change is to update the name of the work chain to AddWorkChain to better represent its new functionality. Next, we declared a new input in the define() method:

        spec.input("y", valid_type=Int)

The y input is simply a second Int node that we will add to x. This is now done in the result() method, where we added the two inputs and attached the sum (summation) as the new output of the work chain:

        summation = self.inputs.x + self.inputs.y
        # Declaring the output
        self.out("workchain_result", summation)

Note that the summation of two Int nodes results in a new (and unstored) Int node (try it in the verdi shell!).

Run the work chain#

Let’s see what happens when we try to run the work chain as we have done before for the OutputInputWorkChain. Navigate to the folder where you have the work chain Python file (my_first_workchain.py) and open a verdi shell session to execute:

In [1]: from aiida.engine import run
In [2]: from my_first_workchain import AddWorkChain
In [3]: result = run(AddWorkChain, x=Int(4), y=Int(3))

Unfortunately, the command fails! The ValueError at the end of the stack trace explains what went wrong:

...
----> 3 result = run(AddWorkChain, x=Int(4), y=Int(3) )
...
ValueError: Workflow<AddWorkChain> tried returning an unstored `Data` node.
This likely means new `Data` is being created inside the workflow.
In order to preserve data provenance, use a `calcfunction` to create this node and return its output from the workflow

As the error message explains, the work chain is trying to create new Data. However, in order to preserve the data provenance, data can only be created by calculation functions or calculation jobs. So, to correctly create the new data inside the work chain, we’ll have to add a calculation function to our script.

Creating data with calculation function#

Let’s fix the issue with our work chain by creating the new data using a calculation function. To do this, define a calculation function that adds the two numbers together and call this function inside a work chain step. You can see the highlighted changes below:

from aiida.orm import Int
from aiida.engine import WorkChain, calcfunction


@calcfunction
def addition(x, y):
    return x + y


class AddWorkChain(WorkChain):
    """WorkChain to add two integers."""

    @classmethod
    def define(cls, spec):
        """Specify inputs, outputs, and the workchain outline."""
        super().define(spec)

        spec.input("x", valid_type=Int)
        spec.input("y", valid_type=Int)
        spec.outline(cls.result)
        spec.output("workchain_result", valid_type=Int)

    def result(self):
        """Sum the inputs and parse the result."""

        # Call `addition` using the two inputs
        addition_result = addition(self.inputs.x, self.inputs.y)

        # Declaring the output
        self.out("workchain_result", addition_result)

We first imported the calcfunction decorator from the aiida engine. Then, we defined the addition() function outside the work chain scope, then we decorated it with @calcfunction:

@calcfunction
def addition(x, y):
    return x + y

And finally, we added the two inputs using the addition() calculation function that we defined above:

        addition_result = addition(self.inputs.x, self.inputs.y)

This will ensure that the Int node created by the addition is stored as a part of the data provenance.

Run the work chain#

Let’s run the work chain that uses the addition() calculation function. Once again make sure you are in the folder where you have the work chain Python script, open the verdi shell and execute:

In [1]: from aiida.engine import run
In [2]: from my_first_workchain import AddWorkChain
In [3]: result = run(AddWorkChain, x=Int(4), y=Int(3))

This time the run should have completed without issue! Let’s see what result was returned by the run() call:

In [4]: result
Out[4]: {'workchain_result': <Int: uuid: 21cf16e9-58dc-4566-bbd7-b170fcd628ee (pk: 1990) value: 7>}

Similar to the first work chain you ran, the result is a dictionary that contains the output label and output Int node as a key/value pair. However, now the Int node is a new node that was created by the addition() calculation function.

Close the verdi shell session and look for the work chain you just ran:

$ verdi process list -a -p 1
...
1988  49s ago    AddWorkChain     ⏹ Finished [0]
1989  49s ago    addition         ⏹ Finished [0]

Next, check the status of the process that corresponds to the AddWorkChain:

$ verdi process status <PK>
AddWorkChain<1988> Finished [0] [None]
    └── addition<1989> Finished [0]

Notice how there is a branch in the work chain tree, which shows that a process (the addition() calculation function) was called by the AddWorkChain. Finally, you can obtain some details about the in- and outputs with:

$ verdi process show <PK>
Property     Value
-----------  ------------------------------------
type         AddWorkChain
state        Finished [0]
pk           1988
uuid         18ffdcfa-395c-4579-be82-a038ee0bbc22
label
description
ctime        2021-06-22 14:57:21.074444+00:00
mtime        2021-06-22 14:57:21.444217+00:00

Inputs      PK  Type
--------  ----  ------
x         1986  Int
y         1987  Int

Outputs             PK  Type
----------------  ----  ------
workchain_result  1990  Int

Called      PK  Type
--------  ----  ------
CALL      1989  addition

Note that the output has its own PK, which is different of both inputs. That is because it is a new data node that was created by the calculation function called by the work chain.

Exercises#

(1) Go back and check the status of the process that corresponds to the work chain OutputInputWorkChain. How is it different from the AddWorkChain?

(2) Generate the provenance graph of the AddWorkChain, and compare it to that of the OutputInputWorkChain.

Multiple work-chain steps - Context#

So far, we have only had a single step in the outline of our work chain. When writing work chains with multiple steps, you may need to pass data between them. This can be achieved using the context.

Our new work chain will have the same goal as before, simply adding two inputs. But this time, we will create two steps in the outline() call, one to actually add the inputs and thus creating new data, and another step just to pass the result as an output. The code looks like this:

from aiida.orm import Int
from aiida.engine import WorkChain, calcfunction


@calcfunction
def addition(x, y):
    return x + y


class AddWorkChain(WorkChain):
    """WorkChain to add two integers."""

    @classmethod
    def define(cls, spec):
        """Specify inputs, outputs, and the workchain outline."""
        super().define(spec)

        spec.input("x", valid_type=Int)
        spec.input("y", valid_type=Int)
        spec.outline(cls.add, cls.result)
        spec.output("workchain_result", valid_type=Int)

    def add(self):
        """Sum the inputs."""

        # Call `addition` using the two inputs
        addition_result = addition(self.inputs.x, self.inputs.y)

        # Passing to context to be used by other functions
        self.ctx.summation = addition_result

    def result(self):
        """Parse the result."""

        # Declaring the output
        self.out("workchain_result", self.ctx.summation)

We added an extra step called add in the outline to be executed before result:

        spec.outline(cls.add, cls.result)

Which means we also have to define the add() method for the work chain class:

    def add(self):
        """Sum the inputs."""

        # Call `addition` using the two inputs
        addition_result = addition(self.inputs.x, self.inputs.y)

        # Passing to context to be used by other functions
        self.ctx.summation = addition_result

This method is essentially the same as the result() method from the previous version, but instead of passing the result of the addition (summation) directly as an output, we added it to the work chain context using self.ctx:

        self.ctx.summation = addition_result

By doing so, the information stored in the context can now be used by another step of the outline. In our example, the self.ctx.summation is passed as the workchain_result output in the result() step:

    def result(self):
        """Parse the result."""

        # Declaring the output
        self.out("workchain_result", self.ctx.summation)

Exercise: Adding multiplication#

Alright, now it’s your turn! Based on the concepts you’ve learned so far, add an extra multiplication step by doing the following:

  • Rename the work chain to MultiplyAddWorkChain, since we’ll be adding an extra multiplication step.

  • Write a calculation function called multiplication, that takes two Int nodes and returns their product.

  • Add a new Int input to the MultiplyAddWorkChain spec, labeled 'z'.

  • Add a new step to the outline of the work chain called multiply, making sure it is the first step of the outline. When defining the method, use the multiplication calculation function to multiply the x and y inputs. Then pass the results to the add step using the context.

  • In the add() method, sum the result of the multiplication with the third input z and pass the result to the context.

  • In the result() method, output the result of the add step as the 'workchain_result'. Also, attach the result of the multiplication as an output. Note that you need to declare another output for this in the define method. You can use product as the label for the output link, for example.

Try to adapt the AddWorkChain into the MultiplyAddWorkChain yourself, and run the final work chain to see if it works. Once you managed to run the work chain, the status should have both the multiply and add calculation functions in the hierarchy:

$ verdi process status <PK>
MultiplyAddWorkChain<203> Finished [0] [2:result]
    ├── multiplication<204> Finished [0]
    └── addition<206> Finished [0]

You can then also generate the provenance graph and once again compare it with the details shown by verdi process show.

If you get stuck, we’ve added our solution to the exercise in the dropdown below. As always, try to solve the exercise yourself before looking at the solution!

Submitting calculation jobs#

All work chains we have seen up to this point rely on calculation functions to create data. When running the work chain, these processes are executed by the same Python process that is executing the code in the work-chain methods. All the functionality of the work chains above could have been implemented in a work function. In fact, this would be much more simple, similar to the add_multiply work function shown in the work function module.

Of course, the power of a work chain lies in its ability to submit other processes that can run independently while the work chain waits for them to complete. Doing so also releases the daemon to do other tasks, which is vital when running many workflows in high-throughput. These processes often don’t run Python code - think for example of a remote code installed on a supercomputer.

Although a work chain can also submit other work chains, in this section we’ll see how to submit a calculation job (CalcJob) inside a work chain. Starting from the AddWorkChain in the section on work chain context the code below replaces the addition calculation function by the ArithmeticAdd calculation job, which ships with aiida-core:

from aiida.orm import Int, Code
from aiida.engine import WorkChain, calcfunction, ToContext
from aiida.plugins.factories import CalculationFactory

ArithmeticAddCalculation = CalculationFactory("core.arithmetic.add")


class AddCalcjobWorkChain(WorkChain):
    """WorkChain to add two integers."""

    @classmethod
    def define(cls, spec):
        """Specify inputs, outputs, and the workchain outline."""
        super().define(spec)

        spec.input("x", valid_type=Int)
        spec.input("y", valid_type=Int)
        spec.input("code", valid_type=Code)
        spec.outline(cls.add, cls.result)
        spec.output("workchain_result", valid_type=Int)

    def add(self):
        """Sum the inputs."""

        # Submitting the calculation job `ArithmeticAddCalculation`
        calc_job_node = self.submit(
            ArithmeticAddCalculation,
            x=self.inputs.x,
            y=self.inputs.y,
            code=self.inputs.code,
        )

        return ToContext(add_node=calc_job_node)

    def result(self):
        """Parse the result."""

        # Declaring the output
        self.out("workchain_result", self.ctx.add_node.outputs.sum)

Let’s have a closer look at each change in the code. First, we imported the ArithmeticAddCalculation calculation job that we plan on using with the help of the CalculationFactory:

from aiida.plugins.factories import CalculationFactory

ArithmeticAddCalculation = CalculationFactory("core.arithmetic.add")

However, it is possible to set up multiple codes in the AiiDA database that run the same calculation job (a local and remote one, for example). Hence, the user must also be able to specify which code the work chain should run. To allow this, we have added a code input to the work chain spec, which must be of type Code:

        spec.input("code", valid_type=Code)

In the add() method, we now submit the ArithmeticAddCalculation calculation job instead of running the addition() calculation function:

    def add(self):
        """Sum the inputs."""

        # Submitting the calculation job `ArithmeticAddCalculation`
        calc_job_node = self.submit(
            ArithmeticAddCalculation,
            x=self.inputs.x,
            y=self.inputs.y,
            code=self.inputs.code,
        )

        return ToContext(add_node=calc_job_node)

Important

When submitting a calculation job or work chain inside a work chain, it is essential to use the submit method of the work chain via self.submit().

Since the result of the addition is only available once the calculation job is finished, the submit() method returns the CalcJobNode of the ArithmeticAddCalculation process. To make sure the work chain waits for this process to finish before continuing, we return the ToContext container. Here, we have specified that the calculation job node stored in calc_job_node should be assigned to the 'add_node' context key.

Once the ArithmeticAddCalculation calculation job is finished, the work chain will execute the result step. The outputs of the calculation job are stored in the outputs attribute of the calculation job node. In case of the ArithmeticAddCalculation, the result of the addition is attached as an output using the sum link label, and hence can be accessed using <calculation_job_node>.outputs.sum. Since we have added the calculation job node to the context under the add_node key, we obtain the sum using self.ctx.add_node.outputs.sum. This result is then attached as the workchain_result output:

        self.out("workchain_result", self.ctx.add_node.outputs.sum)

And that’s all! Copy the full code snippet for the work chain to a Python file, for example addcalcjobworkchain.py. Now we’re ready to launch our work chain.

Submit the work chain#

When submitting work chains to the AiiDA daemon, it’s important that it knows where to find the work chain module, i.e. the .py file that contains the work chain code. To do this, we need to add the directory that contains this file to the PYTHONPATH. Make sure you are in the directory that contains the addcalcjobworkchain.py file and execute:

$ echo "export PYTHONPATH=\$PYTHONPATH:$PWD" >> $HOME/.bashrc
$ source $HOME/.bashrc
$ verdi daemon restart --reset

Also double check that you have set up the add code used in the AiiDA basics module:

$ verdi code list
...
* pk 1912 - add@localhost

If not, you can set it up with the instructions in the dropdown below.

Then open a verdi shell and import the submit() function and work chain:

In [1]: from aiida.engine import submit
   ...: from addcalcjobworkchain import AddCalcjobWorkChain

We’ll also need the add code set up on the localhost. Load it using its label:

In [2]: add_code = load_code(label='add@localhost')

Then we can submit the work chain:

In [3]: workchain_node = submit(AddCalcjobWorkChain, x=Int(1), y=Int(2), code=add_code)

Note that just like for the self.submit() method executed inside the work chain, the bare submit() function also returns the process node:

In [4]: workchain_node
Out[4]: <WorkChainNode: uuid: 0936f2b4-01af-46bb-98f8-da828ce706eb (pk: 324) (addcalcjobworkchain.AddCalcjobWorkChain)>

In this case, it is the node of the AddCalcjobWorkChain we have just submitted.

Exercises#

(1) As before, generate the provenance graph of the AddCalcjobWorkChain and check the details of verdi process show. Do you see any differences with the AddWorkChain?

(2) Try loading the ArithmeticAddCalculation node in the verdi shell, and browse its outputs via the outputs attribute and tab-completion. Basically, after loading the node in e.g. add_node, type add_node.outputs. and then press Tab. Do you find the sum output?

(3) To practice the concepts of this final section, adapt the MultiplyAddWorkChain so it uses the ArithmeticAddCalculation calculation job instead of the addition() calculation process.