diff --git a/Cargo.lock b/Cargo.lock index 4649f71a..1b5d5663 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2431,6 +2431,7 @@ version = "0.12.1" dependencies = [ "anyhow", "async-trait", + "base64", "bytes", "bytesize", "cfg-if", diff --git a/cli/tests/trap-test/Cargo.lock b/cli/tests/trap-test/Cargo.lock index c57429c8..d1ec759f 100644 --- a/cli/tests/trap-test/Cargo.lock +++ b/cli/tests/trap-test/Cargo.lock @@ -2344,6 +2344,7 @@ version = "0.12.1" dependencies = [ "anyhow", "async-trait", + "base64", "bytes", "bytesize", "cfg-if", diff --git a/crates/adapter/src/fastly/core.rs b/crates/adapter/src/fastly/core.rs index 5f6bc58e..ffbd1114 100644 --- a/crates/adapter/src/fastly/core.rs +++ b/crates/adapter/src/fastly/core.rs @@ -2677,6 +2677,21 @@ pub mod fastly_kv_store { PreconditionFailed, PayloadTooLarge, InternalError, + TooManyRequests, + } + + impl From for KvError { + fn from(value: kv_store::KvStatus) -> Self { + match value { + kv_store::KvStatus::Ok => Self::Ok, + kv_store::KvStatus::BadRequest => Self::BadRequest, + kv_store::KvStatus::NotFound => Self::NotFound, + kv_store::KvStatus::PreconditionFailed => Self::PreconditionFailed, + kv_store::KvStatus::PayloadTooLarge => Self::PayloadTooLarge, + kv_store::KvStatus::InternalError => Self::InternalError, + kv_store::KvStatus::TooManyRequests => Self::TooManyRequests, + } + } } #[export_name = "fastly_kv_store#open"] @@ -2735,22 +2750,25 @@ pub mod fastly_kv_store { pending_handle: PendingObjectStoreLookupHandle, body_handle_out: *mut BodyHandle, metadata_out: *mut u8, - metadata_len: *mut usize, + metadata_len: usize, + nwritten_out: *mut usize, generation_out: *mut u32, kv_error_out: *mut KvError, ) -> FastlyStatus { let res = match kv_store::lookup_wait(pending_handle) { - Ok(Some(res)) => res, - Ok(None) => { + Ok((res, status)) => { unsafe { - *kv_error_out = KvError::NotFound; + *kv_error_out = status.into(); } - return FastlyStatus::OK; + let Some(res) = res else { + return FastlyStatus::OK; + }; + + res } Err(e) => { unsafe { - // TODO: the wit interface doesn't return any KvError values *kv_error_out = KvError::Uninitialized; } @@ -2758,27 +2776,27 @@ pub mod fastly_kv_store { } }; - let max_len = unsafe { *metadata_len }; - with_buffer!( metadata_out, - max_len, - { res.metadata(u64::try_from(max_len).trapping_unwrap()) }, + metadata_len, + { res.metadata(u64::try_from(metadata_len).trapping_unwrap()) }, |res| { - let buf = handle_buffer_len!(res, metadata_len); + let buf = handle_buffer_len!(res, nwritten_out); unsafe { - *metadata_len = buf.as_ref().map(Vec::len).unwrap_or(0); + *nwritten_out = buf.as_ref().map(Vec::len).unwrap_or(0); } std::mem::forget(buf); } ); + let body = res.body(); + let generation = res.generation(); + unsafe { - *body_handle_out = res.body(); - *generation_out = res.generation(); - *kv_error_out = KvError::Ok; + *body_handle_out = body; + *generation_out = generation; } FastlyStatus::OK @@ -2839,18 +2857,16 @@ pub mod fastly_kv_store { kv_error_out: *mut KvError, ) -> FastlyStatus { match kv_store::insert_wait(pending_body_handle) { - Ok(_) => { + Ok(status) => { unsafe { - *kv_error_out = KvError::Ok; + *kv_error_out = status.into(); } FastlyStatus::OK } - // TODO: the wit interface doesn't return any KvError values Err(e) => { unsafe { - // TODO: the wit interface doesn't return any KvError values *kv_error_out = KvError::Uninitialized; } @@ -2890,9 +2906,9 @@ pub mod fastly_kv_store { kv_error_out: *mut KvError, ) -> FastlyStatus { match kv_store::delete_wait(pending_body_handle) { - Ok(_) => { + Ok(status) => { unsafe { - *kv_error_out = KvError::Ok; + *kv_error_out = status.into(); } FastlyStatus::OK @@ -2900,7 +2916,6 @@ pub mod fastly_kv_store { Err(e) => { unsafe { - // TODO: the wit interface doesn't return any KvError values *kv_error_out = KvError::Uninitialized; } @@ -2916,19 +2931,27 @@ pub mod fastly_kv_store { list_config: *const ListConfig, pending_body_handle_out: *mut PendingObjectStoreListHandle, ) -> FastlyStatus { - let mask = list_config_mask.into(); + let mask = kv_store::ListConfigOptions::from(list_config_mask); let config = unsafe { kv_store::ListConfig { mode: (*list_config).mode.into(), - cursor: { + cursor: if mask.contains(kv_store::ListConfigOptions::CURSOR) { let len = usize::try_from((*list_config).cursor_len).trapping_unwrap(); Vec::from_raw_parts((*list_config).cursor as *mut _, len, len) + } else { + Vec::new() + }, + limit: if mask.contains(kv_store::ListConfigOptions::LIMIT) { + (*list_config).limit + } else { + 0 }, - limit: (*list_config).limit, - prefix: { + prefix: if mask.contains(kv_store::ListConfigOptions::PREFIX) { let len = usize::try_from((*list_config).prefix_len).trapping_unwrap(); - Vec::from_raw_parts((*list_config).cursor as *mut _, len, len) + Vec::from_raw_parts((*list_config).prefix as *mut _, len, len) + } else { + Vec::new() }, } }; @@ -2957,10 +2980,10 @@ pub mod fastly_kv_store { kv_error_out: *mut KvError, ) -> FastlyStatus { match kv_store::list_wait(pending_body_handle) { - Ok(res) => { + Ok((res, status)) => { unsafe { - *kv_error_out = KvError::Ok; - *body_handle_out = res; + *kv_error_out = status.into(); + *body_handle_out = res.unwrap_or(INVALID_HANDLE); } FastlyStatus::OK @@ -2968,8 +2991,8 @@ pub mod fastly_kv_store { Err(e) => { unsafe { - // TODO: the wit interface doesn't return any KvError values *kv_error_out = KvError::Uninitialized; + *body_handle_out = INVALID_HANDLE; } e.into() diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 670141de..1361b6c1 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -64,6 +64,7 @@ wasmtime-wasi = { workspace = true } wasmtime-wasi-nn = { workspace = true } wat = { workspace = true } wiggle = { workspace = true } +base64 = { workspace = true } [dev-dependencies] tempfile = "3.6.0" diff --git a/lib/compute-at-edge-abi/compute-at-edge.witx b/lib/compute-at-edge-abi/compute-at-edge.witx index 231a402b..85d7e1b3 100644 --- a/lib/compute-at-edge-abi/compute-at-edge.witx +++ b/lib/compute-at-edge-abi/compute-at-edge.witx @@ -761,6 +761,7 @@ ) ) +;; NOTE: These are deprecated, use the fastly_kv_store hostcalls (module $fastly_object_store (@interface func (export "open") (param $name string) @@ -820,6 +821,79 @@ ) ) +(module $fastly_kv_store + (@interface func (export "open") + (param $name string) + (result $err (expected $kv_store_handle (error $fastly_status))) + ) + + (@interface func (export "lookup") + (param $store $kv_store_handle) + (param $key string) + (param $lookup_config_mask $kv_lookup_config_options) + (param $lookup_configuration (@witx pointer $kv_lookup_config)) + (param $handle_out (@witx pointer $kv_store_lookup_handle)) + (result $err (expected (error $fastly_status))) + ) + + (@interface func (export "lookup_wait") + (param $handle $kv_store_lookup_handle) + (param $body_handle_out (@witx pointer $body_handle)) + (param $metadata_buf (@witx pointer (@witx char8))) + (param $metadata_buf_len (@witx usize)) + (param $nwritten_out (@witx pointer (@witx usize))) + (param $generation_out (@witx pointer u32)) + (param $kv_error_out (@witx pointer $kv_error)) + (result $err (expected (error $fastly_status))) + ) + + (@interface func (export "insert") + (param $store $kv_store_handle) + (param $key string) + (param $body_handle $body_handle) + (param $insert_config_mask $kv_insert_config_options) + (param $insert_configuration (@witx pointer $kv_insert_config)) + (param $handle_out (@witx pointer $kv_store_insert_handle)) + (result $err (expected (error $fastly_status))) + ) + + (@interface func (export "insert_wait") + (param $handle $kv_store_insert_handle) + (param $kv_error_out (@witx pointer $kv_error)) + (result $err (expected (error $fastly_status))) + ) + + (@interface func (export "delete") + (param $store $kv_store_handle) + (param $key string) + (param $delete_config_mask $kv_delete_config_options) + (param $delete_configuration (@witx pointer $kv_delete_config)) + (param $handle_out (@witx pointer $kv_store_delete_handle)) + (result $err (expected (error $fastly_status))) + ) + + (@interface func (export "delete_wait") + (param $handle $kv_store_delete_handle) + (param $kv_error_out (@witx pointer $kv_error)) + (result $err (expected (error $fastly_status))) + ) + + (@interface func (export "list") + (param $store $kv_store_handle) + (param $list_config_mask $kv_list_config_options) + (param $list_configuration (@witx pointer $kv_list_config)) + (param $handle_out (@witx pointer $kv_store_list_handle)) + (result $err (expected (error $fastly_status))) + ) + + (@interface func (export "list_wait") + (param $handle $kv_store_list_handle) + (param $body_handle_out (@witx pointer $body_handle)) + (param $kv_error_out (@witx pointer $kv_error)) + (result $err (expected (error $fastly_status))) + ) +) + (module $fastly_secret_store (@interface func (export "open") (param $name string) diff --git a/lib/compute-at-edge-abi/typenames.witx b/lib/compute-at-edge-abi/typenames.witx index 7d192415..a8e18fba 100644 --- a/lib/compute-at-edge-abi/typenames.witx +++ b/lib/compute-at-edge-abi/typenames.witx @@ -91,14 +91,26 @@ (typename $endpoint_handle (handle)) ;;; A handle to an Edge Dictionary. (typename $dictionary_handle (handle)) -;;; A handle to an Object Store. +;;; (DEPRECATED) A handle to an Object Store. (typename $object_store_handle (handle)) -;;; A handle to a pending KV lookup request. +;;; (DEPRECATED) A handle to a pending KV lookup request. (typename $pending_kv_lookup_handle (handle)) -;;; A handle to a pending KV insert request. +;;; (DEPRECATED) A handle to a pending KV insert request. (typename $pending_kv_insert_handle (handle)) -;;; A handle to a pending KV delete request. +;;; (DEPRECATED) A handle to a pending KV delete request. (typename $pending_kv_delete_handle (handle)) +;;; (DEPRECATED) A handle to a pending KV list. +(typename $pending_kv_list_handle (handle)) +;;; A handle to an KV Store. +(typename $kv_store_handle (handle)) +;;; A handle to a KV Store lookup. +(typename $kv_store_lookup_handle (handle)) +;;; A handle to a KV Store insert. +(typename $kv_store_insert_handle (handle)) +;;; A handle to a KV Store delete. +(typename $kv_store_delete_handle (handle)) +;;; A handle to a KV Store list. +(typename $kv_store_list_handle (handle)) ;;; A handle to a Secret Store. (typename $secret_store_handle (handle)) ;;; A handle to an individual secret. @@ -385,3 +397,97 @@ (field $workspace_len u32) ) ) + +(typename $kv_lookup_config_options + (flags (@witx repr u32) + $reserved + )) + +(typename $kv_lookup_config + (record + (field $reserved u32) + )) + +(typename $kv_delete_config_options + (flags (@witx repr u32) + $reserved + )) + +(typename $kv_delete_config + (record + (field $reserved u32) + )) + +(typename $kv_insert_config_options + (flags (@witx repr u32) + $reserved + $background_fetch + $if_generation_match + $metadata + $time_to_live_sec + )) + +(typename $kv_insert_mode + (enum (@witx tag u32) + $overwrite + $add + $append + $prepend)) + +(typename $kv_insert_config + (record + (field $mode $kv_insert_mode) + (field $if_generation_match u32) + (field $metadata (@witx pointer (@witx char8))) + (field $metadata_len u32) + (field $time_to_live_sec u32) + )) + +(typename $kv_list_config_options + (flags (@witx repr u32) + $reserved + $cursor + $limit + $prefix + )) + +(typename $kv_list_mode + (enum (@witx tag u32) + $strong + $eventual)) + +(typename $kv_list_config + (record + (field $mode $kv_list_mode) + (field $cursor (@witx pointer (@witx char8))) + (field $cursor_len u32) + (field $limit u32) + (field $prefix (@witx pointer (@witx char8))) + (field $prefix_len u32) + )) + +(typename $kv_error + (enum (@witx tag u32) + ;;; The $kv_error has not been set. + $uninitialized + ;;; There was no error. + $ok + ;;; KV store cannot or will not process the request due to something that is perceived to be a client error + ;;; This will map to the api's 400 codes + $bad_request + ;;; KV store cannot find the requested resource + ;;; This will map to the api's 404 codes + $not_found + ;;; KV store cannot fulfill the request, as definied by the client's prerequisites (ie. if-generation-match) + ;;; This will map to the api's 412 codes + $precondition_failed + ;;; The size limit for a KV store key was exceeded. + ;;; This will map to the api's 413 codes + $payload_too_large + ;;; The system encountered an unexpected internal error. + ;;; This will map to all remaining http error codes + $internal_error + ;;; Too many requests have been made to the KV store. + ;;; This will map to the api's 429 codes + $too_many_requests + )) diff --git a/lib/data/viceroy-component-adapter.wasm b/lib/data/viceroy-component-adapter.wasm index d0c37ee7..23c5936c 100755 Binary files a/lib/data/viceroy-component-adapter.wasm and b/lib/data/viceroy-component-adapter.wasm differ diff --git a/lib/src/component/error.rs b/lib/src/component/error.rs index 98e69cc6..64b7f4b9 100644 --- a/lib/src/component/error.rs +++ b/lib/src/component/error.rs @@ -1,9 +1,9 @@ use { - super::fastly::api::{http_req, types}, + super::fastly::api::{http_req, kv_store::KvStatus, types}, crate::{ config::ClientCertError, error::{self, HandleError}, - object_store::{KeyValidationError, ObjectStoreError}, + object_store::{KeyValidationError, KvStoreError, ObjectStoreError}, wiggle_abi::{DictionaryError, SecretStoreError}, }, http::{ @@ -12,6 +12,7 @@ use { status::InvalidStatusCode, uri::InvalidUri, }, + wasmtime_wasi::ResourceTableError, }; impl types::Error { @@ -95,6 +96,12 @@ impl From for types::Error { } } +impl From for types::Error { + fn from(_: std::string::FromUtf8Error) -> Self { + types::Error::InvalidArgument + } +} + impl From for types::Error { fn from(err: wiggle::GuestError) -> Self { use wiggle::GuestError::*; @@ -130,6 +137,46 @@ impl From for types::Error { } } +impl From for types::Error { + fn from(err: KvStoreError) -> Self { + use KvStoreError::*; + match err { + Uninitialized => panic!("{}", err), + Ok => panic!("{err} should never be converted to an error"), + BadRequest => types::Error::InvalidArgument, + NotFound => types::Error::OptionalNone, + PreconditionFailed => types::Error::InvalidArgument, + PayloadTooLarge => types::Error::InvalidArgument, + InternalError => types::Error::InvalidArgument, + TooManyRequests => types::Error::InvalidArgument, + } + } +} + +impl From for types::Error { + fn from(err: ResourceTableError) -> Self { + match err { + _ => panic!("{}", err), + } + } +} + +impl From for KvStatus { + fn from(err: KvStoreError) -> Self { + use KvStoreError::*; + match err { + Uninitialized => panic!("{}", err), + Ok => KvStatus::Ok, + BadRequest => KvStatus::BadRequest, + NotFound => KvStatus::NotFound, + PreconditionFailed => KvStatus::PreconditionFailed, + PayloadTooLarge => KvStatus::PayloadTooLarge, + InternalError => KvStatus::InternalError, + TooManyRequests => KvStatus::TooManyRequests, + } + } +} + impl From for types::Error { fn from(_: KeyValidationError) -> Self { types::Error::GenericError @@ -177,6 +224,7 @@ impl From for types::Error { // We delegate to some error types' own implementation of `to_fastly_status`. Error::DictionaryError(e) => e.into(), Error::ObjectStoreError(e) => e.into(), + Error::KvStoreError(e) => e.into(), Error::SecretStoreError(e) => e.into(), // All other hostcall errors map to a generic `ERROR` value. Error::AbiVersionMismatch diff --git a/lib/src/component/kv_store.rs b/lib/src/component/kv_store.rs index 28e8729c..b67e0020 100644 --- a/lib/src/component/kv_store.rs +++ b/lib/src/component/kv_store.rs @@ -1,103 +1,297 @@ use { - super::fastly::api::{http_body, kv_store, types}, - crate::linking::ComponentCtx, + super::{ + fastly::api::{ + http_body, + kv_store::{self, InsertMode}, + types, + }, + types::TrappableError, + }, + crate::{ + linking::ComponentCtx, + object_store::{ObjectKey, ObjectStoreError}, + session::{ + PeekableTask, PendingKvDeleteTask, PendingKvInsertTask, PendingKvListTask, + PendingKvLookupTask, + }, + wiggle_abi::types::KvInsertMode, + }, + wasmtime_wasi::WasiView, }; -pub struct LookupResult; +pub struct LookupResult { + body: http_body::BodyHandle, + metadata: Option>, + generation: u32, +} #[async_trait::async_trait] impl kv_store::HostLookupResult for ComponentCtx { async fn body( &mut self, - _self_: wasmtime::component::Resource, - ) -> http_body::BodyHandle { - todo!() + rep: wasmtime::component::Resource, + ) -> wasmtime::Result { + Ok(self.table().get(&rep)?.body) } async fn metadata( &mut self, - _self_: wasmtime::component::Resource, - _max_len: u64, - ) -> Result>, types::Error> { - todo!() + rep: wasmtime::component::Resource, + max_len: u64, + ) -> Result>, TrappableError> { + let res = self.table().get(&rep)?; + let Some(md) = res.metadata.as_ref() else { + return Ok(None); + }; + + if md.len() > max_len as usize { + return Err(types::Error::BufferLen(md.len() as u64).into()); + } + + Ok(self.table().get_mut(&rep)?.metadata.take()) } async fn generation( &mut self, - _self_: wasmtime::component::Resource, - ) -> u32 { - todo!() + rep: wasmtime::component::Resource, + ) -> wasmtime::Result { + Ok(self.table().get(&rep)?.generation) } async fn drop( &mut self, - _rep: wasmtime::component::Resource, + rep: wasmtime::component::Resource, ) -> wasmtime::Result<()> { - todo!() + self.table().delete(rep)?; + Ok(()) } } #[async_trait::async_trait] impl kv_store::Host for ComponentCtx { - async fn open(&mut self, _name: String) -> Result, types::Error> { - todo!() + async fn open(&mut self, name: Vec) -> Result, types::Error> { + let name = String::from_utf8(name)?; + if self.session.kv_store.store_exists(&name)? { + // todo (byoung), handle optional/none/error case + let h = self.session.kv_store_handle(&name)?; + Ok(Some(h.into())) + } else { + Err(ObjectStoreError::UnknownObjectStore(name.to_owned()).into()) + } } async fn lookup( &mut self, - _store: kv_store::Handle, - _key: String, - ) -> Result { - todo!() + store: kv_store::Handle, + key: Vec, + ) -> Result { + let store = self.session.get_kv_store_key(store.into()).unwrap(); + let key = String::from_utf8(key)?; + // just create a future that's already ready + let fut = futures::future::ok(self.session.obj_lookup(store.clone(), ObjectKey::new(key)?)); + let task = PeekableTask::spawn(fut).await; + let lh = self + .session + .insert_pending_kv_lookup(PendingKvLookupTask::new(task)); + Ok(lh.into()) } async fn lookup_wait( &mut self, - _handle: kv_store::LookupHandle, - ) -> Result>, types::Error> { - todo!() + handle: kv_store::LookupHandle, + ) -> Result< + ( + Option>, + kv_store::KvStatus, + ), + types::Error, + > { + let resp = self + .session + .take_pending_kv_lookup(handle.into())? + .task() + .recv() + .await?; + + match resp { + Ok(value) => { + let lr = kv_store::LookupResult { + body: self.session.insert_body(value.body.into()).into(), + metadata: match value.metadata_len { + 0 => None, + _ => Some(value.metadata), + }, + generation: value.generation, + }; + + let res = self.table().push(lr)?; + + Ok((Some(res), kv_store::KvStatus::Ok)) + } + Err(e) => Ok((None, e.into())), + } } async fn insert( &mut self, - _store: kv_store::Handle, - _key: String, - _body_handle: kv_store::BodyHandle, - _mask: kv_store::InsertConfigOptions, - _config: kv_store::InsertConfig, + store: kv_store::Handle, + key: Vec, + body_handle: kv_store::BodyHandle, + mask: kv_store::InsertConfigOptions, + config: kv_store::InsertConfig, ) -> Result { - todo!() + let body = self + .session + .take_body(body_handle.into())? + .read_into_vec() + .await?; + let store = self.session.get_kv_store_key(store.into()).unwrap(); + let key = String::from_utf8(key)?; + + let mode = match config.mode { + InsertMode::Overwrite => KvInsertMode::Overwrite, + InsertMode::Add => KvInsertMode::Add, + InsertMode::Append => KvInsertMode::Append, + InsertMode::Prepend => KvInsertMode::Prepend, + }; + + let meta = if mask.contains(kv_store::InsertConfigOptions::METADATA) { + Some(config.metadata) + } else { + None + }; + + let igm = if mask.contains(kv_store::InsertConfigOptions::IF_GENERATION_MATCH) { + Some(config.if_generation_match) + } else { + None + }; + + let ttl = if mask.contains(kv_store::InsertConfigOptions::TIME_TO_LIVE_SEC) { + Some(std::time::Duration::from_secs( + config.time_to_live_sec as u64, + )) + } else { + None + }; + + let fut = futures::future::ok(self.session.kv_insert( + store.clone(), + ObjectKey::new(key)?, + body, + Some(mode), + igm, + meta, + ttl, + )); + let task = PeekableTask::spawn(fut).await; + let handle = self + .session + .insert_pending_kv_insert(PendingKvInsertTask::new(task)); + Ok(handle.into()) } - async fn insert_wait(&mut self, _handle: kv_store::InsertHandle) -> Result<(), types::Error> { - todo!() + async fn insert_wait( + &mut self, + handle: kv_store::InsertHandle, + ) -> Result { + let resp = self + .session + .take_pending_kv_insert(handle.into())? + .task() + .recv() + .await?; + + match resp { + Ok(()) => Ok(kv_store::KvStatus::Ok), + Err(e) => Ok(e.into()), + } } async fn delete( &mut self, - _store: kv_store::Handle, - _key: String, + store: kv_store::Handle, + key: Vec, ) -> Result { - todo!() + let store = self.session.get_kv_store_key(store.into()).unwrap(); + let key = String::from_utf8(key)?; + // just create a future that's already ready + let fut = futures::future::ok(self.session.kv_delete(store.clone(), ObjectKey::new(key)?)); + let task = PeekableTask::spawn(fut).await; + let lh = self + .session + .insert_pending_kv_delete(PendingKvDeleteTask::new(task)); + Ok(lh.into()) } - async fn delete_wait(&mut self, _handle: kv_store::DeleteHandle) -> Result<(), types::Error> { - todo!() + async fn delete_wait( + &mut self, + handle: kv_store::DeleteHandle, + ) -> Result { + let resp = self + .session + .take_pending_kv_delete(handle.into())? + .task() + .recv() + .await?; + + match resp { + Ok(()) => Ok(kv_store::KvStatus::Ok), + Err(e) => Ok(e.into()), + } } async fn list( &mut self, - _store: kv_store::Handle, - _mask: kv_store::ListConfigOptions, - _options: kv_store::ListConfig, + store: kv_store::Handle, + mask: kv_store::ListConfigOptions, + options: kv_store::ListConfig, ) -> Result { - todo!() + let store = self.session.get_kv_store_key(store.into()).unwrap(); + + let cursor = if mask.contains(kv_store::ListConfigOptions::CURSOR) { + Some(String::from_utf8(options.cursor)?) + } else { + None + }; + + let prefix = if mask.contains(kv_store::ListConfigOptions::PREFIX) { + Some(String::from_utf8(options.prefix)?) + } else { + None + }; + + let limit = if mask.contains(kv_store::ListConfigOptions::LIMIT) { + Some(options.limit) + } else { + None + }; + + let fut = futures::future::ok(self.session.kv_list(store.clone(), cursor, prefix, limit)); + let task = PeekableTask::spawn(fut).await; + let handle = self + .session + .insert_pending_kv_list(PendingKvListTask::new(task)); + Ok(handle.into()) } async fn list_wait( &mut self, - _handle: kv_store::ListHandle, - ) -> Result { - todo!() + handle: kv_store::ListHandle, + ) -> Result<(Option, kv_store::KvStatus), types::Error> { + let resp = self + .session + .take_pending_kv_list(handle.into())? + .task() + .recv() + .await?; + + match resp { + Ok(value) => Ok(( + Some(self.session.insert_body(value.into()).into()), + kv_store::KvStatus::Ok, + )), + Err(e) => Ok((None, e.into())), + } } } diff --git a/lib/src/component/mod.rs b/lib/src/component/mod.rs index 4dcac2c5..f137840a 100644 --- a/lib/src/component/mod.rs +++ b/lib/src/component/mod.rs @@ -6,6 +6,7 @@ component::bindgen!({ async: true, with: { "fastly:api/uap/user-agent": uap::UserAgent, + "fastly:api/kv-store/lookup-result": kv_store::LookupResult, "wasi:clocks": wasmtime_wasi::bindings::clocks, "wasi:random": wasmtime_wasi::bindings::random, @@ -18,7 +19,10 @@ component::bindgen!({ }, trappable_imports: [ - "header-values-get" + "header-values-get", + "[method]lookup-result.body", + "[method]lookup-result.metadata", + "[method]lookup-result.generation" ], }); diff --git a/lib/src/component/object_store.rs b/lib/src/component/object_store.rs index 95eb1177..79e48326 100644 --- a/lib/src/component/object_store.rs +++ b/lib/src/component/object_store.rs @@ -3,7 +3,7 @@ use { crate::{ body::Body, linking::ComponentCtx, - object_store::{ObjectKey, ObjectStoreError}, + object_store::{KvStoreError, ObjectKey}, session::{PeekableTask, PendingKvDeleteTask, PendingKvInsertTask, PendingKvLookupTask}, }, }; @@ -11,8 +11,8 @@ use { #[async_trait::async_trait] impl object_store::Host for ComponentCtx { async fn open(&mut self, name: String) -> Result, types::Error> { - if self.session.object_store.store_exists(&name)? { - let handle = self.session.obj_store_handle(&name)?; + if self.session.kv_store.store_exists(&name)? { + let handle = self.session.kv_store_handle(&name)?; Ok(Some(handle.into())) } else { Ok(None) @@ -24,17 +24,17 @@ impl object_store::Host for ComponentCtx { store: object_store::Handle, key: String, ) -> Result, types::Error> { - let store = self.session.get_obj_store_key(store.into()).unwrap(); + let store = self.session.get_kv_store_key(store.into()).unwrap(); let key = ObjectKey::new(&key)?; - match self.session.obj_lookup(store, &key) { + match self.session.obj_lookup(store.clone(), key) { Ok(obj) => { - let new_handle = self.session.insert_body(Body::from(obj)); + let new_handle = self.session.insert_body(Body::from(obj.body)); Ok(Some(new_handle.into())) } // Don't write to the invalid handle as the SDK will return Ok(None) // if the object does not exist. We need to return `Ok(())` here to // make sure Viceroy does not crash - Err(ObjectStoreError::MissingObject) => Ok(None), + Err(KvStoreError::NotFound) => Ok(None), Err(err) => Err(err.into()), } } @@ -44,10 +44,10 @@ impl object_store::Host for ComponentCtx { store: object_store::Handle, key: String, ) -> Result { - let store = self.session.get_obj_store_key(store.into()).unwrap(); + let store = self.session.get_kv_store_key(store.into()).unwrap(); let key = ObjectKey::new(key)?; // just create a future that's already ready - let fut = futures::future::ok(self.session.obj_lookup(store, &key)); + let fut = futures::future::ok(self.session.obj_lookup(store.clone(), key)); let task = PendingKvLookupTask::new(PeekableTask::spawn(fut).await); Ok(self.session.insert_pending_kv_lookup(task).into()) } @@ -64,8 +64,8 @@ impl object_store::Host for ComponentCtx { .await?; // proceed with the normal match from lookup() match pending_obj { - Ok(obj) => Ok(Some(self.session.insert_body(Body::from(obj)).into())), - Err(ObjectStoreError::MissingObject) => Ok(None), + Ok(obj) => Ok(Some(self.session.insert_body(Body::from(obj.body)).into())), + Err(KvStoreError::NotFound) => Ok(None), Err(err) => Err(err.into()), } } @@ -76,18 +76,15 @@ impl object_store::Host for ComponentCtx { key: String, body_handle: http_types::BodyHandle, ) -> Result<(), types::Error> { - let store = self - .session - .get_obj_store_key(store.into()) - .unwrap() - .clone(); + let store = self.session.get_kv_store_key(store.into()).unwrap().clone(); let key = ObjectKey::new(&key)?; let bytes = self .session .take_body(body_handle.into())? .read_into_vec() .await?; - self.session.obj_insert(store, key, bytes)?; + self.session + .kv_insert(store, key, bytes, None, None, None, None)?; Ok(()) } @@ -98,18 +95,17 @@ impl object_store::Host for ComponentCtx { key: String, body_handle: http_types::BodyHandle, ) -> Result { - let store = self - .session - .get_obj_store_key(store.into()) - .unwrap() - .clone(); + let store = self.session.get_kv_store_key(store.into()).unwrap().clone(); let key = ObjectKey::new(&key)?; let bytes = self .session .take_body(body_handle.into())? .read_into_vec() .await?; - let fut = futures::future::ok(self.session.obj_insert(store, key, bytes)); + let fut = futures::future::ok( + self.session + .kv_insert(store, key, bytes, None, None, None, None), + ); let task = PeekableTask::spawn(fut).await; Ok(self @@ -135,13 +131,9 @@ impl object_store::Host for ComponentCtx { store: object_store::Handle, key: String, ) -> Result { - let store = self - .session - .get_obj_store_key(store.into()) - .unwrap() - .clone(); + let store = self.session.get_kv_store_key(store.into()).unwrap().clone(); let key = ObjectKey::new(&key)?; - let fut = futures::future::ok(self.session.obj_delete(store, key)); + let fut = futures::future::ok(self.session.kv_delete(store, key)); let task = PeekableTask::spawn(fut).await; Ok(self diff --git a/lib/src/component/types.rs b/lib/src/component/types.rs index bb35934a..ed066a92 100644 --- a/lib/src/component/types.rs +++ b/lib/src/component/types.rs @@ -21,6 +21,12 @@ impl types::Host for ComponentCtx { } } +impl From for TrappableError { + fn from(e: wasmtime::component::ResourceTableError) -> Self { + Self::Trap(e.into()) + } +} + impl From for TrappableError { fn from(e: types::Error) -> Self { Self::Error(e) diff --git a/lib/src/config/object_store.rs b/lib/src/config/object_store.rs index 5f156b53..707245dc 100644 --- a/lib/src/config/object_store.rs +++ b/lib/src/config/object_store.rs @@ -5,6 +5,7 @@ use { crate::{ error::{FastlyConfigError, ObjectStoreConfigError}, object_store::{ObjectKey, ObjectStoreKey, ObjectStores}, + wiggle_abi::types::KvInsertMode, }, std::fs, toml::value::Table, @@ -167,6 +168,10 @@ impl TryFrom for ObjectStoreConfig { } })?, bytes, + KvInsertMode::Overwrite, + None, + None, + None, ) .expect("Lock was not poisoned"); } diff --git a/lib/src/error.rs b/lib/src/error.rs index 765399a8..a7cd2fc0 100644 --- a/lib/src/error.rs +++ b/lib/src/error.rs @@ -95,6 +95,9 @@ pub enum Error { #[error(transparent)] ObjectStoreError(#[from] crate::object_store::ObjectStoreError), + #[error(transparent)] + KvStoreError(#[from] crate::object_store::KvStoreError), + #[error(transparent)] SecretStoreError(#[from] crate::wiggle_abi::SecretStoreError), @@ -182,6 +185,7 @@ impl Error { Error::DictionaryError(e) => e.to_fastly_status(), Error::DeviceDetectionError(e) => e.to_fastly_status(), Error::ObjectStoreError(e) => e.into(), + Error::KvStoreError(e) => e.into(), Error::SecretStoreError(e) => e.into(), Error::Again => FastlyStatus::Again, // All other hostcall errors map to a generic `ERROR` value. @@ -272,6 +276,10 @@ pub enum HandleError { #[error("Invalid pending KV delete handle: {0}")] InvalidPendingKvDeleteHandle(crate::wiggle_abi::types::PendingKvDeleteHandle), + /// A list handle was not valid. + #[error("Invalid pending KV list handle: {0}")] + InvalidPendingKvListHandle(crate::wiggle_abi::types::PendingKvListHandle), + /// A dictionary handle was not valid. #[error("Invalid dictionary handle: {0}")] InvalidDictionaryHandle(crate::wiggle_abi::types::DictionaryHandle), @@ -645,6 +653,8 @@ pub enum ObjectStoreConfigError { NotATable, #[error("There was an error when manipulating the ObjectStore: {0}.")] ObjectStoreError(#[from] crate::object_store::ObjectStoreError), + #[error("There was an error when manipulating the KvStore: {0}.")] + KvStoreError(#[from] crate::object_store::KvStoreError), #[error("Invalid `key` value used: {0}.")] KeyValidationError(#[from] crate::object_store::KeyValidationError), #[error("'{0}' is not a valid format for the config store. Supported format(s) are: 'json'.")] diff --git a/lib/src/linking.rs b/lib/src/linking.rs index 22c7858c..41f01835 100644 --- a/lib/src/linking.rs +++ b/lib/src/linking.rs @@ -298,6 +298,7 @@ pub fn link_host_functions( wiggle_abi::fastly_http_resp::add_to_linker(linker, WasmCtx::session)?; wiggle_abi::fastly_log::add_to_linker(linker, WasmCtx::session)?; wiggle_abi::fastly_object_store::add_to_linker(linker, WasmCtx::session)?; + wiggle_abi::fastly_kv_store::add_to_linker(linker, WasmCtx::session)?; wiggle_abi::fastly_purge::add_to_linker(linker, WasmCtx::session)?; wiggle_abi::fastly_secret_store::add_to_linker(linker, WasmCtx::session)?; wiggle_abi::fastly_uap::add_to_linker(linker, WasmCtx::session)?; diff --git a/lib/src/object_store.rs b/lib/src/object_store.rs index 11a92125..9696567f 100644 --- a/lib/src/object_store.rs +++ b/lib/src/object_store.rs @@ -1,15 +1,27 @@ use { - crate::wiggle_abi::types::FastlyStatus, + crate::wiggle_abi::types::{FastlyStatus, KvError, KvInsertMode}, + base64::prelude::*, + serde::Serialize, std::{ collections::BTreeMap, sync::{Arc, RwLock}, + time::SystemTime, }, }; +#[derive(Debug, Clone)] +pub struct ObjectValue { + pub body: Vec, + pub metadata: Vec, + pub metadata_len: usize, + pub generation: u32, + pub expiration: Option, +} + #[derive(Clone, Debug, Default)] pub struct ObjectStores { #[allow(clippy::type_complexity)] - stores: Arc>>>>, + stores: Arc>>>, } impl ObjectStores { @@ -30,15 +42,32 @@ impl ObjectStores { pub fn lookup( &self, - obj_store_key: &ObjectStoreKey, - obj_key: &ObjectKey, - ) -> Result, ObjectStoreError> { + obj_store_key: ObjectStoreKey, + obj_key: ObjectKey, + ) -> Result { + let mut res = Err(KvStoreError::Uninitialized); + self.stores - .read() - .map_err(|_| ObjectStoreError::PoisonedLock)? - .get(obj_store_key) - .and_then(|map| map.get(obj_key).cloned()) - .ok_or(ObjectStoreError::MissingObject) + .write() + .map_err(|_| KvStoreError::InternalError)? + .entry(obj_store_key) + .and_modify(|store| match store.get(&obj_key) { + Some(val) => { + res = Ok(val.clone()); + // manages ttl + if let Some(exp) = val.expiration { + if SystemTime::now() >= exp { + store.remove(&obj_key); + res = Err(KvStoreError::NotFound); + } + } + } + None => { + res = Err(KvStoreError::NotFound); + } + }); + + res } pub(crate) fn insert_empty_store( @@ -60,17 +89,94 @@ impl ObjectStores { obj_store_key: ObjectStoreKey, obj_key: ObjectKey, obj: Vec, - ) -> Result<(), ObjectStoreError> { + mode: KvInsertMode, + generation: Option, + metadata: Option>, + ttl: Option, + ) -> Result<(), KvStoreError> { + // manages ttl + let existing = self.lookup(obj_store_key.clone(), obj_key.clone()); + + if let Some(g) = generation { + if let Ok(val) = &existing { + if val.generation != g { + return Err(KvStoreError::PreconditionFailed); + } + } + } + + let out_obj = match mode { + KvInsertMode::Overwrite => obj, + KvInsertMode::Add => { + if existing.is_ok() { + // key exists, add fails + return Err(KvStoreError::PreconditionFailed); + } + obj + } + KvInsertMode::Append => { + let mut out_obj; + match existing { + Err(KvStoreError::NotFound) => { + out_obj = obj; + } + Err(_) => return Err(KvStoreError::InternalError), + Ok(v) => { + out_obj = v.body; + out_obj.append(&mut obj.clone()); + } + } + out_obj + } + KvInsertMode::Prepend => { + let mut out_obj; + match existing { + Err(KvStoreError::NotFound) => { + out_obj = obj; + } + Err(_) => return Err(KvStoreError::InternalError), + Ok(mut v) => { + out_obj = obj; + out_obj.append(&mut v.body); + } + } + out_obj + } + }; + + let exp = ttl.map(|t| SystemTime::now() + t); + + let mut obj_val = ObjectValue { + body: out_obj, + metadata: vec![], + metadata_len: 0, + generation: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos() as u32, + expiration: exp, + }; + + // magic number hack to ensure a case for integration tests + if obj_val.generation == 1337 { + obj_val.generation = 1338; + } + + if let Some(m) = metadata { + obj_val.metadata_len = m.len(); + obj_val.metadata = m; + } + self.stores .write() - .map_err(|_| ObjectStoreError::PoisonedLock)? + .map_err(|_| KvStoreError::InternalError)? .entry(obj_store_key) .and_modify(|store| { - store.insert(obj_key.clone(), obj.clone()); + store.insert(obj_key.clone(), obj_val.clone()); }) .or_insert_with(|| { let mut store = BTreeMap::new(); - store.insert(obj_key, obj); + store.insert(obj_key, obj_val); store }); @@ -81,16 +187,131 @@ impl ObjectStores { &self, obj_store_key: ObjectStoreKey, obj_key: ObjectKey, - ) -> Result<(), ObjectStoreError> { + ) -> Result<(), KvStoreError> { + let mut res = Ok(()); + self.stores .write() - .map_err(|_| ObjectStoreError::PoisonedLock)? + .map_err(|_| KvStoreError::InternalError)? .entry(obj_store_key) - .and_modify(|store| { - store.remove(&obj_key); + .and_modify(|store| match store.get(&obj_key) { + // 404 if the key doesn't exist, otherwise delete + Some(val) => { + // manages ttl + if let Some(exp) = val.expiration { + if SystemTime::now() >= exp { + res = Err(KvStoreError::NotFound); + } + } + store.remove(&obj_key); + } + None => { + res = Err(KvStoreError::NotFound); + } }); - Ok(()) + res + } + + pub fn list( + &self, + obj_store_key: ObjectStoreKey, + cursor: Option, + prefix: Option, + limit: u32, + ) -> Result, KvStoreError> { + let mut res = Err(KvStoreError::InternalError); + + let cursor = match cursor { + Some(c) => { + let cursor_bytes = BASE64_STANDARD + .decode(c) + .map_err(|_| KvStoreError::BadRequest)?; + let decoded = + String::from_utf8(cursor_bytes).map_err(|_| KvStoreError::BadRequest)?; + Some(decoded) + } + None => None, + }; + + self.stores + .write() + .map_err(|_| KvStoreError::InternalError)? + .entry(obj_store_key.clone()) + .and_modify(|store| { + // manages ttl + // a bit wasteful to run this loop twice, but we need mutable access to store, + // and it's already claimed in the filters below + let ttl_list = store.iter_mut().map(|(k, _)| k.clone()).collect::>(); + ttl_list.into_iter().for_each(|k| { + let val = store.get(&k); + if let Some(v) = val { + if let Some(exp) = v.expiration { + if SystemTime::now() >= exp { + store.remove(&k); + } + } + } + }); + + let mut list = store + .iter_mut() + .filter(|(k, _)| { + if let Some(c) = &cursor { + &k.0 > c + } else { + true + } + }) + .filter(|(k, _)| { + if let Some(p) = &prefix { + k.0.starts_with(p) + } else { + true + } + }) + .map(|(k, _)| String::from_utf8(k.0.as_bytes().to_vec()).unwrap()) + .collect::>(); + + // limit + let old_len = list.len(); + list.truncate(limit as usize); + let new_len = list.len(); + + let next_cursor = match old_len != new_len { + true => Some(BASE64_STANDARD.encode(&list[new_len - 1])), + false => None, + }; + + #[derive(Serialize)] + struct Metadata { + limit: u32, + #[serde(skip_serializing_if = "Option::is_none")] + prefix: Option, + #[serde(skip_serializing_if = "Option::is_none")] + next_cursor: Option, + } + #[derive(Serialize)] + struct JsonOutput { + data: Vec, + meta: Metadata, + } + + let body = JsonOutput { + data: list, + meta: Metadata { + limit, + prefix, + next_cursor, + }, + }; + + match serde_json::to_string(&body).map_err(|_| KvStoreError::InternalError) { + Ok(s) => res = Ok(s.as_bytes().to_vec()), + Err(e) => res = Err(e), + }; + }); + res } } @@ -136,6 +357,81 @@ impl From<&ObjectStoreError> for FastlyStatus { } } +#[derive(Debug, PartialOrd, Ord, PartialEq, Eq, thiserror::Error)] +pub enum KvStoreError { + #[error("The error was not set")] + Uninitialized, + #[error("There was no error")] + Ok, + #[error("KV store cannot or will not process the request due to something that is perceived to be a client error")] + BadRequest, + #[error("KV store cannot find the requested resource")] + NotFound, + #[error("KV store cannot fulfill the request, as definied by the client's prerequisites (ie. if-generation-match)")] + PreconditionFailed, + #[error("The size limit for a KV store key was exceeded")] + PayloadTooLarge, + #[error("The system encountered an unexpected internal error")] + InternalError, + #[error("Too many requests have been made to the KV store")] + TooManyRequests, +} + +impl From<&KvError> for KvStoreError { + fn from(e: &KvError) -> Self { + match e { + KvError::Uninitialized => KvStoreError::Uninitialized, + KvError::Ok => KvStoreError::Ok, + KvError::BadRequest => KvStoreError::BadRequest, + KvError::NotFound => KvStoreError::NotFound, + KvError::PreconditionFailed => KvStoreError::PreconditionFailed, + KvError::PayloadTooLarge => KvStoreError::PayloadTooLarge, + KvError::InternalError => KvStoreError::InternalError, + KvError::TooManyRequests => KvStoreError::TooManyRequests, + } + } +} + +impl From<&KvStoreError> for KvError { + fn from(e: &KvStoreError) -> Self { + match e { + KvStoreError::Uninitialized => KvError::Uninitialized, + KvStoreError::Ok => KvError::Ok, + KvStoreError::BadRequest => KvError::BadRequest, + KvStoreError::NotFound => KvError::NotFound, + KvStoreError::PreconditionFailed => KvError::PreconditionFailed, + KvStoreError::PayloadTooLarge => KvError::PayloadTooLarge, + KvStoreError::InternalError => KvError::InternalError, + KvStoreError::TooManyRequests => KvError::TooManyRequests, + } + } +} + +impl From<&KvStoreError> for ObjectStoreError { + fn from(e: &KvStoreError) -> Self { + match e { + // the only real one + KvStoreError::NotFound => ObjectStoreError::MissingObject, + _ => ObjectStoreError::UnknownObjectStore("".to_string()), + } + } +} + +impl From<&KvStoreError> for FastlyStatus { + fn from(e: &KvStoreError) -> Self { + match e { + KvStoreError::Uninitialized => panic!("{}", e), + KvStoreError::Ok => FastlyStatus::Ok, + KvStoreError::BadRequest => FastlyStatus::Inval, + KvStoreError::NotFound => FastlyStatus::None, + KvStoreError::PreconditionFailed => FastlyStatus::Inval, + KvStoreError::PayloadTooLarge => FastlyStatus::Inval, + KvStoreError::InternalError => FastlyStatus::Inval, + KvStoreError::TooManyRequests => FastlyStatus::Inval, + } + } +} + /// Keys in the Object Store must follow the following rules: /// /// * Keys can contain any sequence of valid Unicode characters, of length 1-1024 bytes when @@ -193,3 +489,460 @@ pub enum KeyValidationError { #[error("Keys for objects cannot contain a `{0}`")] Contains(String), } + +#[cfg(test)] +mod tests { + use super::*; + + const STORE_NAME: &'static str = "test_store"; + + #[test] + fn test_kv_store_exists() { + let stores = ObjectStores::default(); + stores + .insert_empty_store(ObjectStoreKey(STORE_NAME.to_string())) + .unwrap(); + + let res = stores.store_exists(STORE_NAME); + match res { + Ok(true) => {} + _ => panic!("should have been OK(true)"), + } + } + + #[test] + fn test_kv_store_basics() { + let stores = ObjectStores::default(); + stores + .insert_empty_store(ObjectStoreKey(STORE_NAME.to_string())) + .unwrap(); + + let key = "insert_key".to_string(); + let val1 = "val1".to_string(); + + // insert + let res = stores.insert( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey(key.clone()), + val1.clone().into(), + KvInsertMode::Overwrite, + None, + None, + None, + ); + match res { + Err(_) => panic!("should have been OK"), + _ => {} + } + + // lookup + let res = stores.lookup( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey(key.clone()), + ); + match res { + Ok(ov) => { + assert_eq!(ov.body, val1.as_bytes().to_vec()) + } + Err(_) => panic!("should have been OK"), + } + + // list + let limit = 1000; + let res = stores.list(ObjectStoreKey(STORE_NAME.to_string()), None, None, limit); + match res { + Ok(ov) => { + let val = format!(r#"{{"data":["{key}"],"meta":{{"limit":{limit}}}}}"#); + assert_eq!(std::str::from_utf8(&ov).unwrap(), val) + } + Err(_) => panic!("should have been OK"), + } + + // delete + let res = stores.delete( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey(key.clone()), + ); + match res { + Ok(_) => {} + Err(_) => panic!("should have been OK"), + } + } + + #[test] + fn test_kv_store_item_404s() { + let stores = ObjectStores::default(); + stores + .insert_empty_store(ObjectStoreKey(STORE_NAME.to_string())) + .unwrap(); + + let res = stores.lookup( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey("bad_key".to_string()), + ); + match res { + Ok(_) => panic!("should not have been OK"), + Err(e) => assert_eq!(e, KvStoreError::NotFound), + } + + let res = stores.delete( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey("bad_key".to_string()), + ); + match res { + Ok(_) => panic!("should not have been OK"), + Err(e) => assert_eq!(e, KvStoreError::NotFound), + } + } + + #[test] + fn test_kv_store_item_insert_modes() { + let stores = ObjectStores::default(); + stores + .insert_empty_store(ObjectStoreKey(STORE_NAME.to_string())) + .unwrap(); + + let key = "insert_key".to_string(); + let val1 = "val1".to_string(); + let val2 = "val2".to_string(); + let val3 = "val3".to_string(); + + let res = stores.insert( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey(key.clone()), + val1.clone().into(), + KvInsertMode::Add, + None, + None, + None, + ); + assert!(res.is_ok()); + // fail on Add, because key already exists + let res = stores.insert( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey(key.clone()), + val1.clone().into(), + KvInsertMode::Add, + None, + None, + None, + ); + match res { + Ok(_) => panic!("should not have been OK"), + Err(e) => assert_eq!(e, KvStoreError::PreconditionFailed), + } + // prepend val2 + let res = stores.insert( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey(key.clone()), + val2.clone().into(), + KvInsertMode::Prepend, + None, + None, + None, + ); + match res { + Err(_) => panic!("should have been OK"), + _ => {} + } + // append val3 + let res = stores.insert( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey(key.clone()), + val3.clone().into(), + KvInsertMode::Append, + None, + None, + None, + ); + match res { + Err(_) => panic!("should have been OK"), + _ => {} + } + let res = stores.lookup( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey(key.clone()), + ); + match res { + Ok(ov) => { + let val = format!("{val2}{val1}{val3}"); + assert_eq!(ov.body, val.as_bytes().to_vec()) + } + Err(_) => panic!("should have been OK"), + } + + // overwrite val3 + let res = stores.insert( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey(key.clone()), + val3.clone().into(), + KvInsertMode::Overwrite, + None, + Some(val2.as_bytes().to_vec()), + None, + ); + match res { + Err(_) => panic!("should have been OK"), + _ => {} + } + + // test overwrite + let res = stores.lookup( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey(key.clone()), + ); + match res { + Ok(ov) => { + assert_eq!(ov.body, val3.as_bytes().to_vec()); + assert_eq!(ov.metadata, val2.as_bytes().to_vec()); + } + Err(_) => panic!("should have been OK"), + } + } + + #[test] + fn test_kv_store_item_insert_generation() { + let stores = ObjectStores::default(); + stores + .insert_empty_store(ObjectStoreKey(STORE_NAME.to_string())) + .unwrap(); + + let key = "insert_key".to_string(); + let val1 = "val1".to_string(); + + // insert val1 + let res = stores.insert( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey(key.clone()), + val1.clone().into(), + KvInsertMode::Overwrite, + None, + None, + None, + ); + match res { + Err(_) => panic!("should have been OK"), + _ => {} + } + + // test overwrite, get gen + let generation; + let res = stores.lookup( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey(key.clone()), + ); + match res { + Ok(ov) => { + assert_eq!(ov.body, val1.as_bytes().to_vec()); + generation = ov.generation; + } + Err(_) => panic!("should have been OK"), + } + + // test generation match failure + let res = stores.insert( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey(key.clone()), + val1.clone().into(), + KvInsertMode::Overwrite, + Some(1337), + None, + None, + ); + match res { + Err(KvStoreError::PreconditionFailed) => {} + _ => panic!("should have been Err(KvStoreError::PreconditionFailed)"), + } + + // test generation match positive + let res = stores.insert( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey(key.clone()), + val1.clone().into(), + KvInsertMode::Overwrite, + Some(generation), + None, + None, + ); + match res { + Ok(_) => {} + _ => panic!("should have been OK"), + } + + // check result + let res = stores.lookup( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey(key.clone()), + ); + match res { + Ok(ov) => { + assert_eq!(ov.body, val1.as_bytes().to_vec()); + } + Err(_) => panic!("should have been OK"), + } + } + + #[test] + fn test_kv_store_item_list_advanced() { + let stores = ObjectStores::default(); + stores + .insert_empty_store(ObjectStoreKey(STORE_NAME.to_string())) + .unwrap(); + + let key = "insert_key".to_string(); + let prefix = "key".to_string(); + let key1 = format!("{prefix}1").to_string(); + let key2 = format!("{prefix}2").to_string(); + let key3 = format!("{prefix}3").to_string(); + let val1 = "val1".to_string(); + let val2 = "val2".to_string(); + let val3 = "val3".to_string(); + + // insert insert_key + let res = stores.insert( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey(key.clone()), + val1.clone().into(), + KvInsertMode::Overwrite, + None, + None, + None, + ); + match res { + Err(_) => panic!("should have been OK"), + _ => {} + } + + // insert val1 + let res = stores.insert( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey(key1.clone()), + val1.clone().into(), + KvInsertMode::Overwrite, + None, + None, + None, + ); + match res { + Err(_) => panic!("should have been OK"), + _ => {} + } + // insert val2 + let res = stores.insert( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey(key2.clone()), + val2.clone().into(), + KvInsertMode::Overwrite, + None, + None, + None, + ); + match res { + Err(_) => panic!("should have been OK"), + _ => {} + } + // insert val3 + let res = stores.insert( + ObjectStoreKey(STORE_NAME.to_string()), + ObjectKey(key3.clone()), + val3.clone().into(), + KvInsertMode::Overwrite, + None, + None, + None, + ); + match res { + Err(_) => panic!("should have been OK"), + _ => {} + } + + // list + let limit = 1000; + let res = stores.list(ObjectStoreKey(STORE_NAME.to_string()), None, None, limit); + match res { + Ok(ov) => { + let val = format!( + r#"{{"data":["{key}","{key1}","{key2}","{key3}"],"meta":{{"limit":{limit}}}}}"# + ); + assert_eq!(std::str::from_utf8(&ov).unwrap(), val) + } + Err(_) => panic!("should have been OK"), + } + + // list w/prefix + let limit = 1000; + let res = stores.list( + ObjectStoreKey(STORE_NAME.to_string()), + None, + Some(prefix.clone()), + limit, + ); + match res { + Ok(ov) => { + let val = format!( + r#"{{"data":["{key1}","{key2}","{key3}"],"meta":{{"limit":{limit},"prefix":"{prefix}"}}}}"# + ); + assert_eq!(std::str::from_utf8(&ov).unwrap(), val) + } + Err(_) => panic!("should have been OK"), + } + + // list w/prefix&limit + let limit = 1; + let res = stores.list( + ObjectStoreKey(STORE_NAME.to_string()), + None, + Some(prefix.clone()), + limit, + ); + match res { + Ok(ov) => { + let next_cursor = BASE64_STANDARD.encode(key1.clone()); + let val = format!( + r#"{{"data":["{key1}"],"meta":{{"limit":{limit},"prefix":"{prefix}","next_cursor":"{next_cursor}"}}}}"# + ); + assert_eq!(std::str::from_utf8(&ov).unwrap(), val) + } + Err(_) => panic!("should have been OK"), + } + + // list w/prefix&limit&cursor + let limit = 1; + let last_cursor = BASE64_STANDARD.encode(key1.clone()); + let res = stores.list( + ObjectStoreKey(STORE_NAME.to_string()), + Some(last_cursor), + Some(prefix.clone()), + limit, + ); + match res { + Ok(ov) => { + let next_cursor = BASE64_STANDARD.encode(key2.clone()); + let val = format!( + r#"{{"data":["{key2}"],"meta":{{"limit":{limit},"prefix":"{prefix}","next_cursor":"{next_cursor}"}}}}"# + ); + assert_eq!(std::str::from_utf8(&ov).unwrap(), val) + } + Err(_) => panic!("should have been OK"), + } + + // list w/prefix&limit&cursor + let limit = 1; + let last_cursor = BASE64_STANDARD.encode(key2.clone()); + let res = stores.list( + ObjectStoreKey(STORE_NAME.to_string()), + Some(last_cursor), + Some(prefix.clone()), + limit, + ); + match res { + Ok(ov) => { + let val = format!( + r#"{{"data":["{key3}"],"meta":{{"limit":{limit},"prefix":"{prefix}"}}}}"# + ); + assert_eq!(std::str::from_utf8(&ov).unwrap(), val) + } + Err(_) => panic!("should have been OK"), + } + } +} diff --git a/lib/src/session.rs b/lib/src/session.rs index b6378c04..edade7fd 100644 --- a/lib/src/session.rs +++ b/lib/src/session.rs @@ -4,7 +4,8 @@ mod async_item; mod downstream; pub use async_item::{ - AsyncItem, PeekableTask, PendingKvDeleteTask, PendingKvInsertTask, PendingKvLookupTask, + AsyncItem, PeekableTask, PendingKvDeleteTask, PendingKvInsertTask, PendingKvListTask, + PendingKvLookupTask, }; use std::collections::HashMap; @@ -14,6 +15,9 @@ use std::net::{IpAddr, SocketAddr}; use std::path::PathBuf; use std::sync::atomic::AtomicU64; use std::sync::{Arc, Mutex}; +use std::time::Duration; + +use crate::object_store::KvStoreError; use { self::downstream::DownstreamResponse, @@ -22,14 +26,16 @@ use { config::{Backend, Backends, DeviceDetection, Dictionaries, Geolocation, LoadedDictionary}, error::{Error, HandleError}, logging::LogEndpoint, - object_store::{ObjectKey, ObjectStoreError, ObjectStoreKey, ObjectStores}, + object_store::{ObjectKey, ObjectStoreKey, ObjectStores, ObjectValue}, secret_store::{SecretLookup, SecretStores}, streaming_body::StreamingBody, upstream::{SelectTarget, TlsConfig}, wiggle_abi::types::{ - self, BodyHandle, ContentEncodings, DictionaryHandle, EndpointHandle, - ObjectStoreHandle, PendingKvDeleteHandle, PendingKvInsertHandle, PendingKvLookupHandle, - PendingRequestHandle, RequestHandle, ResponseHandle, SecretHandle, SecretStoreHandle, + self, BodyHandle, ContentEncodings, DictionaryHandle, EndpointHandle, KvInsertMode, + KvStoreDeleteHandle, KvStoreHandle, KvStoreInsertHandle, KvStoreListHandle, + KvStoreLookupHandle, PendingKvDeleteHandle, PendingKvInsertHandle, PendingKvListHandle, + PendingKvLookupHandle, PendingRequestHandle, RequestHandle, ResponseHandle, + SecretHandle, SecretStoreHandle, }, ExecuteCtx, }, @@ -122,11 +128,11 @@ pub struct Session { /// The ObjectStore configured for this execution. /// /// Populated prior to guest execution and can be modified during requests. - pub(crate) object_store: ObjectStores, + pub(crate) kv_store: ObjectStores, /// The object stores configured for this execution. /// /// Populated prior to guest execution. - object_store_by_name: PrimaryMap, + kv_store_by_name: PrimaryMap, /// The secret stores configured for this execution. /// /// Populated prior to guest execution, and never modified. @@ -164,7 +170,7 @@ impl Session { tls_config: TlsConfig, dictionaries: Arc, config_path: Arc>, - object_store: ObjectStores, + kv_store: ObjectStores, secret_stores: Arc, ) -> Session { let (parts, body) = req.into_parts(); @@ -199,8 +205,8 @@ impl Session { tls_config, dictionaries, loaded_dictionaries: PrimaryMap::new(), - object_store, - object_store_by_name: PrimaryMap::new(), + kv_store, + kv_store_by_name: PrimaryMap::new(), secret_stores, secret_stores_by_name: PrimaryMap::new(), secrets_by_name: PrimaryMap::new(), @@ -678,23 +684,33 @@ impl Session { ) } - // ----- Object Store API ----- - pub fn obj_store_handle(&mut self, key: &str) -> Result { + // ----- KV Store API ----- + pub fn kv_store_handle(&mut self, key: &str) -> Result { let obj_key = ObjectStoreKey::new(key); - Ok(self.object_store_by_name.push(obj_key)) + Ok(self.kv_store_by_name.push(obj_key)) } - pub fn get_obj_store_key(&self, handle: ObjectStoreHandle) -> Option<&ObjectStoreKey> { - self.object_store_by_name.get(handle) + pub fn get_kv_store_key(&self, handle: KvStoreHandle) -> Option<&ObjectStoreKey> { + self.kv_store_by_name.get(handle) } - pub fn obj_insert( + pub fn kv_insert( &self, obj_store_key: ObjectStoreKey, obj_key: ObjectKey, obj: Vec, - ) -> Result<(), ObjectStoreError> { - self.object_store.insert(obj_store_key, obj_key, obj) + mode: Option, + generation: Option, + metadata: Option>, + ttl: Option, + ) -> Result<(), KvStoreError> { + let mode = match mode { + None => KvInsertMode::Overwrite, + Some(m) => m, + }; + + self.kv_store + .insert(obj_store_key, obj_key, obj, mode, generation, metadata, ttl) } /// Insert a [`PendingKvInsert`] into the session. @@ -704,7 +720,7 @@ impl Session { pub fn insert_pending_kv_insert( &mut self, pending: PendingKvInsertTask, - ) -> PendingKvInsertHandle { + ) -> KvStoreInsertHandle { self.async_items .push(Some(AsyncItem::PendingKvInsert(pending))) .into() @@ -743,12 +759,12 @@ impl Session { .ok_or(HandleError::InvalidPendingKvInsertHandle(handle)) } - pub fn obj_delete( + pub fn kv_delete( &self, obj_store_key: ObjectStoreKey, obj_key: ObjectKey, - ) -> Result<(), ObjectStoreError> { - self.object_store.delete(obj_store_key, obj_key) + ) -> Result<(), KvStoreError> { + self.kv_store.delete(obj_store_key, obj_key) } /// Insert a [`PendingKvDelete`] into the session. @@ -799,10 +815,10 @@ impl Session { pub fn obj_lookup( &self, - obj_store_key: &ObjectStoreKey, - obj_key: &ObjectKey, - ) -> Result, ObjectStoreError> { - self.object_store.lookup(obj_store_key, obj_key) + obj_store_key: ObjectStoreKey, + obj_key: ObjectKey, + ) -> Result { + self.kv_store.lookup(obj_store_key, obj_key) } /// Insert a [`PendingLookup`] into the session. @@ -851,6 +867,61 @@ impl Session { .ok_or(HandleError::InvalidPendingKvLookupHandle(handle)) } + pub fn kv_list( + &self, + obj_store_key: ObjectStoreKey, + cursor: Option, + prefix: Option, + limit: Option, + ) -> Result, KvStoreError> { + let limit = limit.unwrap_or(1000); + + self.kv_store.list(obj_store_key, cursor, prefix, limit) + } + + /// Insert a [`PendingList`] into the session. + /// + /// This method returns a new [`PendingKvListHandle`], which can then be used to access + /// and mutate the pending list. + pub fn insert_pending_kv_list(&mut self, pending: PendingKvListTask) -> PendingKvListHandle { + self.async_items + .push(Some(AsyncItem::PendingKvList(pending))) + .into() + } + + /// Take ownership of a [`PendingList`], given its [`PendingKvListHandle`]. + /// + /// Returns a [`HandleError`] if the handle is not associated with a pending list in the + /// session. + pub fn take_pending_kv_list( + &mut self, + handle: PendingKvListHandle, + ) -> Result { + // check that this is a pending request before removing it + let _ = self.pending_kv_list(handle)?; + + self.async_items + .get_mut(handle.into()) + .and_then(Option::take) + .and_then(AsyncItem::into_pending_kv_list) + .ok_or(HandleError::InvalidPendingKvListHandle(handle)) + } + + /// Get a reference to a [`PendingList`], given its [`PendingKvListHandle`]. + /// + /// Returns a [`HandleError`] if the handle is not associated with a list in the + /// session. + pub fn pending_kv_list( + &self, + handle: PendingKvListHandle, + ) -> Result<&PendingKvListTask, HandleError> { + self.async_items + .get(handle.into()) + .and_then(Option::as_ref) + .and_then(AsyncItem::as_pending_kv_list) + .ok_or(HandleError::InvalidPendingKvListHandle(handle)) + } + // ----- Secret Store API ----- pub fn secret_store_handle(&mut self, name: &str) -> Option { @@ -1165,3 +1236,63 @@ impl From for PendingKvDeleteHandle { PendingKvDeleteHandle::from(h.as_u32()) } } + +impl From for AsyncItemHandle { + fn from(h: PendingKvListHandle) -> AsyncItemHandle { + AsyncItemHandle::from_u32(h.into()) + } +} + +impl From for PendingKvListHandle { + fn from(h: AsyncItemHandle) -> PendingKvListHandle { + PendingKvListHandle::from(h.as_u32()) + } +} + +impl From for AsyncItemHandle { + fn from(h: KvStoreLookupHandle) -> AsyncItemHandle { + AsyncItemHandle::from_u32(h.into()) + } +} + +impl From for KvStoreLookupHandle { + fn from(h: AsyncItemHandle) -> KvStoreLookupHandle { + KvStoreLookupHandle::from(h.as_u32()) + } +} + +impl From for AsyncItemHandle { + fn from(h: KvStoreInsertHandle) -> AsyncItemHandle { + AsyncItemHandle::from_u32(h.into()) + } +} + +impl From for KvStoreInsertHandle { + fn from(h: AsyncItemHandle) -> KvStoreInsertHandle { + KvStoreInsertHandle::from(h.as_u32()) + } +} + +impl From for AsyncItemHandle { + fn from(h: KvStoreDeleteHandle) -> AsyncItemHandle { + AsyncItemHandle::from_u32(h.into()) + } +} + +impl From for KvStoreDeleteHandle { + fn from(h: AsyncItemHandle) -> KvStoreDeleteHandle { + KvStoreDeleteHandle::from(h.as_u32()) + } +} + +impl From for AsyncItemHandle { + fn from(h: KvStoreListHandle) -> AsyncItemHandle { + AsyncItemHandle::from_u32(h.into()) + } +} + +impl From for KvStoreListHandle { + fn from(h: AsyncItemHandle) -> KvStoreListHandle { + KvStoreListHandle::from(h.as_u32()) + } +} diff --git a/lib/src/session/async_item.rs b/lib/src/session/async_item.rs index c6600049..000ee100 100644 --- a/lib/src/session/async_item.rs +++ b/lib/src/session/async_item.rs @@ -1,4 +1,4 @@ -use crate::object_store::ObjectStoreError; +use crate::object_store::{KvStoreError, ObjectValue}; use crate::{body::Body, error::Error, streaming_body::StreamingBody}; use anyhow::anyhow; use futures::Future; @@ -7,34 +7,45 @@ use http::Response; use tokio::sync::oneshot; #[derive(Debug)] -pub struct PendingKvLookupTask(PeekableTask, ObjectStoreError>>); +pub struct PendingKvLookupTask(PeekableTask>); impl PendingKvLookupTask { - pub fn new(t: PeekableTask, ObjectStoreError>>) -> PendingKvLookupTask { + pub fn new(t: PeekableTask>) -> PendingKvLookupTask { PendingKvLookupTask(t) } - pub fn task(self) -> PeekableTask, ObjectStoreError>> { + pub fn task(self) -> PeekableTask> { self.0 } } #[derive(Debug)] -pub struct PendingKvInsertTask(PeekableTask>); +pub struct PendingKvInsertTask(PeekableTask>); impl PendingKvInsertTask { - pub fn new(t: PeekableTask>) -> PendingKvInsertTask { + pub fn new(t: PeekableTask>) -> PendingKvInsertTask { PendingKvInsertTask(t) } - pub fn task(self) -> PeekableTask> { + pub fn task(self) -> PeekableTask> { self.0 } } #[derive(Debug)] -pub struct PendingKvDeleteTask(PeekableTask>); +pub struct PendingKvDeleteTask(PeekableTask>); impl PendingKvDeleteTask { - pub fn new(t: PeekableTask>) -> PendingKvDeleteTask { + pub fn new(t: PeekableTask>) -> PendingKvDeleteTask { PendingKvDeleteTask(t) } - pub fn task(self) -> PeekableTask> { + pub fn task(self) -> PeekableTask> { + self.0 + } +} + +#[derive(Debug)] +pub struct PendingKvListTask(PeekableTask, KvStoreError>>); +impl PendingKvListTask { + pub fn new(t: PeekableTask, KvStoreError>>) -> PendingKvListTask { + PendingKvListTask(t) + } + pub fn task(self) -> PeekableTask, KvStoreError>> { self.0 } } @@ -51,6 +62,7 @@ pub enum AsyncItem { PendingKvLookup(PendingKvLookupTask), PendingKvInsert(PendingKvInsertTask), PendingKvDelete(PendingKvDeleteTask), + PendingKvList(PendingKvListTask), } impl AsyncItem { @@ -149,6 +161,20 @@ impl AsyncItem { } } + pub fn as_pending_kv_list(&self) -> Option<&PendingKvListTask> { + match self { + Self::PendingKvList(req) => Some(req), + _ => None, + } + } + + pub fn into_pending_kv_list(self) -> Option { + match self { + Self::PendingKvList(req) => Some(req), + _ => None, + } + } + pub fn as_pending_req(&self) -> Option<&PeekableTask>> { match self { Self::PendingReq(req) => Some(req), @@ -178,6 +204,7 @@ impl AsyncItem { Self::PendingKvLookup(req) => req.0.await_ready().await, Self::PendingKvInsert(req) => req.0.await_ready().await, Self::PendingKvDelete(req) => req.0.await_ready().await, + Self::PendingKvList(req) => req.0.await_ready().await, } } @@ -210,6 +237,12 @@ impl From for AsyncItem { } } +impl From for AsyncItem { + fn from(task: PendingKvListTask) -> Self { + Self::PendingKvList(task) + } +} + #[derive(Debug)] pub enum PeekableTask { Waiting(oneshot::Receiver>), diff --git a/lib/src/wiggle_abi.rs b/lib/src/wiggle_abi.rs index bd76b3b0..bcae2721 100644 --- a/lib/src/wiggle_abi.rs +++ b/lib/src/wiggle_abi.rs @@ -60,6 +60,7 @@ mod erl_impl; mod fastly_purge_impl; mod geo_impl; mod headers; +mod kv_store_impl; mod log_impl; mod obj_store_impl; mod req_impl; @@ -75,7 +76,8 @@ wiggle::from_witx!({ errors: { fastly_status => Error }, async: { fastly_async_io::{select}, - fastly_object_store::{delete_async, pending_delete_wait, insert, insert_async, pending_insert_wait, lookup_async, pending_lookup_wait}, + fastly_object_store::{delete_async, pending_delete_wait, insert, insert_async, pending_insert_wait, lookup_async, pending_lookup_wait, list}, + fastly_kv_store::{lookup, lookup_wait, insert, insert_wait, delete, delete_wait, list, list_wait}, fastly_http_body::{append, read, write}, fastly_http_req::{ pending_req_select, pending_req_select_v2, pending_req_poll, pending_req_poll_v2, @@ -84,6 +86,76 @@ wiggle::from_witx!({ } }); +impl From for types::KvStoreHandle { + fn from(h: types::ObjectStoreHandle) -> types::KvStoreHandle { + let s = unsafe { h.inner() }; + s.into() + } +} + +impl From for types::ObjectStoreHandle { + fn from(h: types::KvStoreHandle) -> types::ObjectStoreHandle { + let s = unsafe { h.inner() }; + s.into() + } +} + +impl From for types::PendingKvLookupHandle { + fn from(h: types::KvStoreLookupHandle) -> types::PendingKvLookupHandle { + let s = unsafe { h.inner() }; + s.into() + } +} + +impl From for types::KvStoreLookupHandle { + fn from(h: types::PendingKvLookupHandle) -> types::KvStoreLookupHandle { + let s = unsafe { h.inner() }; + s.into() + } +} + +impl From for types::PendingKvInsertHandle { + fn from(h: types::KvStoreInsertHandle) -> types::PendingKvInsertHandle { + let s = unsafe { h.inner() }; + s.into() + } +} + +impl From for types::KvStoreInsertHandle { + fn from(h: types::PendingKvInsertHandle) -> types::KvStoreInsertHandle { + let s = unsafe { h.inner() }; + s.into() + } +} + +impl From for types::PendingKvDeleteHandle { + fn from(h: types::KvStoreDeleteHandle) -> types::PendingKvDeleteHandle { + let s = unsafe { h.inner() }; + s.into() + } +} + +impl From for types::KvStoreDeleteHandle { + fn from(h: types::PendingKvDeleteHandle) -> types::KvStoreDeleteHandle { + let s = unsafe { h.inner() }; + s.into() + } +} + +impl From for types::PendingKvListHandle { + fn from(h: types::KvStoreListHandle) -> types::PendingKvListHandle { + let s = unsafe { h.inner() }; + s.into() + } +} + +impl From for types::KvStoreListHandle { + fn from(h: types::PendingKvListHandle) -> types::KvStoreListHandle { + let s = unsafe { h.inner() }; + s.into() + } +} + impl From for http::version::Version { fn from(v: types::HttpVersion) -> http::version::Version { match v { diff --git a/lib/src/wiggle_abi/entity.rs b/lib/src/wiggle_abi/entity.rs index 1641ade8..5b11940d 100644 --- a/lib/src/wiggle_abi/entity.rs +++ b/lib/src/wiggle_abi/entity.rs @@ -3,8 +3,9 @@ //! [ref]: https://docs.rs/cranelift-entity/latest/cranelift_entity/trait.EntityRef.html use super::types::{ - AsyncItemHandle, BodyHandle, DictionaryHandle, EndpointHandle, ObjectStoreHandle, - PendingRequestHandle, RequestHandle, ResponseHandle, SecretHandle, SecretStoreHandle, + AsyncItemHandle, BodyHandle, DictionaryHandle, EndpointHandle, KvStoreHandle, + ObjectStoreHandle, PendingRequestHandle, RequestHandle, ResponseHandle, SecretHandle, + SecretStoreHandle, }; /// Macro which implements a 32-bit entity reference for handles generated by Wiggle. @@ -46,6 +47,7 @@ wiggle_entity!(EndpointHandle); wiggle_entity!(PendingRequestHandle); wiggle_entity!(DictionaryHandle); wiggle_entity!(ObjectStoreHandle); +wiggle_entity!(KvStoreHandle); wiggle_entity!(SecretStoreHandle); wiggle_entity!(SecretHandle); wiggle_entity!(AsyncItemHandle); diff --git a/lib/src/wiggle_abi/kv_store_impl.rs b/lib/src/wiggle_abi/kv_store_impl.rs new file mode 100644 index 00000000..96f3ca0d --- /dev/null +++ b/lib/src/wiggle_abi/kv_store_impl.rs @@ -0,0 +1,335 @@ +//! fastly_obj_store` hostcall implementations. + +use crate::object_store::KvStoreError; +use crate::session::PeekableTask; +use crate::session::{ + PendingKvDeleteTask, PendingKvInsertTask, PendingKvListTask, PendingKvLookupTask, +}; + +use { + crate::{ + error::Error, + object_store::{ObjectKey, ObjectStoreError}, + session::Session, + wiggle_abi::{ + fastly_kv_store::FastlyKvStore, + types::{ + BodyHandle, KvDeleteConfig, KvDeleteConfigOptions, KvError, KvInsertConfig, + KvInsertConfigOptions, KvListConfig, KvListConfigOptions, KvLookupConfig, + KvLookupConfigOptions, KvStoreDeleteHandle, KvStoreHandle, KvStoreInsertHandle, + KvStoreListHandle, KvStoreLookupHandle, + }, + }, + }, + wiggle::{GuestMemory, GuestPtr}, +}; + +#[wiggle::async_trait] +impl FastlyKvStore for Session { + fn open( + &mut self, + memory: &mut GuestMemory<'_>, + name: GuestPtr, + ) -> Result { + let name = memory.as_str(name)?.ok_or(Error::SharedMemory)?; + if self.kv_store.store_exists(&name)? { + self.kv_store_handle(&name) + } else { + Err(Error::ObjectStoreError( + ObjectStoreError::UnknownObjectStore(name.to_owned()), + )) + } + } + + async fn lookup( + &mut self, + memory: &mut GuestMemory<'_>, + store: KvStoreHandle, + key: GuestPtr, + _lookup_config_mask: KvLookupConfigOptions, + _lookup_configuration: GuestPtr, + handle_out: GuestPtr, + ) -> Result<(), Error> { + let store = self.get_kv_store_key(store).unwrap(); + let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string()) + .map_err(|_| KvStoreError::BadRequest)?; + // just create a future that's already ready + let fut = futures::future::ok(self.obj_lookup(store.clone(), key)); + let task = PeekableTask::spawn(fut).await; + memory.write( + handle_out, + self.insert_pending_kv_lookup(PendingKvLookupTask::new(task)) + .into(), + )?; + Ok(()) + } + + async fn lookup_wait( + &mut self, + memory: &mut GuestMemory<'_>, + pending_kv_lookup_handle: KvStoreLookupHandle, + body_handle_out: GuestPtr, + metadata_buf: GuestPtr, + metadata_buf_len: u32, + nwritten_out: GuestPtr, + generation_out: GuestPtr, + kv_error_out: GuestPtr, + ) -> Result<(), Error> { + let resp = self + .take_pending_kv_lookup(pending_kv_lookup_handle.into())? + .task() + .recv() + .await?; + + match resp { + Ok(value) => { + let body_handle = self.insert_body(value.body.into()); + + memory.write(body_handle_out, body_handle)?; + match value.metadata_len { + 0 => memory.write(nwritten_out, 0)?, + len => { + let meta_len_u32 = + u32::try_from(len).expect("metadata len is outside the bounds of u32"); + memory.write(nwritten_out, meta_len_u32)?; + if meta_len_u32 > metadata_buf_len { + return Err(Error::BufferLengthError { + buf: "metadata", + len: "specified length", + }); + } + memory.copy_from_slice( + &value.metadata, + metadata_buf.as_array(meta_len_u32), + )?; + } + } + memory.write(generation_out, value.generation)?; + memory.write(kv_error_out, KvError::Ok)?; + Ok(()) + } + Err(e) => { + memory.write(kv_error_out, (&e).into())?; + Ok(()) + } + } + } + + async fn insert( + &mut self, + memory: &mut GuestMemory<'_>, + store: KvStoreHandle, + key: GuestPtr, + body_handle: BodyHandle, + insert_config_mask: KvInsertConfigOptions, + insert_configuration: GuestPtr, + pending_handle_out: GuestPtr, + ) -> Result<(), Error> { + let store = self.get_kv_store_key(store).unwrap().clone(); + let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string()) + .map_err(|_| KvStoreError::BadRequest)?; + let body = self.take_body(body_handle)?.read_into_vec().await?; + + let config = memory.read(insert_configuration)?; + + let config_str_or_none = |flag, str_field: GuestPtr, len_field| { + if insert_config_mask.contains(flag) { + if len_field == 0 { + return Err(Error::InvalidArgument); + } + + Ok(Some(memory.to_vec(str_field.as_array(len_field))?)) + } else { + Ok(None) + } + }; + + let mode = config.mode; + + // won't actually do anything in viceroy + // let bgf = insert_config_mask.contains(KvInsertConfigOptions::BACKGROUND_FETCH); + + let igm = if insert_config_mask.contains(KvInsertConfigOptions::IF_GENERATION_MATCH) { + Some(config.if_generation_match) + } else { + None + }; + + let meta = config_str_or_none( + KvInsertConfigOptions::METADATA, + config.metadata, + config.metadata_len, + )?; + + let ttl = if insert_config_mask.contains(KvInsertConfigOptions::TIME_TO_LIVE_SEC) { + Some(std::time::Duration::from_secs( + config.time_to_live_sec as u64, + )) + } else { + None + }; + + let fut = futures::future::ok(self.kv_insert(store, key, body, Some(mode), igm, meta, ttl)); + let task = PeekableTask::spawn(fut).await; + memory.write( + pending_handle_out, + self.insert_pending_kv_insert(PendingKvInsertTask::new(task)), + )?; + + Ok(()) + } + + async fn insert_wait( + &mut self, + memory: &mut GuestMemory<'_>, + pending_insert_handle: KvStoreInsertHandle, + kv_error_out: GuestPtr, + ) -> Result<(), Error> { + let resp = self + .take_pending_kv_insert(pending_insert_handle.into())? + .task() + .recv() + .await?; + + match resp { + Ok(_) => { + memory.write(kv_error_out, KvError::Ok)?; + Ok(()) + } + Err(e) => { + memory.write(kv_error_out, (&e).into())?; + Ok(()) + } + } + } + + async fn delete( + &mut self, + memory: &mut GuestMemory<'_>, + store: KvStoreHandle, + key: GuestPtr, + _delete_config_mask: KvDeleteConfigOptions, + _delete_configuration: GuestPtr, + pending_handle_out: GuestPtr, + ) -> Result<(), Error> { + let store = self.get_kv_store_key(store).unwrap().clone(); + let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string()) + .map_err(|_| KvStoreError::BadRequest)?; + let fut = futures::future::ok(self.kv_delete(store, key)); + let task = PeekableTask::spawn(fut).await; + memory.write( + pending_handle_out, + self.insert_pending_kv_delete(PendingKvDeleteTask::new(task)) + .into(), + )?; + Ok(()) + } + + async fn delete_wait( + &mut self, + memory: &mut GuestMemory<'_>, + pending_delete_handle: KvStoreDeleteHandle, + kv_error_out: GuestPtr, + ) -> Result<(), Error> { + let resp = self + .take_pending_kv_delete(pending_delete_handle.into())? + .task() + .recv() + .await?; + + match resp { + Ok(_) => { + memory.write(kv_error_out, KvError::Ok)?; + Ok(()) + } + Err(e) => { + memory.write(kv_error_out, (&e).into())?; + Ok(()) + } + } + } + + async fn list( + &mut self, + memory: &mut GuestMemory<'_>, + store: KvStoreHandle, + list_config_mask: KvListConfigOptions, + list_configuration: GuestPtr, + pending_handle_out: GuestPtr, + ) -> Result<(), Error> { + let store = self.get_kv_store_key(store).unwrap().clone(); + + let config = memory.read(list_configuration)?; + + let config_string_or_none = |flag, str_field: GuestPtr, len_field| { + if list_config_mask.contains(flag) { + if len_field == 0 { + return Err(Error::InvalidArgument); + } + + let byte_vec = memory.to_vec(str_field.as_array(len_field))?; + + Ok(Some( + String::from_utf8(byte_vec).map_err(|_| Error::InvalidArgument)?, + )) + } else { + Ok(None) + } + }; + + let cursor = config_string_or_none( + KvListConfigOptions::CURSOR, + config.cursor, + config.cursor_len, + )?; + + let prefix = config_string_or_none( + KvListConfigOptions::PREFIX, + config.prefix, + config.prefix_len, + )?; + + let limit = match list_config_mask.contains(KvListConfigOptions::LIMIT) { + true => Some(config.limit), + false => None, + }; + + let fut = futures::future::ok(self.kv_list(store, cursor, prefix, limit)); + let task = PeekableTask::spawn(fut).await; + memory.write( + pending_handle_out, + self.insert_pending_kv_list(PendingKvListTask::new(task)) + .into(), + )?; + Ok(()) + } + + async fn list_wait( + &mut self, + memory: &mut GuestMemory<'_>, + pending_kv_list_handle: KvStoreListHandle, + body_handle_out: GuestPtr, + kv_error_out: GuestPtr, + ) -> Result<(), Error> { + let resp = self + .take_pending_kv_list(pending_kv_list_handle.into())? + .task() + .recv() + .await?; + + match resp { + Ok(value) => { + let body_handle = self.insert_body(value.into()).into(); + + memory.write(body_handle_out, body_handle)?; + + memory.write(kv_error_out, KvError::Ok)?; + Ok(()) + } + Err(e) => { + memory.write(kv_error_out, (&e).into())?; + Ok(()) + } + } + } +} diff --git a/lib/src/wiggle_abi/obj_store_impl.rs b/lib/src/wiggle_abi/obj_store_impl.rs index ce8211d3..493f0c94 100644 --- a/lib/src/wiggle_abi/obj_store_impl.rs +++ b/lib/src/wiggle_abi/obj_store_impl.rs @@ -8,7 +8,7 @@ use { crate::{ body::Body, error::Error, - object_store::{ObjectKey, ObjectStoreError}, + object_store::{KvStoreError, ObjectKey, ObjectStoreError}, session::Session, wiggle_abi::{ fastly_object_store::FastlyObjectStore, @@ -26,8 +26,8 @@ impl FastlyObjectStore for Session { name: GuestPtr, ) -> Result { let name = memory.as_str(name)?.ok_or(Error::SharedMemory)?; - if self.object_store.store_exists(&name)? { - self.obj_store_handle(&name) + if self.kv_store.store_exists(name)? { + Ok(self.kv_store_handle(name)?.into()) } else { Err(Error::ObjectStoreError( ObjectStoreError::UnknownObjectStore(name.to_owned()), @@ -42,18 +42,18 @@ impl FastlyObjectStore for Session { key: GuestPtr, opt_body_handle_out: GuestPtr, ) -> Result<(), Error> { - let store = self.get_obj_store_key(store).unwrap(); + let store = self.get_kv_store_key(store.into()).unwrap(); let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())?; - match self.obj_lookup(store, &key) { + match self.obj_lookup(store.clone(), key) { Ok(obj) => { - let new_handle = self.insert_body(Body::from(obj)); + let new_handle = self.insert_body(Body::from(obj.body)); memory.write(opt_body_handle_out, new_handle)?; Ok(()) } // Don't write to the invalid handle as the SDK will return Ok(None) // if the object does not exist. We need to return `Ok(())` here to // make sure Viceroy does not crash - Err(ObjectStoreError::MissingObject) => Ok(()), + Err(KvStoreError::NotFound) => Ok(()), Err(err) => Err(err.into()), } } @@ -65,10 +65,10 @@ impl FastlyObjectStore for Session { key: GuestPtr, opt_pending_body_handle_out: GuestPtr, ) -> Result<(), Error> { - let store = self.get_obj_store_key(store).unwrap(); + let store = self.get_kv_store_key(store.into()).unwrap(); let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())?; // just create a future that's already ready - let fut = futures::future::ok(self.obj_lookup(store, &key)); + let fut = futures::future::ok(self.obj_lookup(store.clone(), key)); let task = PeekableTask::spawn(fut).await; memory.write( opt_pending_body_handle_out, @@ -91,11 +91,11 @@ impl FastlyObjectStore for Session { // proceed with the normal match from lookup() match pending_obj { Ok(obj) => { - let new_handle = self.insert_body(Body::from(obj)); + let new_handle = self.insert_body(Body::from(obj.body)); memory.write(opt_body_handle_out, new_handle)?; Ok(()) } - Err(ObjectStoreError::MissingObject) => Ok(()), + Err(KvStoreError::NotFound) => Ok(()), Err(err) => Err(err.into()), } } @@ -107,10 +107,10 @@ impl FastlyObjectStore for Session { key: GuestPtr, body_handle: BodyHandle, ) -> Result<(), Error> { - let store = self.get_obj_store_key(store).unwrap().clone(); + let store = self.get_kv_store_key(store.into()).unwrap().clone(); let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())?; let bytes = self.take_body(body_handle)?.read_into_vec().await?; - self.obj_insert(store, key, bytes)?; + self.kv_insert(store, key, bytes, None, None, None, None)?; Ok(()) } @@ -123,14 +123,15 @@ impl FastlyObjectStore for Session { body_handle: BodyHandle, opt_pending_body_handle_out: GuestPtr, ) -> Result<(), Error> { - let store = self.get_obj_store_key(store).unwrap().clone(); + let store = self.get_kv_store_key(store.into()).unwrap().clone(); let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())?; let bytes = self.take_body(body_handle)?.read_into_vec().await?; - let fut = futures::future::ok(self.obj_insert(store, key, bytes)); + let fut = futures::future::ok(self.kv_insert(store, key, bytes, None, None, None, None)); let task = PeekableTask::spawn(fut).await; memory.write( opt_pending_body_handle_out, - self.insert_pending_kv_insert(PendingKvInsertTask::new(task)), + self.insert_pending_kv_insert(PendingKvInsertTask::new(task)) + .into(), )?; Ok(()) } @@ -154,9 +155,9 @@ impl FastlyObjectStore for Session { key: GuestPtr, opt_pending_delete_handle_out: GuestPtr, ) -> Result<(), Error> { - let store = self.get_obj_store_key(store).unwrap().clone(); + let store = self.get_kv_store_key(store.into()).unwrap().clone(); let key = ObjectKey::new(memory.as_str(key)?.ok_or(Error::SharedMemory)?.to_string())?; - let fut = futures::future::ok(self.obj_delete(store, key)); + let fut = futures::future::ok(self.kv_delete(store, key)); let task = PeekableTask::spawn(fut).await; memory.write( opt_pending_delete_handle_out, diff --git a/lib/wit/deps/fastly/compute.wit b/lib/wit/deps/fastly/compute.wit index 1006d781..cc1076dc 100644 --- a/lib/wit/deps/fastly/compute.wit +++ b/lib/wit/deps/fastly/compute.wit @@ -726,12 +726,22 @@ interface kv-store { type delete-handle = u32; type list-handle = u32; - open: func(name: string) -> result, error>; + enum kv-status { + ok, + bad-request, + not-found, + precondition-failed, + payload-too-large, + internal-error, + too-many-requests, + } + + open: func(name: list) -> result, error>; lookup: func( store: handle, - key: string, - ) -> result; + key: list, + ) -> result; resource lookup-result { body: func() -> body-handle; @@ -741,7 +751,7 @@ interface kv-store { lookup-wait: func( handle: lookup-handle, - ) -> result, error>; + ) -> result, kv-status>, error>; enum insert-mode { overwrite, @@ -767,7 +777,7 @@ interface kv-store { insert: func( store: handle, - key: string, + key: list, body-handle: body-handle, mask: insert-config-options, config: insert-config, @@ -775,16 +785,16 @@ interface kv-store { insert-wait: func( handle: insert-handle, - ) -> result<_, error>; + ) -> result; delete: func( store: handle, - key: string, + key: list, ) -> result; delete-wait: func( handle: delete-handle, - ) -> result<_, error>; + ) -> result; enum list-mode { strong, @@ -813,7 +823,7 @@ interface kv-store { list-wait: func( handle: list-handle, - ) -> result; + ) -> result, kv-status>, error>; } /*