Using TLS in Rust: Getting Async I/O With tokio (Part 2)
Check out this second installment on using Async I/O and tokio!
Join the DZone community and get the full member experience.
Join For FreeOn my last post, I got really frustrated with tokio’s complexity and wanted to move to using mio directly. The advantages are that the programming model is pretty simple, even if actually working with it is hard. Event loops can cause your logic to spread over many different locations and make it hard to follow. I started to go that path until I figured out just how much work it would take. I decided to give tokio a second change, and at this point, I looked into attempts to provide async/await functionality to Rust.
It seems that at least some work is already available for this, using futures + some Rust macros. That let me write code that is much more natural looking, and I actually managed to make it work.
Before I get to the code, I want to point out some concerns that I have right now. The futures-await crate (and indeed, all of tokio) seems to be in a state of flux. There is an await in tokio, and I think that there is some merging around all of those libraries into a single whole. What I don’t know, and can’t find any information about, is what I should actually be using and how all the pieces come together. I have to note that even with async/await, the programming model is still somewhat awkward, but it is at a level that I can live with. Here is how I built it.
First, we need to accept connections, which is done like so:
#[async]
pub fn accept_connections(server : Arc<Server>, listener: TcpListener) -> std::result::Result<(), io::Error> {
#[async]
for connection in listener.incoming() {
tokio::spawn(Server::handle_connection(server.clone(), connection).map_err(|_| ()));
}
Ok(())
}
Note that I have two #[async]
annotations — one for the methods, as a whole, and one for the for
loop. This just accept the connection and spawn a task to handle that; the most interesting tidbits are in the actual processing of the connection:
#[async]
fn handle_connection(
server: Arc<Server>,
connection: TcpStream,
) -> std::result::Result<(), ConnectionError> {
let acceptor = server.tls.clone();
let stream = await!(acceptor.accept_async(connection))?;
let auth_result = match Server::authenticate_certificate(server.clone(), stream.get_ref().ssl()) {
// failed to auth
Ok(Some((msg, err))) => Some((msg, err)),
// failed to figure it out
Err(e) => Some((e.to_string(), ConnectionError::InvalidCertiicateCert)),
// successfully auth
Ok(None) => None,
};
if let Some((msg, e)) = auth_result {
await!(write_all(stream, msg.into_bytes()))?;
return Err(e);
}
let (mut sender, receiver) = futures::sync::mpsc::channel(3);
let (reader, writer) = stream.split();
sender = await!(sender.send("OK".to_string()))?;
tokio::spawn(Server::process_commands(server, reader, sender).map_err(|_|()));
tokio::spawn(Server::send_results(writer, receiver).map_err(|_|()));
Ok(())
}
You can see that this is fairly straightforward code. We first do the TLS handshake, then we validate the certificate. If there is an auth error, we send it to the user and back off. If we are successful, however, things get interesting.
I create a channel, which allows me to split off the read and write portions of the task. This means that I can send results out of order if I wanted to, which is great for the actual protocol handling. The first thing to do is to send the OK string to the client, so they know that we successfully connected, then we spawn the read/write tasks. The write task is pretty simple, overall:
#[async]
fn send_results(mut writer: WriteHalf<SslStream<TcpStream>>, receiver: Receiver<String>) -> std::result::Result<(), ConnectionError> {
#[async]
for msg in receiver {
writer = await!(write_all(writer, msg))?.0;
writer = await!(write_all(writer, b"\r\n\r\n"))?.0;
}
Ok(())
}
You can see the funny .0 references, which is an artifact of the fact that the write_all()
function consumes the writer we pass to it and return (a potentially different) writer in the result. This is pretty common for functional languages.
I’m pretty sure that I can avoid the two calls to write_all
for the postfix, but that is easier for now.
Processing the commands is simple as well:
#[async]
fn process_commands(server: Arc<Server>, reader: ReadHalf<SslStream<TcpStream>>, mut sender: Sender<String>)
-> std::result::Result<(), ConnectionError> {
let cmds = FramedRead::new(reader, CommandCodec::new());
#[async]
for cmd in cmds {
let cmd_to_run = server.cmd_handlers.get(&cmd.args[0])
.map(|h| h.clone());
match cmd_to_run {
None => {
sender = await!(sender.send(format!("ERR Uknown command {}", cmd.args[0])))?;
return Err(ConnectionError::InvalidCommand{cmd: cmd.args[0].clone()});
},
Some(f) =>{
match f(cmd){
Err(e) => {
sender = await!(sender.send(e.to_string()))?;
return Err(e);
},
Ok(v) => {
sender = await!(sender.send(v))?;
}
}
}
}
}
Ok(())
}
For each command we support, we have an entry on the server configuration and we fetch and invoke it. The result of the command will be written to the client by the write task. Right now, we have a 1:1 association between them, but this is now easily broken.
And finally, having an actual command run and running the server itself:
fn echo(cmd: server::cmd::Cmd) -> std::result::Result<String, ConnectionError> {
Ok(cmd.args[1].clone())
}
fn main() -> std::result::Result<(), server::err::ConnectionError> {
let mut server = server::Server::new(
"server.pem",
"server.key",
// allowed thumprints
&["1776821db1002b0e2a9b4ee3d5ee14133d367009"],
)?;
{
Arc::get_mut(&mut server).unwrap()
.handle("echo".to_string(), echo);
}
let listener = TcpListener::bind(&"127.0.0.1:4888".parse::<std::net::SocketAddr>()?)?;
println!("Started");
tokio::run(server::Server::accept_connections(server, listener).map_err(|_| ()));
Ok(())
}
This is pretty simple now, and it gives us a nice model to program commands and responses.
I pushed the whole code to this branch if you care to look at it.
I have some more comments about this code, but I’ll reserve them for another post. Stay tuned!
Published at DZone with permission of Oren Eini, DZone MVB. See the original article here.
Opinions expressed by DZone contributors are their own.
Comments