Implementing Rust futures when you only have async functions

Posted by nickm on 27 April 2021

Rust doesn't yet support asynchronous functions in traits, but several important async-related traits (like AsyncRead and Stream ) define their interface using functions that return Poll. So, what can you do when you have a function that is async, and you need to use it to implement one of these traits?

(I'll be assuming that you already know a little about pinning, futures, and async programming in Rust. That's not because they're easy “everybody-should-know-it” topics, but because I'm still learning them myself, and I don't understand them well enough be a good teacher. You can probably keep reading if you don't understand them well . )

A little background

Here's a situation I ran into earlier this year. In the end, I only solved it with help from Daniel Franke, so I decided that I should write up the solution here in case it can help somebody else.

I've been working on Arti, an implementation of the Tor protocols in Rust. After a bunch of hacking, I finally got to the point where I had a DataStream type that provides an anonymous connection over the Tor network:

impl DataStream {
    pub async fn read(&mut self, buf: &mut[u8]) -> io::Result<usize>
    { ... }
    pub async fn write(&mut self, buf: &[u8]) -> io::Result<usize>
    { ... }
}

Now, there's a lot of complexity hiding in those ellipses. Tor isn't a simple protocol: when it's trying to read, it may need to wait for data to arrive. It may also need to send messages in response to arriving data. It might need to update internal state, or even tear down an entire Tor circuit because of a protocol error. Async functions made it possible to implement all of this stuff in a more-or-less comprehensible way, so rewriting those functions to explicitly return a typed future was not an option.

But I wanted DataStream to implement AsyncRead and AsyncWrite, so I could use it with other code in the Rust async ecosystem. So let's look at AsyncRead (because it's simpler than AsyncWrite). The only required method in AsyncRead is:

pub fn poll_read(
    self: Pin<&mut Self>,
    cx: &mut Context<'_>,
    buf: &mut [u8]
) -> Poll<io::Result<usize>>

This read() has to check whether there's data can be read into buf immediately, without blocking. If there is, we read the data and return the number of bytes we read. Otherwise, we have to schedule ourself on cx, and return Poll::Pending.1

Moving forward, getting stuck

Compare poll_read to the read function in DataStream. First off, there's a mismatch between how these two functions use their output buffers. Because DataStream::read is async, it returns a future that will hang on to its buffer until the future is finally ready. But poll_read has to return right away, and it can't store a reference to its buffer at all. So I started by defining a wrapper variant of DataStream to implements the behavior that poll_read would need:2

pub struct DataReaderImpl {
    s: DataStream,
    pending: Vec<u8>
    offset: usize,
    len: usize,
}
pub struct DataReaderImpl {
    s: DataStream,
    pending: Vec<u8>
}
impl DataReaderImpl {
    fn new(s: DataStream) -> DataReaderImpl {
        DataReaderImpl {
            s,
            pending: Vec::new(),
        }
    }
    // Load up to 1k into the pending buffer.
    async fn fill_buf(&mut self) -> io::Result<usize> {
        let mut data = vec![0;1024];
        let len = self.s.read(&mut data[..]).await?;
        data.truncate(len);
        self.pending.extend(data);
        Ok(len)
    }
    // pull bytes from the pending buffer into `buf`.
    fn extract_bytes(&mut self, buf: &mut [u8]) -> usize {
        let n = cmp::min(buf.len(), self.pending.len());
        buf[..n].copy_from_slice(&self.pending[..n]);
        self.pending.drain(0..n);
        n
    }
}

Then, I thought, it ought to be easy to write AsyncRead! Here was my first try:

// This won't work...
impl AsyncRead for DataReaderImpl {
    fn poll_read(mut self: Pin<&mut Self>,
                 cx: &mut Context<'_>,
                 buf: &mut [u8]) -> Poll<io::Result<usize>> {
       if self.pending.is_empty() {
            // looks like we need more bytes.
            let fut = self.fill_buf();
            futures::pin_mut!(fut);
            match fut.poll(cx) {
                Poll::Ready(Err(e)) =>
                    return Poll::Ready(Err(e)),
                Poll::Ready(Ok(n)) =>
                    if n == 0 {
                        return Poll::Ready(Ok(0)); // EOF
                    }
                Poll::Pending =>
                    todo!("crud, where do i put the future?"), // XXXX
            }
        }

        // We have some data; move it out to the caller.
        let n = self.extract_bytes(buf);
        Poll::Ready(Ok(n))
    }
}

Almost there! But what do I do if the future says it's pending? I need to store it and poll it again later the next time I call this function. But to do that, I won't be able to pin the future to the stack! I'll have to store it in the structure instead. And since the future comes from an async function, it won't have a type that I can name; I'll have to store it as a Box<dyn Future>.

Oh hang on, it'll need to be pinned. And sometimes there won't be a read in progress, so I won't have a future at all. Maybe I store it in an Option<Pin<Box<dyn Future>>>?

(This is the point where I had to take a break and figure out pin-projection3.)

But after I played around with that for a while, I hit the final snag: ultimately, I was trying to create a self-referential structure4, which you can't do in safe Rust. You see, the future returned by DataReaderImpl::fill_buf needs to hold a reference to the DataReaderImpl, and so the future needs to outlive the DataReaderImpl. That means you can't store it in the DataReaderImpl. You can't even store it and the DataReaderImpl in the same struct: that creates self-reference.

So what could I do? Was I supposed to use unsafe code or some tricky crate to make a self-referential struct anyway? Was my solution fundamentally flawed? Was I even trying to do something possible‽

I asked for help on Twitter. Fortunately, Daniel Franke got back to me, looked at my busted code, and walked me through the actual answer.

Hold the future or the reader: not both!

Here's the trick: We define an enum that holds the DataReaderImpl or the future that its fill_buf function returns, but not both at once. That way, we never have a self-referential structure!

First we had to define a new variation on fill_buf that will take ownership of the reader when it's called, and return ownership once it's done:

impl DataReaderImpl {
    async fn owning_fill_buf(mut self) -> (Self, io::Result<usize>) {
        let r = self.fill_buf().await;
        (self, r)
    }
}

Then we had to define an enum that could hold either the future or the DataReaderImpl object, along with a wrapper struct to hold the enum.

type OwnedResult = (DataReaderImpl, io::Result<usize>);
enum State {
    Closed,
    Ready(DataReaderImpl),
    Reading(Pin<Box<dyn Future<Output=OwnedResult>>>),
}
struct DataReader {
    state: Option<State>
}

Note that the DataReader struct holds an Option<State>—we'll want to modify the state object destructively, so we'll need to take ownership of the state in poll_read and then replace it with something else.5

With this groundwork in place we could finally give an implementation of AsyncRead that works:

impl AsyncRead for DataReader {
    fn poll_read(mut self: Pin<&mut Self>,
                 cx: &mut Context<'_>,
                 buf: &mut [u8]) -> Poll<io::Result<usize>> {
        // We're taking this temporarily. We have to put
        // something back before we return.
        let state = self.state.take().unwrap();

        // We own the state, so we can destructure it.
        let mut future = match state {
            State::Closed => {
                self.state = Some(State::Closed);
                return Poll::Ready(Ok(0));
            }
            State::Ready(mut imp) => {
                let n = imp.extract_bytes(buf);
                if n > 0 {
                    self.state = Some(State::Ready(imp));
                    // We have data, so we can give it back now.
                    return Poll::Ready(Ok(n));
                }
                // Nothing available; launch a read and poll it.
                Box::pin(imp.owning_fill_buf())
            }
            // If we have a future, we need to poll it.
            State::Reading(fut) => fut,
        };

        // Now we have a future for an in-progress read.
        // Can it make any progress?
        match future.as_mut().poll(cx) {
            Poll::Ready((_imp, Err(e))) => { // Error
                self.state = Some(State::Closed);
                Poll::Ready(Err(e))
            }
            Poll::Ready((_imp, Ok(0))) => { // EOF
                self.state = Some(State::Closed);
                Poll::Ready(Ok(0))
            }
            Poll::Ready((mut imp, Ok(_))) => {
                // We have some data!
                let n = imp.extract_bytes(buf);
                self.state = Some(State::Ready(imp));
                debug_assert!(n > 0);
                Poll::Ready(Ok(n))
            }
            Poll::Pending => {
                // We're pending; remember the future
                // and tell the caller.
                self.state = Some(State::Reading(future));
                Poll::Pending
            }
        }
    }
}

Now when poll_read() takes ownership of the previous state, it either owns a DataReaderImpl or a future returned by owning_fill_buf()—but never both at once, so we don't have any self-reference problems. When poll_read() done, it has to put a new valid state back before it returns.

Conclusions

For the current version of all this code, have a look at tor_proto::stream::data in Arti. Note that the code in Arti is more complex than what I have in this post, and some of that complexity is probably unnecessary: I've been learning more about Rust as I go along.

I hope that some day there's an easier way to do all of this (with real asynchronous traits, maybe?) but in the meantime, I hope that this write-up will be useful to somebody else.


1

We might also have to report an EOF as Poll::Ready(Ok(0)), or an error as Poll::Ready(Err(_). But let's keep this simple.

2

At this point I started writing my code really inefficiently, since I was just trying to get it to work. In the interest of clarity, I'll leave it as inefficient code here too.

3

It didn't turn out to be what I needed in the end, but I'm glad I learned about it: it has been the answer for a lot of other problems later on.

4

Self-referential structures in Rust require unsafe code and pinning. I spent a semi-unpleasant hour or two looking through example code here just to see what would be involved, and tried learning the rental crate, in case it would help.

5

We could probably use std::mem::replace for this too, but I don't expect there would be a performance difference.