So far in this chapter, we’ve demonstrated how to create and
synchronize threads at a low level, using Java language primitives. The
java.util.concurrent
package and subpackages introduced with Java 5.0 build on this
functionality, adding important threading utilities and codifying some
common design patterns by supplying standard implementations. Roughly in
order of generality, these areas include:
- Thread-aware Collections implementations
The
java.util.concurrent
package augments the Java Collections API with several implementations for specific threading models. These include timed wait and blocking implementations of theQueue
interface, as well as nonblocking, concurrent-access optimized implementations of theQueue
andMap
interfaces. The package also adds “copy on write”List
andSet
implementations for extremely efficient “almost always read” cases. These may sound complex, but actually cover some fairly simple cases very well. We’ll cover the Collections API in Chapter 11.- Executors
Executor
s run tasks, includingRunnable
s, and abstract the concept of thread creation and pooling from the user. Executors are intended to be a high-level replacement for the idiom of creating new threads to service a series of jobs. Along withExecutor
s, theCallable
andFuture
interfaces are introduced, which expand uponRunnable
to allow management, value return, and exception handling.- Low-level synchronization constructs
The
java.util.concurrent.locks
package holds a set of classes, includingLock
andCondition
, that parallels the Java language-level synchronization primitives and promotes them to the level of a concrete API. The locks package also adds the concept of nonexclusive reader/writer locks, allowing for greater concurrency in synchronized data access.- High-level synchronization constructs
This includes the classes
CyclicBarrier
,CountDownLatch
,Semaphore
, andExchanger
. These classes implement common synchronization patterns drawn from other languages and systems and can serve as the basis for new high-level tools.- Atomic operations (sounds very James Bond, doesn’t it?)
The
java.util.concurrent.atomic
package provides wrappers and utilities for atomic, “all-or-nothing” operations on primitive types and references. This includes simple combination atomic operations like testing a value before setting it and getting and incrementing a number in one operation.
With the possible exception of optimizations done by the Java VM for
the atomic
operations package, all of
these utilities are implemented in pure Java, on top of the standard Java
language synchronization constructs. This means that they are in a sense
only convenience utilities and don’t truly add new capabilities to the
language. Their main role is to offer standard patterns and idioms in Java
threading and make them safer and more efficient to use. A good example of
this is the Executor
utility, which
allows a user to manage a set of tasks in a predefined threading model
without having to delve into creating threads at all. Higher-level APIs
like this both simplify coding and allow for greater optimization of the
common cases.
We’ll look at each of these areas in the remainder of this chapter, with the exception of the Collections implementations. We’ll discuss those when we cover the Java Collections APIs in Chapter 11.
Before we dive in, we should give a shout-out to Doug Lea, the author of Concurrent Programming in Java (Addison-Wesley), who led the group that added these packages to Java and is largely responsible for creating them.
In this chapter, we’ve created a lot of Thread
s and hopefully shown how to use them
effectively. But in the grand scheme of things, threads are a fairly
low-level programming tool and, without care, can be error-prone. When
we recognize certain common patterns that developers reproduce over and
over again using threads, it’s natural to want to elevate a pattern to
the level of an API. One such related pair of patterns is the concept of
an executor service that manages tasks and that of
a thread pool that services tasks in an efficient
way.
Thread pools have been implemented and reimplemented by
vast numbers of developers in one way or another over the years and when
you add in features like scheduling different threading models, they can
get quite complex. To address these issues, the java.util.concurrent
package includes interfaces for many default implementations of the
executor pattern for common threading models. This includes
sophisticated scheduling as well as asynchronous collection of results
from the tasks, if they require it. In general, you can use an Executor
as a replacement for creating one-off
threads anywhere you need to execute Runnable
objects. The advantage is that
understanding and modifying the behavior of your code later is a lot
easier when you work at this level.
For the simple case of running a number of tasks and watching for
their completion, we can consider the base Executor
interface, which executes Runnable
objects for us. A convenient thing
about Executor
is that its companion
utility class Executors
is a factory
for creating different kinds of Executor
implementations. We’ll talk about the
various types it can produce in a bit, but for now let’s use the method
called newFixedThreadPool()
,
which, as its name suggests, returns an Executor
that is implemented using a thread
pool of a fixed size:
Executor
executor
=
Executors
.
newFixedThreadPool
(
3
)
;
// 3 threads
List
<
Runnable
>
runnables
=
...
;
for
(
Runnable
task
:
runnables
)
executor
.
execute
(
task
);
Here, we are submitting a number of Runnable
tasks to our Executor
, which executes them using a pool
with a maximum of three threads. If our list contains more than three
tasks, then some of them will have to wait until a thread is free to
service it. So, what happens when we submit the fourth item? The
Executor
interface doesn’t really
specify that. It’s up to the particular implementation to decide.
Without specifying more about its type, we don’t know if an Executor
is going to queue tasks, or if it
will use a pool to service them. Some Executor
implementations may block or even
execute the Runnable
right on the
execute()
call in the
caller’s thread. But in this case (and for all Executor
implementations created for us by the Executors
factory methods), tasks are
effectively put onto an unbounded queue. In the example, our loop
submits all of the tasks immediately and they are queued by the executor
until the three threads have serviced them.
With just a line or two of code in our example, we’ve been able to throttle the concurrency of our task list and avoid the details of constructing any threads ourselves. Later, if we decide we’d rather execute the tasks one at a time, the change is trivial (allocate just one thread!). Next, we’ll take a step up and look at manageable tasks that produce values and executors that can schedule tasks for us.
Because the Runnable
interface was created for Thread
s
to consume, its API doesn’t allow for direct feedback to the caller.
The new Callable
interface, which
is effectively a replacement for Runnable
, rectifies this situation by
providing a call()
method that both
returns a result and can throw exceptions. Callable
is a generic class that is
parameterized by the type it returns. The following examples create a
Callable
that returns an
integer:
class
MyCallable
implements
Callable
<
Integer
>
{
public
Integer
call
()
{
return
2
+
2
;
}
}
// or anonymously
Callable
<
Integer
>
callable
=
new
Callable
<
Integer
>()
{
public
Integer
call
()
{
return
2
+
2
;
}
};
There is also a convenience method for bridging Runnable
s to Callable
s in the Executors
class. It takes a Runnable
and a fixed value to return as a
value when it completes:
Callable
<
Integer
>
callable
=
Executors
.
callable
(
runnable
,
42
/*return value*/
);
The new Future
class is used
with Callable
and serves as a
handle to wait for and retrieve the result of the task or cancel the
task before it is executed. A Future
is returned by the submit()
methods of an ExecutorService
, which is essentially a
beefed-up Executor
. We’ll discuss
ExecutorService
s in the next
section.
Future
<
Integer
>
result
=
executorService
.
submit
(
callable
);
int
val
=
result
.
get
();
// blocks until ready
Future
is also a generic
interface, which is parameterized by its return type. This explains
the somewhat cute name. For example, a Future<Integer>
could be read as “a
future integer.” Future has both blocking and timed-wait get()
methods to retrieve the result when it
is ready, as well as an isDone()
test method
and a cancel()
method to
stop the task if it hasn’t started yet. If the task has been
cancelled, you get a CancellationException
if you attempt to retrieve the result.
Enough said about these interfaces. Next, we’ll look at the
ExecutorService
, which uses
them.
Our first Executor
was little more than a sinkhole for Runnable
s and, as we described, required
knowledge of the implementation to know how it would handle tasks. By
contrast, an ExecutorService
is
intended to be an asynchronous task handler. Instead of an execute()
method, it has submit()
methods that accept a Callable
(or Runnable
) and return immediately with a
Future
object that can be used to
manage the task and collect the result later. In addition to that, an
ExecutorService
has a lifecycle
defined by its shutdown()
method and
related methods that can be used to stop the service gracefully after
tasks are completed.
ExecutorService
extends
Executor
. In fact, all of the
implementations returned by the Executors
factory methods are actually
ExecutorService
s—including the one
we used in our first example. We’ll look at these factory methods to
see what kind of services are offered.
Executors
offers three types
of ExecutorService
implementations:
newFixedThreadPool(int)
This is the classic thread pool with a specified maximum pool size and an unbounded queue for task submission. If a thread dies for some reason while handling a task, a new one will be created to replace it. Threads are never removed from the pool until the service is shut down.
newCachedThreadPool()
This pool uses an open-ended number of threads that grows and shrinks with demand. The main advantage of this service is that threads are cached for a period of time and reused, eliminating the overhead of creating new threads for short-lived tasks. Threads that are not used for one minute are removed. Tasks are submitted directly to threads; there is no real queuing.
newSingleThreadExecutor()
This
ExecutorService
uses a single thread to execute tasks from an unbounded queue. In this sense, it is identical to a fixed thread pool with a pool size of1
.
Let’s look at a more realistic usage of an ExecutorService
, drawn from the Tiny
Httpd
example in Chapter 13. In that chapter, we create a mini-web
server to illustrate features of the networking APIs. Here, we won’t
show the networking details, but we’ll implement the main request
dispatching loop for the example using a thread pool executor service.
(Flip to Chapter 13 to see the
implementation of the Runnable
client-connection handler class. That class works equally well with
both examples.) Here we
go:
public
class
ExecutorHttpd
{
ExecutorService
executor
=
Executors
.
newFixedThreadPool
(
3
);
public
void
start
(
int
port
)
throws
IOException
{
final
ServerSocket
ss
=
new
ServerSocket
(
port
);
while
(
!
executor
.
isShutdown
()
)
executor
.
submit
(
new
TinyHttpdConnection
(
ss
.
accept
()
)
);
}
public
void
shutdown
()
throws
InterruptedException
{
executor
.
shutdown
();
executor
.
awaitTermination
(
30
,
TimeUnit
.
SECONDS
);
executor
.
shutdownNow
();
}
public
static
void
main
(
String
argv
[]
)
throws
Exception
{
new
ExecutorHttpd
().
start
(
Integer
.
parseInt
(
argv
[
0
])
);
}
}
The ExecutorHttpd
class holds
an instance of a fixed thread pool ExecutorService
with three threads to
service client connections. In the start()
method of our class, we create a
ServerSocket
that accepts incoming
network connections. We then enter a loop that runs as long as our
service is not flagged to shut down. Inside the loop, we create a new
connection handler (a Runnable
instance of TinyHttpdConnection
)
for each connection and submit it to the executor. The shutdown()
method of our class illustrates a
graceful termination. First, we call shutdown()
on the executor, which causes the
service to stop accepting new tasks and allow the currently running
ones to complete. Then we wait a reasonable period of time for all web
requests to finish (30 seconds), using the awaitTermination()
method before trying a
less graceful ending with shutdownNow()
. shutdownNow()
attempts to interrupt or
otherwise stop threads as quickly as possible. We leave things there,
but the method actually returns a List
of tasks that remain hung after the
attempt. Finally, we have a main()
method that exercises our example by creating an instance of ExecutorHttpd
on a port specified as an
argument to the program.
In addition to its individual task submit()
methods, ExecutorService
also offers a set of
collective invokeAll()
and
invokeAny()
executor
methods that submit multiple tasks as a group and return results
either when they are all complete or when the first one completes,
respectively. With this, we could reproduce our first example using a
List
of Callable
s like this:
List
<
Callable
<
Integer
>>
taskList
=
...;
ExecutorService
execService
=
Executors
.
newFixedThreadPool
(
3
);
List
<
Future
<
Integer
>>
resultList
=
execService
.
invokeAll
(
taskList
);
By contrast, the invokeAny()
method returns just the first successfully completed task’s result
(cancelling all the remaining unexecuted tasks):
int
result
=
execService
.
invokeAny
(
taskList
);
Both methods also offer timed wait versions that time out after a specified period of time.
For tasks that you’d like to run at a future time or on
a periodic basis, use the ScheduledExecutorService
. ScheduledExecutorService
is an ExecutorService
with additional “schedule”
methods that take a delay for a Runnable
or Callable
or a period specification for a
Runnable
. Two additional factory
methods of Executors
produce
scheduled executors:
Executors
.
newScheduledThreadPool
(
int
);
Executors
.
newSingleThreadScheduledExecutor
();
These are exactly like the similarly named methods for regular executor services, with the exception of returning a scheduled executor type.
To execute a task in the future, you specify a delay from the current time. For example:
ScheduledExecutorService
exec
=
Executors
.
newScheduledThreadPool
(
3
);
exec
.
schedule
(
runnable
,
60
,
TimeUnit
.
SECONDS
);
// run one minute in the
// future
// run at specified date and time
Calendar
futureDate
=
...;
// convertfrom calendar
Date
date
=
futureDate
.
getTime
();
// to Date
long
delay
=
date
.
getTime
()
-
System
.
currentTimeMillis
();
// to relative
// millis
exec
.
schedule
(
runnable
,
delay
,
TimeUnit
.
MILLISECONDS
);
// run at specified
// date
For periodic work, there are two kinds of recurring schedules—fixed delay and fixed rate. Fixed delay means that a fixed amount of time elapses between the end of the task’s execution and the beginning of the next execution. Fixed rate means that the task should begin execution at fixed time intervals, regardless of how long the task takes. The difference comes into play when the time to execute the task is long relative to the interval. The following snippet schedules a logfile cleanup to occur in 12 hours and every 12 hours thereafter:
Runnable
cleanup
=
new
Runnable
()
{
public
void
run
()
{
cleanUpLogFiles
();
}
};
long
period
=
12
*
60
*
60
,
delay
=
period
;
// seconds
Future
<?>
logService
=
executionService
.
scheduleAtFixedRate
(
cleanup
,
delay
,
period
,
TimeUnit
.
SECONDS
);
Because the task for periodic schedules is a Runnable
, the Future
object does not return a useful value
(it returns null
) so we don’t
specify a parameter type in its generic type instantiation. The
Future
is still useful for
cancelling the task at a later time if we wish:
logService
.
cancel
();
We should mention that the ScheduledExecutorService
bears a great deal
of similarity to the java.util.Timer
class that we’ll discuss in
Chapter 11, especially with regard to the
periodic schedules. A java.util.Timer
is always single-threaded,
however.
A CompletionService
is
a lightweight queue-like frontend to an executor. The CompletionService
provides submit()
methods, which delegate their tasks
to a particular instance of Executor
, and then provides take()
and poll()
methods for
retrieving Future
results for
completed tasks. Think of a CompletionService
as a babysitter for the
Future
s, allowing you to easily
gather up only completed results (as opposed to having to check each
Future
yourself to see which ones
have finished and in what order). ExecutorCompletionService
is a concrete
implementation of CompletionService
that takes an Executor
in its
constructor:
Executor
executor
=
Executors
.
newFixedThreadPool
(
3
);
CompletionService
<
Integer
>
completionService
=
new
ExecutorCompletionService
<
Integer
>(
executor
);
completionService
.
submit
(
callable
);
completionService
.
submit
(
runnable
,
resultValue
);
// poll for result
Future
<
Integer
>
result
=
completionService
.
poll
();
if
(
result
!=
null
)
// use value...
// block, waiting for result
Future
<
Integer
>
result
=
completionService
.
take
();
At various times in this chapter, we’ve referred to the
different executor services produced by the Executors
factory as different
implementations of ExecutorService
.
But these implementations are just different configurations of a
single, highly flexible implementation of ExecutorService
called ThreadPoolExecutorService
. You can use this
implementation directly if you want; it offers some additional
features. The primary constructor for ThreadPoolExecutorService
allows you to
specify both a “core” thread pool size and a maximum size, as well as
a thread timeout value for removing idle threads. The core size is a
minimum number of threads which, once created, are allowed to live
indefinitely. The constructor also allows you to provide the task
queue (an implementation of BlockingQueue
) on which new tasks are
placed. This last feature allows you to govern the queuing policy
yourself. You could specify a queue with a limited capacity:
ExecutorService
executorService
=
new
ThreadPoolExecutor
(
corePoolSize
,
maximumPoolSize
,
keepAliveTime
,
timeUnit
,
taskQueue
);
The ThreadPoolExecutor
implementation also has methods that allow you to change the core and
maximum pool size while the service is active or to “prestart” the
core threads before the service is used.
Actually, these last features bring up an interesting issue. If
we know that our executor service is an implementation of ThreadPoolExecutor
, we can cast it at
runtime to get access to these extra methods and do things like change
the pool size. This may not be what the designers of some services had
in mind; in fact, it could be downright dangerous in the wrong hands.
For this reason, Executors
offers a
number of “unconfigurable” wrapper methods that act something like the
“unmodifiable” collection methods we’ll see in the Java Collections
API. These methods wrap an executor service in a delegator object that
does not expose the implementation to the caller:
ExecutorService
tweakable
=
Executors
.
newFixedThreadPool
();
ExecutorService
safe
=
Executors
.
unconfigurableExecutorService
(
tweakable
);
An application server might, for example, wrap a service to protect itself from individual applications modifying (intentionally or accidentally) a global service used by many applications.
We said that the Executor
pattern is a general replacement
for using Thread
s to run simple
tasks. Although Executor
s shield us
from Thread
creation, there still
may be cases where we want some control over how the threads used in
our various thread pool implementations are constructed or set up. For
this reason and to standardize Thread
production in general, the
concurrency package adds an explicit, factory API for thread
creation.
The ThreadFactory
interface provides a newThread()
method.
One of these factories is used by all service implementations that
create threads. All of the factory methods of Executors
have an additional form that
accepts an explicit ThreadFactory
as an argument. You can get the default thread factory used by these
with the Executors.defaultThreadFactory()
method. You
could supply your own ThreadFactory
to perform custom setup, such as ThreadLocal
values or priorities.
So far we’ve seen how the Java concurrency utilities can
be used to manage simple parallel programming scenarios. We’ve seen
that we can submit many tasks to an ExecutorService
and collect result values if
needed through Futures
. We’ve seen
that we can schedule tasks to run at specified times and with
specified frequencies. We’ve seen that we can delve into the details
of the pooling and control the degree of parallelism (how many threads
are used) if we wish. Later in this chapter, we’ll look at APIs that
help us coordinate threads so that we can do more complex jobs that
require cooperation or explicit phases of operation in their data
handling. In this section, we’ll look at an API that helps you
coordinate tasks in another way—by helping you take “scaleable” tasks
and divide them up to match the processing power available at any
given time.
Let’s imagine that you have a task that performs a complex computation like rendering video or generating a complicated image. A natural place to start in parallizing it would be to divide the work for one frame or image into a fixed number of parts and feed them to an executor service. The executor service would be tuned to have as many threads as you wish to use (perhaps the same number as the number of CPUs or “cores” on your machine) and would assign each part to its own thread. If each task (each chunk of the image) requires about the same amount of work to complete and nothing else is competing for time on your computer, then this scenario is pretty optimal. We’d expect that each part of the image would be finished at about the same time and we’ll be able to stitch them all together effectively. But what if some parts of the image are dramatically harder to render than other parts? What if one chunk takes ten or a hundred or a thousand times as much CPU power as another? (Imagine how much faster it may be to render a empty part of an image, for example.) Then we may find ourselves in a situation where many of the threads sit idle, while a few threads churn away doing all of the hard work. What can we do to address this?
Well, one approach would be to simply make our tasks more finely grained. We could make our individual jobs so small that no single one could possibly monopolize a thread for long. However, when tasks can vary in degree of difficulty by many orders of magnitude, this could lead to creating a very large number of tiny tasks and would probably be very inefficient, with threads switching jobs and, even worse, moving data around to accommodate the somewhat random order in which they would service the work requests. What is needed is a way for each task to keep itself busy, but allow other tasks to help when they get overloaded. Ideally, we’d also like to minimize discontinuities in the workflow and for data-intensive tasks, to avoid giving threads jobs that require completely new data to be loaded at each turn.
The Fork/Join framework is a new API added in Java 7 that provides just this—a way for you to structure your tasks so that they can be split up as needed to keep all of the available threads busy working on data with as much continuity as possible. Specifically, the Fork/Join framework relies on tasks that can be split up recursively (i.e., divided in two or more parts, with those parts then subdivided if needed, and so on). When a task is deemed too large to be completed quickly, the task is simply split up and the (now smaller) pieces are placed into a queue for the current thread. The framework then implements what is known as a “work stealing” algorithm, allowing threads that are free to grab unstarted tasks from their neighbors’ queues. The combination of these techniques has some powerful advantages. First, it avoids unecessarily randomizing of the workload. Threads will tend to get to work on contiguous parts of their tasks as long as they stay busy. For data-intensive tasks, this may mean less loading of data across threads. However, when necessary, a thread can grab work from a neighbor. And therein comes the second advantage: by the nature of the recursive splitting of tasks, the largest/least-broken-up pieces of tasks will sit at the bottom of each thread’s queue, which is exactly where a neighbor thread will look to steal work if needed. This means that when work is snatched, it will be redistributed in the largest possible chunks, further stabilizing the workload per thread, reducing stealing operations and context switching. It’s a very clever algorithm that originated in the Cilk programming language.
To show off the Fork/Join framework, we will do some image rendering, which we’ll use as an excuse to draw some fractals! Fractals are amazing mathematical shapes that arise from relatively simple iterative processes. The one that we’ll be drawing is called the Mandelbrot set. Our Mandelbrot example code will do its drawing using the Fork/Join framework to break up the job of drawing the image to the available number of processors and keep them all busy (Figure 9-6). Before we start, a few caveats. First, we won’t give a very good explanation of the drawing part that actually calculates the fractal. In the interest of keeping the example focused on the framework, we have compacted that code down to just a few lines that are very cryptic. Please see the footnotes for a link to a proper explanation of what it is doing. Next, our example will blindly break up the image chunks until they reach a fixed minimum size. While work stealing will indeed happen between threads in this case, a better algorithm might make the determination about when to split the job based on the actual rendering performance and reduce the overhead of unecessary splitting. (We won’t have a large amount of data driving the rendering and so we’re mainly focused on keeping the threads busy rather than minimizing splitting.)
The Fork/Join framework API centers on a ForkJoinPool
and
various implementations of a kind of Future
called a ForkJoinTask
. The
Fork/Join framework can be used in many different ways depending on
how you wish to structure the tasks and make decisions about their
division (forking) and collecting results (joining); however, we are
only going to look at one common case. We will be using a kind of
ForkJoinTask
called RecursiveAction
, which is just a ForkJoinTask
that returns no value. We will
subclass RecursiveAction
with our
MandelbrotTask
and implement the
one required abstract method: compute()
. Within the
compute()
method, we will simply
make a decision as to whether to split up the task or do the work
immediately. Here is the code:
public
class
Mandelbrot
extends
JFrame
{
@Override
public
void
paint
(
Graphics
g
)
{
BufferedImage
image
=
new
BufferedImage
(
getWidth
(),
getHeight
(),
BufferedImage
.
TYPE_INT_RGB
);
ForkJoinPool
pool
=
new
ForkJoinPool
();
// defaults thread per processor
pool
.
invoke
(
new
MandelbrotTask
(
image
,
0
,
image
.
getWidth
()-
1
,
0
,
image
.
getHeight
()-
1
)
);
g
.
drawImage
(
image
,
0
,
0
,
null
);
}
public
static
void
main
(
String
[]
args
)
{
Mandelbrot
mandy
=
new
Mandelbrot
();
mandy
.
setSize
(
768
,
768
);
mandy
.
setVisible
(
true
);
}
}
class
MandelbrotTask
extends
RecursiveAction
{
private
static
double
size
=
3.0
,
offsetX
=
-
0.7
,
thresholdSq
=
100
;
private
static
int
maxIterations
=
30
;
private
BufferedImage
image
;
private
int
xStart
,
xEnd
,
yStart
,
yEnd
;
private
static
int
taskSplitThreshold
=
1024
;
MandelbrotTask
(
BufferedImage
image
,
int
xStart
,
int
xEnd
,
int
yStart
,
int
yEnd
)
{
this
.
image
=
image
;
this
.
xStart
=
xStart
;
this
.
xEnd
=
xEnd
;
this
.
yStart
=
yStart
;
this
.
yEnd
=
yEnd
;
}
public
void
render
()
{
for
(
int
x
=
xStart
;
x
<=
xEnd
;
x
++
)
{
for
(
int
y
=
yStart
;
y
<=
yEnd
;
y
++
)
{
double
r
=
x
*
size
/
image
.
getWidth
()
-
size
/
2
+
offsetX
;
double
i
=
y
*
size
/
image
.
getHeight
()
-
size
/
2
;
double
zr
=
0
,
zi
=
0
;
int
iter
;
for
(
iter
=
0
;
iter
<
maxIterations
;
iter
++
)
{
double
nzr
=
zr
*
zr
-
zi
*
zi
+
r
;
double
nzi
=
2
*
zr
*
zi
+
i
;
if
(
nzr
*
nzr
+
nzi
*
nzi
>
thresholdSq
)
{
break
;
}
zr
=
nzr
;
zi
=
nzi
;
}
image
.
setRGB
(
x
,
y
,
Color
.
HSBtoRGB
(
0.5f
*
iter
/
maxIterations
,
1.0f
,
1.0f
)
);
}
}
}
@Override
protected
void
compute
()
{
int
width
=
xEnd
-
xStart
,
height
=
yEnd
-
yStart
;
if
(
width
*
height
<
taskSplitThreshold
)
{
render
();
}
else
{
invokeAll
(
new
MandelbrotTask
(
image
,
xStart
,
xStart
+
width
/
2
,
yStart
,
yStart
+
height
/
2
),
new
MandelbrotTask
(
image
,
xStart
+
width
/
2
+
1
,
xEnd
,
yStart
,
yStart
+
height
/
2
),
new
MandelbrotTask
(
image
,
xStart
,
xStart
+
width
/
2
,
yStart
+
height
/
2
+
1
,
yEnd
),
new
MandelbrotTask
(
image
,
xStart
+
width
/
2
+
1
,
xEnd
,
yStart
+
height
/
2
+
1
,
yEnd
)
);
}
}
}
Try running the example and then dragging the window out to
different sizes. Watch how it redraws as the window is dragged out to
a large size. The fractal is generated by treating each point in the
image as a complex number (a two-dimensional number) and applying a
simple formula to it repeatedly: Z=Z2+C,
where Z is initially zero and C is related to the coordinate of the
point. Then we color-code the point based on how fast that value
grows. In some areas of the image, the values will grow quickly and
we’ll stop iterating on them; in other areas, we’ll go on until we
reach a number (maxIterations
) of
iterations. This means that some regions will take longer than others
to generate and some threads will therefore steal work from
others.
The main()
method of the
Mandelbrot
class creates the main
window, a JFrame
, for us. (We saw
some simple GUI programming in the introduction to the book and we’ll
return to it in Chapter 16 when
we talk about Swing
.) The main
thing that we need to know here is that the JFrame’s paint()
method is displaying a buffered
image and our various MandelbrotTask
s are competing to render
small rectangles of that image.
When the paint()
method is
invoked to redraw the screen, it creates a new ForkJoinPool
and constructs a single
MandelbrotTask
. The MandelbrotTask
encapsulates knowledge about
a region of the image to draw—initially the entire image—and contains
the method to render it. The initial MandelbrotTask
is passed to the ForkJoinPool
’s invoke()
method, which is a blocking form of
the submit
method that will wait
for the task to complete before returning. The paint()
method will then draw the fully
rendered image. As you can see, from the point of view of the paint()
method, it has prescribed one task
for the entire image and simply asked the ForkJoinPool
to invoke it. All of the
recursive division of labor is handled by the task in cooperation with
the framework.
Within the MandelbrotTask
’s
compute()
method, we check to see
how many pixels the task is being asked to render. If the number of
pixels exceeds a specified threshold, we split the region into four
quadrants and create a new MandelbrotTask
for each of them. The four
subtasks are then passed to the inherited invokeAll()
method,
which executes them and waits for all of them to complete before
moving on (it effectively performs a join operation on them). If the
number of pixels is under the threshold, the compute()
method
directly invokes the render()
method to
generate the fractal for that small portion of the image.
In our case, the division of tasks will proceed until the
threshold has been reached and each of the threads in the pool is busy
rendering regions of the screen. Then the tree of tasks will collapse
back up, with each subdivided MandelbrotTask
returning from its invokeAll()
method invocation until the
initial, top-level task is completed.
One last thing before we move on: an exercise for you if you are
really interested in this topic. If you would like to visualize which
threads are drawing which regions, you can do the following purely as
an experiment: within the render()
method, look up the name of the currently executing thread with the
Thread
getName()
method. While this name will not
be meaningful, it will be unique to a thread. Use a HashMap
to assign that name a unique number
and map it to that number each time you see it. Then use that number
to determine the color of the rendered pixel instead of the fractal
logic (or combine them to add a little tint or shade). This will allow
you to see which threads are rendering which patches of the screen. On
a fast system, this may not be very interesting, but if you stress the
rendering by dragging the image to a very large size you should see
some variations.
The java.util.concurrent.locks
package holds
classes that mimic and expand upon the built-in Java language
synchronization primitives, adding “read/write” locks among other
things. As we mentioned, these classes are utilities written in Java and
don’t strictly add anything new to the language semantics. However, they
do provide more flexible usage at the expense of some of the built-in
safety of Java language synchronization.
At the heart of the locks package are the Lock
and Condition
interfaces.
Lock
represents the same concept as a
Java lock (monitor) that is associated with each object and class for
use with synchronized methods and blocks. The Lock
class provides for exclusive access by
the owner of the lock by allowing only one party to hold the lock at a
time through the lock()
and unlock()
methods. In
Java language synchronization, this is accomplished implicitly with the
synchronized
keyword:
// synchronized method
synchronized
void
writeData
()
{
...
}
// synchronized block
synchronized
(
someObject
)
{
...
}
Upon entry to the synchronized method or block, Java acquires the
lock and automatically releases it upon exiting. Even if an exception is
thrown or the thread dies unexpectedly, Java automatically releases all
of the locks it acquired. Using the Lock
class instead requires us (or allows us,
depending on how you look at it) to explicitly lock when we want the
resource and remember to unlock it when we are through. The locking is
not tied to any particular scope, such as a single method or code block.
To reproduce the effect of the synchronized method in the example, we’d
use something like:
Lock
lock
=
new
ReentrantLock
();
// method or block
lock
.
lock
();
try
{
// body of method or block ...
}
finally
{
lock
.
unlock
()
}
The first caller to lock()
acquires the lock and proceeds. Subsequent calls by other threads block
until the lock is released. We perform the body of our locked operation
in a try/finally
block. This is
generally important in order to ensure that we always unlock before we
leave, but you are free to implement arbitrary protocols at your own
risk.
The lock implementation in this example is called ReentrantLock
. The name implies that this kind
of lock acts like Java locks do in that the lock is associated with the
caller’s thread. The owner of a lock may reacquire (“relock”) the lock
as many times as it wishes. For example, a recursive method that locks a
resource upon entry is fine.
In addition to the standard-looking lock()
method, the Lock
interface has tryLock()
methods that do not block or that
block for a specified period of time in order to acquire the lock. These
conditional and timed wait locking forms are something that ordinary
Java locks do not provide. The ReentrantLock
implementation also has a notion
of “fairness” that can be turned on or off when it is constructed. When
fair is on, the lock attempts to hand out the lock to callers in the
order in which they request it. Normal Java locks (and the default,
unfair policy of ReentrantLock
) do
not make this guarantee.
The ReadWriteLock
interface is a gateway to two different locks, one for reading and one
for writing. The idea behind read/write locks is that for most
resources it is OK for many “readers” to be viewing data, as long as
it is not changing. Conversely, a writer of the data generally
requires exclusive access to it. This is just what read/write locks
do. Any number of readers may acquire the read lock as long as no
write lock is in place. Only one party may hold the write lock, and no
readers may hold read locks while the write lock is out. A writer may
have to wait for readers to finish before acquiring the write lock,
and readers may have to wait for a writer to finish before they are
allowed to acquire read locks:
ReadWriteLock
rwLock
=
new
ReentrantReadWriteLock
(
fair
);
// reader thread 1
rwLock
.
readLock
().
lock
();
// reader thread 2
rwLock
.
readLock
().
lock
();
// writer thread
rwLock
.
writeLock
().
lock
();
// blocks on threads 1 & 2
In this code snippet, two readers hold read locks while a writer
blocks waiting on the write lock. When both readers have unlock()
ed their read locks, the writer
gains exclusive access to the lock and any subsequent readers block
until the writer is finished.
The owner of a write lock can acquire a read lock, too, but not vice versa. Acquiring a read lock and then releasing the write lock is called downgrading the lock. Trying to acquire a write lock while holding a read lock (upgrading) is not allowed and causes a deadlock.
To complete the picture of Lock
as a parallel for Java language
synchronization, we need an analog to the wait()
, notify()
, and notifyAll()
mechanism. The Condition
interface
represents this functionality with its await()
, signal()
, and
signalAll()
methods.
A Condition
is associated with a
Lock
by the lock’s newCondition()
method. Unlike a normal Java lock, a Lock
may have multiple Condition
objects that represent multiple
wait sets of threads.
The Condition await()
method
is used just like the wait()
method
of a Java object within a synchronized
block:
Lock
lock
=
...
Condition
condition
=
lock
.
newCondition
();
lock
.
lock
();
condition
.
await
();
// block, waiting for signal()
lock
.
unlock
();
// meanwhile, in another thread...
lock
.
lock
();
condition
.
signal
();
lock
.
unlock
();
Like wait()
, the Condition await()
method can be called only
when the thread is the owner of the lock associated with the condition
and the signal()
method may be
called only by another thread that has acquired the lock.
Interestingly, though, in this case, these restrictions are
implementation details of the java.util.concurrent
package; some other implementation of these classes could conceivably
change those restrictions in some way.
With the exception of the new reader/writer locks and some timed
wait lock methods, it may not seem that the Locks
package adds a great deal to Java.
However, if you delve into it deeper, you’ll find that it’s also a
toolkit for building new kinds of synchronization primitives and
higher-level constructs. The locks package opens up a concrete
implementation of Java’s synchronization mechanism for all to tinker
with and extend. A brief look at the implementation classes reveals
nifty methods like getOwner()
to tell
you which thread owns a lock or getReadLockCount()
to
tell you how many readers are working on your data. Lots of things are
possible with an open implementation like this, including specialized
synchronization packages that do things like automatically detect
deadlocks or tune themselves based on external information. There may
also be cases where using the explicit lock API provided by this
package performs better than language-level synchronization. But that
probably doesn’t justify the additional burden on developers except in
special cases. Next, we’ll move up a bit and look at some higher-level
synchronization tools.
The java.util.concurrent
package adds several
higher-level synchronization utilities borrowed from other languages,
including CountDownLatch
, Semaphore
, CyclicBarrier
, and Exchanger
.
The CountDownLatch
is
a very simple synchronization utility that allows any number of
threads to block, waiting for a countdown value to reach 0
before being “released” to continue their
activities. The CountDownLatch
is
initialized with the count when constructed. Thereafter, threads may
block using the await()
method or
block for a limited period of time using the timed wait version of
await()
. Any running thread may
decrement the counter at any time, whether threads are blocked or not.
Once the counter reaches 0
, all
waiting threads unblock and continue. Thereafter, any calls to
await()
do not block and the
await()
method returns false
, indicating that the count has passed.
The count cannot be reset.
CountDownLatch
latch
=
new
CountDownLatch
(
2
);
// count from 2
// thread 1
latch
.
await
();
// blocks thread 1
// thread 2
latch
.
countDown
();
// count is 1
latch
.
countDown
();
// count is 0, thread 1 proceeds
Countdown latches are used in a wide variety of synchronization
schemes to coordinate a number of threads on one result or cause a
thread to wait for a number of other threads to produce results. Later
we’ll talk about a related utility, CyclicBarrier
, that explicitly waits for a
number of threads to synchronize in order to coordinate an
action.
Semaphores are a very old synchronization construct that
has been used in many other languages. Conceptually, a semaphore is a
pool of permits—intangible permission slips to
perform some activity. The semaphore is initialized with a specified
number of permits. Callers can then use the acquire()
and
release()
methods to
take and return these permits. Calling acquire()
when no permits are available
causes the caller to block until one is released. In this way, for
example, a semaphore could be used to limit access to some resource to
a specified number of threads:
int
concurrentReaders
=
5
;
boolean
fair
=
true
;
Semaphore
sem
=
new
Semaphore
(
concurrentReaders
,
fair
);
Data
readData
()
throws
InterruptedException
{
sem
.
acquire
();
// read data ...
sem
.
release
();
return
data
;
}
In this code snippet, readData()
effectively limits itself to five
concurrent reading threads at any given time. Additional threads are
blocked in the acquire()
method
until a permit is free. In this sense, a semaphore is vaguely like a
lock with multiple owners. This is where the similarity ends,
however.
In actuality, a semaphore differs from a lock in several ways.
First, the “pool of permits” is really just a number. No actual value
is returned by acquire()
and no
association is made with the acquirer of the lock. This means that any
actual locking behavior is strictly cooperative (by convention in the
application). It also means that “permits” can be acquired and
released by different callers without respect to who actually
“acquired” them. It’s really just incrementing or decrementing the
number. Also, because there is no real association with an “owner,”
semaphores are not reentrant in the way that real locks are. That is,
if a thread calls acquire()
multiple times, it simply decrements the counter multiple times. This
behavior could be useful in some cases to count levels of recursion
for security APIs, for example, but is not like a lock, in which one
caller “owns” multiple permits. Finally, because the permits pool is
really just a number, calling acquire()
and release()
out of sync can increase the
permit pool beyond its starting point or decrement it below zero. It
can even be initialized with a negative number if you wish to require
releases before anyone acquires a permit.
In addition to acquire()
,
Semaphore
has a tryAcquire()
method that parallels the
tryLock()
method of Lock
. It returns immediately, acquiring a
permit if one was available and returning false
otherwise. Another form of tryAcquire()
accepts a timed wait period.
Semaphores also have a notion of “fairness” in the ordering of acquire
requests. By default, requests are not guaranteed to be ordered, but
if the “fair” flag is set when the Semaphore
is constructed, acquire()
doles out permits in
first-in-first-out (FIFO) order. The tradeoff is that ordering may
impact performance a bit, depending on the implementation.
The CyclicBarrier
class is a synchronization point where a specified number of related
threads meet after finishing their activities. When all of the threads
have arrived, an optional, shared barrier action can be executed and
then all of the threads are “released” to continue. The class is
termed cyclic because it can then be used again
in the case where the threads repeat their activities in this manner.
CyclicBarrier
is an alternative to
using the join()
method, which
collects threads only after they have completed and returned from
their run()
method.
The following example, SiteTimer
, accepts a number of URLs on the
command line and times how long it takes to connect to each one,
printing the results in sorted order. It performs the connections in
parallel using a dedicated thread per site and uses a CyclicBarrier
for the threads to rendezvous
after each timing cycle. Then it prints the coordinated results before
they begin again. This example also illustrates a number of Java
features, including generics, collections, formatted printing,
autoboxing, and an inner class. Although we haven’t yet discussed
collections or the network portion of the example, the usage is fairly
simple, and you can return to the example after reading the relevant
chapters later in this book.
import
java.util.*
;
import
java.util.concurrent.*
;
import
java.net.*
;
import
java.io.IOException
;
public
class
SiteTimer
{
CyclicBarrier
barrier
;
List
<
Result
>
results
=
new
ArrayList
<
Result
>();
private
class
Result
implements
Comparable
<
Result
>
{
Long
time
;
String
site
;
Result
(
Long
time
,
String
site
)
{
this
.
time
=
time
;
this
.
site
=
site
;
}
public
int
compareTo
(
Result
r
)
{
return
time
.
compareTo
(
r
.
time
);
}
}
static
long
timeConnect
(
String
site
)
{
long
start
=
System
.
currentTimeMillis
();
try
{
new
URL
(
site
).
openConnection
().
connect
();
}
catch
(
IOException
e
)
{
return
-
1
;
}
return
System
.
currentTimeMillis
()
-
start
;
}
void
showResults
()
{
Collections
.
sort
(
results
);
for
(
Result
result
:
results
)
System
.
out
.
printf
(
"%-30.30s : %d\n"
,
result
.
site
,
result
.
time
);
System
.
out
.
println
(
"------------------"
);
}
public
void
start
(
String
[]
args
)
{
Runnable
showResultsAction
=
new
Runnable
()
{
public
void
run
()
{
showResults
();
results
.
clear
();
}
};
barrier
=
new
CyclicBarrier
(
args
.
length
,
showResultsAction
);
for
(
final
String
site
:
args
)
new
Thread
()
{
public
void
run
()
{
while
(
true
)
{
long
time
=
timeConnect
(
site
);
results
.
add
(
new
Result
(
time
,
site
)
);
try
{
barrier
.
await
();
}
catch
(
BrokenBarrierException
e
)
{
return
;
}
catch
(
InterruptedException
e
)
{
return
;
}
}
}
}.
start
();
}
public
static
void
main
(
String
[]
args
)
throws
IOException
{
new
SiteTimer
().
start
(
args
);
}
}
The start()
method constructs
the barrier, specifying the number of threads that must be present
before the group has fully arrived and the action to perform when all
of the threads are ready. For each site, a thread is created that
loops, timing the connection to the site and adding a result object to
the list before blocking on the barrier’s await()
method. When
all of the threads reach the await()
method, the barrier action fires,
printing the results. All of the threads are then released to begin
the next cycle.
If any of the waiting threads is interrupted or times out (using
the timed wait version of the await()
method) the barrier is said to be
“broken” and all of the waiting threads receive a BrokenBarrierException
. In theory, the
barrier can be “fixed” by calling its reset()
method, but
this is complicated because only one thread from the group can reset
the barrier properly. A reset()
while any other thread is waiting causes the barrier to be broken and
the waiting threads to receive the exception again, so it is probably
best to start over at this point.
One more detail: the await()
method
returns an integer that indicates the order in which the threads
arrived at the barrier. This can be used to divide up work for the
next iteration of the threads. For example, if the threads’ jobs are
not identical, you could use the number to “elect” a leader thread or
divide the threads into two or more groups.
No Star Trek jokes here. Java 7
introduced a new concurrency utility called Phaser
. Phaser
is very similar to the CyclicBarrier
except that it provides a bit
more flexibility. Phaser
draws its
name in part from the fact that it assigns a number to each cycle of
its threads (a phase number). Participating threads and bystanders can
read this number to monitor activity in the barrier. In CyclicBarrier
, the number of threads that
are tracked by the barrier is fixed; new threads cannot join the party
during its lifecycle. This differs from Phaser
, where the number of participants can
change over the life of the activity.
The Exchanger
is a
synchronization point for a pair of threads to exchange data items. An
item of the same type is passed in each direction using the exchange()
method.
The first method to arrive at the Exchanger
blocks, waiting for its mate. When
the second method arrives, they each receive the other’s argument to
the exchange()
method. Any number
of actual threads may be using the Exchanger
; they are simply paired in some
order when they arrive. Exchanger
is a generic class that is parameterized by the type of object to be
passed:
Exchanger
<
ByteBuffer
>
xchange
=
new
Exchanger
<
ByteBuffer
>();
// thread 1
Buffer
nextBuf
=
xchange
.
exchange
(
buffer1
);
// blocks
// thread 2
Buffer
nextBuf
=
xchange
.
exchange
(
buffer2
);
// buffers exchanged, both threads continue...
The Exchanger
pattern is
primarily useful for reusing data objects or buffers between threads,
as indicated in this code snippet. Say that you have a reader thread
filling buffers with data and a writer thread writing the contents of
the buffers somewhere. Using an Exchanger
, the reader and writer can trade a
pair of buffers back and forth without creating new ones. This may
seem a bit arcane, but it has applications when using the NIO advanced
I/O package, which we discuss in Chapters 12 and
13.
We should note that the Exchanger
is similar to the SynchronousQueue
, which we’ll discuss in
Chapter 11 when we cover the Collections
API. The Exchanger
, however, passes
data in both directions, whereas SynchronousQueue
simply passes elements in
one direction.
The java.util.concurrent.atomic
package holds an
interesting set of wrapper classes for atomic, “all-or-nothing”
operations on certain primitive types and reference values. An atomic
operation is a kind of transaction where some sequence of events either
completes or fails as a unit and there is no potential for any
intermediate state to be seen. In this case, the transactions we’re
talking about are very simple operations that either set or get a value,
possibly in combination with a simple test or mathematical operation.
There are atomic wrapper classes for the following types: Booleans,
integers, and long values as well as arrays of integers and longs and
object references:
AtomicBoolean
.
java
AtomicInteger
.
java
AtomicIntegerArray
.
java
AtomicLong
.
java
AtomicLongArray
.
java
AtomicReference
.
java
AtomicReferenceArray
.
java
The AtomicBoolean
class
(which, by the way, has to compete with java.awt.Robot
for coolest class name in Java)
serves as a good example. At first glance, it seems like an oxymoron.
After all, normal operations on Booleans in Java are atomic already.
There is supposed to be no possible “in between” state for a Boolean to
be misread by any fiesty multithreaded code (as there theoretically
could be for long and double values). Instead, the usefulness of the
AtomicBoolean
wrapper is in its
combination operations: compare
AndSet()
and getAndSet()
:
AtomicBoolean
bool
=
new
AtomicBoolean
(
true
);
bool
.
compareAndSet
(
expectedValue
,
newValue
);
The compareAndSet()
method
first performs a comparison to an expected value (true
or false
in the case of a Boolean) and only if
the value matches does it assign the new value. The interesting thing is
that both of these operations happen “atomically,” together. This means
that there is no possibility of someone changing the value between the
time that we checked it and assigned the new value. That may sound like
a slim chance anyway, but it’s very important for guaranteeing the
semantics of flags. For example, suppose we have a master “shutdown”
switch in our application and the thread that sets it wants to perform
cleanup on the way out. Using compareAndSet()
to test first, we can
guarantee that only one thread can possibly set the flag and perform the
procedure.
The getAndSet()
method
simply assigns the new value and returns the old value in the same, safe
way. It’s a little harder to see how this applies to a Boolean, so let’s
move on to AtomicInteger
and
AtomicLong
. These
numeric types have additional arithmetic combination operations:
int
getAndIncrement
()
int
getAndDecrement
()
int
getAndAdd
(
int
delta
)
int
incrementAndGet
()
int
decrementAndGet
()
int
addAndGet
(
int
delta
)
getAndIncrement()
increments the value and then returns the previous value. incrementAndGet()
does
the converse, returning the new value. These operations are very useful
for generating unique serial numbers. For example:
AtomicInteger
serialNum
=
new
AtomicInteger
(
0
);
public
int
nextSerialNumber
()
{
return
serialNum
.
getAndIncrement
();
}
We could have accomplished the same thing by synchronizing the method, but this is simpler and may be much faster.
Object-type references can also be wrapped for atomic operations,
including
and compare
AndSet()
getAndSet()
. The AtomicReference
class is generic and
parameterized by the type of reference it wraps:
AtomicReference
<
Node
>
ref
=
new
AtomicReference
<
Node
>(
node
);
ref
.
compareAndSet
(
null
,
newNode
);
The compareAndSet()
method has a strange twin named weakCompareAndSet()
,
which has the dubious distinction that it simply may not work when
called. It is, however, nice enough to tell you when it doesn’t work
by returning false
. What’s the
point of this? Well, by allowing this fuzziness, Java may be able to
make the implementation of the weak method much faster than the
“certain” one. You can loop and retry the weak method instead and it
may improve performance on some architectures. This is all because the
Java VM may be able to map these kinds of atomic operations all the
way down to the hardware level for performance, but restrictions may
apply that make it difficult to guarantee.
The atomic
package
also supplies a set of “field update” utilities for each of the types
that it can wrap. These utilities use reflection (see Chapter 7) to perform the kinds of atomic
operations we described previously on “naked” primitive types that are
not already wrapped in their atomic wrapper classes. The field
updaters work on variables in an object by name and type. The catch is
that atomicity is guaranteed only with respect to other callers that
use the field updaters or the regular atomic wrapper classes. No
guarantees are made with respect to other threads that address the
variables in arbitrary ways.
Get Learning Java, 4th Edition now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.