Although parallel programs can be quite complex, many applications can be made parallel in a simple way to take advantage of the power of Beowulf clusters. In this section we describe how to write simple programs using features of the Linux operating system that you are probably already familiar with. We begin with a discussion of processes themselves (the primary unit of parallelism) and the ways they can be created in Unix environments such as Linux. A good reference on this material is [111].
First we review terminology. A program is a set of computer instructions. A computer fetches from its memory the instruction at the address contained in its program counter and executing that instruction. Execution of the instruction sets the program counter for the next instruction. This is the basic von Neumann model of computation. A process consists of a program, an area of computer memory called an address space, and a program counter. (If there are multiple program counters for a single address space, the process is called a multithreaded process.) Processes are isolated from one another in the sense that no single instruction from the program associated with one process can access the address space of another process. Data can be moved from the address space of one process to that of another process by methods that we will describe in this and succeeding chapters. For the sake of simplicity, we will discuss single-threaded processes here, so we may think of a process as an (address space, program, program counter) triple.
Where do processes come from? In Unix-based operating systems such as Linux, new processes are created by the fork system call. This is an efficient and lightweight mechanism that duplicates the process by copying the address space and creating a new process with the same program. The only difference between the process that executed the fork (called the parent process) and the new process (called the child process) is that the fork call returns 0 in the child and the process id in the parent. Based on this different return code from fork, the parent and child processes, now executing independently, can do different things.
One thing the child process often does is an exec system call. This call changes the program for the process, sets the program counter to the beginning of the program, and reinitializes the address space. The fork-exec combination, therefore, is the mechanism by a process create a new, completely different one. The new process is executing on the same machine and competing for CPU cycles with the original process through the process scheduler in the machine's operating system.
You have experienced this mechanism many times. When you are logged into a Unix system, you are interacting with a shell, which is just a normal Unix process that prompts you, reads your input commands, and processes them. The default program for this process is /bin/bash; but depending on the shell specified for your user name in '/etc/passwd', you may be using another shell. Whenever you run a Unix command, such as grep, the shell forks and execs the program associated with the command. The command ps shows you all the processes you are running under the current shell, including the ps process itself (strictly speaking, the process executing the ps program).
Normally, when you execute a command from the shell, the shell process waits for the child process to complete before prompting you for another command, so that only one process of yours at a time is actually executing. By "executing" we mean that it is in the list of processes that the operating system will schedule for execution according to its time-slicing algorithm. If your machine has ony one CPU, of course only one instruction from one process can be executing at a time. By time-slicing the CPU among processes, however, the illusion of simultaneously executing process on a single machine, even a single CPU, is presented.
The easiest way to cause multiple processes to be scheduled for execution at the same time is to append the '&' character to a command that you execute in the shell. When you do this, the shell starts the new process (using the fork-exec mechanism) but then immediately prompts for another command without waiting for the new one to complete. This is called "running a process in the background." Multiple background processes can be executing at the same time. This situation provides us with our first example of parallel processes.
To determine whether a file contains a specific string, you can use the Unix command grep. To look in a directory containing mail files in order to find a message about the Boyer-Moore string-matching algorithm, you can cd to that directory and do
grep Boyer *
If your mail is divided into directories by year, you can consider search all those directories in parallel. You can use background processes to do this search in a shell script:
!# /bin/bash echo searching for $1 for i in 20* ; do ( cd $i; grep $1 * > $1.out & ) ; done wait cat 20*/$1.out > $1.all
and invoke this with Boyer as an argument.
This simple parallel program matches our definition of a manager/worker algorithm, in which the master process executes this script and the worker processes execute grep. We can compare its properties with the list in Section 7.1:
The subtasks, each of which is to run grep over all the files in one directory, are independent.
The workers are started by this shell script, which acts as the master.
The subtask specifications (arguments to grep) are communicated to the workers on their respective command lines.
The results are written to the file system, one result file in each directory.
The wait causes the shell script to wait for all background processes to finish, so that the results can be collected by the manager (using cat) into one place.
One can make a few further observations about this example:
The first line of the script tells the system which program to use to interpret the script. Here we have used the default shell for Linux systems, called bash. Other shells may be installed on your system, such as csh, tcsh, or zsh. Each of these has a slightly different syntax and different advanced features, but for the most part they provide the same basic functionality.
We could have made the size of the subtask smaller by running each invocation of grep on a single file. This would have led to more parallelism, but it is of dubious value on a single machine, and we would have been creating potentially thousands of processes at once.
We could time this script by putting date commands at the beginning and end, or by running it under the shell's time command:
time grepmail boyer
where grepmail is the name of this script and boyer is the argument.
Recall that the way a process is created on a Unix system is with the fork mechanism. Only one process is not forked by another process, namely the single init process that is the root of the tree of all processes running at any one time.
Thus, if we want to create a new process on another machine, we must contact some existing process and cause it to fork our new process for us. There are many ways to do this, but all of them use this same basic mechanism. They differ only in which program they contact to make a fork request to. The contact is usually made over a TCP socket. We describe sockets in detail in Section 7.2.5.
The rsh command contacts the rshd process if it is running on the remote machine and asks it to execute a program or script. To see the contents of the '/tmp' directory on the machine foo.bar.edu, you would do
rsh foo.bar.edu ls /tmp
The standard input and output of the remote command are routed through the standard input and output of the rsh command, so that the output of the ls comes back to the user on the local machine. Chapter 5 describes how to set up rsh on your cluster.
The ssh (secure shell) program behaves much like rsh but has a more secure authentication mechanism based on public key encryption and encrypts all traffic between the local and remote host. It is now the most commonly used mechanism for starting remote processes. Nevertheless, rsh is substantially faster than ssh, and is used when security is not a critical issue. A common example of this situation occurs when the cluster is behind a firewall and rsh is enabled just within the cluster. Setting up ssh is described in Chapter 5, and a book on ssh has recently appeared [11].
Here is a simple example. Suppose that we have a file called 'hosts' with the names of all the hosts in our cluster. We want to run a command (in parallel) on all those hosts. We can do so with a simple shell script as follows:
#! /bin/bash for i in 'cat hosts' ; do (ssh -x $i hostname & ) ; done
If everything is working correctly and ssh has been configured so that it does not require a password on every invocation, then we should get back the names of the hosts in our cluster, although not necessarily in the same order as they appear in the file.
(What is that -x doing there? In this example, since the remotely executed program (hostname) does not use any X windowing facilities, we turn off X forwarding by using the -x option. To run a program that does use X, the X option must be turned on by the sshd server at each remote machine and the user should set the DISPLAY environment variable. Then, the connection to the X display is automatically forwarded in such a way that any X programs started from the shell will go through the encrypted channel, and the connection to the real X server will be made from the local machine. We note that if you run several X programs at several different hosts, they will each create a file named '.Xauthority' in your home directory on each of the machines. If the machines all have the same home directory, for example mounted via NFS, the '.Xauthority' files will conflict with each other.)
Programs such as the ones rsh and ssh contact to fork processes on their behalf are often called daemons. These processes are started when the system is booted and run forever, waiting for connections. You can see whether the ssh daemon is installed and running on a particular host by logging into that host and doing ps auxw | grep sshd. Other daemons, either run as root by the system or run by a particular user, can be used to start processes. Two examples are the daemons used to start processes in resource managers and the mpd's that can be used to start MPI jobs quickly (see Chapter 8).
Having discussed how processes are started, we next tunr to the topic of remote files, files that are local to a remote machine. Often we need to move files from one host to another, to prepare for remote execution, to communicate results, or even to notify remote processes of events.
Moving files is not always necessary, of course. On some clusters, certain file systems are accessible on all the hosts through a system like NFS (Network File System) or PVFS (Parallel Virtual File System). (Chapter 19 describes PVFS in detail.) However, direct remote access can sometimes be slower than local access. In this section we discuss some mechanisms for moving files from one host to another, on the assumption that the programs and at least some of the files they use are desired to be staged to a local file system on each host, such as '/tmp'.
The simplest mechanism is the remote copy command rcp. It has the same syntax as the standard local file copy command cp but can accept user name and host information from the file name arguments. For example,
rcp thisfile jeeves.uw.edu:/home/jones/thatfile
copies a local file to a specific location on the host specified by the prefix before the ':'. A remote user can also be added:
rcp smith@jeeves.uw.edu:/home/jones/thatfile .
The rcp command uses the same authentication mechanism as rsh does, so it will either ask for a password or not depending on how rsh has been set up. Indeed, rcp can be thought of as a companion program to rsh. The rcp command can handle "third party" transfers, in which neither the source nor destination file is on the local machine.
Just as ssh is replacing rsh for security reasons, scp is replacing rcp. The scp command is the ssh version of rcp and has a number of other convenient features, such as a progress indicator, which is handy when large files are being transferred.
The syntax of scp is similar to that for rcp. For example,
scp jones@fronk.cs.jx.edu:bazz .
will log in to machine fronk.cs.jx.edu as user jones (prompting for a password for jones if necessary) and then copy the file 'bazz' in user jones's home directory to the file 'bazz' in the current directory on the local machine.
Both ftp and sftp are interactive programs, usually used to browse directories and transfer files from "very" remote hosts rather than within a cluster. If you are not already familiar with ftp, the man page will teach you how to work this basic program. The sftp program is the more secure, ssh-based version of ftp.
One can use rdist to maintain identical copies of a set of files across a set of hosts. A flexible 'distfile' controls exactly what files are updated. This is a useful utility when one wants to update a master copy and then have the changes reflected in local copies on other hosts in a cluster. Either rsh-style (the default) or ssh-style security can be specified.
An efficient replacement for rcp is rsync, particularly when an earlier version of a file or directory to be copied already exists on the remote machine. The idea is to detect the differences between the files and then just transfer the differences over the network. This is especially effective for backing up large directory trees; the whole directory is specified in the command, but only (portions of) the changed files are actually copied.
The most common and flexible way for two processes on different hosts in a cluster to communicate is through sockets. A socket between two processes is a bidirectional channel that is accessed by the processes using the same read and write functions that processes use for file I/O. In this section we show how a process connects to another process, establishing a socket, and then uses it for communication. An excellent reference for the deep topic of sockets and TCP/IP in general is [111]. Here we just scratch the surface, but the examples we present here should enable you to write some useful programs using sockets. Since sockets are typically accessed from programming and scripting languages, we give examples in C, Perl, and Python, all of which are common languages for programming clusters.
Although once a socket is established, it is symmetric in the sense that communication is bidirectional, the initial setup process is asymmetric: one process connects; the other one "listens" for a connection and then accepts it. Because this situation occurs in many client/server applications, we call the process that waits for a connection the server and the process that connects to it the client, even though they may play different roles after the socket has been established.
We present essentially the same example in three languages. In the example, the server runs forever in the background, waiting for a socket connection. It advertises its location by announcing its host and "port" (more on ports below), on which it can be contacted. Then any client program that knows the host and port can set up a connection with the server. In our simple example, when the server gets the connection request, it accepts the request, reads and processes the message that the client sends it, and then sends a reply.
The server is shown in Figure 7.2. Let us walk through this example, which may appear more complex than it really is. Most of the complexity surrounds the sockaddr_in data structure, which is used for two-way communication with the kernel.
#include <stdio.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> main(int argc,char *argv[]) { int rc, n, len, listen_socket, talk_socket; char buf[1024]; struct sockaddr_in sin, from; listen_socket = socket(AF_INET, SOCK_STREAM, 0); bzero(&sin, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_addr.s_addr = INADDR_ANY; sin.sin_port = htons(0); bind(listen_socket, (struct sockaddr *) &sin ,sizeof(sin)); listen(listen_socket, 5); getsockname(listen_socket, (struct sockaddr *) &sin, &len); printf("listening on port = %d\n", ntohs(sin.sin_port)); while (1) { talk_socket = accept(listen_socket, (struct sockaddr *) &from, &len); n = read(talk_socket, buf, 1024); write(talk_socket, buf, n); /* echo */ close(talk_socket); } }
First, we acquire a socket with the socket system call. Note that we use the word "socket" both for the connection between the two processes, as we have used it up to now, and for a single "end" of the socket as it appears inside a program, as here. Here a socket is a small integer, a file descriptor just like the ones used to represent open files. Our call creates an Internet (AF_INET) stream (SOCK_STREAM) socket, which is how one specifies a TCP socket. (The third argument is relevant only to "raw" sockets, which we are not interested in here. It is usually set to zero.) This is our "listening socket," on which we will receive connection requests. We then initialize the sockaddr_in data structure, setting its field sin_port to 0 to indicate that we want the system to select a port for us. A port is an operating system resource that can be made visible to other hosts on the network. We bind our listening socket to this port with the bind system call and notify the kernel that we wish it to accept incoming connections from clients on this port with the listen call. (The second argument to listen is the number of queued connection requests we want the kernel to maintain for us. In most Unix systems this will be 5.) At this point clients can connect to this port but not yet to our actual server process. Also, at this point no one knows what port we have been assigned.
We now publish the address of the port on which we can be contacted. Many standard daemons listen on "well known" ports, but we have not asked for a specific port, so our listening socket has been assigned a port number that no one yet knows. We ourselves find out what it is with the getsockname system call and, in this case, just print it on stdout.
At this point we enter an infinite loop, waiting for connections. The accept system call blocks until there is a connection request from a client. Then it returns a new socket on which we are connected to the client, so that it can continue listening on the original socket. Our server simply reads some data from the client on the new socket (talk_socket), echoes it back to the client, closes the new socket, and goes back to listening for another connection.
This example is extremely simple. We have not checked for failures of any kind (by checking the return codes from our system calls), and of course our server does not provide much service. However, this example does illustrate how to code a common sequence of system calls (the socket-bind-listen sequence) that is used in nearly all socket setup code.
The corresponding client is shown in Figure 7.3. In order to connect to the server, it must know the name of the host where the server is running and the number of the port on which it is listening. We supply these here as command-line arguments.
#include <stdio.h> #include <sys/types.h> #include <sys/socket.h> #include <netdb.h> #include <netinet/in.h> main(int argc,char *argv[]) { int rc, n, talk_socket; char buf[1024] = "test msg"; struct sockaddr_in sin; struct hostent *hp; talk_socket = socket(AF_INET, SOCK_STREAM, 0); hp = gethostbyname(argv[1]); bzero((void *)&sin, sizeof(sin)); bcopy((void *) hp->h_addr, (void *) &sin.sin_addr, hp->h_length); sin.sin_family = hp->h_addrtype; sin.sin_port = htons(atoi(argv[2])); connect(talk_socket,(struct sockaddr *) &sin, sizeof(sin)); n = write(talk_socket, buf, strlen(buf)+1); buf[0] = '\0'; /* empty the buffer */ n = read(talk_socket, buf, 1024); printf("received from server: %s \n",buf); }
Again we acquire a socket with the socket system call. We then fill in the sockaddr_in structure with the host and port (first calling gethostbyname to fill in the hostent structure needed to be placed in sin). Next we call connect to create the socket. When connect returns, the accept has taken place in the server, and we can write to and read from the socket as a way of communicating with the server. Here we send a message and read a response, which we print.
Python is an object-oriented scripting language. Implementations exist for Unix and Windows; see www.python.org for details. It provides an extensive set of modules for interfacing with the operating system. One interesting feature of Python is that the block structure of the code is given by the indentation of the code, rather than explicit "begin"/ "end" or enclosing braces.
Much of the complexity of dealing with sockets has to do with properly managing the sockaddr data structure. Higher-level languages like Python and Perl make socket programming more convenient by hiding this data structure. A number of good books on Python exist that include details of the socket module; see, for example, [14] and [70]. Python uses an exception handling model (not illustrated here) for error conditions, leading to very clean code that does not ignore errors. The Python version of the server code is shown in Figure 7.4. Here we use the "well-known port" approach: rather than ask for a port, we specify the one we want to use. One can see the same socket-bind-listen sequence as in the C example, where now a socket object (s) is returned by the socket call and bind, listen, and accept are methods belonging to the socket object. The accept method returns two objects, a socket (conn) and information (addr) on the host and port on the other (connecting) end of the socket. The methods send and recv are methods on the socket object conn, and so this server accomplishes the same thing as the one in Figure 7.2.
#! /usr/bin/env python #echo server program from socket import * HOST = '' # symbolic name for local host PORT = 50007 # arbibrary port s = socket(AF_INET, SOCK_STREAM) s.bind((HOST, PORT)) s.listen(1) conn, addr = s.accept() print 'connected to by', addr while 1: data = conn.recv(1024) if not data: break conn.send(data) conn.close()
The Python code for the corresponding client is shown in Figure 7.5. It has simply hard-coded the well-known location of the server.
#!/usr/bin/env python # Echo client program from socket import * HOST = 'donner.mcs.anl.gov' # the remote host PORT = 50007 s = socket(AF_INET, SOCK_STREAM) s.connect((HOST, PORT)) s.send('Hello, world') data = s.recv(1024) s.close() print 'Received', 'data'
Perl [124] is a powerful and popular scripting language. Versions exist for Unix and for Windows; see www.perl.com for more information. Perl provides a powerful set of string matching and manipulation operations, combined with access to many of the fundamental system calls. The man page perlipc has samples of clients and servers that use sockets for communication.
The code for a "time server" in Perl is shown in Figure 7.6. It follows the same pattern as our other servers. The code for the corresponding client is shown in Figure 7.7.
#!/usr/bin/perl use strict; use Socket; use FileHandle; my $port = shift || 12345; my $proto = getprotobyname('tcp'); socket(SOCK, PF_INET, SOCK_STREAM, $proto) || die "socket: $!"; SOCK->autoflush(); setsockopt(SOCK, SOL_SOCKET, SO_REUSEADDR, pack("1", 1)) || die "setsockopt: $! "; bind(SOCK, sockaddr_in($port, INADDR_ANY)) || die "bind: $!"; listen(SOCK,SOMAXCONN) || die "listen: $!"; print "server started on port $port\n"; while (1) { my $paddr = accept(CLIENT,SOCK); CLIENT->autoflush(); my $msg = <CLIENT>; print "server: recvd from client: $msg \n"; print CLIENT "Hello there, it's now ", scalar localtime, "\n"; close(CLIENT); }
#!/usr/bin/perl -w use strict; use Socket; use FileHandle; my ($host,$port, $iaddr, $paddr, $proto, $line); $host = shift || 'localhost'; $port = shift || 12345; $iaddr = inet_aton($host) || die "no valid host specified: $host"; $paddr = sockaddr_in($port, $iaddr); # packed addr $proto = getprotobyname('tcp'); socket(SOCK, PF_INET, SOCK_STREAM, $proto) || die "socket failed: $!"; SOCK->autoflush(); # from FileHandle connect(SOCK, $paddr) || die "connect failed: $!"; print SOCK "hello from client\n"; $line = <SOCK>; print "client: recvd from server: $line \n";
So far our example socket code has involved only one socket open by the server at a time (not counting the listening socket). Further, the connections have been short lived: after accepting a connection request, the server handled that request and then terminated the connection. This is a typical pattern for a classical server but may not be efficient for manager/worker algorithms in which we might want to keep the connections to the workers open rather than reestablish them each time. Unlike the clients in the examples above, the workers are persistent, so it makes sense to make their connections persistent as well.
What is needed by the manager in this case is a mechanism to wait for communication from any of a set of workers simultaneously. Unix provides this capability with the select system call. The use of select allows a process to block, waiting for a change of state on any of a set of sockets. It then "wakes up" the process and presents it with a list of sockets on which there is activity, such as a connection request or a message to be read. We will not cover all of the many aspects of select here, but the code in Figure 7.8 illustrates the features most needed for manager/worker algorithms. For compactness, we show this in Python. A C version would have the same logic. See the select man page or [111] for the details of how to use select in C. It is also available, of course, in Perl.
#!/usr/bin/env python from socket import socket, AF_INET, SOCK_STREAM from select import select lsock = socket(AF_INET,SOCK_STREAM) lsock.bind(('',0)) # this host, anonymous port lsock.listen(5) lport = lsock.getsockname()[1] print 'listening on port =', lport sockets = [lsock] while 1: (inReadySockets, None, None) = select(sockets, [], []) for sock in inReadySockets: if sock == lsock: (tsock,taddr) = lsock.accept() sockets.append(tsock) else: msg = sock.recv(1024) if msg: print 'recvd msg=', msg else: sockets.remove(sock) sock.close()
The first part of the code in Figure 7.8 is familiar. We acquire a socket, bind it to a port, and listen on it. Then, instead of doing an accept on this socket directly, we put it into a list (sockets). Initially it is the only member of this list, but eventually the list will grow. Then we call select. The arguments to select are three lists of sockets we are interested in for reading, writing, or other events. The select call blocks until activity occurs on one of the sockets we have given to it. When select returns, it returns three lists, each a sublist of the corresponding input lists. Each of the returned sockets has changed state, and one can take some action on it with the knowledge that the action will not block.
In our case, we loop through the returned sockets, which are now active. We process activity on the listening socket by accepting the connection request and then adding the new connection to the list of sockets we are interested in. Otherwise we read and print the message that the client has sent us. If our read attempt yields an empty message, we interpret this as meaning that the worker has closed its end of the socket (or exited, which will close the socket), and we remove this socket from the list.
We can test this server with the client in Figure 7.9.
#!/usr/bin/env python from sys import argv, stdin from socket import socket, AF_INET, SOCK_STREAM sock = socket(AF_INET,SOCK_STREAM) sock.connect((argv[1],int(argv[2]))) print 'sock=', sock while 1: print 'enter something:' msg = stdin.readline() if msg: sock.sendall(msg.strip()) # strip nl else: break