We’ll now conclude the discussion of the NIO package we began in Chapter 12 by talking about nonblocking and selectable network communications. All our server examples in this chapter thus far have used a thread-bound pattern (one thread per I/O operation). In Java, this is very natural because of the ease with which we can create threads. It’s also very efficient, within limits. Problems arise when you try to build very large-scale servers using this style of client handling. While on a large machine it’s certainly possible to have hundreds or even thousands of threads (especially if they’re mostly idle, waiting for I/O), this is a resource-hungry solution. Every thread you start in Java consumes memory for its internal stack, and the performance of managing this number of threads is highly system-dependent.
An alternate approach is to take a lesson from the old, dark days before threading was available and use nonblocking I/O operations to manage numerous communications from a single thread. Better yet, our server uses a configurable pool of threads, taking advantage of machines with many processors.
At the heart of this process is the concept of selectable I/O. It’s not good enough to simply have nonblocking I/O operations if you have no way to efficiently poll for work to be done. The NIO package provides for efficient polling using selectable channels. A selectable channel allows for the registration of a special kind of listener called a selector that can check the readiness of the channel for operations, such as reading and writing or accepting or creating network connections.
The selector and the selection process are not typical Java listeners of the kind we’ll see elsewhere in this book, but instead rather slavishly follow the conventions of C language systems. This is mainly for performance reasons; because this API is primarily intended for high-volume servers, it is bound very tightly to the traditional, underlying operating system facilities with less regard for ease of use. This, combined with the other details of using the NIO package, mean that this section is somewhat dense and the server we create here is one of the longer and more complex examples in the book. Don’t be discouraged if you are a bit put off by this section. You can use the general techniques earlier in this chapter for most applications and reserve this knowledge for creating services that handle the very highest volumes of simultaneous client requests.
A selectable channel implements the SelectableChannel
interface, which specifies that the channel can be set to a nonblocking
mode and that it supports the select API that makes efficient polling
possible. The primary implementations of selectable channels are those
for working with the network: SocketChannel
,
ServerSocketChannel
,
and DatagramChannel
. The
only other selectable channel is the Pipe
(which can be used
in an analogous way for intra-VM communication).
At the heart of the process is the Selector
object, which knows about a
particular set of selectable channels and provides a select()
method for
determining their readiness for I/O operations. Conceptually, the
process is simple; you register one or more channels with a selector and
then poll it, asking it to tell you which set of channels is ready to
go. In actuality, there are a few additional pieces involved.
First, the Selector
does not
work directly with channels but instead operates on SelectionKey
objects. A
SelectionKey
object is created
implicitly when the channel is registered with the Selector
. It encapsulates the selectable
channel as well as information about what types of operations (e.g.,
read, write) we are interested in waiting for. That information is held
in the SelectionKey
in a set of flags
called the interest set, which can be changed by
the application at any time. SelectionKey
s are also used to return the
results of a select
operation. Each
call to select()
returns the number
of SelectionKey
s that are ready for
some type of I/O. The keys are then retrieved with the selectedKeys()
method.
Each key also has a set of flags called the ready
set that indicates which operation of interest is actually
ready (possibly more than one). For example, a SelectionKey
interest set might indicate that
we want to know when its channel is ready for reading or writing. After
a select operation, if that key is in the set returned by the selector,
we know that it is ready for one or more of those operations, and we can
check the key’s ready set to find out which one.
Before we go on, we should say that although we have been saying that channels are registered with selectors, the API is (confusingly) the other way around. Selectors are actually registered with the one or more channels they manage, but it’s better to mentally spackle over this and think of them the other way around.
A Selector
object is
created using the Selector.open()
method
(Selector
uses a factory
pattern):
Selector
selector
=
Selector
.
open
();
To register one or more channels with the selector, set them to nonblocking mode:
SelectableChannel
channelA
=
// ...
channelA
.
configureBlocking
(
false
);
Next, register the channels:
int
interestOps
=
SelectionKey
.
OP_READ
|
SelectionKey
.
OP_WRITE
;
SelectionKey
key
=
channelA
.
register
(
selector
,
interestOps
);
When we register the channel, we have an opportunity to set the
initial interest operations (or “interest ops”). These are defined by
constant fields in the SelectionKey
class:
These fields are bit flags; you can logically OR them together as in this example to express interest in more than one type of operation.
The result of the register()
method is a
SelectionKey
object. We can use the
key to change the interest ops at any time with the SelectionKey interestOps()
method or to
unregister the channel from the Selector
with the key’s cancel()
method.
This same key is also returned as the result of selection
operations when its channel is ready. When the SelectionKey
is returned, its ready set holds
flags for the operations that do not block if called. We can retrieve
the value of the flags with the readySet()
method.
Convenience methods are available to test for each operation in the
ready set: isReadable()
,
isWritable()
,
isConnectable()
, and
isAcceptable()
.
Depending on how you structure your application, it may not be
necessary to save the SelectionKey
at
registration time. In our example, we let the Selector
keep track of
the keys for us, simply using them when they are ready. In fact, we go
even further and put the SelectionKey
to work by asking it to hold a reference for us! The SelectionKey attach()
method is a convenience
method that can attach an arbitrary object to the key for use by our
application. We show how this can be useful in a bit.
After one or more channels are registered with the Selector
, we can perform a select
operation using one of its select()
methods:
int
readyCount
=
selector
.
select
();
Without arguments, the method blocks until at least one channel is
ready for some operation or until the Selector
’s wakeup()
method is
called. Alternatively, you can use the form of select()
that takes a timeout (in
milliseconds) to wait for a ready channel before returning. There is
also selectNow()
, which
always returns immediately. Both of these return methods count the
number of ready channels.
You can use select()
and
wakeup()
somewhat like wait()
and notify()
. The wakeup is necessary because once
a selection is started, it will not see any changes to its key’s
interest ops until the next invocation. If another thread changes the
interest ops, it must use wakeup()
to
prompt the selecting thread to select()
again. The Selector
is also heavily synchronized; for
example, calls to register new channels block until the select is
finished. Often it’s much easier to simply use select
with a short timeout and a loop, like
this:
while
(
selector
.
select
(
50
)
==
0
);
However, if another thread is allowed to change the interest ops,
you still need to use wakeup()
to
maximize throughput. Otherwise, in the worst case, you could end up
waiting the full select
wait period
on every iteration, even when there is work to be done.
Next, we can get the set of ready channels from the Selector
with the selectedKeys()
method
and iterate through them, doing whatever our application
dictates:
Set
readySet
=
selector
.
selectedKeys
();
for
(
Iterator
it
=
readySet
.
iterator
();
it
.
hasNext
();
)
{
SelectionKey
key
=
(
SelectionKey
)
it
.
next
();
it
.
remove
();
// remove the key from the ready set
// use the key
}
The ready set is returned to us as a java.util.Set
, which we walk through with an
Iterator
(see Chapter 1). One important thing to note is that
we’ve used the Iterator
’s remove()
method to remove the key from the
ready set. The select()
methods add
keys only to the ready set or add flags to keys already in the set; they
never remove them, so we must clear the keys when we handle them. You
can get the full set of keys a Selector
is managing with the keys()
method, but you should not attempt to
remove keys from that set; use the cancel()
method on individual keys instead. Or
you can close the entire Selector
with its close()
method,
unregistering all its keys.
Let’s put this information to use. In this section, we’ll
create the big brother of TinyHttpd
(our minimalist web server)
called LargerHttpd
. The LargerHttpd
server is a nonblocking web server
that uses SocketChannel
s and a
pool of threads to service requests. In this example, a single thread
executes a main loop that accepts new connections and checks the
readiness of existing client connections for reading or writing.
Whenever a client needs attention, it places the job in a queue where a
thread from our thread pool waits to service it. As we said, this
example is a bit longer than we would like, but it is really the minimum
that is necessary to show a realistic usage of the APIs:
import
java.io.*
;
import
java.util.*
;
import
java.util.concurrent.*
;
import
java.net.*
;
import
java.nio.*
;
import
java.nio.channels.*
;
import
java.nio.charset.*
;
import
java.util.regex.*
;
public
class
LargerHttpd
{
Selector
clientSelector
;
public
void
run
(
int
port
,
int
threads
)
throws
IOException
{
clientSelector
=
Selector
.
open
();
ServerSocketChannel
ssc
=
ServerSocketChannel
.
open
();
ssc
.
configureBlocking
(
false
);
InetSocketAddress
sa
=
new
InetSocketAddress
(
InetAddress
.
getLoopbackAddress
(),
port
);
ssc
.
socket
().
bind
(
sa
);
ssc
.
register
(
clientSelector
,
SelectionKey
.
OP_ACCEPT
);
Executor
executor
=
Executors
.
newFixedThreadPool
(
threads
);
while
(
true
)
{
try
{
while
(
clientSelector
.
select
(
100
)
==
0
);
Set
<
SelectionKey
>
readySet
=
clientSelector
.
selectedKeys
();
for
(
Iterator
<
SelectionKey
>
it
=
readySet
.
iterator
();
it
.
hasNext
();)
{
final
SelectionKey
key
=
it
.
next
();
it
.
remove
();
if
(
key
.
isAcceptable
()
)
{
acceptClient
(
ssc
);
}
else
{
key
.
interestOps
(
0
);
executor
.
execute
(
new
Runnable
()
{
public
void
run
()
{
try
{
handleClient
(
key
);
}
catch
(
IOException
e
)
{
System
.
out
.
println
(
e
);
}
}
}
);
}
}
}
catch
(
IOException
e
)
{
System
.
out
.
println
(
e
);
}
}
}
void
acceptClient
(
ServerSocketChannel
ssc
)
throws
IOException
{
SocketChannel
clientSocket
=
ssc
.
accept
();
clientSocket
.
configureBlocking
(
false
);
SelectionKey
key
=
clientSocket
.
register
(
clientSelector
,
SelectionKey
.
OP_READ
);
HttpdConnection
client
=
new
HttpdConnection
(
clientSocket
);
key
.
attach
(
client
);
}
void
handleClient
(
SelectionKey
key
)
throws
IOException
{
HttpdConnection
client
=
(
HttpdConnection
)
key
.
attachment
();
if
(
key
.
isReadable
()
)
{
client
.
read
(
key
);
}
else
{
client
.
write
(
key
);
}
clientSelector
.
wakeup
();
}
public
static
void
main
(
String
argv
[]
)
throws
IOException
{
//new LargerHttpd().run( Integer.parseInt(argv[0]), 3/*threads*/ );
new
LargerHttpd
().
run
(
1235
,
3
/*threads*/
);
}
}
class
HttpdConnection
{
static
Charset
charset
=
Charset
.
forName
(
"8859_1"
);
static
Pattern
httpGetPattern
=
Pattern
.
compile
(
"(?s)GET /?(\\S*).*"
);
SocketChannel
clientSocket
;
ByteBuffer
buff
=
ByteBuffer
.
allocateDirect
(
64
*
1024
);
String
request
;
String
response
;
FileChannel
file
;
int
filePosition
;
HttpdConnection
(
SocketChannel
clientSocket
)
{
this
.
clientSocket
=
clientSocket
;
}
void
read
(
SelectionKey
key
)
throws
IOException
{
if
(
request
==
null
&&
(
clientSocket
.
read
(
buff
)
==
-
1
||
buff
.
get
(
buff
.
position
()-
1
)
==
'\n'
)
)
processRequest
(
key
);
else
key
.
interestOps
(
SelectionKey
.
OP_READ
);
}
void
processRequest
(
SelectionKey
key
)
{
buff
.
flip
();
request
=
charset
.
decode
(
buff
).
toString
();
Matcher
get
=
httpGetPattern
.
matcher
(
request
);
if
(
get
.
matches
()
)
{
request
=
get
.
group
(
1
);
if
(
request
.
endsWith
(
"/"
)
||
request
.
equals
(
""
)
)
request
=
request
+
"index.html"
;
System
.
out
.
println
(
"Request: "
+
request
);
try
{
file
=
new
FileInputStream
(
request
).
getChannel
();
}
catch
(
FileNotFoundException
e
)
{
response
=
"404 Object Not Found"
;
}
}
else
response
=
"400 Bad Request"
;
if
(
response
!=
null
)
{
buff
.
clear
();
charset
.
newEncoder
().
encode
(
CharBuffer
.
wrap
(
response
),
buff
,
true
);
buff
.
flip
();
}
key
.
interestOps
(
SelectionKey
.
OP_WRITE
);
}
void
write
(
SelectionKey
key
)
throws
IOException
{
if
(
response
!=
null
)
{
clientSocket
.
write
(
buff
);
if
(
buff
.
remaining
()
==
0
)
response
=
null
;
}
else
if
(
file
!=
null
)
{
int
remaining
=
(
int
)
file
.
size
()-
filePosition
;
long
sent
=
file
.
transferTo
(
filePosition
,
remaining
,
clientSocket
);
if
(
sent
>=
remaining
||
remaining
<=
0
)
{
file
.
close
();
file
=
null
;
}
else
filePosition
+=
sent
;
}
if
(
response
==
null
&&
file
==
null
)
{
clientSocket
.
close
();
key
.
cancel
();
}
else
key
.
interestOps
(
SelectionKey
.
OP_WRITE
);
}
}
From a bird’s-eye view, the structure of LargerHttpd
is the same as TinyHttpd
. The main class, LargerHttpd
, accepts connections, and a
connection class, HttpdConnection
,
encapsulates a socket and handles the conversation with the client.
However, this time, instead of each connection object being a Runnable
serviced in its own thread, its
functionality is broken into two primary methods called read()
and write()
. The job of our LargerHttpd
is to accept new client socket
connections, wrap them in an instance of HttpdConnection
, and then watch the client’s
status with a Selector
. Whenever we
detect that a client is ready to send or receive data, we hand off a
Runnable
task to our Executor
. The task calls read()
or write()
on the corresponding client, based on
the operation that is is ready.
The HttpConnection
object
encapsulates the state of the conversation with the client. Because its
interface is rather coarse, it must keep track of whether it is waiting
to read more input, generate a response, or write file output. The
HttpdConnection
also manages the
interest set of its key so that it can effectively schedule itself to be
woken up when it’s ready for reading or writing. The association between
the HttpdConnection
and the key is
made by using the key’s attach()
and
attachment()
methods.
LargerHttpd
’s acceptClient()
method does several things.
First, it accepts the new socket connection. Next, it configures and
registers it with the selector with an initial interest set for reading.
Finally, it creates the HttpdConnection
for the socket, and attaches
the HttpdConnection
object to the key
for later retrieval.
The main loop of LargerHttpd
is
fairly straightforward. First, we set up the ServerSocketChannel
.
This is similar to setting up a plain ServerSocket
, except that we must first create
an InetSocketAddress
object to hold the local loopback address and port combination of our
server socket and then explicitly bind our socket to that address with
the ServerSocketChannel bind()
method. We also configure the server socket to nonblocking mode and
register it with our main Selector
so
that we can select for client connections in the same loop that we use
to select for client read and write readiness.
In the main select
loop, we
check to see whether the key is ready for an accept
operation and if so, we call acceptClient()
; if not, we set the key’s
interest set to zero with the interestOps()
method and dispatch the key to
our handleClient()
method via a
Runnable
task. It’s important that we
change the interest set to zero to clear it before the next loop;
otherwise, we’d be in a race to see whether the thread pool performed
its maximum work before we detected another ready condition. Setting the
interest ops to 0 and resetting it in the HttpdConnection
object upon completion ensures
that only one thread is handling a given client at a time.
For each operation that is ready, we dispatch a task to our
Executor
. The task calls handleClient()
, passing it the selection key.
From the key, we retrieve the associated HttpdConnection
object and call the
appropriate service method based on whether the key is ready for reading
or writing. After that, it’s up to the connection object to do its job.
Each call to the read()
method simply
does what would be one iteration of a read loop in a thread-bound
application. Each read gets as much data as available and checks to see
whether we’ve reached the end of a line (a \n
newline character). Upon reaching the end
of a line, we dispatch the call to the processRequest()
method, which turns the byte
buffer into text and uses the same techniques as our TinyHttpd
to parse the request into a file
pathname. On each incomplete call to read()
, we set the interest ops of our key
back to OP_READ
. Upon completing the
read and processing the request, we switch to using OP_WRITE
because we are now ready to send a
response.
The write()
method keeps track
of whether it’s sending a text response (error message) or a file by
using the response
and file
instance variables. When sending a file,
we use the FileChannel
’s transferTo()
method to
transfer bytes from the file directly to the network socket without
copying them into Java’s memory space. (This is indeed an efficient
little web server.) And that’s about it. When we’re done, we close the
client socket and cancel our key, which causes it to be removed from the
Selector
’s key set during the next
select
operation (discarding our
HttpdConnection
object with
it).
Our example showed SocketChannel
used for
nonblocking, selectable I/O in a typical server application. It’s less
common to need nonblocking I/O from a client, but there is certainly no
reason you can’t do it. Perhaps you’re writing a peer-to-peer (P2P)
application that manages many connections from both sides.
For the client side of communications, one additional tool is
provided: a nonblocking socket-connect operation. The process of
creating a TCP connection from the client side involves contacting the
remote host in a two-phase acknowledgment. This process normally blocks
until the connection is established. However, the NIO package provides
an alternative that allows you to initiate the connection and then poll
for its status. When set to nonblocking mode, a call to a SocketChannel
’s connect()
method returns immediately. The
connection is then attempted (and possibly succeeds or fails) in the
background. Later, a Selector
can be
used, checking for the OP_CONNECT
flag to see when the socket is ready to “finish connecting.” The
connection is finished by invoking the SocketChannel
’s finishConnect()
method, which either returns
or throws an IOException
indicating
the failure. The process of finishing the connection is really more
about collecting the results of the asynchronous
connection—acknowledging its success or failure—than about doing
work.
Get Learning Java, 4th Edition now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.