11.2 Building Fault-Tolerant Parallel Applications

11.2 Building Fault-Tolerant Parallel Applications

From the application's view, three steps must be performed for fault tolerance: detection, notification, and recovery.

The first step is being able to detect that something has gone wrong. Detection is typically the job of the runtime environment; but when the runtime envoronment does not provide this capability, application developers can create their own set of monitoring tasks to oversee an application.

The PVM runtime system has a monitoring and notification capability built into it. Any or all tasks in an application can ask to be notified of specific events. These events include the failure of a task, the failure of a cluster node, or the availability of new nodes into the application.

The second step in building fault-tolerant applications is notification. The PVM task(s) requesting notification can specify a particular task or set of tasks to be monitored. Or it can ask to be notified if any task within the application fails. The notification message contains the ID of the task that failed.

Unlike many detection systems, PVM's monitoring system is not based on the detection of a broken communication channel between the monitored and notifed task. Thus there is no need for the notified task and the failed task ever to have communicated in order to detect the failure. This approach provides more robustness in the first step of detection.

The failure or deletion of a node in the cluster is another notify event that can be requested. Again the requesting application task can specify a particular node, set of nodes, or all nodes. And, as before, the notification message returns the ID of the failed node(s).

The addition of one or more cluster nodes to the application's computational environment is also an event that PVM can notify an application about. In this case no ID can be specified, and the notification message returns the ID of the new node(s).

int info = pvm_notify( int EventType, int msgtag, int cnt, int *ids )

The EventType options are PvmTaskExit, PvmHostDelete, or PvmHostAdd. A separate notify call must be made for each event type that the application wishes to be notified about. The msgtag argument specifies what message tag the task will be using to listen for events. The cnt argument is the number of tasks or node IDs in the ids list for which notification is requested.

Given the flexibility of the pvm_notify command, there are several options for how the application can be designed to receive notification from the PVM system. The first option is designing a separate watcher task. One or more of these watcher tasks are spawned across the cluster and often have the additional responsibility of managing the recovery phase of the application. The advantage of this approach is that the application code can remain cleaner. Note that in the manager/worker scheme the manager often assumes the additional duty as watcher.

A second option is for the application tasks to watch each other. A common method is to have each task watch its neighbor in a logical ring. Thus each task just watches one or two other tasks. Another common, but not particularly efficient, method is to have every task watch all the other tasks. Remember that the PVM system is doing the monitoring, not the application tasks. So the monitoring overhead is the same with all these options. The difference is the number of notification messages that get sent in the event of a failure.

Recovery is the final step in building fault-tolerant programs. Recovery depends heavily on the type of parallel algorithm used in the application. The most commonly used options are restart from the beginning, roll back to the last checkpoint, or reassign the work of a failed task.

The first option is the simplest to implement but the most expensive in the amount of calculation that must be redone. This option is used by many batch systems because it requires no knowledge of the application. It guarantees that the application will complete even if failures occur, although it does not guarantee how long this will take. On average the time is less than twice the normal run time. For short-running applications this is the best option.

For longer-running applications, checkpointing is a commonly used option. With this option you must understand the parallel application and modify it so that the application can restart from an input data file. You then have to modify the application to write out such a data file periodically. In the event of a failure, only computations from the last checkpoint are lost. The application restarts itself from the last successful data file written out. How often checkpoints are written out depends on the size of the restart file and how long the application is going to run. For large, scientific applications that run for days, checkpointing is typically done every few hours.

Note that if a failure is caused by the loss of a cluster node, then the application cannot be restarted until the node is repaired or is replaced by another node in the cluster. The restart file is almost always written out assuming that the same number of nodes is available during the restart.

In the special case where an application is based on a manager/worker scheme, it is often possible to reassign the job sent to the failed worker to another worker or to spawn a replacement worker to take its place. Manager/worker is a very popular parallel programming scheme for Beowulf clusters, so this special case arises often. Below is an example of a fault-tolerant manager/worker program.

/* Fault Tolerant Manager / Worker Example
 * using notification and task spawning.
 * example1.c

#include <stdio.h>
#include <math.h>
#include <pvm3.h>

#define NWORK        4
#define NPROB        10000
#define MSGTAG       123

int main()
    double sum = 0.0, result, input = 1.0;
    int tids[NWORK], numt, probs[NPROB], sent=0, recvd=0;
    int aok=0, cc, bufid, done=0, i, j, marker, next, src;

    /* If I am a Manager Task */
    if ( (cc = pvm_parent()) == PvmNoParent || cc == PvmParentNotSet ) {

        /* Spawn NWORK Worker Tasks */
        numt = pvm_spawn( "example1", (char **) NULL, PvmTaskDefault,
                (char *) NULL, NWORK, tids );

        /* Set Up Notify for Spawned Tasks */
        pvm_notify( PvmTaskExit, MSGTAG, numt, tids );

        /* Send Problem to Spawned Workers */
        for ( i=0 ; i < NPROB ; i++ ) probs[i] = -1;
        for ( i=0 ; i < numt ; i++ ) {
            pvm_initsend( PvmDataDefault );
            pvm_pkint( &aok, 1, 1 );  /* Valid Problem Marker */
            input = (double) (i + 1);
            pvm_pkdouble( &input, 1, 1 );
            pvm_send( tids[i], MSGTAG );
            probs[i] = i; sent++;  /* Next Problem */

        /* Collect Results / Handle Failures */
        do {
            /* Receive Result */
            bufid = pvm_recv( -1, MSGTAG );
            pvm_upkint( &marker, 1, 1 );

            /* Handle Notify */
            if ( marker > 0 ) {
                /* Find Failed Task Index */
                for ( i=0, next = -1 ; i < numt ; i++ )
                    if ( tids[i] == marker )
                        /* Find Last Problem Sent to Task */
                        for ( j=(sent-1) ; j > 0 ; j-- )
                            if ( probs[j] == i ) {
                                /* Spawn Replacement Task */
                                if ( pvm_spawn( "example1", (char **) NULL,
                                        PvmTaskDefault, (char *) NULL, 1,
                                        &(tids[i]) ) == 1 ) {
                                    pvm_notify( PvmTaskExit, MSGTAG, 1,
                                            &(tids[i]) );
                                    next = i;  sent--;
                                probs[j] = -1; /* Reinsert Prob */
            } else {
                /* Get Source Task & Accumulate Solution */
                pvm_upkdouble( &result, 1, 1 );
                sum += result;
                /* Get Task Index */
                pvm_bufinfo( bufid, (int *) NULL, (int *) NULL, &src );
                for ( i=0 ; i < numt ; i++ )
                    if ( tids[i] == src ) next = i;

            /* Send Another Problem */
            if ( next >= 0 ) {
                for ( i=0, input = -1.0 ; i < NPROB ; i++ )
                    if ( probs[i] < 0 ) {
                        input = (double) (i + 1);
                        probs [i] = next; sent++;  /* Next Problem */
                pvm_initsend( PvmDataDefault );
                pvm_pkint( &aok, 1, 1 );  /* Valid Problem Marker */
                pvm_pkdouble( &input, 1, 1 );
                pvm_send( tids[next], MSGTAG );
                if ( input < 0.0 ) tids[next] = -1;

        } while ( recvd < sent );

        printf( "Sum = %lf\n", sum );

    /* If I am a Worker Task */
    else if ( cc > 0 ) {
        /* Notify Me If Manager Fails */
        pvm_notify( PvmTaskExit, MSGTAG, 1, &cc );
        /* Solve Problems Until Done */
        do {
            /* Get Problem from Master */
            pvm_recv( -1, MSGTAG );
            pvm_upkint( &aok, 1, 1 );
            if ( aok > 0 )  /* Master Died */
            pvm_upkdouble( &input, 1, 1 );
            if ( input > 0.0 ) {
                /* Compute Result */
                result = sqrt( ( 2.0 * input ) - 1.0 );
                /* Send Result to Master */
                pvm_initsend( PvmDataDefault );
                pvm_pkint( &aok, 1, 1 );    /* Ask for more... */
                pvm_pkdouble( &result, 1, 1 );
                pvm_send( cc, MSGTAG );
            } else
                done = 1;
        } while ( !done );


    return( 0 );

This example illustrates another useful function: pvm_spawn(). The ability to spawn a replacement task is a powerful capability in fault tolerance. It is also a key function in adaptive programs, as we will see in the next section.

int numt = pvm_spawn( char *task, char **argv, int flag,
                      char *node, int ntasks, int *tids )

The routine pvm_spawn() starts up ntasks copies of an executable file task on the virtual machine. The PVM virtual machine is assumed to be running on the Beowulf cluster. Here argv is a pointer to an array of arguments to task with the end of the array specified by NULL. If task takes no arguments, then argv is NULL. The flag argument is used to specify options and is a sum of the following options:

  • PvmTaskDefault: has PVM choose where to spawn processes

  • PvmTaskHost: uses a where argument to specify a particular host or cluster node to spawn on

  • PvmTaskArch: uses a where argument to specify an architecture class to spawn on

  • PvmTaskDebug: starts up these processes under debugger

  • PvmTaskTrace: uses PVM calls to generate trace data

  • PvmMppFront: starts process on MPP front-end/service node

  • PvmHostComp: starts process on complementary host set

For example, flag = PvmTaskHost + PvmHostCompl spawns tasks on every node but the specified node (which may be the manager, for instance).

On return, numt is set to the number of tasks successfully spawned or an error code if no tasks could be started. If tasks were started, then pvm_spawn() returns a vector of the spawned tasks' tids. If some tasks could not be started, the corresponding error codes are placed in the last (ntask - numt) positions of the vector.

In the example above, pvm_spawn() is used by the manager to start all the worker tasks and also is used to replace workers who fail during the computation. This type of fault-tolerant method is useful for applications that run continuously with a steady stream of new work coming in, as was the case in our two initial examples. Both used a variation on the above PVM example code for their solution.

Part III: Managing Clusters