6.10 Async op

Overview

Like everything else in Javascript, async ops doesn't block the current execution till a result is available. Most of the ops implemented by Deno are sync ops. Only file system ops come in both variants: sync and async. This is because the file system ops could take time. The user decides to run operations as sync ops or async ops, wherever async ops are available.
The async op in our example is truncate. This is a simple async op but would be very useful to learn the principles behind async execution. Let's go step by step and understand how the async ops works end to end. We'll cover both JS and Rust.

JS part

User code

In the JS space, the hello world v2 program makes a call to truncate a file:
await Deno.truncate('/var/tmp/test.log');

truncate

Truncate is an async function. That's why the user program has to wait for its completion.
Truncate calls jsonOpAsync to run the op asynchronously. For sync ops, we saw that jsonOpSync was used.
async function truncate(path, len) {
await core.jsonOpAsync("op_truncate_async", { path, len: coerceLen(len) });
}
jsonOpASync takes the same arguments as it's sync equivalent. There is nothing returned by truncate, so there is no return.
However, if we some other async function, we can see how the result gets returned:
function realPath(path) {
return core.jsonOpAsync("op_realpath_async", { path });
}
In our example, the following gets passed in the args:
{ path: "/var/tmp/test.log", len: 0 }

jsonOpAsync

jsonOpAsync is very different from jsonOpSync. The reason being that jsonOpAsync won't get a response immediately from the low-level send function. Instead, the response would come sometime later, so jsonOpAsync needs a way to correlate the response to its corresponding request.
There are a number of steps performed by jsonOpAsync function:
async function jsonOpAsync(opName, args = {}, ...zeroCopy) {
setAsyncHandler(opsCache[opName], jsonOpAsyncHandler);
args.promiseId = nextPromiseId++;
const argsBuf = encodeJson(args);
dispatch(opName, argsBuf, ...zeroCopy);
let resolve, reject;
const promise = new Promise((resolve_, reject_) => {
resolve = resolve_;
reject = reject_;
});
promise.resolve = resolve;
promise.reject = reject;
promiseTable[args.promiseId] = promise;
return processResponse(await promise);
}
The number of steps performed by jsonOpAsync is significantly more than jsonOpSync. Let's go over all the steps in detail:
  • setAsyncHandler
    • For each op id, associate the async handler
    • Async handler is the same for all the ops: jsonOpAsyncHandler
  • Create a promise id
    • This is a simple incrementing number
  • Encode args
    • This is the same as jsonOpSync
    • Promise id is also sent
  • Dispatch operation
    • This is the same as jsonOpSync
  • Create a promise
    • Create a regular JS promise
  • Save id -> promise
    • Save the mapping in a table: promise id -> promise
  • await promise
    • Wait for the promise to resolve/reject
  • Decode response
    • This is the same as jsonOpSync
    • Get promise id back
  • Resolve promise
  • Process response
    • This is the same as jsonOpSync
The functions encode args, dispatch, decode response, and process response are the same as jsonOpSync, so they won't get discussed here. We'll only go over the new functions.

setAsyncHandler

For each op id, the async handler is associated. The async handler is generic and works for all the ops.
function setAsyncHandler(opId, cb) {
maybeInit();
assert(opId != null);
asyncHandlers[opId] = cb;
}
For each op id, the callback is stored. The callback is the function jsonOpAsyncHandler.

Create and save promise

A JS promise is created and it is saved against the promise id in a promise table. The promise id is sent to Rust and would come back when the low-level work is completed. Using the received promise id, the awaiting promise can be identified and resolved.
promiseTable[args.promiseId] = promise;
In our example, the promise table contains the following:
{ "2": Promise { <pending> } }

handleAsyncMessageFromRust

For each async operation finished by the lower layer, a callback is made to a function called handleAsyncMessageFromRust. This function simply calls the associated async handler which is the same for all the ops.
function handleAsyncMsgFromRust(opId, buf) {
if (buf) {
// This is the overflow_response case of deno::JsRuntime::poll().
asyncHandlers[opId](buf);
return;
}
while (true) {
const opIdBuf = shift();
if (opIdBuf == null) {
break;
}
assert(asyncHandlers[opIdBuf[0]] != null);
asyncHandlers[opIdBuf[0]](opIdBuf[1]);
}
}

jsonOpAsyncHandler

This function gets the promise id from the response and performs a lookup in the promise table. It resolves the waiting promise with received data.
function jsonOpAsyncHandler(buf) {
const res = decodeJson(buf);
const promise = promiseTable[res.promiseId];
delete promiseTable[res.promiseId];
promise.resolve(res);
}
In our example, the following response comes back:
{ ok: {}, promiseId: 2 }
When the promise gets resolved, the processResponse in the jsonOpAsync would get called and the response gets processed.
return processResponse(await promise);
This is all that happens on the JS side. Now let's change gears and see what happens on the Rust side.

Rust part

Send

We've already seen the send function in the previous section. The send function works differently for async ops:
let op = OpTable::route_op(op_id, state.op_state.clone(), bufs);
assert_eq!(state.shared.size(), 0);
match op {
Op::Sync(buf) if !buf.is_empty() => {
rv.set(boxed_slice_to_uint8array(scope, buf).into());
}
Op::Sync(_) => {}
Op::Async(fut) => {
let fut2 = fut.map(move |buf| (op_id, buf));
state.pending_ops.push(fut2.boxed_local());
state.have_unpolled_ops.set(true);
}
For async ops, send doesn't return an immediate response. Rather, send creates a future and add it to the pending_ops list. This list gets checked in every tick of the event loop. When the async op finishes, the future would get resolved and a callback goes to v8.
route_op is the same for both types of ops.

json_op_async

Although we've seen a preview of this op in one of the previous sections, it'd be useful to see the implementation again as we now know a lot more about async ops.
pub fn json_op_async<F, R>(op_fn: F) -> Box<OpFn>
where
F: Fn(Rc<RefCell<OpState>>, Value, BufVec) -> R + 'static,
R: Future<Output = Result<Value, AnyError>> + 'static,
{
let try_dispatch_op =
move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Result<Op, AnyError> {
let args: Value = serde_json::from_slice(&bufs[0])?;
let promise_id = args
.get("promiseId")
.and_then(Value::as_u64)
.ok_or_else(|| type_error("missing or invalid `promiseId`"))?;
let bufs = bufs[1..].into();
use crate::futures::FutureExt;
let fut = op_fn(state.clone(), args, bufs).map(move |result| {
json_serialize_op_result(
Some(promise_id),
result,
state.borrow().get_error_class_fn,
)
});
Ok(Op::Async(Box::pin(fut)))
};
Box::new(move |state: Rc<RefCell<OpState>>, bufs: BufVec| -> Op {
match try_dispatch_op(state.clone(), bufs) {
Ok(op) => op,
Err(err) => Op::Sync(json_serialize_op_result(
None,
Err(err),
state.borrow().get_error_class_fn,
)),
}
})
}
Unlike json_op_sync which directly called the op function and returned the result, json_op_async does the following:
  • Get promiseId from the args
    • This promiseId is the same that was allocated for this call of op
    • This promiseId comes from JS space
  • Create a future for op execution
  • When the future is done,
    • Send promiseId and result back
Handling promiseId is a critical work of the async function. As we've seen earlier, this promiseId is mandatory to correlate requests and responses in JS space.
The future created here would get polled in the event loop.

op_truncate_async

This is the implementation of the low-level op. As this is also an async op, there were two ways to implement it:
  • Use tokio's async functions
  • Use tokio tasks
As this op doesn't have an equivalent tokio async function, Deno makes use of the tokio task to carry out the op asynchronously:
async fn op_truncate_async(
state: Rc<RefCell<OpState>>,
args: Value,
_zero_copy: BufVec,
) -> Result<Value, AnyError> {
let args: TruncateArgs = serde_json::from_value(args)?;
let path = PathBuf::from(&args.path);
let len = args.len;
{
let state = state.borrow();
state.borrow::<Permissions>().check_write(&path)?;
}
tokio::task::spawn_blocking(move || {
debug!("op_truncate_async {} {}", path.display(), len);
let f = std::fs::OpenOptions::new().write(true).open(&path)?;
f.set_len(len)?;
Ok(json!({}))
})
.await
.unwrap()
}
The only major step is to spawn a blocking task on tokio which would carry out the low-level file system asynchronously. There is no return value from the truncate op, so the return is simply an empty object: {}.

Event loop

We've seen how the requests are made from JS and how responses are handled. We've also seen how op gets executed in Rust. The only thing remaining to see is how the async op responses go back from Rust to JS or v8. This happens in the event loop.
Recall that the send function saves async ops future into a pending which would get polled in each tick of the event loop:
Op::Async(fut) => {
let fut2 = fut.map(move |buf| (op_id, buf));
state.pending_ops.push(fut2.boxed_local());
state.have_unpolled_ops.set(true);
}
Here is a relevant portion of the event loop where ops are checked and handled:
pub fn poll_event_loop(
&mut self,
cx: &mut Context,
) -> Poll<Result<(), AnyError>> {
// -- CODE OMITTED ---
// Ops
{
let overflow_response = self.poll_pending_ops(cx);
self.async_op_response(overflow_response)?;
// -- CODE OMITTED ---
}
// -- CODE OMITTED ---
let has_pending_ops = !state.pending_ops.is_empty();
// -- CODE OMITTED ---
if !has_pending_ops
// -- CODE OMITTED ---
{
return Poll::Ready(Ok(()));
}
// Check if more async ops have been dispatched
// during this turn of event loop.
if state.have_unpolled_ops.get() {
state.waker.wake();
}
if has_pending_module_evaluation {
if has_pending_ops
// -- CODE OMITTED ---
{
// pass, will be polled again
} else {
let msg = "Module evaluation is still pending but there are no pending ops or dynamic imports. This situation is often caused by unresolved promise.";
return Poll::Ready(Err(generic_error(msg)));
}
}
// -- CODE OMITTED ---
Poll::Pending
}
}
The checking and handling of async ops happen in two steps:
  • Poll pending ops
    • Check the futures of the pending ops
  • Handle resolved futures
    • The future that has been resolved gets handled here
    • This makes a callback to v8 via shared queue or the recv callback
Pending async ops would get polled in every tick of the event loop. The resolved futures would be processed, while the unresolved futures would be polled in the next tick.

poll_pending_ops

Polling of pending ops checks for the pending ops and returns the one that has been finished.
fn poll_pending_ops(
&mut self,
cx: &mut Context,
) -> Option<(OpId, Box<[u8]>)> {
let state_rc = Self::state(self.v8_isolate());
let mut overflow_response: Option<(OpId, Box<[u8]>)> = None;
loop {
// -- CODE OMITTED --
let pending_r = state.pending_ops.poll_next_unpin(cx);
match pending_r {
Poll::Ready(None) => break,
Poll::Pending => break,
Poll::Ready(Some((op_id, buf))) => {
let successful_push = state.shared.push(op_id, &buf);
if !successful_push {
// If we couldn't push the response to the shared queue, because
// there wasn't enough size, we will return the buffer via the
// legacy route, using the argument of deno_respond.
overflow_response = Some((op_id, buf));
break;
}
}
};
}
// -- CODE OMITTED --
overflow_response
}
Polling of pending ops is quite straightforward:
  • If the future is still pending,
    • continue
  • If the future is resolved,
    • Push op_id and buf to the shared queue
    • If the buf is too big to go into shared queue,
      • push it through recv
In chapter 4, we've already seen how a shared queue is used between Rust and JS. Once the shared queue gets a message from Rust, it calls the callback to handle async message from rust.
--
That's all about async ops.