Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

⏱️ At a hostcall to get the amount of vcpu time that has passed in the guest, in milliseconds #412

Merged
merged 13 commits into from
Aug 16, 2024
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ base64 = "0.21.2"
clap = { version = "^4.0.18", features = ["derive"] }
hyper = { version = "=0.14.26", features = ["full"] }
itertools = "0.10.5"
pin-project = "1.0.8"
rustls = { version = "0.21.5", features = ["dangerous_configuration"] }
rustls-pemfile = "1.0.3"
serde_json = "1.0.59"
Expand Down
1 change: 1 addition & 0 deletions cli/tests/integration/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,4 @@ mod upstream;
mod upstream_async;
mod upstream_dynamic;
mod upstream_streaming;
mod vcpu_time;
28 changes: 28 additions & 0 deletions cli/tests/integration/vcpu_time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use crate::{
common::{Test, TestResult},
viceroy_test,
};
use hyper::{Request, Response, StatusCode};

viceroy_test!(vcpu_time_getter_works, |is_component| {
let req = Request::get("/")
.header("Accept", "text/html")
.body("Hello, world!")
.unwrap();

let resp = Test::using_fixture("vcpu_time_test.wasm")
.adapt_component(is_component)
.backend("slow-server", "/", None, |_| {
std::thread::sleep(std::time::Duration::from_millis(4000));
Response::builder()
.status(StatusCode::OK)
.body(vec![])
.unwrap()
})
.await
.against(req)
.await?;

assert_eq!(resp.status(), StatusCode::OK);
Ok(())
});
1 change: 1 addition & 0 deletions cli/tests/trap-test/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions crates/adapter/src/fastly/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,24 @@ pub mod fastly_abi {
}
}

pub mod fastly_compute_runtime {
use super::*;

#[export_name = "fastly_compute_runtime#get_vcpu_ms"]
pub fn get_vcpu_ms(vcpu_time_ms_out: *mut u64) -> FastlyStatus {
match crate::bindings::fastly::api::compute_runtime::get_vcpu_ms() {
Ok(time) => {
unsafe {
*vcpu_time_ms_out = time;
};
FastlyStatus::OK
}

Err(e) => e.into(),
}
}
}

pub mod fastly_uap {
use super::*;
use crate::bindings::fastly::api::uap;
Expand Down
1 change: 1 addition & 0 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ http-body = "^0.4.5"
hyper = { workspace = true }
itertools = { workspace = true }
lazy_static = "^1.4.0"
pin-project = { workspace = true }
regex = "^1.3.9"
rustls = "^0.21.1"
rustls-native-certs = "^0.6.3"
Expand Down
6 changes: 6 additions & 0 deletions lib/compute-at-edge-abi/compute-at-edge.witx
Original file line number Diff line number Diff line change
Expand Up @@ -965,3 +965,9 @@
(result $err (expected (error $fastly_status)))
)
)

(module $fastly_compute_runtime
(@interface func (export "get_vcpu_ms")
(result $err (expected $vcpu_ms (error $fastly_status)))
)
)
1 change: 1 addition & 0 deletions lib/compute-at-edge-abi/typenames.witx
Original file line number Diff line number Diff line change
Expand Up @@ -358,3 +358,4 @@
(typename $has u32)

(typename $body_length u64)
(typename $vcpu_ms u64)
Binary file modified lib/data/viceroy-component-adapter.wasm
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you rerun make adapter after merging main so that this includes both the vcpu hostcall and the inspect hostcall?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup! Rerunning it now shows me no changes with this PR.

Binary file not shown.
10 changes: 10 additions & 0 deletions lib/src/component/compute_runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use super::fastly::api::{compute_runtime, types};
use crate::session::Session;
use std::sync::atomic::Ordering;

#[async_trait::async_trait]
impl compute_runtime::Host for Session {
async fn get_vcpu_ms(&mut self) -> Result<u64, types::Error> {
Ok(self.active_cpu_time_us.load(Ordering::SeqCst) / 1000)
}
}
2 changes: 2 additions & 0 deletions lib/src/component/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@ pub fn link_host_functions(linker: &mut component::Linker<ComponentCtx>) -> anyh
fastly::api::types::add_to_linker(linker, |x| x.session())?;
fastly::api::uap::add_to_linker(linker, |x| x.session())?;
fastly::api::config_store::add_to_linker(linker, |x| x.session())?;
fastly::api::compute_runtime::add_to_linker(linker, |x| x.session())?;

Ok(())
}

pub mod async_io;
pub mod backend;
pub mod cache;
pub mod compute_runtime;
pub mod config_store;
pub mod device_detection;
pub mod dictionary;
Expand Down
59 changes: 54 additions & 5 deletions lib/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@ use {
upstream::TlsConfig,
Error,
},
futures::{
task::{Context, Poll},
Future,
},
hyper::{Request, Response},
pin_project::pin_project,
std::{
collections::HashSet,
fs,
io::Write,
net::{Ipv4Addr, SocketAddr},
path::{Path, PathBuf},
pin::Pin,
sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::{Arc, Mutex},
thread::{self, JoinHandle},
Expand Down Expand Up @@ -365,13 +371,22 @@ impl ExecuteCtx {
let req_id = self
.next_req_id
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
let active_cpu_time_us = Arc::new(AtomicU64::new(0));

// Spawn a separate task to run the guest code. That allows _this_ method to return a response early
// if the guest sends one, while the guest continues to run afterward within its task.
let guest_handle = tokio::task::spawn(
self.run_guest(req, req_id, sender, local, remote)
.instrument(info_span!("request", id = req_id)),
);
let guest_handle = tokio::task::spawn(CpuTimeTracking::new(
active_cpu_time_us.clone(),
self.run_guest(
req,
req_id,
sender,
local,
remote,
active_cpu_time_us.clone(),
)
.instrument(info_span!("request", id = req_id)),
));

let resp = match receiver.await {
Ok(resp) => (resp, None),
Expand Down Expand Up @@ -428,6 +443,7 @@ impl ExecuteCtx {
sender: Sender<Response<Body>>,
local: SocketAddr,
remote: SocketAddr,
active_cpu_time_us: Arc<AtomicU64>,
) -> Result<(), ExecutionError> {
info!("handling request {} {}", req.method(), req.uri());
let start_timestamp = Instant::now();
Expand All @@ -437,6 +453,7 @@ impl ExecuteCtx {
sender,
local,
remote,
active_cpu_time_us,
&self,
self.backends.clone(),
self.device_detection.clone(),
Expand Down Expand Up @@ -586,13 +603,15 @@ impl ExecuteCtx {
let (sender, receiver) = oneshot::channel();
let local = (Ipv4Addr::LOCALHOST, 80).into();
let remote = (Ipv4Addr::LOCALHOST, 0).into();
let active_cpu_time_us = Arc::new(AtomicU64::new(0));

let session = Session::new(
req_id,
req,
sender,
local,
remote,
active_cpu_time_us.clone(),
&self,
self.backends.clone(),
self.device_detection.clone(),
Expand Down Expand Up @@ -637,7 +656,8 @@ impl ExecuteCtx {
.map_err(ExecutionError::Typechecking)?;

// Invoke the entrypoint function and collect its exit code
let result = main_func.call_async(&mut store, ()).await;
let result =
CpuTimeTracking::new(active_cpu_time_us, main_func.call_async(&mut store, ())).await;

// If we collected a profile, write it to the file
write_profile(&mut store, self.guest_profile_path.as_ref().as_ref());
Expand Down Expand Up @@ -710,3 +730,32 @@ fn configure_wasmtime(

config
}

#[pin_project]
struct CpuTimeTracking<F> {
#[pin]
future: F,
time_spent: Arc<AtomicU64>,
}

impl<F> CpuTimeTracking<F> {
fn new(time_spent: Arc<AtomicU64>, future: F) -> Self {
CpuTimeTracking { future, time_spent }
}
}

impl<E, F: Future<Output = Result<(), E>>> Future for CpuTimeTracking<F> {
type Output = F::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let me = self.project();

let start = Instant::now();
let result = me.future.poll(cx);
// 2^64 microseconds is over half a million years, so I'm not terribly
// worried about this cast.
let runtime = start.elapsed().as_micros() as u64;
let _ = me.time_spent.fetch_add(runtime, Ordering::SeqCst);
result
}
}
1 change: 1 addition & 0 deletions lib/src/linking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ pub fn link_host_functions(
wiggle_abi::fastly_uap::add_to_linker(linker, WasmCtx::session)?;
wiggle_abi::fastly_async_io::add_to_linker(linker, WasmCtx::session)?;
wiggle_abi::fastly_backend::add_to_linker(linker, WasmCtx::session)?;
wiggle_abi::fastly_compute_runtime::add_to_linker(linker, WasmCtx::session)?;
link_legacy_aliases(linker)?;
Ok(())
}
Expand Down
5 changes: 5 additions & 0 deletions lib/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use std::future::Future;
use std::io::Write;
use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::sync::atomic::AtomicU64;
use std::sync::{Arc, Mutex};

use {
Expand Down Expand Up @@ -47,6 +48,8 @@ pub struct Session {
downstream_client_addr: SocketAddr,
/// The IP address and port that received this session.
downstream_server_addr: SocketAddr,
/// The amount of time we've spent on this session in ms.
pub active_cpu_time_us: Arc<AtomicU64>,
acw marked this conversation as resolved.
Show resolved Hide resolved
/// The compliance region that this request was received in.
///
/// For now this is just always `"none"`, but we place the field in the session
Expand Down Expand Up @@ -150,6 +153,7 @@ impl Session {
resp_sender: Sender<Response<Body>>,
server_addr: SocketAddr,
client_addr: SocketAddr,
active_cpu_time_us: Arc<AtomicU64>,
ctx: &ExecuteCtx,
backends: Arc<Backends>,
device_detection: Arc<DeviceDetection>,
Expand All @@ -176,6 +180,7 @@ impl Session {
downstream_req_handle,
downstream_req_body_handle,
downstream_req_original_headers,
active_cpu_time_us,
async_items,
req_parts,
resp_parts: PrimaryMap::new(),
Expand Down
1 change: 1 addition & 0 deletions lib/src/wiggle_abi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ macro_rules! multi_value_result {
mod backend_impl;
mod body_impl;
mod cache;
mod compute_runtime;
mod config_store;
mod device_detection_impl;
mod dictionary_impl;
Expand Down
14 changes: 14 additions & 0 deletions lib/src/wiggle_abi/compute_runtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use crate::error::Error;
use crate::session::Session;
use crate::wiggle_abi::fastly_compute_runtime::FastlyComputeRuntime;
use std::sync::atomic::Ordering;
use wiggle::GuestMemory;

impl FastlyComputeRuntime for Session {
fn get_vcpu_ms(&mut self, _memory: &mut GuestMemory<'_>) -> Result<u64, Error> {
// we internally track microseconds, because our wasmtime tick length
// is too short for ms to work. but we want to shrink this to ms to
// try to minimize timing attacks.
Ok(self.active_cpu_time_us.load(Ordering::SeqCst) / 1000)
}
}
9 changes: 9 additions & 0 deletions lib/wit/deps/fastly/compute.wit
Original file line number Diff line number Diff line change
Expand Up @@ -1086,6 +1086,14 @@ interface reactor {
serve: func(req: request-handle, body: body-handle) -> result;
}

interface compute-runtime {
use types.{error};

type vcpu-ms = u64;

get-vcpu-ms: func() -> result<vcpu-ms, error>;
}

world compute {
import wasi:clocks/[email protected];
import wasi:clocks/[email protected];
Expand All @@ -1101,6 +1109,7 @@ world compute {
import async-io;
import backend;
import cache;
import compute-runtime;
import dictionary;
import geo;
import device-detection;
Expand Down
Loading
Loading