Chapter 17. Project: The Not Too Parallel Virtual Machine

PVM (Parallel Virtual Machine) provides a high-level, but not transparent, system for a user to coordinate tasks spread across workstations on a network. This project describes a threaded implementation of the Not Too Parallel Virtual Machine (NTPVM) dispatcher, a simplified PVM system. The multithreaded implementation illustrates the interaction between threads and fork, providing a semirealistic application in which to explore complex thread interactions.

PVM History, Terminology, and Architecture

Grace Murray Hopper, a vocal early advocate of parallel computing, was fond of reminding her audiences that the way to pull a heavier load was not to grow a bigger ox but to hitch more oxen to the load. Seymour Cray, a pioneer in computer architecture, is reported to have later countered, “If you were plowing a field, which would you rather use, two strong oxen or 1024 chickens?” The chickens versus oxen debate continues to rage. IBM’s Blue Gene Project involves the building of a 64,000-processor machine with petaflop capabilities (a thousand trillion operations per second) based on relatively low-powered, embedded PowerPC chips [14]. On the other hand, the NEC Earth-Simulator, which was rated as the world’s fastest computer in 2002, uses only 640 nodes. Each “NEC oxen node” consists of 8 tightly coupled vector processors [135].

Another important development in the parallel/distributed computing arena is the move to harness cheap workstations to solve large problems. Programming libraries, such as PVM (Parallel Virtual Machine) [118] and MPI (Message Passing Interface) [43], allow groups of heterogeneous, interconnected machines to provide a transparent parallel-computing environment by providing a cross-platform message-passing facility with higher-level services built on top. These systems allow users to solve large problems on networks of workstations by providing the illusion of a single parallel machine. PVM operates at the task level and presents a message-passing abstraction that hides the details of the network and individual machines that make up the virtual machine. PVM/MPI libraries have become the mainstay of distributed scientific computing because they allow researchers to develop platform-independent software. However, programs based on this paradigm are hard for nonexperts to debug and optimize.

A new notion of “computing as a utility” has recently emerged in the form of grid computing [38]. The Open Grid Services Architecture provides a higher-level layer of services built over message-passing libraries and native host runtime systems. These higher-level abstractions are quickly bringing distributed computing into the mainstream.

This chapter project develops a PVM-like library for managing tasks. We begin by introducing PVM terminology and providing an overview of the PVM architecture.

The basic unit of computation in PVM is called a task and is analogous to a UNIX process. A PVM program calls PVM library functions to create and coordinate tasks. The tasks can communicate by passing messages to other tasks through calls to PVM library functions. Tasks that cooperate, either through communication or synchronization, are organized into groups called computations. PVM supports direct communication, broadcast and barriers within a computation.

Figure 17.1 shows a logical view of a typical PVM system. A PVM application generally starts with an input and partitioning task that controls the problem solution. The user specifies in this task how other tasks cooperate to solve the problem. The input and partitioning task creates several computations. Tasks within each computation share data and communicate with each other. The PVM application also has a dedicated task to handle output and user display. The other tasks in the PVM application forward their output to this task for display on the application’s console.

Logical view of an application running on a PVM virtual machine.

Figure 17.1. Logical view of an application running on a PVM virtual machine.

To run a PVM application, a user first designates the pool of machines or hosts that make up the virtual machine and then starts the PVM control daemon, pvmd, on each of these hosts. The control daemon communicates with the user’s console and handles communication and controls tasks on its machine. To send input to a particular task, PVM sends the data to the pvmd daemon on the destination host, which then forwards it to the appropriate task. Similarly, a task outputs by sending a message to its pvmd, which in turn forwards it to the console’s pvmd and on to the application’s output task. The underlying message passing is transparent, so the user sees only that a particular task has sent a message to the console.

Figure 17.2 shows how an application might be mapped onto the virtual machine. The tasks that make up a logical computation are not necessarily mapped to the same host but might be spread across all the hosts on the virtual machine. Host 1 of Figure 17.2 has three computations, one containing a single task, one with two tasks and one that is part of a computation that also has tasks on host 2.

Schematic of a PVM.

Figure 17.2. Schematic of a PVM.

The Not Too Parallel Virtual Machine

The Not Too Parallel Virtual Machine (NTPVM) is a dispatcher that shares many characteristics of a PVM control daemon, pvmd. The NTPVM dispatcher is responsible for creating and managing tasks on a single host, as shown schematically in Figure 17.3. The dispatcher receives requests through its standard input and responds through its standard output. (Later, standard input and standard output can be redirected to network communication ports.) The dispatcher might receive a request to create a task or to forward data to a task under its control.

Schematic of the NTPVM dispatcher.

Figure 17.3. Schematic of the NTPVM dispatcher.

A task is just a process that executes a specified program. Each task is identified by a computation ID and a task ID. When the dispatcher receives a request to create a task with a particular computation ID and task ID, it creates a pair of pipes and forks a child to execute the task. Figure 17.4 shows the communication layout between a task and its dispatcher. The pipe that carries communication from the dispatcher to the child task is labeled writefd on the dispatcher end. The child redirects its standard input to this pipe. Similarly, the pipe that carries communication from the child to the dispatcher is labeled readfd on the dispatcher end. The child redirects its standard output to this pipe.

NTPVM dispatcher communicates with its children through pipes.

Figure 17.4. NTPVM dispatcher communicates with its children through pipes.

The dispatcher supports delivery of input data to the tasks, delivery of output from the tasks and broadcast of data to tasks that have the same computation ID. The dispatcher also supports numbered barriers and cancellation for tasks with the same computation ID. NTPVM is simpler than the real PVM in several respects. PVM has in-order message delivery and allows any task to communicate with other tasks in its computation. It has a buffering mechanism for holding messages. PVM also provides sophisticated computation monitoring tools. NTPVM delivers messages whenever it gets them, does not support point-to-point task communication, and has primitive monitoring capabilities.

NTPVM Project Overview

The tasks in NTPVM are independent processes grouped into units called computations. The dispatcher is responsible for creating and managing tasks. In general, the tasks of a computation do not have to reside on the same machine, and the specification of the project is designed with this extension in mind. However, a single dispatcher controls all the computations for the project described in this chapter.

The dispatcher communicates with the outside world by reading packets from its standard input and writing packets to its standard output. The dispatcher might receive a packet requesting that it create a new task, or it might receive a data packet intended for a task under its control. The dispatcher forwards output generated by the tasks under its control to its own standard output in the form of packets. For the first four parts of the project, the tasks send ASCII data and the dispatcher wraps the data in a packet. Later, the tasks generate the packets themselves.

Program 17.1 shows the ntpvm.h header file that contains the relevant type definitions for the dispatcher. Include this file in all the programs in this project.

The dispatcher packets include a computation ID, a task ID, a packet type, a packet length and the packet information. The first four items make up a fixed-length packet header that is stored in a structure of type taskpacket_t. Assume that the information portion of the packet contains no more than MAX_PACK_SIZE bytes.

The dispatcher keeps information about each active task in a global tasks array of type ntpvm_task_t, which should be implemented as an object with appropriate functions for accessing and modifying it. When the description refers to “modifying” or “accessing” information in the tasks object, it means calling a public function in the file to perform the action. Do not allow the dispatcher to execute more than MAX_TASKS simultaneous tasks. Initially, set the compid member of each element of the tasks array to –1 to indicate that the slot is empty.

Example 17.1. ntpvm.h

The ntpvm.h header file.

#include <pthread.h>
#include <sys/types.h>
#define MAX_PACK_SIZE 1024
#define MAX_TASKS 10
#define NUMTYPES 6

typedef enum ptype {NEWTASK, DATA, BROADCAST, DONE,
                    TERMINATE, BARRIER} packet_t;

typedef struct {
     int compid;
     int taskid;
     packet_t type;
     int length;
} taskpacket_t;

typedef struct {
     int compid;                            /* computation ID for task */
     int taskid;                               /* task ID for the task */
     int writefd;                        /* holds dispatcher->child fd */
     int readfd;                         /* holds child->dispatcher fd */
     int recvbytes;
     int recvpacksets;
     int sentbytes;
     int sentpackets;
     pid_t taskpid;                   /* process ID of the forked task */
     pthread_t tasktid;             /* thread ID of task output thread */
     int barrier;         /* -1 if not at barrier, else barrier number */
     pthread_mutex_t mlock;                  /* mutex lock for element */
     int endinput;                   /* true if no more input for task */
} ntpvm_task_t;

There are six types of dispatcher packets in all: NEWTASK, DATA, BROADCAST, DONE, TERMINATE and BARRIER. A packet consists of a header structure of type taskpacket_t followed by a data field that is an array whose size is specified by the length field of the header. The maximum value of length is MAX_PACK_SIZE. The dispatcher interprets the packet types as follows.

  1. When the dispatcher receives a NEWTASK packet on standard input, it initiates a new task. The information portion of this packet gives the command line to be executed by the forked child task. The dispatcher creates two pipes and forks a child that calls execvp for the specified command.

  2. The dispatcher treats the DATA packets that it receives on standard input as input data for the task identified by the computation ID and task ID members of the packet header. For the first four parts of the project, the dispatcher strips off the packet header and writes the actual packet data to writefd of the appropriate task.

  3. When a task writes data to its standard output, the dispatcher forwards the data to standard output. The first four parts of this project run standard UNIX utilities as the tasks. Since these commands produce just ASCII text as output, the dispatcher packages the data into DATA packets before sending to standard output. Starting with part five, the tasks send DATA packets.

  4. When the dispatcher receives a DONE packet on standard input, it closes the writefd file descriptor for the task identified by the computation ID and task ID members of the packet header. The corresponding task then detects end-of-file on its standard input.

  5. When the dispatcher detects end-of-file on the readfd descriptor of a task, it performs the appropriate cleanup and sends a DONE packet on standard output to signify that the task has completed.

  6. The dispatcher forwards any BROADCAST packets from standard input to all tasks in the specified computation.

  7. If a task sends a BROADCAST packet to the dispatcher, the dispatcher forwards the request to all tasks in the same computation and also forwards the request on its standard output. In this way, all the tasks within a computation receive the message.

  8. If the dispatcher receives a TERMINATE packet on its standard input, it kills the task identified by the packet’s computation ID and task ID. If task ID is –1, the dispatcher kills all tasks in the specified computation. The dispatcher handles a TERMINATE packet received from readfd in a similar way. However, if no task ID matches the packet or if task ID is –1, the dispatcher also writes the TERMINATE packet to standard output.

  9. The BARRIER packets synchronize tasks of a computation at a particular point in their execution.

The NTPVM project has the following parts:

Part I:

Setup of I/O and testing [Section 17.4].

Part II:

Single task with no input (handle NEWTASK and outgoing data) [Section 17.5].

Part III:

One task at a time (handle NEWTASK, DATA and DONE packets) [Section 17.6].

Part IV:

Multiple tasks and computations (handle NEWTASK, DATA and DONE packets) [Section 17.7].

Part V:

Task synchronization (handle BROADCAST and BARRIER packets) [Section 17.8].

Part VI:

Cleanup (handle TERMINATION packets and signals) [Section 17.9].

Part VII:

Ordered message delivery [Section 17.10].

In the first four parts of the project, the child tasks do not communicate by using packets, and the dispatcher strips off the packet headers before writing to writefd. This format allows the dispatcher to run ordinary UNIX utilities such as cat or ls as tasks. In Part V, the tasks communicate with the dispatcher by using packets. At that point, the project requires specific task programs for NTPVM testing. The remainder of this section gives examples of different types of packets and methods the dispatcher uses to handle them.

NEWTASK packets

The dispatcher waits for a NEWTASK packet from standard input. Such a packet includes a computation ID, a task ID and a command-line string.

Example 17.1. 

The following NEWTASK packet requests that task 2 in computation 3 be created to execute ls -l.

Computation ID:

3

Task ID:

2

Packet Type:

NEWTASK

Packet Data Length:

5

Packet Information:

ls -l

The data in the packet of Example 17.1 is not null-terminated. The dispatcher must convert the data to such a string before handing it to makeargv or execvp.

The dispatcher asks the tasks array to find a free entry and to store the information about the new task. The dispatcher discards the packet and reports an error if it detects that a task with the same computation and task IDs is already in the tasks array. The new entry has sentpackets, sentbytes, recvpackets, recvbytes and endinput members of the tasks array entry set to 0 and the barrier member set to –1 to signify that the task is not waiting at a barrier.

The dispatcher then creates two pipes and uses two of the four resulting pipe file descriptors for communication with the child task. These descriptors are stored in the readfd and writefd members of the tasks array entry. The dispatcher forks a child and stores the child process ID in the taskpid member of the tasks entry. The dispatcher closes unused pipe file descriptors and then waits for I/O either from its standard input or from the readfd descriptors of its tasks.

The child task forked by the dispatcher redirects its standard input and output to the pipes and closes the unused file descriptors. The child then calls execvp to execute the command string. Use the makeargv function of Program 2.2 on page 37 to create an argument array for input to execvp.

DATA packets

When the dispatcher reads a DATA packet from standard input, it asks the tasks object to determine whether the packet’s task ID and computation ID match those of any entry in the tasks array. The dispatcher discards the packet if no entry matches. Otherwise, the dispatcher updates the recvpackets and recvbytes members of the task’s entry in the tasks array.

For the first four parts of the project, the tasks are standard UNIX utilities that accept ASCII input. The dispatcher forwards the information portion of the packet to the task on the task’s writefd descriptor. In Parts V, VI and VII the tasks receive the full data packets directly.

Example 17.2. 

After receiving the following DATA packet, the dispatcher sends the words This is my data to task 2 in computation 3.

Computation ID:

3

Task ID:

2

Packet Type:

DATA

Packet Data Length:

15

Packet Data:

This is my data

The dispatcher also forwards data received from individual tasks to its standard output in the form of DATA packets. For the first four parts of the project, the dispatcher interprets input from readfd as raw output from the task. It creates a DATA packet with the task’s computation ID and task ID and uses the information read from readfd as the information portion of the packet. The dispatcher then writes the DATA packet to its standard output. Starting with Section 17.8, each task reads and writes its data in packet format. In these sections, the dispatcher copies the DATA packets to its standard output.

DONE packets

When the dispatcher receives a DONE packet on standard input, it sets the corresponding task’s endinput member in the tasks array and closes the writefd descriptor for the task. The dispatcher discards any subsequent DONE or DATA packets that arrive for the task.

Example 17.3. 

The following DONE packet specifies that there is no more input data for task 2 in computation 3.

Computation ID:

3

Task ID:

2

Packet Type:

DONE

Packet Data Length:

0

Packet Data:

 

When the dispatcher receives an end-of-file indication on a readfd descriptor, it closes that descriptor and forwards a DONE packet on its standard output. If the writefd descriptor for the task is still open, the dispatcher closes it. The dispatcher must eventually call wait on the child task process and set the compid member of the tasks array entry to –1 so that the array entry can be reused.

If the dispatcher receives an end-of-file indication on its own standard input, it closes the writefd descriptors of all active tasks and sets the endinput member of the tasks array entry for each active task to 1. When it has received an end-of-file indication on the readfd descriptors for all active tasks, the dispatcher waits for each task and exits. The dispatcher should also periodically wait for all its completed children.

I/O and Testing of Dispatcher

This section develops dispatcher I/O functions and debugging layout. The dispatcher receives input data from standard input by calling getpacket and sends output data on standard output by calling putpacket, as shown in Figure 17.5. The data is always transferred in two parts. First, the dispatcher reads or writes a header of type taskpacket_t. Second, it uses the length member in the header to determine how many bytes of packet data to read or to write. Finally, it reads or writes the data portion of the packet. Assume that the packet data field contains no more than MAX_PACK_SIZE bytes so that the dispatcher can use a fixed-length buffer of MAX_PACK_SIZE bytes to hold the packet data during input and output.

Basic dispatcher I/O.

Figure 17.5. Basic dispatcher I/O.

The getpacket function has the following prototype.

int getpacket(int fd, int *compidp, int *taskidp,
               packet_t *typep, int *lenp, unsigned char *buf);

The getpacket function reads a taskpacket_t header from fd and then reads into buf the number of bytes specified by the length member. If successful, getpacket returns 0. If unsuccessful, getpacket returns –1 and sets errno. The getpacket function sets *compidp, *taskidp, *typep and *lenp from the compid, taskid, type and length members of the packet header, respectively. If getpacket receives an end-of-file while trying to read a packet, it returns –1 and sets errno. Since errno will not automatically be set, you must pick an appropriate value. There is no standard error number to represent end-of-file. One possibility is to use EINVAL.

The putpacket function has the following prototype.

int putpacket(int fd, int compid, int taskid,
               packet_t type, int len, unsigned char *buf);

The putpacket function assembles a taskpacket_t header from compid, taskid, type and len. It then writes the packet header to fd followed by len bytes from buf. If successful, putpacket returns 0. If unsuccessful, putpacket returns –1 and sets errno.

Example 17.4. 

The following program uses getpacket and putpacket to copy packets from standard input to standard output.

#include <unistd.h>
#include "ntpvm.h"

int getpacket(int, int *, int *, packet_t *, int *, unsigned char *);
int putpacket(int, int, int, packet_t, int, unsigned char *);

int main(void) {
   unsigned char buf[MAX_PACK_SIZE];
   int compid;
   int taskid;
   int tdatalen;
   int tin, tout;
   packet_t type;

   tin = STDIN_FILENO;
   tout = STDOUT_FILENO;
   while (getpacket(tin, &compid, &taskid, &type, &tdatalen, buf) != -1) {
      if (putpacket(tout, compid, taskid, type, tdatalen, buf) == -1)
         break;
   }
   return 0;
}

The specification for Part I of the project is as follows.

  1. Write the getpacket and putpacket functions.

  2. Compile and run lint on the program to make sure that there are no syntax errors.

  3. Test the program, using one of the methods described below.

  4. Add debugging messages to the loop of the main program to show what values are being read and written. All debugging messages should go to standard error.

The hardest part of the NTPVM project is the testing of the dispatcher. The dispatcher communicates with standard input and standard output, using packets that have non-ASCII components. During debugging, the dispatcher should producemessages on standard error reporting its progress. A small amount of work is needed to isolate the dispatcher output and input from the informative messages by directing the three types of I/O to appear in ASCII format on different screens.

Program 17.2 shows the a2ts filter that reads ASCII characters from standard input, constructs a task packet, and writes it to standard output. The a2ts program writes all prompt messages to standard error, so it can be run either with interactive prompts or with standard input redirected from a file. For interactive use, a2ts prompts for the required information, sending the prompts to standard error.

Example 17.2. a2ts.c

The filter a2ts prompts for information and writes a task packet to standard output. Some error checking is omitted.

#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "restart.h"
#include "ntpvm.h"
#define MAX_LINE_SIZE 100
#define TERMINATE_STRING "!!!!!
"

static char *typename[] = {"Start Task", "Data", "Broadcast", "Done",
                         "Terminate", "Barrier"};

int main(void)  {
   char buf[MAX_PACK_SIZE + MAX_LINE_SIZE];
   char *bufptr;
   int i;
   int linelen;
   taskpacket_t pack;
   int tasktype;
   int wsize;

   wsize = sizeof(taskpacket_t);
   fprintf(stderr, "Ready for first packet
");
   for( ; ; ) {                       /* loop with menu for interactive input */
      fprintf(stderr, "Enter compid:");
      if (scanf("%d", &pack.compid) == EOF)
         break;
      fprintf(stderr, "Enter taskid:");
      scanf("%d", &pack.taskid);
      fprintf(stderr, "Enter task type:
");
      for (i=0; i< NUMTYPES; i++)
         fprintf(stderr, "   %d = %s
", i, typename[i]);
      scanf("%d", &tasktype);
      pack.type = tasktype;
      pack.length = 0;
      bufptr = buf;
      *bufptr = 0;
      fprintf(stderr, "Enter first line of data (%.*s to end):
",
         strlen(TERMINATE_STRING) - 1, TERMINATE_STRING);

      while ((linelen = readline(STDIN_FILENO, bufptr, MAX_LINE_SIZE)) != -1) {
         if (linelen == 0)
            break;
         if (strcmp(TERMINATE_STRING, bufptr) == 0)
            break;
         bufptr = bufptr + linelen;
         pack.length = pack.length + linelen;
         if (pack.length >= MAX_PACK_SIZE) {
            fprintf(stderr, "**** Maximum packet size exceeded
");
            return 1;
         }
         fprintf(stderr, "Received %d, total=%d, Enter line (%.*s to end):
",
             linelen, pack.length, strlen(TERMINATE_STRING) - 1,
             TERMINATE_STRING);
      }
      fprintf(stderr, "Writing packet header: %d %d %d %d
",
          pack.compid, pack.taskid, (int)pack.type, pack.length);
      if (write(STDOUT_FILENO, &pack, wsize) != wsize) {
         fprintf(stderr, "Error writing packet
");
         return 1;
      }
      fprintf(stderr, "Writing %d bytes
", pack.length);
      if (write(STDOUT_FILENO, buf, pack.length) != pack.length) {
         fprintf(stderr,"Error writing packet
");
         return 1;
      }
      fprintf(stderr, "Ready for next packet
");
   }
   fprintf(stderr, "a2ts exiting normally
");
   return 0;
}

The ts2a filter of Program 17.3 reads a task packet from standard input and writes the contents of the packet to standard output in ASCII format. For this project, assume that the data portion of a task packet always contains ASCII information.

Example 17.5. 

The ts2a program assumes that header and data will each be read with a single call to read. How would you make this more robust?

Answer:

Use the readblock function from the restart library described in Appendix B.

Example 17.3. ts2a.c

The ts2a filter reads a packet from standard input and writes the header and data to standard output in ASCII format. Some error checking is omitted.

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "ntpvm.h"
#define MAX_LINE_SIZE 100

static char *typename[] = {"Start Task", "Data", "Broadcast", "Done",
                         "Terminate", "Barrier"};

int main(void) {
   char buf[MAX_PACK_SIZE + MAX_LINE_SIZE];
   int bytesread;
   taskpacket_t pack;
   int wsize;

   wsize = sizeof(taskpacket_t);
   fprintf(stderr, "***** Waiting for first packet
");
   for( ; ; ) {
      bytesread =  read(STDIN_FILENO, &pack, wsize);
      if (bytesread == 0) {
         fprintf(stderr, "End-of-file received
");
         break;
      }
      if (bytesread != wsize) {
         fprintf(stderr, "Error reading packet header
");
         return 1;
      }
      if ( (pack.type < 0) || (pack.type >= NUMTYPES) ) {
         fprintf(stderr, "Got invalid packet
");
         return 1;
      }
      printf("Received packet header of type %s
",typename[pack.type]);
      printf("   compid = %d, taskid = %d, length = %d
",
             pack.compid, pack.taskid, pack.length);
      fflush(stdout);
      if (pack.length > MAX_PACK_SIZE) {
         fprintf(stderr, "Task data is too long
");
         return 1;
      }
      if (read(STDIN_FILENO, buf, pack.length) != pack.length) {
         fprintf(stderr, "Error reading packet data
");
         return 1;
      }
      write(STDOUT_FILENO, buf, pack.length);
      fprintf(stderr, "***** Waiting for next packet
");
   }
   return 0;
}

Example 17.6. 

The following command prompts for the fields of a packet. It then echoes the packet to standard output in ASCII format.

a2ts | ts2a

The a2ts program of Example 17.6 interactively prompts for packet information and writes the information as a binary packet to its standard output. The standard output of a2ts is piped into standard input of ts2a. The ts2a program reads binary packets from its standard input and outputs them in ASCII format to its standard output. Input entered to a2ts will be interleaved with output from ts2a, but this should not be a problem since ts2a will not produce any output until a2ts has received an entire packet.

Example 17.7. 

The following command shows a possible method of testing the dispatcher interactively. For now, use the testpacket program of Example 17.4 instead of the dispatcher.

a2ts | dispatcher | ts2a

Example 17.7 pipes standard output of a2ts into standard input of the dispatcher and standard output of the dispatcher into ts2a. The command line of Example 17.7 allows a user to enter ASCII data and to display the task packet output in ASCII. Unfortunately, real tests produce too much data from different sources, making it difficult to distinguish information from different programs. Input to a2ts and output from ts2a will be interleaved with error messages sent to standard error. The next two subsections propose two different methods for handling this problem.

Testing with multiple windows

The first strategy for improving the usability of a2ts and ts2a in testing the dispatcher is to use separate windows, as shown in Figure 17.6. The dispatcher, which runs in the dispatcher window, redirects its standard input to the named pipe inpipe and its standard output to the named pipe outpipe. The output from the dispatcher’s standard error still appears in the dispatcher window. The a2ts program reads from standard input in the input window and writes to its standard output, which is redirected to the named pipe inpipe. Enter packets in ASCII format in this window. The ts2a program redirects its standard input to the named pipe outpipe. As the dispatcher runs, ts2a displays dispatcher output in the output window.

Figure 17.6 shows the setup for the three windows. Be sure to use the same working directory for all three windows. The procedure for running the dispatcher is as follows.

Use three windows to debug the NTPVM dispatcher.

Figure 17.6. Use three windows to debug the NTPVM dispatcher.

  1. Create two named pipes in the dispatcher window by executing the following commands.

    mkfifo outpipe
    mkfifo inpipe
    
  2. Start the dispatcher in the dispatcher window by executing the following command.

    dispatcher < inpipe > outpipe
    

    This window displays only the messages that the dispatcher sends to standard error, since both standard input and standard output are redirected.

  3. In the output window, execute the following command.

    ts2a < outpipe
    

    This window displays the packets coming from the standard output of the dispatcher.

  4. In the input window, execute the following command.

    a2ts > inpipe
    

    This window displays the prompts for the user to enter packets. The a2ts program converts the entered information from ASCII to packet format and writes it to the standard input of the dispatcher.

Figure 17.7 shows the layout of the windows for the debugging. If you do not have a workstation that supports multiple windows, try to persuade your system administrator to install a program such as screen, which supports multiple screens on an ASCII terminal.

Logical process layout for debugging the dispatcher.

Figure 17.7. Logical process layout for debugging the dispatcher.

Testing with remote logging

The second strategy for testing the dispatcher uses the remote logging facility discussed in Section 10.3.4 and in Appendix D. Replace the ts2a program with the ts2log program of Program 17.4. The ts2log program uses the r_readblock function of the restart library described in Appendix B.

Example 17.8. 

The following command shows how to test the dispatcher by using remote logging.

a2ts | dispatcher | ts2log

The dispatcher should also log events. It could send the packets to standard output and have the ts2log program receive them through redirection. Alternatively, the dispatcher could log them directly.

Example 17.4. ts2log.c

A program that logs packets using the remote logging utilities. Some error checking is omitted.

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "ntpvm.h"
#include "restart.h"
#include "rlogging.h"
#define MAX_LINE_SIZE 100

static char *typename[] = {"Start Task", "Data", "Broadcast", "Done",
                         "Terminate", "Barrier"};

int main(void) {
   char buf[MAX_PACK_SIZE + MAX_LINE_SIZE];
   int bytesread;
   LFILE *lf;
   taskpacket_t pack;
   int wsize;

   wsize = sizeof(taskpacket_t);
   lf = lopen(NULL,0);
   if (lf == NULL)
      fprintf(stderr, "Failed to open remote logger.
");
   for( ; ; ) {
      bytesread =  readblock(STDIN_FILENO, &pack, wsize);
      if (bytesread == 0) {
         lprintf(lf, "End-of-file received
");
         break;
      }
      if (bytesread != wsize) {
         lprintf(lf, "Error reading packet header
");
         return 1;
      }
      if ( (pack.type < 0) || (pack.type >= NUMTYPES) ) {
         fprintf(stderr, "Got invalid packet
");
         return 1;
      }
      lprintf(lf, "%s %s
   compid = %d
   taskid = %d
   length = %d
",
             "Received packet header of type",
             typename[pack.type], pack.compid, pack.taskid, pack.length);
      if (pack.length > MAX_PACK_SIZE) {
         lprintf(lf, "Task data is too long
");
         return 1;
      }
      if (readblock(STDIN_FILENO, buf, pack.length) != pack.length) {
         lprintf(lf, "Error reading packet data
");
         return 1;
      }
      lprintf(lf, buf, pack.length);
   }
   return 0;
}

Single Task with No Input

This part of the project uses a single task that has no input to allow testing of the code to create the task and the pipes for communication without the added complication of monitoring multiple file descriptors for input. The task outputs ASCII text rather than packets.

The dispatcher reads a single NEWTASK packet from standard input, creates the appropriate pipes, and forks the child that executes the task. The dispatcher then monitors the readfd pipe file descriptor for output from the task and forwards what it reads as DATA packets on standard output. When the dispatcher encounters an end-of-file on readfd, it waits for the child task to exit and then exits.

Implement the NTPVM dispatcher as described above. The dispatcher does the following.

  1. Read a packet from standard input, using getpacket. If the packet is not a NEWTASK packet, then exit after outputting an error message.

  2. Create a pipe for communication with a child task.

  3. Fork a child to execute the command given in the NEWTASK packet of step 1. The child should redirect standard input and output to the pipe and close all pipe file descriptors before executing the command. Use the makeargv function of Program 2.2 on page 37 to construct the argument array in the child. If an error occurs, the child just exits after printing an informative message.

  4. Have the parent close all unneeded pipe descriptors so that the parent can detect end-of-file on readfd.

  5. Wait for output from the child on readfd. For this part of the assignment, the child will be executing standard UNIX commands. Assume that the child outputs only text. The dispatcher reads the child task’s output from readfd, wraps this output in a DATA packet, and sends the packet to standard output by calling putpacket.

  6. If getpacket returns an error, assume that this is an end-of-file. Close the readfd and writefd descriptors for the task. Send a DONE packet to standard output identifying the task and exit.

The dispatcher should liberally use standard error or the remote logging facility to display informative messages about what it is doing. For example, when it receives something from readfd, the dispatcher should display information about the source task, the number of bytes read and the message read. It is worthwhile to invest time in designing a readable layout for the informative messages so that all the relevant information is available at a glance.

Test the program by using ls -l as the command to be executed.

Sequential Tasks

This section describes the behavior of the dispatcher when the child task has both input and output. Although the dispatcher handles only one task at a time, it must monitor two input file descriptors. Complete Section 17.5 before starting this part.

The dispatcher keeps information about the child task in the tasks array. For simplicity, the discussion refers to members of the ntpvm_task_t array such as readfd without their qualifying structure. Implement the tasks array as an object with appropriate access functions. The tasks array and its access functions should be in a file separate from the dispatcher main program. The array and its access functions are referred to as the tasks object, and an individual element of the tasks array is referred to as an entry of the tasks object. For this part, we only allow one task at a time, so the tasks object does not need an array of tasks.

Figure 17.8 suggests the structure of threaded NTPVM dispatcher. An input thread monitors standard input and processes the incoming packets. An output thread monitors the readfd descriptor for input from the child task and writes this information to standard output.

Schematic of a threaded NTPVM dispatcher for a single task.

Figure 17.8. Schematic of a threaded NTPVM dispatcher for a single task.

The input and output threads share the tasks object and must synchronize their access to this structure. One possible approach for synchronizing threads is to use a mutex lock to protect the entire tasks object. This choice cuts down on the potential parallelism because only one thread at a time can access the tasks object. Since mutex locks are low cost, we use a mutex lock for each element of the tasks array.

The input thread

The input thread monitors standard input and takes action according to the input it receives. Write an input function that executes the following steps in a loop until it encounters an end-of-file on standard input.

  1. Read a packet from standard input by using getpacket.

  2. Process the packet.

After falling through the loop, close writefd and call pthread_exit.

Processing a packet depends on the packet type.

NEWTASK
  1. If a child task is already executing, discard the packet and output an error message.

  2. Otherwise, if no child task exists, create two pipes to handle the task’s input and output.

  3. Update the tasks object, and fork a child. The child should redirect its standard input and output to the pipes and use the makeargv function of Program 2.2 to construct the argument array before calling execvp to execute the command given in the packet.

  4. Create a detached output thread by calling pthread_create. Pass a key for the tasks entry of this task as an argument to the output thread. The key is just the index of the appropriate tasks array entry.

DATA
  1. If the packet’s communication and task IDs don’t match those of the executing task or if the task’s endinput is true, output an error message and discard the packet.

  2. Otherwise, copy the data portion to writefd.

  3. Update the recvpackets and recvbytes members of the appropriate task entry of the tasks object.

DONE
  1. If the packet’s computation and task IDs do not match those of the executing task, output an error message and discard the packet.

  2. Otherwise, close the writefd descriptor if it is still open.

  3. Set the endinput member for this task entry.

BROADCAST, BARRIER or TERMINATE

  1. Output an error message.

  2. Discard the packet.

Example 17.9. 

When a process that contains multiple threads creates a child by calling fork, how many threads exist in the child?

Answer:

Although fork creates a copy of the process, the child does not inherit the threads of the parent. POSIX specifies that the child has only one thread of execution—the thread that called fork.

The output thread

The output thread handles input from the readfd descriptor of a particular task. The output thread receives a tasks object key to the task it monitors as a parameter. Write an output function that executes the following steps in a loop until it encounters an end-of-file on readfd.

  1. Read data from readfd.

  2. Call putpacket to construct a DATA packet and send it to standard output.

  3. Update the sentpackets and sentbytes members of the appropriate task entry in the tasks object.

After falling through the loop because of an end-of-file or an error on readfd, the output thread does the following.

  1. Close the readfd and writefd descriptors for the task.

  2. Execute wait for the child task.

  3. Send a DONE packet with the appropriate computation and task IDs to standard output.

  4. Output information about the finished task to standard error or to the remote logger. Include the computation ID, the task ID, the total bytes sent by the task, the total packets sent by the task, the total bytes received by the task and the total packets received by the task.

  5. Deactivate the task entry by setting the computation ID to –

  6. Call pthread_exit.

Test the program by starting tasks to execute various cat and ls -l commands. Try other filters such as sort to test the command-line parsing. For this part you should not enter a new command until the previous command has completed.

Concurrent Tasks

Modify the program to allow multiple computations and tasks. Use a MAX_TASKS value of 10 for this part. A new NEWTASK packet may come in before the data from previous tasks has been completely transmitted.

When a new NEWTASK packet comes in, find an available slot in the tasks object, create a new set of pipes, and fork a new child to execute the command. Don’t enter any duplicates in the tasks array.

Figure 17.9 shows a schematic of a threaded NTPVM dispatcher that supports multiple simultaneous tasks. When another request comes in, the input thread creates a new output thread. Since multiple output threads write to standard output, define an additional mutex lock to synchronize output on the dispatcher’s standard output.

Schematic of a threaded NTPVM dispatcher.

Figure 17.9. Schematic of a threaded NTPVM dispatcher.

Packet Communication, Broadcast and Barriers

Once the dispatcher handles multiple simultaneous tasks, implement the handling of the BROADCAST and BARRIER packets. The child tasks now have to communicate with the dispatcher in packet format so that the dispatcher and its tasks can distinguish control information (broadcast or barrier) from data information.

When the dispatcher receives a BROADCAST request from standard input, it forwards the packet on the writefd descriptors for each task whose computation ID matches that of the BROADCAST packet. If the dispatcher receives a BROADCAST request from one of the readfd descriptors, it forwards the packet on the writefd descriptors for each task whose computation ID matches that in the BROADCAST packet. Since, in a future extension, tasks from the computation may reside on other hosts, the dispatcher also forwards the packet on its standard output.

When the dispatcher receives a BARRIER packet from a task, it sets the barrier member for that task to the barrier number specified by the packet data. When all the tasks in a computation have reported that they are waiting for the barrier, the dispatcher sends a BARRIER message on standard output.

When the dispatcher reads a BARRIER packet for that barrier number from standard input, it resets the barrier member to –1 and sends a SIGUSR1 signal to all the tasks in the computation. The BARRIER packet from standard input signifies that all tasks in the computation are waiting at the designated barrier and that they can be released. Assume that the dispatcher never receives a second BARRIER packet from standard input before it has forwarded a corresponding BARRIER packet on standard output.

Implement the barrier on the task side by blocking the SIGUSR1 signal, writing a BARRIER packet to standard output, and then executing sigsuspend in a loop until the SIGUSR1 signal arrives. Example 8.26 shows how this is done.

Write a dummy task program to generate appropriate broadcast and barrier messages.

Example 17.10. 

What complications do BROADCAST packets present from a synchronization point of view?

Answer:

Since BROADCAST packets may have to be forwarded to other tasks, the input and output threads now share the writefd descriptor associated with those tasks.

Termination and Signals

Implement signal handling so that the dispatcher shuts down gracefully when it receives Ctrl-C. Also add code to handle TERMINATE packets.

Ordered Message Delivery

Add a sequence number to the packet format and implement in-order delivery of packets from each source-destination pair.

Additional Reading

The PVM system was developed by Oak Ridge National Laboratory and Emory University. The paper “PVM: A framework for parallel distributed computing” by V. S. Sunderam [118] provides an overview of the development and implementation of the PVM system. Other articles of interest include “Visualization and debugging in a heterogeneous environment” by Beguelin et al. [10] and “Experiences with network-based concurrent computing on the PVM system” by Geist and Sunderam [41]. The PVM distribution is available electronically from www.csm.ornl.gov/pvm.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset