Networked Threads

We’ve seen how to make separate threads of control in a Java applet or application, and we’ve discussed the various ways that the Java API allows you to manage threads at runtime. Now we’ll go over some of the issues that arise with multithreaded distributed applications, and how the Java environment helps you deal with them.

Asynchronous Agents

The threaded implementation of our Solver interface in Example 4.1 shows how multithreaded servers can be implemented in Java. This allows our server to respond to clients asynchronously and to service their requests in parallel, which can reduce the amount of time a client has to wait for a response. The alternative is to have a server with only one thread servicing clients on a first-come, first-serve basis. So if client A is the first client to make a request, the server begins processing it right away. If client B makes a request while the server is processing client A’s job, then B will have to wait for the server to finish A’s job before its job can be started. In fact, client B won’t even get an acknowledgment from the server until client A’s job is done. With the multithreaded server, an independent thread can listen for client requests and acknowledge them almost immediately (or as soon as the thread scheduler gives it a CPU time slice). And with the jobs being allocated to separate threads for processing, the CPU resources will be spread out between the two jobs, and B’s job will potentially finish sooner (though client A’s job might finish later, since it is now getting less than 100% of the CPU).

Threads are useful in any distributed system where we want an agent to respond to asynchronous messages. By isolating communications in a separate thread, the other threads in the process can continue to do useful work while the communications thread blocks on a socket waiting for messages. The client process shown in Example 4.3 only has a single thread, since it doesn’t really have anything else to do but wait for the server to send a response. But we could easily reuse these classes in a multithreaded client as a single communications thread, or as multiple threads talking to multiple servers.

Distributed ThreadGroups

You can probably imagine situations where it would be useful to define a ThreadGroup that includes Threads from several agent processes in a distributed application. A distributed database application, for example, might be designed such that each agent contains a thread responsible for routing SQL calls to one of the databases in the system. If the database suddenly becomes temporarily unavailable, perhaps for some administrative task, then you might like to be able to perform a batch suspend() on all of the threads in the distributed system responsible for that database, to guarantee that the blocked threads don’t attempt database connections until the database is fully online. When the database administration is complete and we get confirmation that the database is online, we can send a batch resume() to the distributed thread group to activate the threads again.

Unfortunately, since the ThreadGroup class in the java.lang package is implemented with nearly all of the critical methods defined as final, we can’t just extend this class to implement a distributed thread group. However, we can implement a distributed thread group by defining a class that can handle the network communications and use the existing ThreadGroup interface.

Example 4.4 shows the DistThreadGroup class, which represents a group of threads distributed across the network. It basically acts as a local agent for a set of ThreadGroups across the network, which might also include a ThreadGroup on the local host. The DistThreadGroup has two major tasks:

  • When a state change is requested locally (e.g., to suspend the thread group), it broadcasts the request to all other threads in the distributed group so that the entire distributed thread group changes state.

  • It listens to a port on the local host for messages from other agents to change its state.

Example 4-4. A Distributed Thread Group
package dcj.utils.Thread;

import java.lang.*;

/*
 * DistThreadGroup
 *
 *     Local representation of a group of threads distributed across 
 * processes on the network. Allows for the definition and control of 
 * distributed threads.  
 *
 */
 
public class DistThreadGroup extends Thread {
  // Protected instance variables
  protected ThreadGroup  localGroup;
  protected HashTable    remoteGroups;
  protected ServerSocket incoming;
  protected int          localPort;
  
  // Class variables
  static final int       hostIdx = 0;
  static final int       portIdx = 1;

  // Public constructors
  public DistThreadGroup(ThreadGroup g, int port) {
    localGroup = g;
    localPort = port;
  }

  public DistThreadGroup(int port) {
    localGroup = new ThreadGroup();
    localPort = port;
  }

  public DistThreadGroup(String rHost, int rPort, String gname, int port) {
    localGroup = new ThreadGroup();
    localPort = port
    Add(gname, rHost, rPort);
  }

  // Add a remote thread group to this group
  public void Add(String gname, String host, int port) {
    RmtThreadGroup rg = new RmtThreadGroup(host, port);
    remoteGroups.put(gname, rg);
  }
  
  // Remove a thread group from this group
  public void Remove(String gname) {
    remoteGroups.remove(gname);
  }  

  // Get the local thread group belonging to this distributed group
  public ThreadGroup GetLocalGroup() {
    return localGroup;
  }
  
  // Implementation of Thread::run - checks its port on the current machine
  // waiting for messages from remote members of this group.
  public void run() {
    incoming = new ServerSocket(localPort);
    while (true) {
      Socket peer = incoming.accept();
      DataInputStream is = new DataInputStream(peer.getInputStream());
      String input = is.readUTF();
      if (input.compareTo("suspend") == 0)
        suspend();
      else if (input.compareTo("resume") == 0)
        resume();
      //
      // Check for other messages here ("stop", "start", etc.)
      // ...                           
      else {
        System.out.println("DistThreadGroup: Received unknown command \""
                           + input + "\"");
      }
    }
  }

  // Suspend the group of threads. If requested, the suspend
  // command is sent to the remote threads first, then the local group
  // is suspended.
  public synchronized void suspend(boolean bcast) {
    if (bcast) 
      broadcastCmd("suspend");
      
    if (localGroup)
      localGroup.suspend();
  }
  
  // Resume the group of threads. If requested, the resume 
  // command is sent to the remote threads first, then the 
  // local group is resumed.
  public synchronized void resume(boolean bcast) {
    if (bcast)
      broadcastCmd("resume");
      
    if (localGroup)
      localGroup.resume();
  }
  
  //
  //  Implement other methods corresponding to ThreadGroup methods here
  //  (e.g. resume(), stop())
  //  ...
  
  // Broadcast the given message to the remote thread groups.
  protected void broadcastCmd(String cmd) {
    Enumeration e = remoteGroups.elements();
    while (e.hasMoreElements()) {
      RmtThreadGroup rg = (RmtThreadGroup)e.nextElement();
      try {
        Socket s = new Socket(rg.getHost(), rg.getPort());
        DataOutputStream os = new DataOutputStream(s.getOutputStream());
        os.writeUTF(cmd);
      }
      catch (Exception e) {
        System.out.println("DistThreadGroup: Failed to " + cmd
                           " group at \"" + rg.getHost() + ":"
                           + rg.getPort());
      }
    }
  }
}

The DistThreadGroup represents the distributed thread group using a local ThreadGroup and a hashtable of remote thread groups. The remote thread groups are represented by a RmtThreadGroup class, which for this example is simply a host/port pair, as shown in Example 4.5. The host and port number pairs indicate how to contact the DistThreadGroups running on the remote host, and they are keyed in the hashtable using a name, which is just a way to refer to the remote group locally.

Example 4-5. A Utility Class for Tracking Remote Thread Groups
package dcj.utils.Thread;

import java.lang.String;

public class RmtThreadGroup {
  protected String host = "";
  protected int port = 0;
 
  public RmtThreadGroup() {}
  
  public RmtThreadGroup(String h, int p) {
    host = h;
    port = p;
  }

  public String getHost() { return host; }
  public int getPort() { return port; }
  public void setHost(String h) { host = h; }
  public void setPort(int p) { port = p; }
}

When a state change is made to the DistThreadGroup by calling one of its methods, the change is broadcast to the remote thread groups, then the change is made to the local thread group. To broadcast the change, we sequentially open a socket to each remote thread group’s host and port number, then send a message to the remote group indicating the change to make. The only methods shown in the example are the suspend() and resume() methods, but you can imagine how the other ThreadGroup methods would be implemented. If its bcast argument is true, then the suspend() and resume() methods use the broadcastCmd() method to send the same command to each remote thread group. The broadcastCmd() method iterates through the contents of the hashtable, and for each host/port pair, it opens up a socket to the host, attaches a DataOutputStream to the output stream of the socket, and sends the command string to the remote process. After the command has been broadcast to the remote groups, then the suspend() and resume() methods call the corresponding method on the local ThreadGroup, either suspending or resuming all of its threads.

Each DistThreadGroup is also a Thread, whose run() method listens on a port for messages coming in from remote thread groups, telling it to change its state. When a connection is made on its port, the DistThreadGroup checks for a message on the socket’s input stream. It then calls the appropriate method as indicated by the message. Note that when receiving state-change messages over the socket, the DistThreadGroup calls the local version of the state-change method with the bcast argument set to false. We assume that the agent originating the message will broadcast it to the other agents in the distributed group, so the receiving agent doesn’t need to repeat the broadcast.

One flaw in this design is that the DistThreadGroup could be added locally to the ThreadGroup that it’s managing. If a request to suspend the distributed group is received, then the DistThreadGroup will suspend the remote groups, and then suspend the local group—and in the process suspend itself. If we try to call resume() on the same DistThreadGroup object, the method won’t run, because the DistThreadGroup’s local thread is still suspended. We won’t be able to resume the distributed thread group until the resume() method is called directly on either the DistThreadGroup, or on its local ThreadGroup .

Improving Efficiency with Thread Priorities

As we mentioned previously, thread priorities don’t guarantee a particular processing order. They only influence the thread scheduling algorithm. But this influence can be a powerful way to control the perceived performance of your application, especially in situations where you have a good idea of how quickly you want the threads to run relative to each other.

As always, things are more complicated in a distributed system, where there are processes located across the network, each containing its own threads and thread priorities. Presumably, these processes are working together to complete some job, so ideally we’d like to have the thread priorities coordinated to match the relationships between the processes and threads in the system. What this means exactly depends on the job you’re trying to do and the environment in which you’re trying to do it.

Let’s assume that you’re running your distributed system in an ideal environment: each host has the same operating system, CPU resources, memory, current load, etc. Now suppose that you’re running a group of identical agent processes on this cluster of hosts, with the same number of agents on each host. Maybe each agent is solving a piece of a large problem, like a finite-element analysis; or perhaps each agent represents a node in a replicated database system. Under these assumptions, we should be able to come up with optimal thread priorities for the threads in each agent in the system. Since the hosts and the agents are completely homogenous, we can use a single host and agent, figure out the best thread priorities for that agent, and then use those priorities on all of the other agents in the distributed system.

In the distributed solver, for example, each agent is made up of two threads: one responsible for solving its piece of the problem, and another responsible for communicating with other agents in the system (broadcasting status or results, for example). The RunnableSolverServer classes are threads that listen for client requests and the RunnableSolver classes are threads that are “spun off " by the server to handle each request coming from remote clients. In general, we want to give the communication thread a higher priority than the CPU threads, so that it has a chance of getting some CPU time to check for requests, or to send off a few messages before the computation continues. Unless the computing job we give the RunnableSolvers is really trivial, the RunnableSolver threads are going to be running full-speed almost continuously, demanding as much CPU time as they can get. If we give them a higher priority than the I/O thread, the I/O thread will probably be blocked for long periods of time waiting for the computing threads either to finish or to yield() to other threads running on the same CPU.

If we can’t rely on homogeneous agents and hosts, finding the best set of priorities for the threads in each agent isn’t so simple. If we have several different kinds of agents composed of various types of threads, and these threads are interacting with each other in various ways, then we need to understand the major trends in these interactions in order to come up with optimal thread priorities for each agent. If we also have different types of hosts in our distributed system, we can’t make assumptions about the underlying system that the Java runtime is using (single- or multiple-CPU, time-slicing or not, other applications sharing CPU load, etc.), so we can’t foresee exactly how threads will be allocated CPU time and program it into our application.

In some cases the only way to make effective use of thread priorities is to have some way of monitoring the performance of your distributed system, and set thread priorities dynamically at runtime. Monitoring system performance is an issue of its own, and we won’t go into detail about it here. Setting thread priorities dynamically is supported by the Java API within a single process on a single Java virtual machine, but we have to come up with a way to set priorities on threads across a distributed system. One way to do this is to extend our DistThreadGroup class in Example 4.4 to allow us to both get and set priorities on threads in the entire distributed group. We could add getPriority() and setPriority() methods, for example, that take the remote group name as an argument, as well as the name of a thread in the remote thread group. This message could be passed on to the remote group the same way we pass state-change messages. Another approach would be to make a version of DistThreadGroup that is also an RMI remote object. Then remote agents could get a stub for the DistThreadGroup and call the getPriority() and setPriority() methods directly.

Now that I’ve shown how distributed thread priority manipulation is possible in the Java environment, I have to admit that it’s probably not something that will be commonly used. The overhead that you’ll need to monitor and reason about the running state of the distributed system in order to calculate optimal thread priorities will usually outweigh the performance improvements that you’ll be able to get by trying to influence thread scheduling.

Synchronizing Distributed Threads

The ability to synchronize threads is a necessity in situations where data is accessible and modifiable by multiple threads. This synchronization is easily extendible to distributed situations, where data is accessible to multiple agents on the network. The simplest example is one where some runtime data in a Java process is accessible by multiple agents in a distributed application. If we wanted to allow clients to query our multithreaded solvers for their current problem sets, we could easily synchronize this access by making the Problem() methods on the Solvers synchronized. Since this method is the only means for accessing a problem set, doing a local synchronization on it ensures that every accessor, whether it’s a local thread or an external agent making a request over a socket, will have synchronous access to the data.

Get Java Distributed Computing now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.