Skip to content

Commit

Permalink
Allow capturing logging endpoint messages
Browse files Browse the repository at this point in the history
Some integration tests need to capture log output in order to verify
that logging endpoints work correctly from guests, but it's also
potentially useful for people using Viceroy as a library.

The previous way that the integration tests did this was not reliable
when multiple tests ran in parallel, because there was only one global
hook for capturing logs. Exposing this as a configuration option on
execution contexts instead allows each test to set independent capture
buffers.

I guess this should be considered a breaking API change since
`viceroy_lib::logging::LOG_WRITER` was exported publicly from the crate.
  • Loading branch information
jameysharp committed Jun 28, 2024
1 parent c75ee9d commit ed292ae
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 53 deletions.
19 changes: 16 additions & 3 deletions cli/tests/integration/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

use futures::stream::StreamExt;
use hyper::{service, Body as HyperBody, Request, Response, Server, Uri};
use std::net::Ipv4Addr;
use std::{
collections::HashSet, convert::Infallible, future::Future, net::SocketAddr, path::PathBuf,
sync::Arc,
collections::HashSet,
convert::Infallible,
future::Future,
io::Write,
net::{Ipv4Addr, SocketAddr},
path::PathBuf,
sync::{Arc, Mutex},
};
use tracing_subscriber::filter::EnvFilter;
use viceroy_lib::config::UnknownImportBehavior;
Expand Down Expand Up @@ -56,6 +60,7 @@ pub struct Test {
geolocation: Geolocation,
object_stores: ObjectStores,
secret_stores: SecretStores,
capture_logs: Arc<Mutex<dyn Write + Send>>,
log_stdout: bool,
log_stderr: bool,
via_hyper: bool,
Expand All @@ -77,6 +82,7 @@ impl Test {
geolocation: Geolocation::new(),
object_stores: ObjectStores::new(),
secret_stores: SecretStores::new(),
capture_logs: Arc::new(Mutex::new(std::io::stdout())),
log_stdout: false,
log_stderr: false,
via_hyper: false,
Expand All @@ -98,6 +104,7 @@ impl Test {
geolocation: Geolocation::new(),
object_stores: ObjectStores::new(),
secret_stores: SecretStores::new(),
capture_logs: Arc::new(Mutex::new(std::io::stdout())),
log_stdout: false,
log_stderr: false,
via_hyper: false,
Expand Down Expand Up @@ -212,6 +219,11 @@ impl Test {
self
}

pub fn capture_logs(mut self, capture_logs: Arc<Mutex<dyn Write + Send>>) -> Self {
self.capture_logs = capture_logs;
self
}

/// Treat stderr as a logging endpoint for this test.
pub fn log_stderr(self) -> Self {
Self {
Expand Down Expand Up @@ -299,6 +311,7 @@ impl Test {
.with_geolocation(self.geolocation.clone())
.with_object_stores(self.object_stores.clone())
.with_secret_stores(self.secret_stores.clone())
.with_capture_logs(self.capture_logs.clone())
.with_log_stderr(self.log_stderr)
.with_log_stdout(self.log_stdout);

Expand Down
23 changes: 8 additions & 15 deletions cli/tests/integration/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,44 +3,37 @@ use {
hyper::StatusCode,
std::{
io::{self, Write},
sync::mpsc,
sync::{Arc, Mutex},
},
viceroy_lib::logging,
};

struct LogWriter(mpsc::Sender<Vec<u8>>);
struct LogWriter(Vec<Vec<u8>>);

impl Write for LogWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match self.0.send(buf.to_owned()) {
Ok(()) => Ok(buf.len()),
Err(_) => Err(io::ErrorKind::ConnectionReset.into()),
}
self.0.push(buf.to_owned());
Ok(buf.len())
}

fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}

fn setup_log_writer() -> mpsc::Receiver<Vec<u8>> {
let (send, recv) = mpsc::channel();
*logging::LOG_WRITER.lock().unwrap() = Box::new(LogWriter(send));
recv
}

#[tokio::test(flavor = "multi_thread")]
async fn logging_works() -> TestResult {
let log_recv = setup_log_writer();
let log_writer = Arc::new(Mutex::new(LogWriter(Vec::new())));
let resp = Test::using_fixture("logging.wasm")
.capture_logs(log_writer.clone())
.log_stderr()
.log_stdout()
.against_empty()
.await?;

assert_eq!(resp.status(), StatusCode::OK);

let read_log_line = || String::from_utf8(log_recv.recv().unwrap()).unwrap();
let mut logs = std::mem::take(&mut log_writer.lock().unwrap().0).into_iter();
let mut read_log_line = || String::from_utf8(logs.next().unwrap()).unwrap();

assert_eq!(read_log_line(), "inigo :: Who are you?\n");
assert_eq!(read_log_line(), "mib :: No one of consequence.\n");
Expand Down
35 changes: 25 additions & 10 deletions lib/src/execute.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
//! Guest code execution.

use std::time::SystemTime;

use wasmtime::GuestProfiler;

use crate::config::UnknownImportBehavior;

use {
crate::{
adapt,
body::Body,
component as compute,
config::{Backends, DeviceDetection, Dictionaries, ExperimentalModule, Geolocation},
config::{
Backends, DeviceDetection, Dictionaries, ExperimentalModule, Geolocation,
UnknownImportBehavior,
},
downstream::prepare_request,
error::ExecutionError,
linking::{create_store, link_host_functions, ComponentCtx, WasmCtx},
Expand All @@ -25,18 +22,19 @@ use {
std::{
collections::HashSet,
fs,
io::Write,
net::{IpAddr, Ipv4Addr},
path::{Path, PathBuf},
sync::atomic::{AtomicBool, AtomicU64, Ordering},
sync::Arc,
sync::{Arc, Mutex},
thread::{self, JoinHandle},
time::{Duration, Instant},
time::{Duration, Instant, SystemTime},
},
tokio::sync::oneshot::{self, Sender},
tracing::{event, info, info_span, warn, Instrument, Level},
wasmtime::{
component::{self, Component},
Engine, InstancePre, Linker, Module, ProfilingStrategy,
Engine, GuestProfiler, InstancePre, Linker, Module, ProfilingStrategy,
},
wasmtime_wasi::I32Exit,
};
Expand Down Expand Up @@ -80,6 +78,8 @@ pub struct ExecuteCtx {
dictionaries: Arc<Dictionaries>,
/// Path to the config, defaults to None
config_path: Arc<Option<PathBuf>>,
/// Where to direct logging endpoint messages, defaults to stdout
capture_logs: Arc<Mutex<dyn Write + Send>>,
/// Whether to treat stdout as a logging endpoint
log_stdout: bool,
/// Whether to treat stderr as a logging endpoint
Expand Down Expand Up @@ -217,6 +217,7 @@ impl ExecuteCtx {
tls_config: TlsConfig::new()?,
dictionaries: Arc::new(Dictionaries::default()),
config_path: Arc::new(None),
capture_logs: Arc::new(Mutex::new(std::io::stdout())),
log_stdout: false,
log_stderr: false,
next_req_id: Arc::new(AtomicU64::new(0)),
Expand Down Expand Up @@ -295,6 +296,18 @@ impl ExecuteCtx {
self
}

/// Where to direct logging endpoint messages. Defaults to stdout.
pub fn capture_logs(&self) -> Arc<Mutex<dyn Write + Send>> {
self.capture_logs.clone()
}

/// Set where to direct logging endpoint messages for this execution
/// context. Defaults to stdout.
pub fn with_capture_logs(mut self, capture_logs: Arc<Mutex<dyn Write + Send>>) -> Self {
self.capture_logs = capture_logs;
self
}

/// Whether to treat stdout as a logging endpoint.
pub fn log_stdout(&self) -> bool {
self.log_stdout
Expand Down Expand Up @@ -427,6 +440,7 @@ impl ExecuteCtx {
req,
sender,
remote,
&self,
self.backends.clone(),
self.device_detection.clone(),
self.geolocation.clone(),
Expand Down Expand Up @@ -580,6 +594,7 @@ impl ExecuteCtx {
req,
sender,
remote,
&self,
self.backends.clone(),
self.device_detection.clone(),
self.geolocation.clone(),
Expand Down
4 changes: 2 additions & 2 deletions lib/src/linking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,13 @@ fn make_wasi_ctx(ctx: &ExecuteCtx, session: &Session) -> WasiCtxBuilder {
.env("FASTLY_TRACE_ID", &format!("{:032x}", session.req_id()));

if ctx.log_stdout() {
wasi_ctx.stdout(LogEndpoint::new(b"stdout"));
wasi_ctx.stdout(LogEndpoint::new(b"stdout", ctx.capture_logs()));
} else {
wasi_ctx.inherit_stdout();
}

if ctx.log_stderr() {
wasi_ctx.stderr(LogEndpoint::new(b"stderr"));
wasi_ctx.stderr(LogEndpoint::new(b"stderr", ctx.capture_logs()));
} else {
wasi_ctx.inherit_stderr();
}
Expand Down
38 changes: 17 additions & 21 deletions lib/src/logging.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,22 @@
use {
lazy_static::lazy_static,
std::{
io::{self, Write},
sync::Mutex,
},
use std::{
io::{self, Write},
sync::{Arc, Mutex},
};

/// A logging endpoint, which for Viceroy is just a name.
pub struct LogEndpoint(Vec<u8>);

lazy_static! {
/// The underlying writer to use for all log messages. It defaults to `stdout`,
/// but can be redirected for tests. We make this a static, rather than e.g.
/// a field in `ExecuteCtx`, because the `Write` implementation for `LogEndpoint`
/// doesn't have direct access to context data.
pub static ref LOG_WRITER: Mutex<Box<dyn Write + Send>> = Mutex::new(Box::new(io::stdout()));
#[derive(Clone)]
pub struct LogEndpoint {
name: Vec<u8>,
writer: Arc<Mutex<dyn Write + Send>>,
}

impl LogEndpoint {
/// Allocate a new `LogEndpoint` with the given name.
pub fn new(name: &[u8]) -> LogEndpoint {
LogEndpoint(name.to_owned())
pub fn new(name: &[u8], writer: Arc<Mutex<dyn Write + Send>>) -> LogEndpoint {
LogEndpoint {
name: name.to_owned(),
writer,
}
}

/// Write a log entry to this endpoint.
Expand All @@ -44,9 +40,9 @@ impl LogEndpoint {

// Accumulate log entry into a buffer before writing, while escaping newlines
let mut to_write =
Vec::with_capacity(msg.len() + self.0.len() + LOG_ENDPOINT_DELIM.len() + 1);
Vec::with_capacity(msg.len() + self.name.len() + LOG_ENDPOINT_DELIM.len() + 1);

to_write.extend_from_slice(&self.0);
to_write.extend_from_slice(&self.name);
to_write.extend_from_slice(LOG_ENDPOINT_DELIM);
for &byte in msg {
if byte == b'\n' {
Expand All @@ -57,7 +53,7 @@ impl LogEndpoint {
}
to_write.push(b'\n');

LOG_WRITER.lock().unwrap().write_all(&to_write)
self.writer.lock().unwrap().write_all(&to_write)
}
}

Expand All @@ -68,13 +64,13 @@ impl Write for LogEndpoint {
}

fn flush(&mut self) -> io::Result<()> {
LOG_WRITER.lock().unwrap().flush()
self.writer.lock().unwrap().flush()
}
}

impl wasmtime_wasi::StdoutStream for LogEndpoint {
fn stream(&self) -> Box<dyn wasmtime_wasi::HostOutputStream> {
Box::new(LogEndpoint(self.0.clone()))
Box::new(self.clone())
}

fn isatty(&self) -> bool {
Expand Down
16 changes: 14 additions & 2 deletions lib/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,19 @@ use {
ObjectStoreHandle, PendingKvDeleteHandle, PendingKvInsertHandle, PendingKvLookupHandle,
PendingRequestHandle, RequestHandle, ResponseHandle, SecretHandle, SecretStoreHandle,
},
ExecuteCtx,
},
cranelift_entity::{entity_impl, PrimaryMap},
futures::future::{self, FutureExt},
http::{request, response, HeaderMap, Request, Response},
std::{collections::HashMap, future::Future, net::IpAddr, path::PathBuf, sync::Arc},
std::{
collections::HashMap,
future::Future,
io::Write,
net::IpAddr,
path::PathBuf,
sync::{Arc, Mutex},
},
tokio::sync::oneshot::Sender,
};

Expand Down Expand Up @@ -65,6 +73,8 @@ pub struct Session {
/// [parts]: https://docs.rs/http/latest/http/response/struct.Parts.html
/// [resp]: https://docs.rs/http/latest/http/response/struct.Response.html
resp_parts: PrimaryMap<ResponseHandle, Option<response::Parts>>,
/// Where to direct logging endpoint messages.
capture_logs: Arc<Mutex<dyn Write + Send>>,
/// A handle map for logging endpoints.
log_endpoints: PrimaryMap<EndpointHandle, LogEndpoint>,
/// A by-name map for logging endpoints.
Expand Down Expand Up @@ -131,6 +141,7 @@ impl Session {
req: Request<Body>,
resp_sender: Sender<Response<Body>>,
client_ip: IpAddr,
ctx: &ExecuteCtx,
backends: Arc<Backends>,
device_detection: Arc<DeviceDetection>,
geolocation: Arc<Geolocation>,
Expand Down Expand Up @@ -158,6 +169,7 @@ impl Session {
req_parts,
resp_parts: PrimaryMap::new(),
downstream_resp: DownstreamResponse::new(resp_sender),
capture_logs: ctx.capture_logs(),
log_endpoints: PrimaryMap::new(),
log_endpoints_by_name: HashMap::new(),
backends,
Expand Down Expand Up @@ -523,7 +535,7 @@ impl Session {
if let Some(handle) = self.log_endpoints_by_name.get(name).copied() {
return handle;
}
let endpoint = LogEndpoint::new(name);
let endpoint = LogEndpoint::new(name, self.capture_logs.clone());
let handle = self.log_endpoints.push(endpoint);
self.log_endpoints_by_name.insert(name.to_owned(), handle);
handle
Expand Down

0 comments on commit ed292ae

Please sign in to comment.