5 Rust programming

Initializing a project

Reference page

  1. cargo new [package_name].
  2. Different commands:
    • cargo build: compiles target/debug. cargo build --release goes to target/release.
    • cargo run: build + run
    • cargo check: quick check
  3. Running upon specifying a directory:
    • File not registered in Cargo.toml: put in examples directory and run cargo run --example hello-redis
    • Register bin in Cargo.toml and cargo run --bin [name} -- {--args}.

Modules and code organization

  1. Calling mod A or mod crate::path_to::A brings A scope.
  2. Calling pub mod A brings A into scope and enables the current module’s parents reference module.A
  3. Calling use A::submodule directly makes submodule::something_else available in the scope of module.
  4. Calling pub use A::submodule allows parents to use module::submodule.

Applying style linters

VSCode setting:

  1. rustup component add rustfmt
  2. Open user settings (JSON), and add
"[rust]": {
    "editor.defaultFormatter": "rust-lang.rust",
    "editor.formatOnSave": true
}
  1. In the command palette, enable “Format document”.

Async programming

  1. tokio tutorial and Rust book chapter.

Lazy asyncs, and tokio-main macro

Unlike languages like OCaml (similar construct Deferred.t), async operations are lazy. An async return value is analogous to a zero-argument closure. The macro #[tokio::main] provides the syntactic sugar to block on an async function:

#[tokio::main]
async fn main() {
    println!("hello");
}

// Equivalent code: 
fn main() {
    let mut rt = tokio::runtime::Runtime::new().unwrap();
    rt.block_on(async {
        println!("hello");
    })
}

Important implication of lazy futures

Fundamental future construct in Rust is poll. Different futures operations result in different polling behavior, resulting in concurrency / parallelism.

  1. When .await is called sequentially, e.g. f1.await; f2.await;, the underlying tasks are executed sequentially, since task1 essentially “starts executing” when f1.await gets called, unlike eager languages. After obtaining a future:
    • future.await: this does not imply concurrency by itself.
    • tokio::spawn: scheduler is free to run task whenever it sees fit. For CPU intensive tasks, this might result in parallelism.
    • tokio::select!: make concurrent progress on multiple futures and wait for the first one to complete, and tokio::join! waits for all of them to complete.
  2. This is unlike eager-future languages (e.g. OCaml), where sequential .awaits immediately imply concurrency.

Minimal server implementation

Some networking basics:

  1. Server on a specific ip address exposes ports.
  2. A port can host multiple (thousands+) of sockets. Each socket denotes a specific connection. Each time a client connects, OS creates a unique socket for that conversation.
  3. The OS identifies each TCP connection with a unique 4-tuple of (source IP, source port, destination IP, destination port).
    • Source IP and port are the same for every connection, while each client connects from its own IP and a random ephemeral port.

Tokio notes:

  1. tokio::spawn consumes a closure and starts a task and returns a JoinHandle which, upon await, returns a Result<return_type>. Each task only requires 64-bytes of memory.
  2. Argument closure to tokio::spawn must implement Send trait. Realistically, this means that any captured variable implements Send (i.e. safe to transfer across threads).
use mini_redis::{Connection, Frame};
use tokio::net::{TcpListener, TcpStream};

// https://tokio.rs/tokio/tutorial/spawning
// Copy to `src/main.rs` to run.

#[tokio::main]
async fn main() {
    // Bind the listener to the address
    let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();

    loop {
        // The second item contains the IP and port of the new connection.
        // This blocks until some socket connects to the listener's port
        let (socket, socket_addr) = listener.accept().await.unwrap();
        // This prints out e.g. `127.0.0.1:58646`. Last number is random for each connection.
        println!("Got connection from {:?}", socket_addr);

        // This implementation blocks the main loop for each incoming socket connection
        // process(socket).await;
        // Alternatively, spawn a new task for each inbound socket
        tokio::spawn(async move {
            process(socket).await;
        });
    }
}

async fn process(socket: TcpStream) {
    // The `Connection` lets us read/write redis **frames** instead of
    // byte streams. The `Connection` type is defined by mini-redis.
    let mut connection = Connection::new(socket);

    if let Some(frame) = connection.read_frame().await.unwrap() {
        println!("GOT: {:?}", frame);

        // Respond with an error
        let response = Frame::Error("unimplemented".to_string());
        connection.write_frame(&response).await.unwrap();
    }
}

Rough-edge: Send

TO be more precise, in the argument closure to tokio::spawn, all data whose ownership span intersects with .await calls must implement Send trait. Note that RC<..> does not implement Send:

use tokio::task::yield_now;
use std::rc::Rc;

// This function would compile 
#[tokio::main]
async fn main() {
    tokio::spawn(async {
        // The scope forces `rc` to drop before `.await`.
        {
            let rc = Rc::new("hello");
            println!("{}", rc);
        }

        // `rc` is no longer used. It is **not** persisted when
        // the task yields to the scheduler
        yield_now().await;
    });
}

// This function **would not** compile
#[tokio::main]
async fn main() {
    tokio::spawn(async {
        let rc = Rc::new("hello");

        // `rc` is used after `.await`. It must be persisted to
        // the task's state.
        yield_now().await;

        println!("{}", rc);
    });
}

Rust futures under the hood

In Rust, a Future is a struct which implements the Future trait with Output type and poll method.

struct Delay {when: Instant,}

impl Future for Delay {
    type Output = &'static str;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> {
        if Instant::now() >= self.when {
            println!("Hello world");
            Poll::Ready("done")
        } else {
            // Ignore this line for now.
            cx.waker().wake_by_ref();
            Poll::Pending
        }}}

#[tokio::main]
async fn main() {
    let when = Instant::now() + Duration::from_millis(10);
    let future = Delay { when };

    let out = future.await;
    assert_eq!(out, "done");
}

Under the hood, the async-main roughly returns a future of the following construction:

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

enum MainFuture {
    // Initialized, never polled
    State0,
    // Waiting on `Delay`, i.e. the `future.await` line.
    State1(Delay),
    // The future has completed.
    Terminated,
}

impl Future for MainFuture {
    type Output = ();
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<()> {
        use MainFuture::*;
        loop {
            match *self {
                State0 => {
                    let when = Instant::now() +
                        Duration::from_millis(10);
                    let future = Delay { when };
                    *self = State1(future);
                }
                State1(ref mut my_future) => {
                    match Pin::new(my_future).poll(cx) {
                        Poll::Ready(out) => {
                            assert_eq!(out, "done");
                            *self = Terminated;
                            return Poll::Ready(());
                        }
                        Poll::Pending => {
                            return Poll::Pending;
                        }}}
                Terminated => {
                    panic!("future polled after completion")
                }}}}}

Salient features:

  1. When polled (i.e. computation should proceed), it proceeds as much as possible.
    • In this case, up to the .await statement in the async-main function.
  2. After encountering a child future, it polls the nested future and returns its value.
  3. Proceed until we’ve completed the last child future.

The runtime therefore just needs to _call poll in a smart way__ on the outermost future.

Wakers

Without the cx: &mut Context argument to fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;, we could only implement a continuously polling system which burns CPU cycles. A future can call its cx.waker().wake() to signify that it’s ready to take new tasks.

  • A future returning Poll::Pending must ensure that the waker is signaled at some point, else it may hang indefinitely.

This results in the new Delay poll method:

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<&'static str>
{
    if Instant::now() >= self.when {
        println!("Hello world");
        Poll::Ready("done")
    } else {
        // Get a handle to the waker for the current task
        let waker = cx.waker().clone();
        let when = self.when;
        // Spawn a timer thread.
        thread::spawn(move || {
            let now = Instant::now();
            if now < when { thread::sleep(when - now); }
            waker.wake();
        });
        Poll::Pending
    }