In this section, we use some more of the PVM virtual machine functions to illustrate how cluster programs can be extended to adapt not only to faults but also to many other metrics and circumstances. The first example demonstrates a parallel application that dynamically adapts the size of the virtual machine through adding and releasing nodes based on the computational needs of the application. Such a feature is used every day on a 128-processor Beowulf cluster at Oak Ridge National Laboratory that is shared by three research groups.
int numh = pvm_addhosts( char **hosts, int nhost, int *infos) int numh = pvm_delhosts( char **hosts, int nhost, int *infos)
The PVM addhosts and delhosts routines add or delete a set of hosts in the virtual machine. In a Beowulf cluster this corresponds to adding or deleting nodes from the computation; numh is returned as the number of nodes successfully added or deleted. The argument infos is an array of length nhost that contains the status code for each individual node being added or deleted. This allows you to check whether only one of a set of hosts caused a problem, rather than trying to add or delete the entire set of hosts again.
/* * Adaptive Host Allocation Example adds and removes cluster nodes * from computation on the fly for different computational phases */ #include <stdio.h> #include <pvm3.h> static char *host_set_A[] = { "node1", "node2", "node3" }; static int nhosts_A = sizeof( host_set_A ) / sizeof( char ** ); static char *host_set_B[] = { "node10", "node12" }; static int nhosts_B = sizeof( host_set_B ) / sizeof( char ** ); #define MAX_HOSTS 255 #define MSGTAG 123 double phase1( int prob ) { return( (prob == 1) ? 1 : ((double) prob * phase1( prob - 1 )) ); } double phase2( int prob ) { int main( int argc, char **argv ) { double sum1 = 0.0, sum2 = 0.0, result; int status[MAX_HOSTS], prob, cc, i; char *args[3], input[16]; /* If I am the Manager Task */ if ( (cc = pvm_parent()) == PvmNoParent || cc == PvmParentNotSet ) { /* Phase #1 of computation - Use Host Set A */ pvm_addhosts( host_set_A, nhosts_A, status ); /* Spawn Worker Tasks - One Per Host */ args[0] = "phase1"; args[1] = input; args[2] = (char *) NULL; for ( i=0, prob=0 ; i < nhosts_A ; i++ ) if ( status[i] > 0 ) { /* Successful Host Add */ sprintf( input, "%d", prob++ ); pvm_spawn( "example2", args, PvmTaskDefault | PvmTaskHost, host_set_A[i], 1, (int *) NULL ); } /* Collect Results */ for ( i=0 ; i < prob ; i++ ) { pvm_recv( -1, MSGTAG ); pvm_upkdouble( &result, 1, 1 ); sum1 += result; } /* Remove Host Set A after Phase #1 */ for ( i=0 ; i < nhosts_A ; i++ ) if ( status[i] > 0 ) /* Only Delete Successful Hosts */ pvm_delhosts( &(host_set_A[i]), 1, (int *) NULL ); /* Phase #2 of Computation - Use Host Set B */ pvm_addhosts( host_set_B, nhosts_B, status ); /* Spawn Worker Tasks - One Per Host (None Locally) */ args[0] = "phase2"; for ( i=0, prob=0 ; i < nhosts_B ; i++ ) if ( status[i] > 0 ) { /* Successful Host Add */ sprintf( input, "%d", prob++ ); pvm_spawn( "example2", args, PvmTaskDefault | PvmTaskHost, host_set_B[i], 1, (int *) NULL ); } /* Collect Results */ for ( i=0 ; i < prob ; i++ ) { pvm_recv( -1, MSGTAG ); pvm_upkdouble( &result, 1, 1 ); sum2 += result; } /* Remove Host Set B from Phase #2 */ for ( i=0 ; i < nhosts_B ; i++ ) if ( status[i] > 0 ) /* Only Delete Successful Hosts */ pvm_delhosts( &(host_set_B[i]), 1, (int *) NULL ); /* Done */ printf( "sum1 (%lf) / sum2 (%lf) = %lf\n", sum1, sum2, sum1/sum2); } /* If I am a Worker Task */ else if ( cc > 0 ) { /* Compute Result */ prob = atoi( argv[2] ); if ( !strcmp( argv[1], "phase1" ) ) result = phase1( prob + 1 ); else if ( !strcmp( argv[1], "phase2" ) ) result = phase2( 100 * ( prob + 1 ) ); /* Send Result to Master */ pvm_initsend( PvmDataDefault ); pvm_pkdouble( &result, 1, 1 ); pvm_send( cc, MSGTAG ); } pvm_exit(); return( 0 ); }
One of the main difficulties of writing libraries for message-passing applications is that messages sent inside the application may get intercepted by the message-passing calls inside the library. The same problem occurs when two applications want to cooperate, for example, a performance monitor and a scientific application or an airframe stress application coupled with an aerodynamic flow application. Whenever two or more programmers are writing different parts of the overall message-passing application, there is the potential that a message will be inadvertently received by the wrong part of the application. The solution to this problem is communication context. As described earlier in the MPI chapters, communication context in MPI is handled cleanly through the MPI communicator.
In PVM 3.4, pvm_recv() requests a message from a particular source with a user-chosen message tag (either or both of these fields can be set to accept anything). In addition, communication context is a third field that a receive must match on before accepting a message; the context cannot be specified by a wild card. By default a base context is predefined, which is similar to the default MPI_COMM_WORLD communicator in MPI.
PVM has four routines to manage communication contexts.
new_context = pvm_newcontext() old_context = pvm_setcontext( new_context ) info = pvm_freecontext( context ) context = pvm_getcontext()
Pvm_newcontext() returns a systemwide unique context tag generated by the local daemon (in a way similar to the way the local daemon generates systemwide unique task IDs). Since it is a local operation, pvm_newcontext is very fast. The returned context can then be broadcast to all the tasks that are cooperating on this part of the application. Each of the tasks calls pvm_setcontext, which switches the active context and returns the old context tag so that it can be restored at the end of the module by another call to pvm_setcontext. Pvm_freecontext and pvm_getcontext are used to free memory associated with a context tag and to get the value of the active context tag, respectively.
Spawned tasks inherit the context of their parent. Thus, if you wish to add context to an existing parallel routine already written in PVM, you need to add only four lines to the source:
int mycxt, oldcxt; /* near the beginning of the routine set a new context */ mycxt = pvm_newcontext(); oldcxt = pvm_setcontext( mycxt ); /* spawn slave tasks to help */ /* slave tasks require no source code change */ /* leave all the PVM calls in master unchanged */ /* just before exiting the routine restore previous context */ mycxt = pvm_setcontext( oldcxt ); pvm_freecontext( mycxt ); return;
PVM has always had message handlers internally, which were used for controlling the virtual machine. In PVM 3.4 the ability to define and delete message handlers was raised to the user level so that parallel programs can be written that can add new features while the program is running.
The two new message handler functions are
mhid = pvm_addmhf( src, tag, context, *function ); pvm_delmhf( mhid );
Once a message handler has been added by a task, whenever a message arrives at this task with the specified source, message tag, and communication context, the specified function is executed. The function is passed the message so that it may unpack the message if desired. PVM places no restrictions on the complexity of the function, which is free to make system calls or other PVM calls. A message handler ID is returned by the add routine, which is used in the delete message handler routine.
There is no limit on the number of handlers you can set up, and handlers can be added and deleted dynamically by each application task independently.
By setting up message handlers, you can now write programs that can dynamically change the features of the underlying virtual machine. For example, message handlers can be added that implement active messages; the application then can use this form of communication rather than the typical send/receive. Similar opportunities exist for almost every feature of the virtual machine.
The ability of the application to adapt features of the virtual machine to meet its present needs is a powerful capability that has yet to be fully exploited in Beowulf clusters.
/* Adapting available Virtual Machine features with * user redefined message handlers. */ #include <stdio.h> #include <pvm3.h> #define NWORK 4 #define MAIN_MSGTAG 123 #define CNTR_MSGTAG 124 int counter = 0; int handler( int mid ) { int ack, incr, src; /* Increment Counter */ pvm_upkint( &incr, 1, 1 ); counter += incr; printf( "counter = %d\n", counter ); /* Acknowledge Counter Task */ pvm_bufinfo( mid, (int *) NULL, (int *) NULL, &src ); pvm_initsend( PvmDataDefault ); ack = ( counter > 1000 ) ? -1 : 1; pvm_pkint( &ack, 1, 1 ); pvm_send( src, CNTR_MSGTAG ); return( 0 ); } int main( int argc, char **argv ) { int ack, cc, ctx, bufid, incr=1, iter=1, max, numt, old, value=1, src; char *args[2]; /* If I am a Manager Task */ if ( (cc = pvm_parent()) == PvmNoParent || cc == PvmParentNotSet ) { /* Generate New Message Context for Counter Task messages */ ctx = pvm_newcontext(); /* Register Message Handler Function for Independent Counter */ pvm_addmhf( -1, CNTR_MSGTAG, ctx, handler ); /* Spawn 1 Counter Task */ args[0] = "counter"; args[1] = (char *) NULL; old = pvm_setcontext( ctx ); /* Set Message Context for Task */ if ( pvm_spawn( "example3", args, PvmTaskDefault, (char *) NULL, 1, (int *) NULL ) != 1 ) counter = 1001; /* Counter Failed to Spawn, Trigger Exit */ pvm_setcontext( old ); /* Reset to Base Message Context */ /* Spawn NWORK Worker Tasks */ args[0] = "worker"; numt = pvm_spawn( "example3", args, PvmTaskDefault, (char *) NULL, NWORK, (int *) NULL ); /* Increment & Return Worker Values */ do { /* Get Value */ bufid = pvm_recv( -1, MAIN_MSGTAG ); pvm_upkint( &value, 1, 1 ); max = ( value > max ) ? value : max; printf( "recvd value = %d\n", value ); /* Send Reply */ pvm_bufinfo( bufid, (int *) NULL, (int *) NULL, &src ); if ( counter <= 1000 ) value += iter++; else { value = -1; numt--; } /* Tell Workers to Exit */ pvm_initsend( PvmDataDefault ); pvm_pkint( &value, 1, 1 ); pvm_send( src, MAIN_MSGTAG ); } while ( numt > 0 ); printf( "Max Value = %d\n", max ); } /* If I am a Worker Task */ else if ( cc > 0 && !strcmp( argv[1], "worker" ) ) { /* Grow Values Until Done */ do { /* Send Value to Master */ value *= 2; pvm_initsend( PvmDataDefault ); pvm_pkint( &value, 1, 1 ); pvm_send( cc, MAIN_MSGTAG ); /* Get Incremented Value from Master */ pvm_recv( cc, MAIN_MSGTAG ); pvm_upkint( &value, 1, 1 ); } while ( value > 0 ); } /* If I am a Counter Task */ else if ( cc > 0 && !strcmp( argv[1], "counter" ) ) { /* Grow Values Until Done */ do { /* Send Counter Increment to Master */ pvm_initsend( PvmDataDefault ); pvm_pkint( &incr, 1, 1 ); pvm_send( cc, CNTR_MSGTAG ); incr *= 2; /* Check Ack from Master */ pvm_recv( cc, CNTR_MSGTAG ); pvm_upkint( &ack, 1, 1 ); } while ( ack > 0 ); } pvm_exit(); return( 0 ); }
In a typical message-passing system, messages are transient, and the focus is on making their existence as brief as possible by decreasing latency and increasing bandwidth. But in a growing number of situations in the parallel applications seen today, programming would be much easier if one could have persistent messages. This is the purpose of the Message Box feature in PVM.
The Message Box is an simple key/value database in the virtual machine. The key is a user-specified name, and the value is any valid PVM message. Given that there are no restrictions on the complexity or size of a PVM message, the database is simple, but remarkably flexible.
Four functions make up the Message Box:
index = pvm_putinfo( name, msgbuf, flag ) pvm_recvinfo( name, index, flag ) pvm_delinfo( name, index, flag ) pvm_getmboxinfo( pattern, matching_names, info )
Tasks can use regular PVM pack routines to create an arbitrary message and then use pvm_putinfo() to place this message into the Message Box with an associated name. Copies of this message can be retrieved by any PVM task that knows the name. If the name is unknown or is changing dynamically, then pvm_getmboxinfo () can be used to find the list of names active in the Message Box. The flag defines the properties of the stored message, such as who is allowed to delete this message, whether this name allows multiple instances of messages, and whether a put to the same name can overwrite the message.
The Message Box has been used for many other purposes. For example, the dynamic group functionality in PVM is implemented in the new Message Box functions; the Cumulvs computational steering tool uses the Message Box to query for the instructions on how to attach to a remote distributed simulation; and performance monitors leave their findings in the Message Box for other tools to use.
The capability to have persistent messages in parallel computing opens up many new application possibilities not only in high-performance computing but also in collaborative technologies.
/* Example using persistent messages to adapt to change * Monitor tasks are created and killed as needed * Information is exchanged between these tasks using persistent messages */ #include <stdio.h> #include <sys/time.h> #include <pvm3.h> #define MSGBOX "load_stats" int main() { int cc, elapsed, i, index, load, num; struct timeval start, end; double value; /* If I am a Manager Task */ if ( (cc = pvm_parent()) == PvmNoParent || cc == PvmParentNotSet ) { /* Periodically Spawn Load Monitor, Check Current System Load */ do { /* Spawn Load Monitor Task */ if ( pvm_spawn( "example4", (char **) NULL, PvmTaskDefault, (char *) NULL, 1, (int *) NULL ) != 1 ) { perror( "spawning load monitor" ); break; } sleep( 1 ); /* Check System Load (Microseconds Per Megaflop) */ for ( i=0, load=0.0, num=0 ; i < 11 ; i++ ) if ( pvm_recvinfo( MSGBOX, i, PvmMboxDefault ) >= 0 ) { pvm_upkint( &elapsed, 1, 1 ); load += elapsed; num++; } if ( num ) printf( "Load Avg = %lf usec/Mflop\n", (double) load / (double) num ); sleep( 5 ); } while ( 1 ); } /* If I am a Load Monitor Task */ else if ( cc > 0 ) { /* Time Simple Computation */ gettimeofday( &start, (struct timezone *) NULL ); for ( i=0, value=1.0 ; i < 1000000 ; i++ ) value *= 1.2345678; gettimeofday( &end, (struct timezone *) NULL ); elapsed = (end.tv_usec - start.tv_usec) + 1000000 * (end.tv_sec - start.tv_sec); /* Dump Into Next Available Message Mbox */ pvm_initsend( PvmDataDefault ); pvm_pkint( &elapsed, 1, 1 ); index = pvm_putinfo( MSGBOX, pvm_getsbuf(), PvmMboxDefault | PvmMboxPersistent | PvmMboxMultiInstance | PvmMboxOverWritable ); /* Free Next Mbox Index for Next Instance (Only Save 10) */ pvm_delinfo( MSGBOX, (index + 1) % 11, PvmMboxDefault ); } pvm_exit(); return( 0 ); }