5 Rust programming
Initializing a project
cargo new [package_name].- Different commands:
cargo build: compilestarget/debug.cargo build --releasegoes totarget/release.cargo run: build + runcargo check: quick check
- Running upon specifying a directory:
- File not registered in
Cargo.toml: put inexamplesdirectory and runcargo run --example hello-redis - Register
bininCargo.tomlandcargo run --bin [name} -- {--args}.
- File not registered in
Modules and code organization
- Calling
mod Aormod crate::path_to::AbringsAscope. - Calling
pub mod AbringsAinto scope and enables the current module’s parents referencemodule.A - Calling
use A::submoduledirectly makessubmodule::something_elseavailable in the scope ofmodule. - Calling
pub use A::submoduleallows parents to usemodule::submodule.
Applying style linters
VSCode setting:
rustup component add rustfmt- Open user settings (JSON), and add
- In the command palette, enable “Format document”.
Async programming
Lazy asyncs, and tokio-main macro
Unlike languages like OCaml (similar construct Deferred.t), async operations are lazy. An async return value is analogous to a zero-argument closure. The macro #[tokio::main] provides the syntactic sugar to block on an async function:
#[tokio::main]
async fn main() {
println!("hello");
}
// Equivalent code:
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
println!("hello");
})
}Important implication of lazy futures
Fundamental future construct in Rust is poll. Different futures operations result in different polling behavior, resulting in concurrency / parallelism.
- When
.awaitis called sequentially, e.g.f1.await; f2.await;, the underlying tasks are executed sequentially, sincetask1essentially “starts executing” whenf1.awaitgets called, unlike eager languages. After obtaining a future:future.await: this does not imply concurrency by itself.tokio::spawn: scheduler is free to run task whenever it sees fit. For CPU intensive tasks, this might result in parallelism.tokio::select!: make concurrent progress on multiple futures and wait for the first one to complete, andtokio::join!waits for all of them to complete.
- This is unlike eager-future languages (e.g. OCaml), where sequential
.awaits immediately imply concurrency.
Minimal server implementation
Some networking basics:
- Server on a specific
ipaddress exposes ports. - A port can host multiple (thousands+) of
sockets. Each socket denotes a specific connection. Each time a client connects, OS creates a unique socket for that conversation. - The OS identifies each TCP connection with a unique 4-tuple of (source IP, source port, destination IP, destination port).
- Source IP and port are the same for every connection, while each client connects from its own IP and a random ephemeral port.
Tokio notes:
tokio::spawnconsumes a closure and starts a task and returns aJoinHandlewhich, uponawait, returns aResult<return_type>. Each task only requires 64-bytes of memory.- Argument closure to
tokio::spawnmust implementSendtrait. Realistically, this means that any captured variable implementsSend(i.e. safe to transfer across threads).
use mini_redis::{Connection, Frame};
use tokio::net::{TcpListener, TcpStream};
// https://tokio.rs/tokio/tutorial/spawning
// Copy to `src/main.rs` to run.
#[tokio::main]
async fn main() {
// Bind the listener to the address
let listener = TcpListener::bind("127.0.0.1:6379").await.unwrap();
loop {
// The second item contains the IP and port of the new connection.
// This blocks until some socket connects to the listener's port
let (socket, socket_addr) = listener.accept().await.unwrap();
// This prints out e.g. `127.0.0.1:58646`. Last number is random for each connection.
println!("Got connection from {:?}", socket_addr);
// This implementation blocks the main loop for each incoming socket connection
// process(socket).await;
// Alternatively, spawn a new task for each inbound socket
tokio::spawn(async move {
process(socket).await;
});
}
}
async fn process(socket: TcpStream) {
// The `Connection` lets us read/write redis **frames** instead of
// byte streams. The `Connection` type is defined by mini-redis.
let mut connection = Connection::new(socket);
if let Some(frame) = connection.read_frame().await.unwrap() {
println!("GOT: {:?}", frame);
// Respond with an error
let response = Frame::Error("unimplemented".to_string());
connection.write_frame(&response).await.unwrap();
}
}Rough-edge: Send
TO be more precise, in the argument closure to tokio::spawn, all data whose ownership span intersects with .await calls must implement Send trait. Note that RC<..> does not implement Send:
use tokio::task::yield_now;
use std::rc::Rc;
// This function would compile
#[tokio::main]
async fn main() {
tokio::spawn(async {
// The scope forces `rc` to drop before `.await`.
{
let rc = Rc::new("hello");
println!("{}", rc);
}
// `rc` is no longer used. It is **not** persisted when
// the task yields to the scheduler
yield_now().await;
});
}
// This function **would not** compile
#[tokio::main]
async fn main() {
tokio::spawn(async {
let rc = Rc::new("hello");
// `rc` is used after `.await`. It must be persisted to
// the task's state.
yield_now().await;
println!("{}", rc);
});
}Rust futures under the hood
In Rust, a Future is a struct which implements the Future trait with Output type and poll method.
struct Delay {when: Instant,}
impl Future for Delay {
type Output = &'static str;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<&'static str> {
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Ignore this line for now.
cx.waker().wake_by_ref();
Poll::Pending
}}}
#[tokio::main]
async fn main() {
let when = Instant::now() + Duration::from_millis(10);
let future = Delay { when };
let out = future.await;
assert_eq!(out, "done");
}Under the hood, the async-main roughly returns a future of the following construction:
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};
enum MainFuture {
// Initialized, never polled
State0,
// Waiting on `Delay`, i.e. the `future.await` line.
State1(Delay),
// The future has completed.
Terminated,
}
impl Future for MainFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<()> {
use MainFuture::*;
loop {
match *self {
State0 => {
let when = Instant::now() +
Duration::from_millis(10);
let future = Delay { when };
*self = State1(future);
}
State1(ref mut my_future) => {
match Pin::new(my_future).poll(cx) {
Poll::Ready(out) => {
assert_eq!(out, "done");
*self = Terminated;
return Poll::Ready(());
}
Poll::Pending => {
return Poll::Pending;
}}}
Terminated => {
panic!("future polled after completion")
}}}}}Salient features:
- When polled (i.e. computation should proceed), it proceeds as much as possible.
- In this case, up to the
.awaitstatement in the async-main function.
- In this case, up to the
- After encountering a child future, it polls the nested future and returns its value.
- Proceed until we’ve completed the last child future.
The runtime therefore just needs to _call poll in a smart way__ on the outermost future.
Wakers
Without the cx: &mut Context argument to fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output>;, we could only implement a continuously polling system which burns CPU cycles. A future can call its cx.waker().wake() to signify that it’s ready to take new tasks.
- A future returning
Poll::Pendingmust ensure that the waker is signaled at some point, else it may hang indefinitely.
This results in the new Delay poll method:
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<&'static str>
{
if Instant::now() >= self.when {
println!("Hello world");
Poll::Ready("done")
} else {
// Get a handle to the waker for the current task
let waker = cx.waker().clone();
let when = self.when;
// Spawn a timer thread.
thread::spawn(move || {
let now = Instant::now();
if now < when { thread::sleep(when - now); }
waker.wake();
});
Poll::Pending
}