Skip to content

Commit

Permalink
WIP: async
Browse files Browse the repository at this point in the history
Fixes #115
  • Loading branch information
puzza007 committed Feb 22, 2022
1 parent fb9b832 commit 7114888
Showing 1 changed file with 43 additions and 8 deletions.
51 changes: 43 additions & 8 deletions src/katipo.erl
Original file line number Diff line number Diff line change
Expand Up @@ -320,9 +320,18 @@
sslkey = undefined :: undefined | binary() | file:name_all(),
sslkey_blob = undefined :: undefined | binary(),
keypasswd = undefined :: undefined | binary(),
userpwd = undefined :: undefined | binary()
userpwd = undefined :: undefined | binary(),
async = false :: boolean(),
reply_to = undefined :: pid() | atom()
}).

-record(reply_to, {
async = false :: boolean(),
from :: {pid(), any()} | pid(),
tref = undefined :: undefined | reference(),
response_ref = undefined :: undefined | reference()
}).

tcp_fastopen_available() ->
?TCP_FASTOPEN_AVAILABLE.

Expand Down Expand Up @@ -399,12 +408,16 @@ req(PoolName, Opts)
case process_opts(Opts) of
{ok, #req{url=undefined}} ->
{error, error_map(bad_opts, <<"[{url,undefined}]">>)};
{ok, Req} ->
{ok, Req=#req{async=Async, reply_to=ReplyTo}} ->
Timeout = ?MODULE:get_timeout(Req),
Req2 = Req#req{timeout=Timeout},
Req3 = case {Async, ReplyTo} of
{true, undefined} -> Req2#req{reply_to=self()};
{_, _} -> Req2
end,
Ts = os:timestamp(),
{Result, {Response, Metrics}} =
wpool:call(PoolName, Req2, random_worker, infinity),
wpool:call(PoolName, Req3, random_worker, infinity),
TotalUs = timer:now_diff(os:timestamp(), Ts),
Metrics2 = katipo_metrics:notify({Result, Response}, Metrics, TotalUs),
Response2 = maybe_return_metrics(Req2, Metrics2, Response),
Expand Down Expand Up @@ -460,7 +473,9 @@ handle_call(#req{method = Method,
sslkey = SSLKey,
sslkey_blob = SSLKeyBlob,
keypasswd = KeyPasswd,
userpwd = UserPwd},
userpwd = UserPwd,
async = Async,
reply_to = PidOrAtom},
From,
State=#state{port=Port, reqs=Reqs}) ->
{Self, Ref} = From,
Expand Down Expand Up @@ -490,9 +505,22 @@ handle_call(#req{method = Method,
{?userpwd, UserPwd}],
Command = {Self, Ref, Method, Url, Headers, CookieJar, Body, Opts},
true = port_command(Port, term_to_binary(Command)),
Tref = erlang:start_timer(Timeout, self(), {req_timeout, From}),
Reqs2 = maps:put(From, Tref, Reqs),
{noreply, State#state{reqs=Reqs2}}.
ReplyTo =
case Async of
false ->
Tref = erlang:start_timer(Timeout, self(), {req_timeout, From}),
#reply_to{async=Async, from=From, tref=Tref};
true ->
#reply_to{async=Async, from=PidOrAtom, response_ref=make_ref()}
end,
Reqs2 = maps:put(From, ReplyTo, Reqs),
State2 = State#state{reqs=Reqs2},
case Async of
false ->
{noreply, State2};
true ->
{reply, {ok, ReplyTo#reply_to.response_ref}, State2}
end.

handle_cast(Msg, State) ->
error_logger:error_msg("Unexpected cast: ~p", [Msg]),
Expand All @@ -512,9 +540,12 @@ handle_info({Port, {data, Data}}, State=#state{port=Port, reqs=Reqs}) ->
{error, {From0, {Error, Metrics}}}
end,
case maps:find(From, Reqs) of
{ok, Tref} ->
{ok, #reply_to{async=false, from=From, tref=Tref}} ->
_ = erlang:cancel_timer(Tref),
_ = gen_server:reply(From, {Result, Response});
{ok, #reply_to{async=true, from=PidOrAtom, response_ref=ResponseRef}} ->
PidOrAtom ! {katipo_response, ResponseRef, {Result, Response}},
ok;
error ->
ok
end,
Expand Down Expand Up @@ -716,6 +747,10 @@ opt(keypasswd, Pass, {Req, Errors}) when is_binary(Pass) ->
{Req#req{keypasswd=Pass}, Errors};
opt(userpwd, UserPwd, {Req, Errors}) when is_binary(UserPwd) ->
{Req#req{userpwd=UserPwd}, Errors};
opt(async, Async, {Req, Errors}) when is_boolean(Async) ->
{Req#req{async=Async}, Errors};
opt(reply_to, PidOrAtom, {Req, Errors}) when is_pid(PidOrAtom) orelse is_atom(PidOrAtom) ->
{Req#req{reply_to=PidOrAtom}, Errors};
opt(K, V, {Req, Errors}) ->
{Req, [{K, V} | Errors]}.

Expand Down

0 comments on commit 7114888

Please sign in to comment.