4.5 Shared queue

Overview

The primary use of the shared queue is to quickly send async op responses from Deno to V8 or from Rust to Javascript. As both Rust and V8 run in the same thread, it is possible for them to share a memory block.
Here are some of the key properties of the shared queue:
  • It is a fixed-sized shared memory block
  • It is accessible by both (Rust and V8/JS)
  • It has a header block that contains the metadata
  • It has a record block that contains the op response data
  • It is implemented as a queue
Under heavy load conditions, the shared queue enables a very efficient way to send op responses. A single callback is enough to read everything present in the queue. The shared queue is very efficient as it avoids multiple callbacks. A single callback can notify V8 to process all the responses queued up in the shared queue.

Layout

As the shared queue is a memory block, there is a predefined format to it. The shared queue has a fixed layout that is agreed upon by both parties. Here is the layout of the shared queue:
The shared queue has a header that has all the required information and pointers to correctly read all the queued records. The header values are set by Rust. The header values get reset by JS as and when records are read.
Here is the use of each of the fields present in the header:

NUM_RECORDS

This holds the number of records present in the shared queue. This number is incremented after a record is inserted, and decremented after a record is removed.
  • Default value: 0
  • Position index: 0
  • Memory used: 4 bytes
  • Purpose: Total number of records present in the queue
  • Incremented by Rust
  • Decremented by JS

NUM_SHIFTED_OFF

This holds the number of records that have been shifted off or read by the reader. This is useful when there is more than one record in the queue.
  • Default value: 0
  • Position index: 1
  • Memory used: 4 bytes
  • Purpose: Number of records that have been read by JS
  • Incremented by JS
This is a pointer to the place from where the next record should be inserted. This moves forward as records are inserted, and moves backward as records are removed.
  • Default value: 812
  • Position index: 2
  • Memory used: 4 bytes
  • Purpose: Pointer to the place from where the next record should be inserted
  • Incremented by Rust
  • Decremented by JS

OFFSETS

This holds tuples containing metadata (end pointer, op id) related to the records that are getting inserted in the queue.
  • Default value: NA
  • Start position index: 3
  • Index range: 3 to 811
  • Memory used: 808 bytes
  • Purpose: Pointer to the metadata that contains information about the data present in the records area
  • Inserted by Rust
  • Removed by JS
Each entry in offset is a tuple. The offset area supports up to a maximum of 100 tuples. Each tuple consists of:
  • End index of the record
  • Op id for the record
Only the end index is stored in the tuple. The reason being that the start index of the next record is always the index following the end index of the previous record.

RECORDS

This is the area that holds the records. The records are variable-sized. It can hold up to 100 records, though the space taken could vary depending on the size of records.
  • Default value: NA
  • Start position index: 812
  • Index range: 812 to around 12800
  • Memory used: ~ 12000 bytes
  • Purpose: Actual record data
  • Inserted by Rust
  • Removed by JS

Rust part

An async op response can be sent to V8/JS in one of the following two ways:
  • Via shared queue
  • Via traditional callback
There is a simple logic to choose from one of the above ways:
  • Try to push response data into the shared queue
    • Call the recv callback function without params
  • If push fails for any reason,
    • Call the recv callback function with the response data
Here is an abbreviated code from the event loop that process the responses from the async ops:
fn poll_pending_ops(
&mut self,
cx: &mut Context,
) -> Option<(OpId, Box<[u8]>)> {
let state_rc = Self::state(self.v8_isolate());
let mut overflow_response: Option<(OpId, Box<[u8]>)> = None;
loop {
let mut state = state_rc.borrow_mut();
// Now handle actual ops.
state.have_unpolled_ops.set(false);
let pending_r = state.pending_ops.poll_next_unpin(cx);
match pending_r {
Poll::Ready(None) => break,
Poll::Pending => break,
Poll::Ready(Some((op_id, buf))) => {
let successful_push = state.shared.push(op_id, &buf);
if !successful_push {
overflow_response = Some((op_id, buf));
break;
}
}
};
}
// -- CODE OMITTED --
overflow_response
}
  • For any async ops that have been completed
    • Push response data to shared queue
    • If push fails
      • Save into overflow_response
The next step is to make the callback and/or process the overflow_response:
fn async_op_response(
&mut self,
maybe_overflown_response: Option<(OpId, Box<[u8]>)>,
) -> Result<(), AnyError> {
let state_rc = Self::state(self.v8_isolate());
let shared_queue_size = state_rc.borrow().shared.size();
if shared_queue_size == 0 && maybe_overflown_response.is_none() {
return Ok(());
}
let js_recv_cb_handle = state_rc
.borrow()
.js_recv_cb
.clone()
.expect("Deno.core.recv has not been called.");
let context = self.global_context();
let scope = &mut v8::HandleScope::with_context(self.v8_isolate(), context);
let context = scope.get_current_context();
let global: v8::Local<v8::Value> = context.global(scope).into();
let js_recv_cb = js_recv_cb_handle.get(scope);
let tc_scope = &mut v8::TryCatch::new(scope);
if shared_queue_size > 0 {
js_recv_cb.call(tc_scope, global, &[]);
let shared_queue_size = state_rc.borrow().shared.size();
assert_eq!(shared_queue_size, 0);
}
if let Some(overflown_response) = maybe_overflown_response {
let (op_id, buf) = overflown_response;
let op_id: v8::Local<v8::Value> =
v8::Integer::new(tc_scope, op_id as i32).into();
let ui8: v8::Local<v8::Value> =
bindings::boxed_slice_to_uint8array(tc_scope, buf).into();
js_recv_cb.call(tc_scope, global, &[op_id, ui8]);
}
match tc_scope.exception() {
None => Ok(()),
Some(exception) => exception_to_err_result(tc_scope, exception, false),
}
}
There are three important parts in above function:
  • If shared queue is empty or there is no overflow_response
    • Return
  • If shared queue size is not empty
    • Callback js_recv_cb without data
  • If overflow_response is present
    • Callback js_recv_cb with response data
Sending via shared queue is more efficient that sending via traditional callback.
Finally, here is the push function of the shared queue data structure:
pub fn push(&mut self, op_id: OpId, record: &[u8]) -> bool {
let off = self.head();
assert_eq!(off % 4, 0);
let end = off + record.len();
let aligned_end = (end + 3) & !3;
let index = self.num_records();
if aligned_end > self.bytes().len() || index >= MAX_RECORDS {
debug!("WARNING the sharedQueue overflowed");
return false;
}
assert_eq!(aligned_end % 4, 0);
self.set_meta(index, end, op_id);
assert_eq!(end - off, record.len());
self.bytes_mut()[off..end].copy_from_slice(record);
let u32_slice = self.as_u32_slice_mut();
u32_slice[INDEX_NUM_RECORDS] += 1;
u32_slice[INDEX_HEAD] = aligned_end as u32;
true
}
}
fn set_meta(&mut self, index: usize, end: usize, op_id: OpId) {
let s = self.as_u32_slice_mut();
s[INDEX_OFFSETS + 2 * index] = end as u32;
s[INDEX_OFFSETS + 2 * index + 1] = op_id.try_into().unwrap();
}
The push function is quite straightforward:
  • Save the end pointer and op id into offsets via set_meta
    • End and op id are saved as a tuple
  • Copy bytes into the records area
  • Increment number of records
  • Set head to the end of the inserted record

JS part

The JS space receives a callback whenever there is a new entry or entries in the shared queue. Depending on the case, the callback could be empty (if response is present in shared queue) or the callback could contain the data (if response couldn't fit into shared queue).
Once JS receives the callback, it reads all the records available in the shared queue. When finished, it resets all the pointers.
JS receives callback in both the cases:
function handleAsyncMsgFromRust(opId, buf) {
if (buf) {
// This is the overflow_response case of deno::JsRuntime::poll().
asyncHandlers[opId](buf);
return;
}
while (true) {
const opIdBuf = shift();
if (opIdBuf == null) {
break;
}
assert(asyncHandlers[opIdBuf[0]] != null);
asyncHandlers[opIdBuf[0]](opIdBuf[1]);
}
}
The logic is:
  • If buf has something
    • Process it (it should have came via overflow_response)
  • Else
    • Loop till there are records in the shared queue
      • Shift queue and get the last record
      • Process it
The only thing remaining to see is the shift function and it's helpers:
function shift() {
const i = shared32[INDEX_NUM_SHIFTED_OFF];
if (size() == 0) {
assert(i == 0);
return null;
}
const off = getOffset(i);
const [opId, end] = getMeta(i);
if (size() > 1) {
shared32[INDEX_NUM_SHIFTED_OFF] += 1;
} else {
reset();
}
assert(off != null);
assert(end != null);
const buf = sharedBytes.subarray(off, end);
return [opId, buf];
}
function getMeta(index) {
if (index >= numRecords()) {
return null;
}
const buf = shared32[INDEX_OFFSETS + 2 * index];
const opId = shared32[INDEX_OFFSETS + 2 * index + 1];
return [opId, buf];
}
function getOffset(index) {
if (index >= numRecords()) {
return null;
}
if (index == 0) {
return HEAD_INIT;
}
const prevEnd = shared32[INDEX_OFFSETS + 2 * (index - 1)];
return (prevEnd + 3) & ~3;
}
function size() {
return shared32[INDEX_NUM_RECORDS] - shared32[INDEX_NUM_SHIFTED_OFF];
}
Here is how shift function works:
  • Get offset of the record
  • Read metadata from the offset (tuple containing opid and end)
  • Copy bytes from previous end to this end
  • If the queue is empty,
    • reset the pointers (offset, head, etc.)
It's obvious to see that the reading logic in JS is exactly the same as the writing logic in Rust. The only difference being that it's a queue, so writing and reading happen in different order.