diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index da9683f9..22e3a898 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -364,6 +364,7 @@ jobs: - 'http_body' - 'http_config' - 'http_headers' + - 'http_parallel_call' - 'grpc_auth_random' defaults: @@ -444,6 +445,7 @@ jobs: - 'http_body' - 'http_config' - 'http_headers' + - 'http_parallel_call' - 'grpc_auth_random' defaults: diff --git a/BUILD b/BUILD index 7f611e4f..1f5f914e 100644 --- a/BUILD +++ b/BUILD @@ -29,7 +29,10 @@ cargo_build_script( rust_library( name = "proxy_wasm", - srcs = glob(["src/*.rs"]), + srcs = glob([ + "src/*.rs", + "src/callout/*.rs", + ]), edition = "2018", visibility = ["//visibility:public"], deps = [ diff --git a/README.md b/README.md index 7db3cdc2..65f3f0f8 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,7 @@ - [HTTP Headers](./examples/http_headers/) - [HTTP Response body](./examples/http_body/) - [HTTP Configuration](./examples/http_config/) +- [HTTP Parallel Call](./examples/http_parallel_call/) - [gRPC Auth (random)](./examples/grpc_auth_random/) ## Articles & blog posts from the community diff --git a/examples/http_parallel_call/Cargo.toml b/examples/http_parallel_call/Cargo.toml new file mode 100644 index 00000000..29899f51 --- /dev/null +++ b/examples/http_parallel_call/Cargo.toml @@ -0,0 +1,22 @@ +[package] +publish = false +name = "proxy-wasm-example-http-parallel-call" +version = "0.0.1" +authors = ["Zhuozhi Ji "] +description = "Proxy-Wasm plugin example: HTTP parallel call" +license = "Apache-2.0" +edition = "2018" + +[lib] +crate-type = ["cdylib"] + +[dependencies] +log = "0.4" +proxy-wasm = { path = "../../" } + +[profile.release] +lto = true +opt-level = 3 +codegen-units = 1 +panic = "abort" +strip = "debuginfo" diff --git a/examples/http_parallel_call/README.md b/examples/http_parallel_call/README.md new file mode 100644 index 00000000..a837b2c3 --- /dev/null +++ b/examples/http_parallel_call/README.md @@ -0,0 +1,27 @@ +## Proxy-Wasm plugin example: HTTP parallel call + +Proxy-Wasm plugin that makes multiply HTTP callout and combine responses as final response . + +### Building + +```sh +$ cargo build --target wasm32-wasi --release +``` + +### Using in Envoy + +This example can be run with [`docker compose`](https://docs.docker.com/compose/install/) +and has a matching Envoy configuration. + +```sh +$ docker compose up +``` + +#### Access granted. + +Send HTTP request to `localhost:10000/`: + +```sh +$ curl localhost:10000/ +Hello, World!\n +``` diff --git a/examples/http_parallel_call/docker-compose.yaml b/examples/http_parallel_call/docker-compose.yaml new file mode 100644 index 00000000..6a188511 --- /dev/null +++ b/examples/http_parallel_call/docker-compose.yaml @@ -0,0 +1,36 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +services: + envoy: + image: envoyproxy/envoy:v1.31-latest + hostname: envoy + ports: + - "10000:10000" + volumes: + - ./envoy.yaml:/etc/envoy/envoy.yaml + - ./target/wasm32-wasi/release:/etc/envoy/proxy-wasm-plugins + networks: + - envoymesh + depends_on: + - httpbin + httpbin: + image: mccutchen/go-httpbin + hostname: httpbin + ports: + - "8080:8080" + networks: + - envoymesh +networks: + envoymesh: {} diff --git a/examples/http_parallel_call/envoy.yaml b/examples/http_parallel_call/envoy.yaml new file mode 100644 index 00000000..61fdf8da --- /dev/null +++ b/examples/http_parallel_call/envoy.yaml @@ -0,0 +1,68 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +static_resources: + listeners: + address: + socket_address: + address: 0.0.0.0 + port_value: 10000 + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: ingress_http + codec_type: AUTO + route_config: + name: local_routes + virtual_hosts: + - name: local_service + domains: + - "*" + routes: + - match: + prefix: "/" + route: + cluster: httpbin + http_filters: + - name: envoy.filters.http.wasm + typed_config: + "@type": type.googleapis.com/udpa.type.v1.TypedStruct + type_url: type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm + value: + config: + name: "http_parallel_call" + vm_config: + runtime: "envoy.wasm.runtime.v8" + code: + local: + filename: "/etc/envoy/proxy-wasm-plugins/proxy_wasm_example_http_parallel_call.wasm" + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + clusters: + - name: httpbin + connect_timeout: 5s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: httpbin + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: httpbin + port_value: 8080 diff --git a/examples/http_parallel_call/src/lib.rs b/examples/http_parallel_call/src/lib.rs new file mode 100644 index 00000000..68c2529f --- /dev/null +++ b/examples/http_parallel_call/src/lib.rs @@ -0,0 +1,112 @@ +// Copyright 2020 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use proxy_wasm::callout::http::HttpClient; +use proxy_wasm::callout::promise::Promise; +use proxy_wasm::traits::*; +use proxy_wasm::types::*; +use std::cell::RefCell; +use std::rc::Rc; +use std::time::Duration; + +proxy_wasm::main! {{ + proxy_wasm::set_log_level(LogLevel::Trace); + proxy_wasm::set_http_context(|_, _| -> Box { Box::new(HttpParallelCall::default()) }); +}} + +#[derive(Default, Clone)] +struct HttpParallelCall { + client: Rc>, +} + +impl HttpContext for HttpParallelCall { + fn on_http_request_headers(&mut self, _: usize, _: bool) -> Action { + let self_clone_for_promise1 = self.clone(); + let self_clone_for_promise2 = self.clone(); + let self_clone_for_join = self.clone(); + + // "Hello, " + let promise1 = self + .client + .borrow_mut() + .dispatch( + "httpbin", + vec![ + (":method", "GET"), + (":path", "/base64/SGVsbG8sIA=="), + (":authority", "httpbin.org"), + ], + None, + vec![], + Duration::from_secs(1), + ) + .then(move |(_, _, body_size, _)| { + match self_clone_for_promise1.get_http_call_response_body(0, body_size) { + None => "".to_owned(), + Some(bytes) => String::from_utf8(bytes.to_vec()).unwrap(), + } + }); + + // "World!" + let promise2 = self + .client + .borrow_mut() + .dispatch( + "httpbin", + vec![ + (":method", "GET"), + (":path", "/base64/V29ybGQh"), + (":authority", "httpbin.org"), + ], + None, + vec![], + Duration::from_secs(1), + ) + .then(move |(_, _, body_size, _)| { + match self_clone_for_promise2.get_http_call_response_body(0, body_size) { + None => "".to_owned(), + Some(bytes) => String::from_utf8(bytes.to_vec()).unwrap(), + } + }); + + Promise::all_of(vec![promise1, promise2]).then(move |results| { + self_clone_for_join.send_http_response( + 200, + vec![], + Some(format!("{}{}\n", results[0], results[1]).as_bytes()), + ); + }); + + Action::Pause + } + + fn on_http_response_headers(&mut self, _: usize, _: bool) -> Action { + self.set_http_response_header("Powered-By", Some("proxy-wasm")); + Action::Continue + } +} + +impl Context for HttpParallelCall { + fn on_http_call_response( + &mut self, + token_id: u32, + num_headers: usize, + body_size: usize, + num_trailers: usize, + ) { + self.client + .borrow_mut() + .callback(token_id, num_headers, body_size, num_trailers) + } +} diff --git a/src/callout/http.rs b/src/callout/http.rs new file mode 100644 index 00000000..90ebdb2a --- /dev/null +++ b/src/callout/http.rs @@ -0,0 +1,56 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::callout::promise::Promise; +use crate::hostcalls; +use std::collections::HashMap; +use std::rc::Rc; +use std::time::Duration; + +type OnHttpResponseArgs = (u32, usize, usize, usize); + +#[derive(Default)] +pub struct HttpClient { + m: HashMap>>, +} + +impl HttpClient { + pub fn dispatch( + &mut self, + upstream: &str, + headers: Vec<(&str, &str)>, + body: Option<&[u8]>, + trailers: Vec<(&str, &str)>, + timeout: Duration, + ) -> Rc> { + let token = + hostcalls::dispatch_http_call(upstream, headers, body, trailers, timeout).unwrap(); + let promise = Promise::new(); + self.m.insert(token, promise.clone()); + promise + } + + pub fn callback( + &mut self, + token_id: u32, + num_headers: usize, + body_size: usize, + num_trailers: usize, + ) { + let promise = self.m.remove(&token_id); + promise + .unwrap() + .fulfill((token_id, num_headers, body_size, num_trailers)) + } +} diff --git a/src/callout/mod.rs b/src/callout/mod.rs new file mode 100644 index 00000000..b67a869d --- /dev/null +++ b/src/callout/mod.rs @@ -0,0 +1,16 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod http; +pub mod promise; diff --git a/src/callout/promise.rs b/src/callout/promise.rs new file mode 100644 index 00000000..873940ca --- /dev/null +++ b/src/callout/promise.rs @@ -0,0 +1,341 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cell::RefCell; +use std::rc::Rc; + +enum PromiseState { + Pending, + Fulfilled(T), + Rejected(String), +} + +type ThenCallbackRef = RefCell>>; +type CatchCallbackRef = RefCell>>; + +pub struct Promise { + state: RefCell>, + then_callback: ThenCallbackRef, + catch_callback: CatchCallbackRef, +} + +impl Promise +where + T: 'static + Clone, +{ + pub fn new() -> Rc { + Rc::new(Self { + state: RefCell::new(PromiseState::Pending), + then_callback: RefCell::new(None), + catch_callback: RefCell::new(None), + }) + } + + pub fn fulfill(self: &Rc, value: T) { + *self.state.borrow_mut() = PromiseState::Fulfilled(value.clone()); + if let Some(callback) = self.then_callback.borrow_mut().take() { + callback(value); + } + } + + pub fn reject(self: &Rc, reason: String) { + *self.state.borrow_mut() = PromiseState::Rejected(reason.clone()); + if let Some(callback) = self.catch_callback.borrow_mut().take() { + callback(reason); + } + } + + pub fn then(self: &Rc, f: F) -> Rc> + where + F: FnOnce(T) -> R + 'static, + R: 'static + Clone, + { + let new_promise = Promise::new(); + let new_promise_clone = new_promise.clone(); + match &*self.state.borrow() { + PromiseState::Pending => { + *self.then_callback.borrow_mut() = Some(Box::new(move |value| { + let result = f(value); + new_promise_clone.fulfill(result); + })); + let new_promise_for_catch = new_promise.clone(); + *self.catch_callback.borrow_mut() = Some(Box::new(move |reason| { + new_promise_for_catch.reject(reason); + })); + } + PromiseState::Fulfilled(value) => { + let result = f(value.clone()); + new_promise.fulfill(result); + } + PromiseState::Rejected(reason) => new_promise.reject(reason.clone()), + } + new_promise + } + + pub fn catch(self: &Rc, f: F) -> Rc + where + F: FnOnce(String) + 'static, + { + match &*self.state.borrow() { + PromiseState::Pending => *self.catch_callback.borrow_mut() = Some(Box::new(f)), + PromiseState::Fulfilled(_) => {} + PromiseState::Rejected(reason) => f(reason.clone()), + } + self.clone() + } + + pub fn all_of(promises: Vec>) -> Rc>> { + let next_promise = Promise::new(); + + if promises.is_empty() { + next_promise.fulfill(vec![]); + return next_promise; + } + + let total = promises.len(); + let results = Rc::new(RefCell::new(vec![None; total])); + let remaining = Rc::new(RefCell::new(total)); + let rejected = Rc::new(RefCell::new(false)); + + for (i, promise) in promises.iter().enumerate() { + let next_promise_clone = next_promise.clone(); + let next_promise_clone_for_catch = next_promise.clone(); + let results_clone = results.clone(); + let remaining_clone = remaining.clone(); + let rejected_clone = rejected.clone(); + let rejected_clone_for_catch = rejected.clone(); + promise + .then(move |result| { + if *rejected_clone.borrow() { + return; + } + results_clone.borrow_mut()[i] = Some(result); + *remaining_clone.borrow_mut() -= 1; + + if *remaining_clone.borrow() == 0 { + let final_results: Vec = results_clone + .borrow_mut() + .iter_mut() + .map(|res| res.take().unwrap()) + .collect(); + next_promise_clone.fulfill(final_results); + } + }) + .catch(move |reason| { + if !*rejected_clone_for_catch.borrow() { + *rejected_clone_for_catch.borrow_mut() = true; + next_promise_clone_for_catch.reject(reason); + } + }); + } + next_promise + } + + pub fn any_of(promises: Vec>) -> Rc> { + let next_promise = Promise::new(); + let total = promises.len(); + let remaining = Rc::new(RefCell::new(total)); + let first_error = Rc::new(RefCell::new(None)); + + for promise in promises { + let next_promise_clone = next_promise.clone(); + let next_promise_clone_for_catch = next_promise.clone(); + let remaining_clone = remaining.clone(); + let remaining_clone_for_catch = remaining.clone(); + let first_error_clone = first_error.clone(); + + promise + .then(move |result| { + if *remaining_clone.borrow() > 0 { + next_promise_clone.fulfill(result); + *remaining_clone.borrow_mut() = 0; + } + }) + .catch(move |err| { + if first_error_clone.borrow().is_none() { + *first_error_clone.borrow_mut() = Some(err); + } + + *remaining_clone_for_catch.borrow_mut() -= 1; + + if *remaining_clone_for_catch.borrow() == 0 { + if let Some(first_err) = first_error_clone.borrow().clone() { + next_promise_clone_for_catch.reject(first_err); + } + } + }); + } + + next_promise + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_promise_new() { + let promise = Promise::::new(); + assert!(matches!(*promise.state.borrow(), PromiseState::Pending)); + assert!(promise.then_callback.borrow().is_none()); + assert!(promise.catch_callback.borrow().is_none()); + } + + #[test] + fn test_promise_fulfill() { + let touched = Rc::new(RefCell::new(false)); + let touched_clone = touched.clone(); + + let promise = Promise::::new(); + let _next_promise = promise.then(move |result| { + assert_eq!(result, 42); + *touched_clone.borrow_mut() = true; + }); + + promise.fulfill(42); + assert!(touched.take()) + } + + #[test] + fn test_promise_reject() { + let touched = Rc::new(RefCell::new(false)); + let touched_clone = touched.clone(); + + let promise = Promise::::new(); + let _next_promise = promise.catch(move |err| { + assert_eq!(err, "Error"); + *touched_clone.borrow_mut() = true; + }); + + promise.reject("Error".to_string()); + assert!(touched.take()) + } + + #[test] + fn test_promise_chain() { + let touched = Rc::new(RefCell::new(false)); + let touched_clone = touched.clone(); + + let promise = Promise::::new(); + let next_promise = promise.then(|result| { + assert_eq!(result, 10); + 20 + }); + + next_promise.then(move |result| { + assert_eq!(result, 20); + *touched_clone.borrow_mut() = true; + }); + + promise.fulfill(10); + assert!(touched.take()) + } + + #[test] + fn test_all_of_success() { + let touched = Rc::new(RefCell::new(false)); + let touched_clone = touched.clone(); + + let promise1 = Promise::::new(); + let promise2 = Promise::::new(); + + let all_promise = Promise::all_of(vec![promise1.clone(), promise2.clone()]); + + promise1.fulfill(42); + promise2.fulfill(100); + + all_promise + .then(move |results| { + assert_eq!(results.len(), 2); + assert_eq!(results[0], 42); + assert_eq!(results[1], 100); + *touched_clone.borrow_mut() = true; + }) + .catch(|_err| { + panic!("Should not reach here"); + }); + + assert!(touched.take()) + } + + #[test] + fn test_all_of_failure() { + let touched = Rc::new(RefCell::new(false)); + let touched_clone = touched.clone(); + + let promise1 = Promise::::new(); + let promise2 = Promise::::new(); + + let all_promise = Promise::all_of(vec![promise1.clone(), promise2.clone()]); + + promise1.reject("Error 1".to_string()); + promise2.reject("Error 2".to_string()); + + all_promise + .then(|_results| { + panic!("Should not reach here"); + }) + .catch(move |err| { + assert_eq!(err, "Error 1"); + *touched_clone.borrow_mut() = true; + }); + + assert!(touched.take()) + } + + #[test] + fn test_all_of_mixed_results() { + let touched = Rc::new(RefCell::new(false)); + let touched_clone = touched.clone(); + + let promise1 = Promise::::new(); + let promise2 = Promise::::new(); + + let all_promise = Promise::all_of(vec![promise1.clone(), promise2.clone()]); + + promise1.reject("Error".to_string()); + promise2.fulfill(100); + + all_promise + .then(|_| { + panic!("Should not reach here"); + }) + .catch(move |reason| { + assert_eq!(reason, "Error".to_string()); + *touched_clone.borrow_mut() = true; + }); + + assert!(touched.take()) + } + + #[test] + fn test_all_of_empty() { + let touched = Rc::new(RefCell::new(false)); + let touched_clone = touched.clone(); + + let all_promise = Promise::::all_of(vec![]); + + all_promise + .then(move |results| { + assert!(results.is_empty()); + *touched_clone.borrow_mut() = true; + }) + .catch(|_err| { + panic!("Should not reach here"); + }); + + assert!(touched.take()) + } +} diff --git a/src/lib.rs b/src/lib.rs index a8f42651..14f37989 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod callout; pub mod hostcalls; pub mod traits; pub mod types;