PVM (Parallel Virtual Machine) provides a high-level, but not transparent, system for a user to coordinate tasks spread across workstations on a network. This project describes a threaded implementation of the Not Too Parallel Virtual Machine (NTPVM) dispatcher, a simplified PVM system. The multithreaded implementation illustrates the interaction between threads and fork, providing a semirealistic application in which to explore complex thread interactions.
Grace Murray Hopper, a vocal early advocate of parallel computing, was fond of reminding her audiences that the way to pull a heavier load was not to grow a bigger ox but to hitch more oxen to the load. Seymour Cray, a pioneer in computer architecture, is reported to have later countered, “If you were plowing a field, which would you rather use, two strong oxen or 1024 chickens?” The chickens versus oxen debate continues to rage. IBM’s Blue Gene Project involves the building of a 64,000-processor machine with petaflop capabilities (a thousand trillion operations per second) based on relatively low-powered, embedded PowerPC chips [14]. On the other hand, the NEC Earth-Simulator, which was rated as the world’s fastest computer in 2002, uses only 640 nodes. Each “NEC oxen node” consists of 8 tightly coupled vector processors [135].
Another important development in the parallel/distributed computing arena is the move to harness cheap workstations to solve large problems. Programming libraries, such as PVM (Parallel Virtual Machine) [118] and MPI (Message Passing Interface) [43], allow groups of heterogeneous, interconnected machines to provide a transparent parallel-computing environment by providing a cross-platform message-passing facility with higher-level services built on top. These systems allow users to solve large problems on networks of workstations by providing the illusion of a single parallel machine. PVM operates at the task level and presents a message-passing abstraction that hides the details of the network and individual machines that make up the virtual machine. PVM/MPI libraries have become the mainstay of distributed scientific computing because they allow researchers to develop platform-independent software. However, programs based on this paradigm are hard for nonexperts to debug and optimize.
A new notion of “computing as a utility” has recently emerged in the form of grid computing [38]. The Open Grid Services Architecture provides a higher-level layer of services built over message-passing libraries and native host runtime systems. These higher-level abstractions are quickly bringing distributed computing into the mainstream.
This chapter project develops a PVM-like library for managing tasks. We begin by introducing PVM terminology and providing an overview of the PVM architecture.
The basic unit of computation in PVM is called a task and is analogous to a UNIX process. A PVM program calls PVM library functions to create and coordinate tasks. The tasks can communicate by passing messages to other tasks through calls to PVM library functions. Tasks that cooperate, either through communication or synchronization, are organized into groups called computations. PVM supports direct communication, broadcast and barriers within a computation.
Figure 17.1 shows a logical view of a typical PVM system. A PVM application generally starts with an input and partitioning task that controls the problem solution. The user specifies in this task how other tasks cooperate to solve the problem. The input and partitioning task creates several computations. Tasks within each computation share data and communicate with each other. The PVM application also has a dedicated task to handle output and user display. The other tasks in the PVM application forward their output to this task for display on the application’s console.
To run a PVM application, a user first designates the pool of machines or hosts that make up the virtual machine and then starts the PVM control daemon, pvmd
, on each of these hosts. The control daemon communicates with the user’s console and handles communication and controls tasks on its machine. To send input to a particular task, PVM sends the data to the pvmd
daemon on the destination host, which then forwards it to the appropriate task. Similarly, a task outputs by sending a message to its pvmd
, which in turn forwards it to the console’s pvmd
and on to the application’s output task. The underlying message passing is transparent, so the user sees only that a particular task has sent a message to the console.
Figure 17.2 shows how an application might be mapped onto the virtual machine. The tasks that make up a logical computation are not necessarily mapped to the same host but might be spread across all the hosts on the virtual machine. Host 1 of Figure 17.2 has three computations, one containing a single task, one with two tasks and one that is part of a computation that also has tasks on host 2.
The Not Too Parallel Virtual Machine (NTPVM) is a dispatcher that shares many characteristics of a PVM control daemon, pvmd
. The NTPVM dispatcher is responsible for creating and managing tasks on a single host, as shown schematically in Figure 17.3. The dispatcher receives requests through its standard input and responds through its standard output. (Later, standard input and standard output can be redirected to network communication ports.) The dispatcher might receive a request to create a task or to forward data to a task under its control.
A task is just a process that executes a specified program. Each task is identified by a computation ID and a task ID. When the dispatcher receives a request to create a task with a particular computation ID and task ID, it creates a pair of pipes and forks a child to execute the task. Figure 17.4 shows the communication layout between a task and its dispatcher. The pipe that carries communication from the dispatcher to the child task is labeled writefd
on the dispatcher end. The child redirects its standard input to this pipe. Similarly, the pipe that carries communication from the child to the dispatcher is labeled readfd
on the dispatcher end. The child redirects its standard output to this pipe.
The dispatcher supports delivery of input data to the tasks, delivery of output from the tasks and broadcast of data to tasks that have the same computation ID. The dispatcher also supports numbered barriers and cancellation for tasks with the same computation ID. NTPVM is simpler than the real PVM in several respects. PVM has in-order message delivery and allows any task to communicate with other tasks in its computation. It has a buffering mechanism for holding messages. PVM also provides sophisticated computation monitoring tools. NTPVM delivers messages whenever it gets them, does not support point-to-point task communication, and has primitive monitoring capabilities.
The tasks in NTPVM are independent processes grouped into units called computations. The dispatcher is responsible for creating and managing tasks. In general, the tasks of a computation do not have to reside on the same machine, and the specification of the project is designed with this extension in mind. However, a single dispatcher controls all the computations for the project described in this chapter.
The dispatcher communicates with the outside world by reading packets from its standard input and writing packets to its standard output. The dispatcher might receive a packet requesting that it create a new task, or it might receive a data packet intended for a task under its control. The dispatcher forwards output generated by the tasks under its control to its own standard output in the form of packets. For the first four parts of the project, the tasks send ASCII data and the dispatcher wraps the data in a packet. Later, the tasks generate the packets themselves.
Program 17.1 shows the ntpvm.h
header file that contains the relevant type definitions for the dispatcher. Include this file in all the programs in this project.
The dispatcher packets include a computation ID, a task ID, a packet type, a packet length and the packet information. The first four items make up a fixed-length packet header that is stored in a structure of type taskpacket_t
. Assume that the information portion of the packet contains no more than MAX_PACK_SIZE
bytes.
The dispatcher keeps information about each active task in a global tasks
array of type ntpvm_task_t
, which should be implemented as an object with appropriate functions for accessing and modifying it. When the description refers to “modifying” or “accessing” information in the tasks
object, it means calling a public function in the file to perform the action. Do not allow the dispatcher to execute more than MAX_TASKS
simultaneous tasks. Initially, set the compid
member of each element of the tasks
array to –1 to indicate that the slot is empty.
Example 17.1. ntpvm.h
The ntpvm.h
header file.
#include <pthread.h> #include <sys/types.h> #define MAX_PACK_SIZE 1024 #define MAX_TASKS 10 #define NUMTYPES 6 typedef enum ptype {NEWTASK, DATA, BROADCAST, DONE, TERMINATE, BARRIER} packet_t; typedef struct { int compid; int taskid; packet_t type; int length; } taskpacket_t; typedef struct { int compid; /* computation ID for task */ int taskid; /* task ID for the task */ int writefd; /* holds dispatcher->child fd */ int readfd; /* holds child->dispatcher fd */ int recvbytes; int recvpacksets; int sentbytes; int sentpackets; pid_t taskpid; /* process ID of the forked task */ pthread_t tasktid; /* thread ID of task output thread */ int barrier; /* -1 if not at barrier, else barrier number */ pthread_mutex_t mlock; /* mutex lock for element */ int endinput; /* true if no more input for task */ } ntpvm_task_t;
There are six types of dispatcher packets in all: NEWTASK
, DATA
, BROADCAST
, DONE
, TERMINATE
and BARRIER
. A packet consists of a header structure of type taskpacket_t
followed by a data field that is an array whose size is specified by the length
field of the header. The maximum value of length
is MAX_PACK_SIZE
. The dispatcher interprets the packet types as follows.
When the dispatcher receives a NEWTASK
packet on standard input, it initiates a new task. The information portion of this packet gives the command line to be executed by the forked child task. The dispatcher creates two pipes and forks a child that calls execvp
for the specified command.
The dispatcher treats the DATA
packets that it receives on standard input as input data for the task identified by the computation ID and task ID members of the packet header. For the first four parts of the project, the dispatcher strips off the packet header and writes the actual packet data to writefd
of the appropriate task.
When a task writes data to its standard output, the dispatcher forwards the data to standard output. The first four parts of this project run standard UNIX utilities as the tasks. Since these commands produce just ASCII text as output, the dispatcher packages the data into DATA
packets before sending to standard output. Starting with part five, the tasks send DATA
packets.
When the dispatcher receives a DONE
packet on standard input, it closes the writefd
file descriptor for the task identified by the computation ID and task ID members of the packet header. The corresponding task then detects end-of-file on its standard input.
When the dispatcher detects end-of-file on the readfd
descriptor of a task, it performs the appropriate cleanup and sends a DONE
packet on standard output to signify that the task has completed.
The dispatcher forwards any BROADCAST
packets from standard input to all tasks in the specified computation.
If a task sends a BROADCAST
packet to the dispatcher, the dispatcher forwards the request to all tasks in the same computation and also forwards the request on its standard output. In this way, all the tasks within a computation receive the message.
If the dispatcher receives a TERMINATE
packet on its standard input, it kills the task identified by the packet’s computation ID and task ID. If task ID is –1, the dispatcher kills all tasks in the specified computation. The dispatcher handles a TERMINATE
packet received from readfd
in a similar way. However, if no task ID matches the packet or if task ID is –1, the dispatcher also writes the TERMINATE
packet to standard output.
The BARRIER
packets synchronize tasks of a computation at a particular point in their execution.
The NTPVM project has the following parts:
Part I: | Setup of I/O and testing [Section 17.4]. |
Part II: | Single task with no input (handle |
Part III: | One task at a time (handle |
Part IV: | Multiple tasks and computations (handle |
Part V: | Task synchronization (handle |
Part VI: | Cleanup (handle |
Part VII: | Ordered message delivery [Section 17.10]. |
In the first four parts of the project, the child tasks do not communicate by using packets, and the dispatcher strips off the packet headers before writing to writefd
. This format allows the dispatcher to run ordinary UNIX utilities such as cat
or ls
as tasks. In Part V, the tasks communicate with the dispatcher by using packets. At that point, the project requires specific task programs for NTPVM testing. The remainder of this section gives examples of different types of packets and methods the dispatcher uses to handle them.
The dispatcher waits for a NEWTASK
packet from standard input. Such a packet includes a computation ID, a task ID and a command-line string.
Example 17.1.
The following NEWTASK
packet requests that task 2 in computation 3 be created to execute ls -l
.
Computation ID: |
|
Task ID: |
|
Packet Type: |
|
Packet Data Length: |
|
Packet Information: |
|
The data in the packet of Example 17.1 is not null-terminated. The dispatcher must convert the data to such a string before handing it to makeargv
or execvp
.
The dispatcher asks the tasks
array to find a free entry and to store the information about the new task. The dispatcher discards the packet and reports an error if it detects that a task with the same computation and task IDs is already in the tasks
array. The new entry has sentpackets
, sentbytes
, recvpackets
, recvbytes
and endinput
members of the tasks
array entry set to 0 and the barrier
member set to –1 to signify that the task is not waiting at a barrier.
The dispatcher then creates two pipes and uses two of the four resulting pipe file descriptors for communication with the child task. These descriptors are stored in the readfd
and writefd
members of the tasks
array entry. The dispatcher forks a child and stores the child process ID in the taskpid
member of the tasks
entry. The dispatcher closes unused pipe file descriptors and then waits for I/O either from its standard input or from the readfd
descriptors of its tasks.
The child task forked by the dispatcher redirects its standard input and output to the pipes and closes the unused file descriptors. The child then calls execvp
to execute the command string. Use the makeargv
function of Program 2.2 on page 37 to create an argument array for input to execvp
.
When the dispatcher reads a DATA
packet from standard input, it asks the tasks
object to determine whether the packet’s task ID and computation ID match those of any entry in the tasks
array. The dispatcher discards the packet if no entry matches. Otherwise, the dispatcher updates the recvpackets
and recvbytes
members of the task’s entry in the tasks
array.
For the first four parts of the project, the tasks are standard UNIX utilities that accept ASCII input. The dispatcher forwards the information portion of the packet to the task on the task’s writefd
descriptor. In Parts V, VI and VII the tasks receive the full data packets directly.
Example 17.2.
After receiving the following DATA
packet, the dispatcher sends the words This is my data
to task 2 in computation 3.
Computation ID: |
|
Task ID: |
|
Packet Type: |
|
Packet Data Length: |
|
Packet Data: |
|
The dispatcher also forwards data received from individual tasks to its standard output in the form of DATA
packets. For the first four parts of the project, the dispatcher interprets input from readfd
as raw output from the task. It creates a DATA
packet with the task’s computation ID and task ID and uses the information read from readfd
as the information portion of the packet. The dispatcher then writes the DATA
packet to its standard output. Starting with Section 17.8, each task reads and writes its data in packet format. In these sections, the dispatcher copies the DATA
packets to its standard output.
When the dispatcher receives a DONE
packet on standard input, it sets the corresponding task’s endinput
member in the tasks
array and closes the writefd
descriptor for the task. The dispatcher discards any subsequent DONE
or DATA
packets that arrive for the task.
Example 17.3.
The following DONE
packet specifies that there is no more input data for task 2 in computation 3.
Computation ID: |
|
Task ID: |
|
Packet Type: |
|
Packet Data Length: |
|
Packet Data: |
When the dispatcher receives an end-of-file indication on a readfd
descriptor, it closes that descriptor and forwards a DONE
packet on its standard output. If the writefd
descriptor for the task is still open, the dispatcher closes it. The dispatcher must eventually call wait
on the child task process and set the compid
member of the tasks
array entry to –1 so that the array entry can be reused.
If the dispatcher receives an end-of-file indication on its own standard input, it closes the writefd
descriptors of all active tasks and sets the endinput
member of the tasks
array entry for each active task to 1. When it has received an end-of-file indication on the readfd
descriptors for all active tasks, the dispatcher waits for each task and exits. The dispatcher should also periodically wait for all its completed children.
This section develops dispatcher I/O functions and debugging layout. The dispatcher receives input data from standard input by calling getpacket
and sends output data on standard output by calling putpacket
, as shown in Figure 17.5. The data is always transferred in two parts. First, the dispatcher reads or writes a header of type taskpacket_t
. Second, it uses the length
member in the header to determine how many bytes of packet data to read or to write. Finally, it reads or writes the data portion of the packet. Assume that the packet data field contains no more than MAX_PACK_SIZE
bytes so that the dispatcher can use a fixed-length buffer of MAX_PACK_SIZE
bytes to hold the packet data during input and output.
The getpacket
function has the following prototype.
int getpacket(int fd, int *compidp, int *taskidp, packet_t *typep, int *lenp, unsigned char *buf);
The getpacket
function reads a taskpacket_t
header from fd
and then reads into buf
the number of bytes specified by the length
member. If successful, getpacket
returns 0. If unsuccessful, getpacket
returns –1 and sets errno
. The getpacket
function sets *compidp
, *taskidp
, *typep
and *lenp
from the compid
, taskid
, type
and length
members of the packet header, respectively. If getpacket
receives an end-of-file while trying to read a packet, it returns –1 and sets errno
. Since errno
will not automatically be set, you must pick an appropriate value. There is no standard error number to represent end-of-file. One possibility is to use EINVAL
.
The putpacket
function has the following prototype.
int putpacket(int fd, int compid, int taskid, packet_t type, int len, unsigned char *buf);
The putpacket
function assembles a taskpacket_t
header from compid
, taskid
, type
and len
. It then writes the packet header to fd
followed by len
bytes from buf
. If successful, putpacket
returns 0. If unsuccessful, putpacket
returns –1 and sets errno
.
Example 17.4.
The following program uses getpacket
and putpacket
to copy packets from standard input to standard output.
#include <unistd.h> #include "ntpvm.h" int getpacket(int, int *, int *, packet_t *, int *, unsigned char *); int putpacket(int, int, int, packet_t, int, unsigned char *); int main(void) { unsigned char buf[MAX_PACK_SIZE]; int compid; int taskid; int tdatalen; int tin, tout; packet_t type; tin = STDIN_FILENO; tout = STDOUT_FILENO; while (getpacket(tin, &compid, &taskid, &type, &tdatalen, buf) != -1) { if (putpacket(tout, compid, taskid, type, tdatalen, buf) == -1) break; } return 0; }
The specification for Part I of the project is as follows.
Write the getpacket
and putpacket
functions.
Compile and run lint
on the program to make sure that there are no syntax errors.
Test the program, using one of the methods described below.
Add debugging messages to the loop of the main
program to show what values are being read and written. All debugging messages should go to standard error.
The hardest part of the NTPVM project is the testing of the dispatcher. The dispatcher communicates with standard input and standard output, using packets that have non-ASCII components. During debugging, the dispatcher should producemessages on standard error reporting its progress. A small amount of work is needed to isolate the dispatcher output and input from the informative messages by directing the three types of I/O to appear in ASCII format on different screens.
Program 17.2 shows the a2ts
filter that reads ASCII characters from standard input, constructs a task packet, and writes it to standard output. The a2ts
program writes all prompt messages to standard error, so it can be run either with interactive prompts or with standard input redirected from a file. For interactive use, a2ts
prompts for the required information, sending the prompts to standard error.
Example 17.2. a2ts.c
The filter a2ts
prompts for information and writes a task packet to standard output. Some error checking is omitted.
#include <string.h> #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include "restart.h" #include "ntpvm.h" #define MAX_LINE_SIZE 100 #define TERMINATE_STRING "!!!!! " static char *typename[] = {"Start Task", "Data", "Broadcast", "Done", "Terminate", "Barrier"}; int main(void) { char buf[MAX_PACK_SIZE + MAX_LINE_SIZE]; char *bufptr; int i; int linelen; taskpacket_t pack; int tasktype; int wsize; wsize = sizeof(taskpacket_t); fprintf(stderr, "Ready for first packet "); for( ; ; ) { /* loop with menu for interactive input */ fprintf(stderr, "Enter compid:"); if (scanf("%d", &pack.compid) == EOF) break; fprintf(stderr, "Enter taskid:"); scanf("%d", &pack.taskid); fprintf(stderr, "Enter task type: "); for (i=0; i< NUMTYPES; i++) fprintf(stderr, " %d = %s ", i, typename[i]); scanf("%d", &tasktype); pack.type = tasktype; pack.length = 0; bufptr = buf; *bufptr = 0; fprintf(stderr, "Enter first line of data (%.*s to end): ", strlen(TERMINATE_STRING) - 1, TERMINATE_STRING); while ((linelen = readline(STDIN_FILENO, bufptr, MAX_LINE_SIZE)) != -1) { if (linelen == 0) break; if (strcmp(TERMINATE_STRING, bufptr) == 0) break; bufptr = bufptr + linelen; pack.length = pack.length + linelen; if (pack.length >= MAX_PACK_SIZE) { fprintf(stderr, "**** Maximum packet size exceeded "); return 1; } fprintf(stderr, "Received %d, total=%d, Enter line (%.*s to end): ", linelen, pack.length, strlen(TERMINATE_STRING) - 1, TERMINATE_STRING); } fprintf(stderr, "Writing packet header: %d %d %d %d ", pack.compid, pack.taskid, (int)pack.type, pack.length); if (write(STDOUT_FILENO, &pack, wsize) != wsize) { fprintf(stderr, "Error writing packet "); return 1; } fprintf(stderr, "Writing %d bytes ", pack.length); if (write(STDOUT_FILENO, buf, pack.length) != pack.length) { fprintf(stderr,"Error writing packet "); return 1; } fprintf(stderr, "Ready for next packet "); } fprintf(stderr, "a2ts exiting normally "); return 0; }
The ts2a
filter of Program 17.3 reads a task packet from standard input and writes the contents of the packet to standard output in ASCII format. For this project, assume that the data portion of a task packet always contains ASCII information.
Example 17.5.
The ts2a
program assumes that header and data will each be read with a single call to read
. How would you make this more robust?
Answer:
Use the readblock
function from the restart library described in Appendix B.
Example 17.3. ts2a.c
The ts2a
filter reads a packet from standard input and writes the header and data to standard output in ASCII format. Some error checking is omitted.
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include "ntpvm.h" #define MAX_LINE_SIZE 100 static char *typename[] = {"Start Task", "Data", "Broadcast", "Done", "Terminate", "Barrier"}; int main(void) { char buf[MAX_PACK_SIZE + MAX_LINE_SIZE]; int bytesread; taskpacket_t pack; int wsize; wsize = sizeof(taskpacket_t); fprintf(stderr, "***** Waiting for first packet "); for( ; ; ) { bytesread = read(STDIN_FILENO, &pack, wsize); if (bytesread == 0) { fprintf(stderr, "End-of-file received "); break; } if (bytesread != wsize) { fprintf(stderr, "Error reading packet header "); return 1; } if ( (pack.type < 0) || (pack.type >= NUMTYPES) ) { fprintf(stderr, "Got invalid packet "); return 1; } printf("Received packet header of type %s ",typename[pack.type]); printf(" compid = %d, taskid = %d, length = %d ", pack.compid, pack.taskid, pack.length); fflush(stdout); if (pack.length > MAX_PACK_SIZE) { fprintf(stderr, "Task data is too long "); return 1; } if (read(STDIN_FILENO, buf, pack.length) != pack.length) { fprintf(stderr, "Error reading packet data "); return 1; } write(STDOUT_FILENO, buf, pack.length); fprintf(stderr, "***** Waiting for next packet "); } return 0; }
Example 17.6.
The following command prompts for the fields of a packet. It then echoes the packet to standard output in ASCII format.
a2ts | ts2a
The a2ts
program of Example 17.6 interactively prompts for packet information and writes the information as a binary packet to its standard output. The standard output of a2ts
is piped into standard input of ts2a
. The ts2a
program reads binary packets from its standard input and outputs them in ASCII format to its standard output. Input entered to a2ts
will be interleaved with output from ts2a
, but this should not be a problem since ts2a
will not produce any output until a2ts
has received an entire packet.
Example 17.7.
The following command shows a possible method of testing the dispatcher interactively. For now, use the testpacket
program of Example 17.4 instead of the dispatcher.
a2ts | dispatcher | ts2a
Example 17.7 pipes standard output of a2ts
into standard input of the dispatcher and standard output of the dispatcher into ts2a
. The command line of Example 17.7 allows a user to enter ASCII data and to display the task packet output in ASCII. Unfortunately, real tests produce too much data from different sources, making it difficult to distinguish information from different programs. Input to a2ts
and output from ts2a
will be interleaved with error messages sent to standard error. The next two subsections propose two different methods for handling this problem.
The first strategy for improving the usability of a2ts
and ts2a
in testing the dispatcher is to use separate windows, as shown in Figure 17.6. The dispatcher, which runs in the dispatcher window, redirects its standard input to the named pipe inpipe
and its standard output to the named pipe outpipe
. The output from the dispatcher’s standard error still appears in the dispatcher window. The a2ts
program reads from standard input in the input window and writes to its standard output, which is redirected to the named pipe inpipe
. Enter packets in ASCII format in this window. The ts2a
program redirects its standard input to the named pipe outpipe
. As the dispatcher runs, ts2a
displays dispatcher output in the output window.
Figure 17.6 shows the setup for the three windows. Be sure to use the same working directory for all three windows. The procedure for running the dispatcher is as follows.
Create two named pipes in the dispatcher window by executing the following commands.
mkfifo outpipe mkfifo inpipe
Start the dispatcher in the dispatcher window by executing the following command.
dispatcher < inpipe > outpipe
This window displays only the messages that the dispatcher sends to standard error, since both standard input and standard output are redirected.
In the output window, execute the following command.
ts2a < outpipe
This window displays the packets coming from the standard output of the dispatcher.
In the input window, execute the following command.
a2ts > inpipe
This window displays the prompts for the user to enter packets. The a2ts
program converts the entered information from ASCII to packet format and writes it to the standard input of the dispatcher.
Figure 17.7 shows the layout of the windows for the debugging. If you do not have a workstation that supports multiple windows, try to persuade your system administrator to install a program such as screen
, which supports multiple screens on an ASCII terminal.
The second strategy for testing the dispatcher uses the remote logging facility discussed in Section 10.3.4 and in Appendix D. Replace the ts2a
program with the ts2log
program of Program 17.4. The ts2log
program uses the r_readblock
function of the restart library described in Appendix B.
Example 17.8.
The following command shows how to test the dispatcher by using remote logging.
a2ts | dispatcher | ts2log
The dispatcher should also log events. It could send the packets to standard output and have the ts2log
program receive them through redirection. Alternatively, the dispatcher could log them directly.
Example 17.4. ts2log.c
A program that logs packets using the remote logging utilities. Some error checking is omitted.
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include "ntpvm.h" #include "restart.h" #include "rlogging.h" #define MAX_LINE_SIZE 100 static char *typename[] = {"Start Task", "Data", "Broadcast", "Done", "Terminate", "Barrier"}; int main(void) { char buf[MAX_PACK_SIZE + MAX_LINE_SIZE]; int bytesread; LFILE *lf; taskpacket_t pack; int wsize; wsize = sizeof(taskpacket_t); lf = lopen(NULL,0); if (lf == NULL) fprintf(stderr, "Failed to open remote logger. "); for( ; ; ) { bytesread = readblock(STDIN_FILENO, &pack, wsize); if (bytesread == 0) { lprintf(lf, "End-of-file received "); break; } if (bytesread != wsize) { lprintf(lf, "Error reading packet header "); return 1; } if ( (pack.type < 0) || (pack.type >= NUMTYPES) ) { fprintf(stderr, "Got invalid packet "); return 1; } lprintf(lf, "%s %s compid = %d taskid = %d length = %d ", "Received packet header of type", typename[pack.type], pack.compid, pack.taskid, pack.length); if (pack.length > MAX_PACK_SIZE) { lprintf(lf, "Task data is too long "); return 1; } if (readblock(STDIN_FILENO, buf, pack.length) != pack.length) { lprintf(lf, "Error reading packet data "); return 1; } lprintf(lf, buf, pack.length); } return 0; }
This part of the project uses a single task that has no input to allow testing of the code to create the task and the pipes for communication without the added complication of monitoring multiple file descriptors for input. The task outputs ASCII text rather than packets.
The dispatcher reads a single NEWTASK
packet from standard input, creates the appropriate pipes, and forks the child that executes the task. The dispatcher then monitors the readfd
pipe file descriptor for output from the task and forwards what it reads as DATA
packets on standard output. When the dispatcher encounters an end-of-file on readfd
, it waits for the child task to exit and then exits.
Implement the NTPVM dispatcher as described above. The dispatcher does the following.
Read a packet from standard input, using getpacket
. If the packet is not a NEWTASK
packet, then exit after outputting an error message.
Create a pipe for communication with a child task.
Fork a child to execute the command given in the NEWTASK
packet of step 1. The child should redirect standard input and output to the pipe and close all pipe file descriptors before executing the command. Use the makeargv
function of Program 2.2 on page 37 to construct the argument array in the child. If an error occurs, the child just exits after printing an informative message.
Have the parent close all unneeded pipe descriptors so that the parent can detect end-of-file on readfd
.
Wait for output from the child on readfd
. For this part of the assignment, the child will be executing standard UNIX commands. Assume that the child outputs only text. The dispatcher reads the child task’s output from readfd
, wraps this output in a DATA
packet, and sends the packet to standard output by calling putpacket
.
If getpacket
returns an error, assume that this is an end-of-file. Close the readfd
and writefd
descriptors for the task. Send a DONE
packet to standard output identifying the task and exit.
The dispatcher should liberally use standard error or the remote logging facility to display informative messages about what it is doing. For example, when it receives something from readfd
, the dispatcher should display information about the source task, the number of bytes read and the message read. It is worthwhile to invest time in designing a readable layout for the informative messages so that all the relevant information is available at a glance.
Test the program by using ls -l
as the command to be executed.
This section describes the behavior of the dispatcher when the child task has both input and output. Although the dispatcher handles only one task at a time, it must monitor two input file descriptors. Complete Section 17.5 before starting this part.
The dispatcher keeps information about the child task in the tasks
array. For simplicity, the discussion refers to members of the ntpvm_task_t
array such as readfd
without their qualifying structure. Implement the tasks
array as an object with appropriate access functions. The tasks
array and its access functions should be in a file separate from the dispatcher main
program. The array and its access functions are referred to as the tasks
object, and an individual element of the tasks
array is referred to as an entry of the tasks
object. For this part, we only allow one task at a time, so the tasks
object does not need an array of tasks.
Figure 17.8 suggests the structure of threaded NTPVM dispatcher. An input thread monitors standard input and processes the incoming packets. An output thread monitors the readfd
descriptor for input from the child task and writes this information to standard output.
The input and output threads share the tasks
object and must synchronize their access to this structure. One possible approach for synchronizing threads is to use a mutex lock to protect the entire tasks
object. This choice cuts down on the potential parallelism because only one thread at a time can access the tasks
object. Since mutex locks are low cost, we use a mutex lock for each element of the tasks
array.
The input thread monitors standard input and takes action according to the input it receives. Write an input
function that executes the following steps in a loop until it encounters an end-of-file on standard input.
After falling through the loop, close writefd
and call pthread_exit
.
Processing a packet depends on the packet type.
NEWTASK
If a child task is already executing, discard the packet and output an error message.
Otherwise, if no child task exists, create two pipes to handle the task’s input and output.
Update the tasks
object, and fork a child. The child should redirect its standard input and output to the pipes and use the makeargv
function of Program 2.2 to construct the argument array before calling execvp
to execute the command given in the packet.
Create a detached output
thread by calling pthread_create
. Pass a key for the tasks
entry of this task as an argument to the output
thread. The key is just the index of the appropriate tasks
array entry.
DATA
If the packet’s communication and task IDs don’t match those of the executing task or if the task’s endinput
is true, output an error message and discard the packet.
Otherwise, copy the data portion to writefd
.
Update the recvpackets
and recvbytes
members of the appropriate task entry of the tasks
object.
DONE
If the packet’s computation and task IDs do not match those of the executing task, output an error message and discard the packet.
Otherwise, close the writefd
descriptor if it is still open.
Set the endinput
member for this task entry.
BROADCAST
, BARRIER
or TERMINATE
Example 17.9.
When a process that contains multiple threads creates a child by calling fork
, how many threads exist in the child?
Answer:
Although fork
creates a copy of the process, the child does not inherit the threads of the parent. POSIX specifies that the child has only one thread of execution—the thread that called fork
.
The output thread handles input from the readfd
descriptor of a particular task. The output thread receives a tasks
object key to the task it monitors as a parameter. Write an output
function that executes the following steps in a loop until it encounters an end-of-file on readfd
.
Read data from readfd
.
Call putpacket
to construct a DATA
packet and send it to standard output.
Update the sentpackets
and sentbytes
members of the appropriate task entry in the tasks
object.
After falling through the loop because of an end-of-file or an error on readfd
, the output thread does the following.
Close the readfd
and writefd
descriptors for the task.
Execute wait
for the child task.
Send a DONE
packet with the appropriate computation and task IDs to standard output.
Output information about the finished task to standard error or to the remote logger. Include the computation ID, the task ID, the total bytes sent by the task, the total packets sent by the task, the total bytes received by the task and the total packets received by the task.
Deactivate the task entry by setting the computation ID to –
Call pthread_exit
.
Test the program by starting tasks to execute various cat
and ls -l
commands. Try other filters such as sort
to test the command-line parsing. For this part you should not enter a new command until the previous command has completed.
Modify the program to allow multiple computations and tasks. Use a MAX_TASKS
value of 10 for this part. A new NEWTASK
packet may come in before the data from previous tasks has been completely transmitted.
When a new NEWTASK
packet comes in, find an available slot in the tasks
object, create a new set of pipes, and fork a new child to execute the command. Don’t enter any duplicates in the tasks
array.
Figure 17.9 shows a schematic of a threaded NTPVM dispatcher that supports multiple simultaneous tasks. When another request comes in, the input thread creates a new output thread. Since multiple output threads write to standard output, define an additional mutex lock to synchronize output on the dispatcher’s standard output.
Once the dispatcher handles multiple simultaneous tasks, implement the handling of the BROADCAST
and BARRIER
packets. The child tasks now have to communicate with the dispatcher in packet format so that the dispatcher and its tasks can distinguish control information (broadcast or barrier) from data information.
When the dispatcher receives a BROADCAST
request from standard input, it forwards the packet on the writefd
descriptors for each task whose computation ID matches that of the BROADCAST
packet. If the dispatcher receives a BROADCAST
request from one of the readfd
descriptors, it forwards the packet on the writefd
descriptors for each task whose computation ID matches that in the BROADCAST
packet. Since, in a future extension, tasks from the computation may reside on other hosts, the dispatcher also forwards the packet on its standard output.
When the dispatcher receives a BARRIER
packet from a task, it sets the barrier
member for that task to the barrier number specified by the packet data. When all the tasks in a computation have reported that they are waiting for the barrier, the dispatcher sends a BARRIER
message on standard output.
When the dispatcher reads a BARRIER
packet for that barrier number from standard input, it resets the barrier
member to –1 and sends a SIGUSR1
signal to all the tasks in the computation. The BARRIER
packet from standard input signifies that all tasks in the computation are waiting at the designated barrier and that they can be released. Assume that the dispatcher never receives a second BARRIER
packet from standard input before it has forwarded a corresponding BARRIER
packet on standard output.
Implement the barrier on the task side by blocking the SIGUSR1
signal, writing a BARRIER
packet to standard output, and then executing sigsuspend
in a loop until the SIGUSR1
signal arrives. Example 8.26 shows how this is done.
Write a dummy task program to generate appropriate broadcast and barrier messages.
Implement signal handling so that the dispatcher shuts down gracefully when it receives Ctrl-C. Also add code to handle TERMINATE
packets.
Add a sequence number to the packet format and implement in-order delivery of packets from each source-destination pair.
The PVM system was developed by Oak Ridge National Laboratory and Emory University. The paper “PVM: A framework for parallel distributed computing” by V. S. Sunderam [118] provides an overview of the development and implementation of the PVM system. Other articles of interest include “Visualization and debugging in a heterogeneous environment” by Beguelin et al. [10] and “Experiences with network-based concurrent computing on the PVM system” by Geist and Sunderam [41]. The PVM distribution is available electronically from www.csm.ornl.gov/pvm.