Chapter 4. Integrating Networking into Our Own Async Runtime
In Chapter 3, we built our own async runtime queue to illustrate how async tasks run through an async runtime. However, we used only basic sleep and print operations. Focusing on simple operations is useful initially, but simple sleep and print functions are limiting. In this chapter, we build on the async runtime we defined previously and integrate networking protocols so they can run on our async runtime.
By the end of this chapter, you will be able to use traits to integrate the hyper crate for HTTP requests into our runtime. This means you will be able to take this example and integrate other third-party dependencies into our async runtime via traits after reading the documentation of that crate. Finally, we will go to a lower level by implementing the mio crate to directly poll sockets in our futures. This will show you how to utilize fine-grained control over the way the socket is polled, read, and written to in our async runtime. With this exposure and further external reading, you will be able to implement your own custom networking protocols.
This is the hardest chapter to follow, and it is not essential if you are not planning to integrate networking into a custom runtime. If you are not feeling it, feel free to skip this chapter and come back after reading the rest of the book. This content is placed in this chapter because it builds off the code written in Chapter 3.
Before we progress through the chapter, we need the following additional dependencies alongside the dependencies we used in Chapter 3:
[
dependencies
]
hyper
=
{
version
=
"0.14.26"
,
features
=
[
"http1"
,
"http2"
,
"client"
,
"runtime"
]
}
smol
=
"1.3.0"
anyhow
=
"1.0.70"
async
-
native
-
tls
=
"0.5.0"
http
=
"0.2.9"
tokio
=
"1.14.0"
We are using these dependencies:
- hyper
-
This crate is a fast and popular HTTP implementation. We are going to use this to make requests. We need the features
client
to allow us to make HTTP requests andruntime
to allow compatibility with a custom async runtime. - smol
-
This crate is a small and fast async runtime. It is particularly good with lightweight tasks with low overhead.
- anyhow
-
This crate is an error-handling library.
- async-native-tls
-
This crate provides asynchronous Transport Layer Security (TLS) support.
- http
-
This crate provides types for working with HTTP requests and their responses.
- Tokio
-
We used this crate before for demonstrating our async runtime and will use it again in this chapter.
As you can see, we are going to be using hyper for this example. This is to give you a different set of tools than those we used in previous examples and to demonstrate that tools like Tokio are layered in other commonly used libraries. Before we write any code, however, we must introduce executors and connectors.
Understanding Executors and Connectors
An executor is responsible for running futures to completion. It is the part of the runtime that schedules tasks and makes sure they run (or are executed) when they are ready. We need an executor when we introduce networking into our runtime because without it, our futures such as HTTP requests would be created but never actually run.
A connector in networking is a component that establishes a connection between our application and the service we want to connect to. It handles activities like opening TCP connections and maintaining them through the lifetime of the request.
Integrating hyper into Our Async Runtime
Now that you understand what executors and connectors are, let’s look at how these concepts are essential when integrating a library like hyper into our async runtime. Without an appropriate executor and connector, our runtime wouldn’t be able to handle the HTTP requests and connections that hyper relies on.
If we look at hyper official documentation or various online tutorials, we might get the impression that we can perform a simple GET request using the hyper crate with the following code:
use
hyper
::{
Request
,
Client
};
let
url
=
"http://www.rust-lang.org"
;
let
uri
:
Uri
=
url
.
parse
().
unwrap
();
let
request
=
Request
::
builder
()
.
method
(
"GET"
)
.
uri
(
uri
)
.
header
(
"User-Agent"
,
"hyper/0.14.2"
)
.
header
(
"Accept"
,
"text/html"
)
.
body
(
hyper
::
Body
::
empty
()).
unwrap
();
let
future
=
async
{
let
client
=
Client
::
new
();
client
.
request
(
request
).
await
.
unwrap
()
};
let
test
=
spawn_task
!
(
future
);
let
response
=
future
::
block_on
(
test
);
println!
(
"Response status: {}"
,
response
.
status
());
However, if we run the tutorial code, we would get the following error:
thread
'<unnamed>'
panicked
at
'
thereis
no
reactor
running,
must
be
called
from
the
context
of
a
Tokio
1
.xruntime
This is because under the hood, hyper by default runs on the Tokio runtime and no executor is specified in our code. If you were going to use the reqwest or other popular crates, chances are you will get a similar error.
To address the issue, we will create a custom executor that can handle our tasks within the custom async runtime we’ve built. Then we’ll build a custom connector to manage the actual network connections, allowing our runtime to seamlessly integrate with hyper and other similar libraries.
The first step is to import the following into our program:
usestd::net::Shutdown
;
use
std::net::
{
TcpStream,ToSocketAddrs
}
;
use
std::pin::Pin
;
use
std::task::
{
Context,Poll
}
;
use
anyhow::
{
bail,Context
as
_,
Error,
Result
}
;
use
async_native_tls::TlsStream
;
use
http::Uri
;
use
hyper::
{
Body,Client,
Request,
Response
}
;
use
smol::
{
io,prelude::*,
Async
}
;
We can build our own executor as follows:
structCustomExecutor
;
impl<F:
Future
+
Send
+
'
static>hyper::rt::Executor<F>
for
CustomExecutor
{
fn
execute
(
&
self,fut:
F
)
{
spawn_task!
(
async
{
println!
(
"sending request"
)
;
fut.await
;
})
.detach()
;
}
}
This code defines our custom executor and the behavior of the execute
function. Inside this function, we call our spawn_task
macro. Inside, we create an async
block and await
for the future that was passed into the execute
function. We employ the detach
function; otherwise, the channel will be closed, and we will not continue with our request because of the task moving out of scope and simply being dropped. As you may recall from Chapter 3, detach
will send the pointer of the task to a loop to be polled until the task has finished, before dropping the task.
We now have a custom executor that we can pass into the hyper client. However, our hyper client will still fail to make the request because it is expecting the connection to be managed by the Tokio runtime. To fully integrate hyper with our custom async runtime, we need to build our own async connector that handles network connections independently of Tokio.
Building an HTTP Connection
When it comes to networking requests, the protocols are well-defined and standardized. For instance, TCP has a three-step handshake to establish a connection before sending packets of bytes through that connection. Implementing the TCP connection from scratch has zero benefit unless you have very specific needs that the standardized connection protocols cannot provide. In Figure 4-1, we can see that HTTP and HTTPS are application layer protocols running over a transport protocol such as TCP.
With HTTP, we are sending over a body, header, and so forth. HTTPS has even more steps, as a certificate is checked and sent over to the client before the client starts sending over data. This is because the data needs to be encrypted. Considering all the back-and-forth in these protocols and waiting for responses, networking requests are a sensible target for async. We cannot get rid of the steps in networking without losing security and assurance that the connection is made. However, we can release the CPU from networking requests when waiting for responses with async.
For our connector, we are going to support HTTP and HTTPS, so we need the following enum:
enum
CustomStream
{
Plain
(
Async
<
TcpStream
>
),
Tls
(
TlsStream
<
Async
<
TcpStream
>>
),
}
The Plain
variant is an async TCP stream. Considering Figure 4-1, we can deduce that the Plain
variant supports HTTP requests. With the Tls
variant, we remember that HTTPS is merely a TLS layer between the TCP and the HTTP, which means that our Tls
variant supports HTTPS.
We can now use this custom stream enum to implement the hyper Service
trait for a custom connector strut:
#[derive(Clone)]
struct
CustomConnector
;
impl
hyper
::
service
::
Service
<
Uri
>
for
CustomConnector
{
type
Response
=
CustomStream
;
type
Error
=
Error
;
type
Future
=
Pin
<
Box
<
dyn
Future
<
Output
=
Result
<
Self
::
Response
,
Self
::
Error
>>
+
Send
>>
;
fn
poll_ready
(
&
mut
self
,
_
:
&
mut
Context
<'
_
>
)
->
Poll
<
Result
<
(),
Self
::
Error
>>
{
.
.
.
}
fn
call
(
&
mut
self
,
uri
:
Uri
)
->
Self
::
Future
{
.
.
.
}
}
The Service
trait defines the future for the connection. Our connection is a thread-safe future that returns our stream enum. This enum is either an async TCP connection or an async TCP connection that is wrapped in a TLS stream.
We can also see that our poll_ready
function just returns Ready
. This function is used by hyper to check whether a service is ready to process requests. If we return Pending
, the task will be polled until the service becomes ready. We return an error when the service can no longer process requests. Because we are using the Service
trait for a client call, we will always return Ready
for the poll_ready
. If we were implementing the Service
trait for a server, we could have the following poll_ready
function:
fn
poll_ready
(
&
mut
self
,
cx
:
&
mut
Context
<'
_
>
)
->
Poll
<
Result
<
(),
Error
>>
{
Poll
::
Ready
(
Ok
(()))
}
We can see that our poll_ready
function returns that the future is ready. We could, ideally, not bother defining poll_ready
, as our implementation makes calling it redundant. However, this function is a requirement for the Service
trait.
We can now move on to the response function, which is call
. The poll_ready
function needs to return Ok
before we can use call
. Our call
function has the following outline:
fn
call
(
&
mut
self
,
uri
:
Uri
)
->
Self
::
Future
{
Box
::
pin
(
async
move
{
let
host
=
uri
.
host
().
context
(
"cannot parse host"
)
?
;
match
uri
.
scheme_str
()
{
Some
(
"http"
)
=>
{
.
.
.
}
Some
(
"https"
)
=>
{
.
.
.
}
scheme
=>
bail
!
(
"unsupported scheme: {:?}"
,
scheme
),
}
})
}
We remember that the pin
and async
block returns a future. So, our pinned future will be the async
block’s return statement. For our HTTPS block, we build a future with the following code:
let
socket_addr
=
{
let
host
=
host
.
to_string
();
let
port
=
uri
.
port_u16
().
unwrap_or
(
443
);
smol
::
unblock
(
move
||
(
host
.
as_str
(),
port
).
to_socket_addrs
())
.
await
?
.
next
()
.
context
(
"cannot resolve address"
)
?
};
let
stream
=
Async
::
<
TcpStream
>
::
connect
(
socket_addr
).
await
?
;
let
stream
=
async_native_tls
::
connect
(
host
,
stream
).
await
?
;
Ok
(
CustomStream
::
Tls
(
stream
))
The port is 443 because this is the standard port for HTTPS. We then pass a closure into the unblock
function. The closure returns the socket address. The unblock
function runs blocking code on a thread pool so we can have the async interface on blocking code. While we are resolving the socket address, we can free up the thread to do something else. We connect our TCP stream and then connect it to our native TLS. Once our connection is achieved, we finally return the CustomStream
enum.
When it comes to building our HTTP code, it is nearly the same. The port is 80 instead of 443, and the TLS connection is not required, resulting in returning Ok(CustomStream::Plain(stream))
.
Our call
function is now defined. However, if we try to make an HTTPS call to a website with our stream enum or connection struct at this point, we will get an error message stating that we have not implemented the AsyncRead
and AsyncWrite
Tokio traits for our stream trait. This is because hyper requires these traits to be implemented in order for our connection enum to be used.
Implementing the Tokio AsyncRead Trait
The AsyncRead
trait is similar to the std::io::Read
trait but integrates with asynchronous task systems. When implementing our AsyncRead
trait, we have to define only the poll_read
function, which returns a Poll
enum as a result. If we return Poll::Ready
, we are saying that the data was immediately read and placed into the output buffer. If we return Poll::Pending
, we are saying that no data was read into the buffer that we provided. We are also saying that the I/O object is not currently readable but may become readable in the future. The return of Pending
results in the current future’s task being scheduled to be unparked when the object is readable. The final Poll
enum variant that we can return is Ready
but with an error that would usually be a standard I/O error.
Our implementation of the AsyncRead
trait is defined in the code here:
impl
tokio
::
io
::
AsyncRead
for
CustomStream
{
fn
poll_read
(
mut
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
<'
_
>
,
buf
:
&
mut
tokio
::
io
::
ReadBuf
<'
_
>
,
)
->
Poll
<
io
::
Result
<
()
>>
{
match
&
mut
*
self
{
CustomStream
::
Plain
(
s
)
=>
{
Pin
::
new
(
s
)
.
poll_read
(
cx
,
buf
.
initialize_unfilled
())
.
map_ok
(
|
size
|
{
buf
.
advance
(
size
);
})
}
CustomStream
::
Tls
(
s
)
=>
{
Pin
::
new
(
s
)
.
poll_read
(
cx
,
buf
.
initialize_unfilled
())
.
map_ok
(
|
size
|
{
buf
.
advance
(
size
);
})
}
}
}
}
Our other streams have essentially the same treatment; we pass in either the async TCP stream or an async TCP stream with TLS. We then pin this stream and execute the stream’s poll_read
function, which performs a read and returns a Poll
enum indicating how much the buffer grew because of the read. Once the poll_read
is done, we execute map_ok
, which takes in FnOnce(T)
, which is either a function or a closure that can be called only once.
Note
In the context of map_ok
, the closure’s purpose is to advance the buffer by the size returned from poll_read
. This is a one-time operation for each read, and hence FnOnce
is sufficient and preferred. If the closure needed to be called multiple times, Fn
or FnMut
would be required. By using FnOnce
, we ensure that the closure can take ownership of the environment it captures, providing flexibility for what the closure can do. This is particularly useful in async programming, where ownership and lifetimes must be carefully managed.
The map_ok
also references itself, which is the result from poll_read
. If the Poll
result is Ready
but with an error, Ready
with the error is returned. If the Poll
result is Pending
, Pending
is returned. We pass the context into poll_read
so a waker is used if we have a Pending
result. If we have a Ready
with an Ok
result, the closure is called with the result from the poll_read
, and Ready
Ok
is returned from the map_ok
function. Our closure passed into our map_ok
function advances the buffer.
A lot is going on under the hood, but essentially, our stream is pinned, a read is performed on the pinned stream, and if the read is successful, we advance the size of the buffer’s filled region because the read data is now in the buffer. The polling in poll_read
, and the matching of the Poll
enum in map_ok
, enable this read process to be compatible with an async runtime.
So we can now read into our buffer asynchronously, but we also need to write asynchronously for our HTTP request to be complete.
Implementing the Tokio AsyncWrite Trait
The AsyncWrite
trait is similar to std::io::Write
but interacts with asynchronous task systems. It write bytes asynchronously, and like the AsyncRead
we just implemented, comes from Tokio.
When implementing the AsyncWrite
trait, we need the following outline:
impl
tokio
::
io
::
AsyncWrite
for
CustomStream
{
fn
poll_write
(
mut
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
<'
_
>
,
buf
:
&
[
u8
],
)
->
Poll
<
io
::
Result
<
usize
>>
{
.
.
.
}
fn
poll_flush
(
mut
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
<'
_
>
)
->
Poll
<
io
::
Result
<
()
>>
{
.
.
.
}
fn
poll_shutdown
(
mut
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
<'
_
>
)
->
Poll
<
io
::
Result
<
()
>>
{
.
.
.
}
}
The poll_write
function should not be a surprise, but note that we also have poll_flush
and poll_shutdown
functions. All the functions return a variant of the Poll
enum and accept the context. Therefore, we can deduce that all functions are able to put the task to sleep to be woken again and to check whether the future is ready for shutting down, flushing, and writing to the connection.
We should start with our poll_write
function:
match
&
mut
*
self
{
CustomStream
::
Plain
(
s
)
=>
Pin
::
new
(
s
).
poll_write
(
cx
,
buf
),
CustomStream
::
Tls
(
s
)
=>
Pin
::
new
(
s
).
poll_write
(
cx
,
buf
),
}
We are matching the stream, pinning the stream, and executing the poll_write
function of the stream. At this point in the chapter, it should not come as a surprise that poll_write
tries to write bytes from the buffer into an object. Like the read, if the write is successful, the number of bytes written is returned. If the object is not ready for writing, we will get Pending
; if we get 0
, this usually means that the object is no longer able to accept bytes.
Inside the poll_write
function of the stream, a loop is executed and the mutable reference of the I/O handler is obtained. The loop then repeatedly tries to write to the underlying I/O until all the bytes from the buffer are written. Each write attempt has a result that is handled. If the error of the write is io::ErrorKind::WouldBlock
, this means that the write could not complete immediately, and the loop repeats until the write is complete. If the result is any other error, the loop waits for the resource to be available again by returning Pending
for the future to be polled again later.
Now that we have written poll_write
, we can define the body of the poll_flush
function:
match
&
mut
*
self
{
CustomStream
::
Plain
(
s
)
=>
Pin
::
new
(
s
).
poll_flush
(
cx
),
CustomStream
::
Tls
(
s
)
=>
Pin
::
new
(
s
).
poll_flush
(
cx
),
}
This has the same outline as our poll_write
function. However, in this case, we call poll_flush
on our stream. A flush is like a write except we ensure that all the contents of the buffer immediately reach the destination. The underlying mechanism of the flush is exactly the same as the write with the loop, but the flush function will be called in the loop as opposed to the write function.
We can now move on to our final function, which is shutdown
:
match
&
mut
*
self
{
CustomStream
::
Plain
(
s
)
=>
{
s
.
get_ref
().
shutdown
(
Shutdown
::
Write
)
?
;
Poll
::
Ready
(
Ok
(()))
}
CustomStream
::
Tls
(
s
)
=>
Pin
::
new
(
s
).
poll_close
(
cx
),
}
The way we implement the different types of our custom stream varies slightly. The Plain
stream is shut down directly. Once it is shut down, we return a Poll
that is ready. However, the Tls
stream is an async implementation by itself. Therefore, we need to pin it to avoid having it moved in memory, because it could be put on the task queue multiple times until the poll is complete. We call the poll_close
function, which will return a poll result by itself.
We have now implemented our async read and write traits for our hyper client. All we need to do now is connect and run HTTP requests to test our implementation.
Connecting and Running Our Client
In this section, we are wrapping up what we have done and testing it. We can create our connection request-sending function as follows:
impl
hyper
::
client
::
connect
::
Connection
for
CustomStream
{
fn
connected
(
&
self
)
->
hyper
::
client
::
connect
::
Connected
{
hyper
::
client
::
connect
::
Connected
::
new
()
}
}
async
fn
fetch
(
req
:
Request
<
Body
>
)
->
Result
<
Response
<
Body
>>
{
Ok
(
Client
::
builder
()
.
executor
(
CustomExecutor
)
.
build
::
<
_
,
Body
>
(
CustomConnector
)
.
request
(
req
)
.
await
?
)
}
Now all we need to do is run our HTTP client on our async runtime in the main
function:
fn
main
()
{
Runtime
::
new
().
with_low_num
(
2
).
with_high_num
(
4
).
run
();
let
future
=
async
{
let
req
=
Request
::
get
(
"https://www.rust-lang.org"
)
.
body
(
Body
::
empty
())
.
unwrap
();
let
response
=
fetch
(
req
).
await
.
unwrap
();
let
body_bytes
=
hyper
::
body
::
to_bytes
(
response
.
into_body
())
.
await
.
unwrap
();
let
html
=
String
::
from_utf8
(
body_bytes
.
to_vec
()).
unwrap
();
println!
(
"{}"
,
html
);
};
let
test
=
spawn_task
!
(
future
);
let
_outcome
=
future
::
block_on
(
test
);
}
And here we have it: we can run our code to get the HTML code from the Rust website. We can now say that our async runtime can communicate with the internet asynchronously, but what about accepting requests? We already covered implementing traits from other crates to get an async implementation. So let’s go one step lower and directly listen to events in sockets with the mio crate.
Introducing mio
When it comes to implementing async functionality with sockets, we cannot really get any lower than mio (metal I/O) without directly calling the OS. This low-level, nonblocking I/O library in Rust provides the building blocks for creating high-performance async applications. It acts as a thin abstraction over the OS’s asynchronous I/O capabilities.
The mio crate is so crucial because it serves as a foundation for other higher-level async runtimes, including Tokio. These higher-level libraries abstract the complexity away to make them easier to work with. The mio crate is useful for developers who need fine-grained control over their I/O operations and want to optimize for performance. Figure 4-2 shows how Tokio is built on mio.
Previously in this chapter, we connected hyper to our runtime. To get the full picture, we are now going to explore mio and integrate it in our runtime. Before we proceed, we need the following dependency in Cargo.toml:
mio
=
{
version
=
"1.0.2"
,
features
=
[
"net"
,
"os-poll"
]}
We also need these imports:
use
mio
::
net
::{
TcpListener
,
TcpStream
};
use
mio
::{
Events
,
Interest
,
Poll
as
MioPoll
,
Token
};
use
std
::
io
::{
Read
,
Write
};
use
std
::
time
::
Duration
;
use
std
::
error
::
Error
;
use
std
::
future
::
Future
;
use
std
::
pin
::
Pin
;
use
std
::
task
::{
Context
,
Poll
};
Now that we have laid all the groundwork, we can move on to polling TCP sockets in futures.
Polling Sockets in Futures
The mio crate is built for handling many sockets (thousands). Therefore, we need to identify which socket triggered the notification. Tokens enable us to do this. When we register a socket with the event loop, we pass it a token, and that token is returned in the handler. The token is a struct tuple around usize
. This is because every OS allows a pointer amount of data to be associated with a socket. So in the handler we can have a mapping function where the token is the key, and we map it with a socket.
mio is not using callbacks here because we want a zero cost abstraction, and tokens are the only way of doing that. We can build callbacks, streams, and futures on top of mio.
With tokens, we now have the following steps:
-
Register the sockets with the event loop.
-
Wait for socket readiness.
-
Look up the socket state by using the token.
-
Operate on the socket.
-
Repeat.
Our simple example negates the need for mapping, so we are going to define our tokens with the following code:
const
SERVER
:
Token
=
Token
(
0
);
const
CLIENT
:
Token
=
Token
(
1
);
Here, we just have to ensure that our tokens are unique. The integer passed into Token
is used to differentiate it from other tokens. Now that we have our tokens, we define the future that is going to poll the socket:
struct
ServerFuture
{
server
:
TcpListener
,
poll
:
MioPoll
,
}
impl
Future
for
ServerFuture
{
type
Output
=
String
;
fn
poll
(
mut
self
:
Pin
<&
mut
Self
>
,
cx
:
&
mut
Context
<'
_
>
)
->
Poll
<
Self
::
Output
>
{
.
.
.
}
}
We are using TcpListener
to accept incoming data and MioPoll
to poll the socket and tell the future when the socket is readable. Inside our future poll
function, we can define the events and poll the socket:
let
mut
events
=
Events
::
with_capacity
(
1
);
let
_
=
self
.
poll
.
poll
(
&
mut
events
,
Some
(
Duration
::
from_millis
(
200
))
).
unwrap
();
for
event
in
events
.
iter
()
{
.
.
.
}
cx
.
waker
().
wake_by_ref
();
return
Poll
::
Pending
The poll will extract the events from the socket into the events iterator. We also set the socket poll to time out after 200 ms. If there are no events in the socket, we proceed without any events and return Pending
. We will continue polling until we get an event.
When we do get events, we loop through them. In the preceding code, we set the capacity to 1
, but we can increase the capacity to handle multiple events if needed. When processing an event, we need to clarify the event type. For our future, we need to ensure that the socket is readable and that the token is the SERVER
token:
if
event
.
token
()
==
SERVER
&&
event
.
is_readable
()
{
let
(
mut
stream
,
_
)
=
self
.
server
.
accept
().
unwrap
();
let
mut
buffer
=
[
0
u8
;
1024
];
let
mut
received_data
=
Vec
::
new
();
loop
{
.
.
.
}
if
!
received_data
.
is_empty
()
{
let
received_str
=
String
::
from_utf8_lossy
(
&
received_data
);
return
Poll
::
Ready
(
received_str
.
to_string
())
}
cx
.
waker
().
wake_by_ref
();
return
Poll
::
Pending
}
The event is readable if the socket contains data. If our event is the correct one, we extract TcpStream
and define a data_received
collection on the heap with Vec
, using the buffer slice to perform the reads. If the data is empty, we return Pending
so we can poll the socket again if the data is not there. We then convert the data to a string and return it with Ready
. This means that our socket listener is finished after we have the data.
Note
If we wanted our socket to be continually polled throughout the lifetime of our program, we would spawn a detached task to pass the data into an async function to handle the data, as shown here:
if
!
received_data
.
is_empty
()
{
spawn_task
!
(
some_async_handle_function
(
&
received_data
))
.
detach
();
return
Poll
::
Pending
;
}
In our loop, we read the data from the socket:
loop
{
match
stream
.
read
(
&
mut
buffer
)
{
Ok
(
n
)
if
n
>
0
=>
{
received_data
.
extend_from_slice
(
&
buffer
[
..
n
]);
}
Ok
(
_
)
=>
{
break
;
}
Err
(
e
)
=>
{
eprintln!
(
"Error reading from stream: {}"
,
e
);
break
;
}
}
}
It does not matter if the received message is bigger than the buffer; our loop will continue to extract all the bytes to be processed, adding them onto our Vec
. If there are no more bytes, we can stop our loop to process the data.
We now have a future that will continue to be polled until it accepts data from a socket. After receiving the data, the future will terminate. We can also make this future continually poll the socket. We could argue that we could use this continual polling future to keep track of thousands of sockets if needed. We would have one socket per future and spawn thousands of futures into our runtime. Now that we have defined our TcpListener
logic, we can move on to our client logic to send data over the socket to our future.
Sending Data over the Socket
For our client, we are going to run everything in the main
function:
fn
main
()
->
Result
<
(),
Box
<
dyn
Error
>>
{
Runtime
::
new
().
with_low_num
(
2
).
with_high_num
(
4
).
run
();
.
.
.
Ok
(())
}
In our main
, we initially create our listener and our stream for the client:
let
addr
=
"127.0.0.1:13265"
.
parse
()
?
;
let
mut
server
=
TcpListener
::
bind
(
addr
)
?
;
let
mut
stream
=
TcpStream
::
connect
(
server
.
local_addr
()
?
)
?
;
Our example requires just one stream, but we can create multiple streams if needed. We register our server with a mio poll and use the server and poll to spawn the listener task:
let
poll
:
MioPoll
=
MioPoll
::
new
()
?
;
poll
.
registry
()
.
register
(
&
mut
server
,
SERVER
,
Interest
::
READABLE
)
?
;
let
server_worker
=
ServerFuture
{
server
,
poll
,
};
let
test
=
spawn_task
!
(
server_worker
);
Now our task is continually polling the TCP port for incoming events. We then create another poll with the CLIENT
token for writable events. If the socket is not full, we can write to it. If the socket is full, it is no longer writable and needs to be flushed. Our client poll takes the following form:
let
mut
client_poll
:
MioPoll
=
MioPoll
::
new
()
?
;
client_poll
.
registry
()
.
register
(
&
mut
stream
,
CLIENT
,
Interest
::
WRITABLE
)
?
;
Note
With mio, we can also create polls that can trigger if the socket is readable or writable:
.
register
(
&
mut
server
,
SERVER
,
Interest
::
READABLE
|
Interest
::
WRITABLE
)
?
;
Now that we have created our poll, we can wait for the socket to become writable before writing to it. We use this poll call:
let
mut
events
=
Events
::
with_capacity
(
128
);
let
_
=
client_poll
.
poll
(
&
mut
events
,
None
).
unwrap
();
Note that there is a None
for the timeout. This means that our current thread will be blocked until an event is yielded by the poll call. Once we have the event, we send a simple message to the socket:
for
event
in
events
.
iter
()
{
if
event
.
token
()
==
CLIENT
&&
event
.
is_writable
()
{
let
message
=
"that's so dingo!
\n
"
;
let
_
=
stream
.
write_all
(
message
.
as_bytes
());
}
}
The message is sent, so we can block our thread and then print out the message:
let
outcome
=
future
::
block_on
(
test
);
println!
(
"outcome: {}"
,
outcome
);
When running the code, you might get the following printout:
Error
reading
from
stream
:
Resource
temporarily
unavailable
(
os
error
35
)
outcome
:
that
'
s
so
dingo
!
It works, but we get the initial error. This can be a result of nonblocking TCP listeners; mio is nonblocking. The Resource temporarily unavailable
error is usually caused by no data being available in the socket. This can happen when the TCP stream is created, but it is not a problem because we handle these errors in our loop and are returning Pending
so the socket continues to be polled, as shown here:
use
std
::
io
::
ErrorKind
;
.
.
.
Err
(
ref
e
)
if
e
.
kind
()
==
ErrorKind
::
WouldBlock
=>
{
waker
.
cx
.
waker
().
wake_by_ref
();
return
Poll
::
Pending
;
}
Note
With the mio polling feature, we have implemented async communication through a TCP socket. We can also use mio to send data between processes via a UnixDatagram
. UnixDatagram
s are sockets that are restricted to sending data on the same machine. Because of this, UnixDatagram
s are faster, require less context switching, and do not have to go through the network stack.
Summary
We finally managed to get our async runtime to do something apart from sleep and print. In this chapter, we explored how traits can help us integrate third-party crates into our runtime, and we have gone lower to poll TCP sockets via mio. When it comes to getting a custom async runtime running, nothing else is standing in your way as long as you have access to trait documentation. If you want to get a better grip on the async knowledge you’ve gained so far, you are in the position to create a basic web server that handles various endpoints. Implementing all your communication in mio would be difficult, but using it just for async programming is much easier. The hyper HttpListener
will cover the protocol complexity so you can focus on passing the requests as async tasks and on the response to the client.
For our journey in this book, we are exploring async programming as opposed to web programming. Therefore, we are going to move on to how we implement async programming to solve specific problems. We start in Chapter 5 with coroutines.
Get Async Rust now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.