Networked Threads
We’ve seen how to make separate threads of control in a Java applet or application, and we’ve discussed the various ways that the Java API allows you to manage threads at runtime. Now we’ll go over some of the issues that arise with multithreaded distributed applications, and how the Java environment helps you deal with them.
Asynchronous Agents
The threaded implementation of our
Solver
interface in Example 4.1
shows how multithreaded servers can be implemented in Java. This
allows our server to respond to clients asynchronously and to service
their requests in parallel, which can reduce the amount of time a
client has to wait for a response. The alternative is to have a
server with only one thread servicing clients on a first-come,
first-serve basis. So if client A is the first client to make a
request, the server begins processing it right away. If client B
makes a request while the server is processing client A’s job,
then B will have to wait for the server to finish A’s job
before its job can be started. In fact, client B won’t even get
an acknowledgment from the server until client A’s job is done.
With the multithreaded server, an independent thread can listen for
client requests and acknowledge them almost immediately (or as soon
as the thread scheduler gives it a CPU time slice). And with the jobs
being allocated to separate threads for processing, the CPU resources
will be spread out between the two jobs, and B’s job will
potentially finish sooner (though client A’s job might finish
later, since it is now getting less than 100% of the CPU).
Threads are useful in any distributed system where we want an agent to respond to asynchronous messages. By isolating communications in a separate thread, the other threads in the process can continue to do useful work while the communications thread blocks on a socket waiting for messages. The client process shown in Example 4.3 only has a single thread, since it doesn’t really have anything else to do but wait for the server to send a response. But we could easily reuse these classes in a multithreaded client as a single communications thread, or as multiple threads talking to multiple servers.
Distributed ThreadGroups
You can probably imagine
situations where it would be useful to define a
ThreadGroup
that includes
Thread
s from several agent processes in a
distributed application. A distributed database application, for
example, might be designed such that each agent contains a thread
responsible for routing SQL calls to one of the databases in the
system. If the database suddenly becomes temporarily unavailable,
perhaps for some administrative task, then you might like to be able
to perform a batch suspend()
on all of the
threads in the distributed system responsible for that database, to
guarantee that the blocked threads don’t attempt database
connections until the database is fully online. When the database
administration is complete and we get confirmation that the database
is online, we can send a batch resume()
to the
distributed thread group to activate the threads again.
Unfortunately, since the ThreadGroup
class in
the java.lang
package is implemented with nearly
all of the critical methods defined as final
, we
can’t just extend this class to implement a distributed thread
group. However, we can implement a distributed thread group by
defining a class that can handle the network communications and use
the existing ThreadGroup
interface.
Example 4.4 shows the
DistThreadGroup
class, which represents a group of
threads distributed across the network. It basically acts as a local
agent for a set of ThreadGroup
s across the
network, which might also include a ThreadGroup
on the local host. The DistThreadGroup
has two
major tasks:
When a state change is requested locally (e.g., to suspend the thread group), it broadcasts the request to all other threads in the distributed group so that the entire distributed thread group changes state.
It listens to a port on the local host for messages from other agents to change its state.
package dcj.utils.Thread; import java.lang.*; /* * DistThreadGroup * * Local representation of a group of threads distributed across * processes on the network. Allows for the definition and control of * distributed threads. * */ public class DistThreadGroup extends Thread { // Protected instance variables protected ThreadGroup localGroup; protected HashTable remoteGroups; protected ServerSocket incoming; protected int localPort; // Class variables static final int hostIdx = 0; static final int portIdx = 1; // Public constructors public DistThreadGroup(ThreadGroup g, int port) { localGroup = g; localPort = port; } public DistThreadGroup(int port) { localGroup = new ThreadGroup(); localPort = port; } public DistThreadGroup(String rHost, int rPort, String gname, int port) { localGroup = new ThreadGroup(); localPort = port Add(gname, rHost, rPort); } // Add a remote thread group to this group public void Add(String gname, String host, int port) { RmtThreadGroup rg = new RmtThreadGroup(host, port); remoteGroups.put(gname, rg); } // Remove a thread group from this group public void Remove(String gname) { remoteGroups.remove(gname); } // Get the local thread group belonging to this distributed group public ThreadGroup GetLocalGroup() { return localGroup; } // Implementation of Thread::run - checks its port on the current machine // waiting for messages from remote members of this group. public void run() { incoming = new ServerSocket(localPort); while (true) { Socket peer = incoming.accept(); DataInputStream is = new DataInputStream(peer.getInputStream()); String input = is.readUTF(); if (input.compareTo("suspend") == 0) suspend(); else if (input.compareTo("resume") == 0) resume(); // // Check for other messages here ("stop", "start", etc.) // ... else { System.out.println("DistThreadGroup: Received unknown command \"" + input + "\""); } } } // Suspend the group of threads. If requested, the suspend // command is sent to the remote threads first, then the local group // is suspended. public synchronized void suspend(boolean bcast) { if (bcast) broadcastCmd("suspend"); if (localGroup) localGroup.suspend(); } // Resume the group of threads. If requested, the resume // command is sent to the remote threads first, then the // local group is resumed. public synchronized void resume(boolean bcast) { if (bcast) broadcastCmd("resume"); if (localGroup) localGroup.resume(); } // // Implement other methods corresponding to ThreadGroup methods here // (e.g. resume(), stop()) // ... // Broadcast the given message to the remote thread groups. protected void broadcastCmd(String cmd) { Enumeration e = remoteGroups.elements(); while (e.hasMoreElements()) { RmtThreadGroup rg = (RmtThreadGroup)e.nextElement(); try { Socket s = new Socket(rg.getHost(), rg.getPort()); DataOutputStream os = new DataOutputStream(s.getOutputStream()); os.writeUTF(cmd); } catch (Exception e) { System.out.println("DistThreadGroup: Failed to " + cmd " group at \"" + rg.getHost() + ":" + rg.getPort()); } } } }
The DistThreadGroup
represents the distributed
thread group using a local ThreadGroup
and a
hashtable of remote thread groups. The remote thread groups are
represented by a RmtThreadGroup
class, which for this example
is simply a host/port pair, as shown in Example 4.5.
The host and port number pairs indicate how to contact the
DistThreadGroup
s running on the remote host, and
they are keyed in the hashtable using a name, which is just a way to
refer to the remote group locally.
package dcj.utils.Thread; import java.lang.String; public class RmtThreadGroup { protected String host = ""; protected int port = 0; public RmtThreadGroup() {} public RmtThreadGroup(String h, int p) { host = h; port = p; } public String getHost() { return host; } public int getPort() { return port; } public void setHost(String h) { host = h; } public void setPort(int p) { port = p; } }
When a state change is made to the
DistThreadGroup
by calling one of its methods,
the change is broadcast to the remote thread groups, then the change
is made to the local thread group. To broadcast the change, we
sequentially open a socket to each remote thread group’s host
and port number, then send a message to the remote group indicating
the change to make. The only methods shown in the example are the
suspend()
and
resume()
methods, but you can imagine how the
other ThreadGroup
methods would be implemented.
If its bcast
argument is
true
, then the suspend()
and
resume()
methods use the
broadcastCmd()
method to send the same command to each remote thread group. The
broadcastCmd()
method iterates through the
contents of the hashtable, and for each host/port pair, it opens up a
socket to the host, attaches a DataOutputStream
to the output stream of the socket, and sends the command string to
the remote process. After the command has been broadcast to the
remote groups, then the suspend()
and
resume()
methods call the corresponding method
on the local ThreadGroup
, either suspending or
resuming all of its threads.
Each DistThreadGroup
is also a
Thread
, whose run()
method
listens on a port for messages coming in from remote thread groups,
telling it to change its state. When a connection is made on its
port, the DistThreadGroup
checks for a message
on the socket’s input stream. It then calls the appropriate
method as indicated by the message. Note that when receiving
state-change messages over the socket, the
DistThreadGroup
calls the local version of the
state-change method with the bcast
argument set
to false
. We assume that the agent originating the
message will broadcast it to the other agents in the distributed
group, so the receiving agent doesn’t need to repeat the
broadcast.
One flaw in this design is that the
DistThreadGroup
could be added locally to the
ThreadGroup
that it’s managing. If a
request to suspend the distributed group is received, then the
DistThreadGroup
will suspend the remote groups,
and then suspend the local group—and in the process suspend
itself. If we try to call resume()
on the same
DistThreadGroup
object, the method won’t
run, because the DistThreadGroup
’s local
thread is still suspended. We won’t be able to resume the
distributed thread group until the resume()
method is called directly on either the
DistThreadGroup
, or on its local
ThreadGroup
.
Improving Efficiency with Thread Priorities
As we mentioned previously, thread priorities don’t guarantee a particular processing order. They only influence the thread scheduling algorithm. But this influence can be a powerful way to control the perceived performance of your application, especially in situations where you have a good idea of how quickly you want the threads to run relative to each other.
As always, things are more complicated in a distributed system, where there are processes located across the network, each containing its own threads and thread priorities. Presumably, these processes are working together to complete some job, so ideally we’d like to have the thread priorities coordinated to match the relationships between the processes and threads in the system. What this means exactly depends on the job you’re trying to do and the environment in which you’re trying to do it.
Let’s assume that you’re running your distributed system in an ideal environment: each host has the same operating system, CPU resources, memory, current load, etc. Now suppose that you’re running a group of identical agent processes on this cluster of hosts, with the same number of agents on each host. Maybe each agent is solving a piece of a large problem, like a finite-element analysis; or perhaps each agent represents a node in a replicated database system. Under these assumptions, we should be able to come up with optimal thread priorities for the threads in each agent in the system. Since the hosts and the agents are completely homogenous, we can use a single host and agent, figure out the best thread priorities for that agent, and then use those priorities on all of the other agents in the distributed system.
In the distributed solver, for example, each agent is made up of two
threads: one responsible for solving its piece of the problem, and
another responsible for communicating with other agents in the system
(broadcasting status or results, for example). The
RunnableSolverServer
classes are threads that
listen for client requests and the
RunnableSolver
classes are threads that are
“spun off " by the server to handle each request coming
from remote clients. In general, we want to give the communication
thread a higher priority than the CPU threads, so that it has a
chance of getting some CPU time to check for requests, or to send off
a few messages before the computation continues. Unless the computing
job we give the RunnableSolver
s is really
trivial, the RunnableSolver
threads are going to
be running full-speed almost continuously, demanding as much CPU time
as they can get. If we give them a higher priority than the I/O
thread, the I/O thread will probably be blocked for long periods of
time waiting for the computing threads either to finish or to
yield()
to other threads running on the same
CPU.
If we can’t rely on homogeneous agents and hosts, finding the best set of priorities for the threads in each agent isn’t so simple. If we have several different kinds of agents composed of various types of threads, and these threads are interacting with each other in various ways, then we need to understand the major trends in these interactions in order to come up with optimal thread priorities for each agent. If we also have different types of hosts in our distributed system, we can’t make assumptions about the underlying system that the Java runtime is using (single- or multiple-CPU, time-slicing or not, other applications sharing CPU load, etc.), so we can’t foresee exactly how threads will be allocated CPU time and program it into our application.
In some cases the only way to make effective use of thread priorities
is to have some way of monitoring the performance of your distributed
system, and set thread priorities dynamically at runtime. Monitoring
system performance is an issue of its own, and we won’t go into
detail about it here. Setting thread priorities dynamically is
supported by the Java API within a single process on a single Java
virtual machine, but we have to come up with a way to set priorities
on threads across a distributed system. One way to do this is to
extend our DistThreadGroup
class in Example 4.4 to allow us to both get and set priorities on
threads in the entire distributed group. We could add
getPriority()
and
setPriority()
methods, for example, that take
the remote group name as an argument, as well as the name of a thread
in the remote thread group. This message could be passed on to the
remote group the same way we pass state-change messages. Another
approach would be to make a version of
DistThreadGroup
that is also an RMI remote
object. Then remote agents could get a stub for the
DistThreadGroup
and call the
getPriority()
and
setPriority()
methods directly.
Now that I’ve shown how distributed thread priority manipulation is possible in the Java environment, I have to admit that it’s probably not something that will be commonly used. The overhead that you’ll need to monitor and reason about the running state of the distributed system in order to calculate optimal thread priorities will usually outweigh the performance improvements that you’ll be able to get by trying to influence thread scheduling.
Synchronizing Distributed Threads
The ability to synchronize threads is a necessity in situations where
data is accessible and modifiable by multiple threads. This
synchronization is easily extendible to distributed situations, where
data is accessible to multiple agents on the network. The simplest
example is one where some runtime data in a Java process is
accessible by multiple agents in a distributed application. If we
wanted to allow clients to query our multithreaded solvers for their
current problem sets, we could easily synchronize this access by
making the Problem()
methods on the
Solver
s synchronized. Since this method is the
only means for accessing a problem set, doing a local synchronization
on it ensures that every accessor, whether it’s a local thread
or an external agent making a request over a socket, will have
synchronous access to the
data.
Get Java Distributed Computing now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.