diff --git a/src/main/clojure/clojure/core/async.clj b/src/main/clojure/clojure/core/async.clj index 55ae450..0dba2e1 100644 --- a/src/main/clojure/clojure/core/async.clj +++ b/src/main/clojure/clojure/core/async.clj @@ -26,7 +26,7 @@ to catch and handle." [clojure.core.async.impl.buffers :as buffers] [clojure.core.async.impl.timers :as timers] [clojure.core.async.impl.dispatch :as dispatch] - [clojure.core.async.impl.runtime :as ioc] + [clojure.core.async.impl.ioc-macros :as ioc] [clojure.core.async.impl.mutex :as mutex] [clojure.core.async.impl.concurrent :as conc] ) @@ -417,7 +417,7 @@ to catch and handle." (defn ioc-alts! [state cont-block ports & {:as opts}] (ioc/aset-all! state ioc/STATE-IDX cont-block) - (when-let [cb (do-alts + (when-let [cb (clojure.core.async/do-alts (fn [val] (ioc/aset-all! state ioc/VALUE-IDX val) (ioc/run-state-machine-wrapped state)) @@ -462,8 +462,7 @@ to catch and handle." (dispatch/run (^:once fn* [] (let [~@(mapcat (fn [[l sym]] [sym `(^:once fn* [] ~(vary-meta l dissoc :tag))]) crossing-env) - f# ~((requiring-resolve 'clojure.core.async.impl.ioc-macros/state-machine) - `(do ~@body) 1 [crossing-env &env] ioc/async-custom-terminators) + f# ~(ioc/state-machine `(do ~@body) 1 [crossing-env &env] ioc/async-custom-terminators) state# (-> (f#) (ioc/aset-all! ioc/USER-START-IDX c# ioc/BINDINGS-IDX captured-bindings#))] diff --git a/src/main/clojure/clojure/core/async/impl/ioc_macros.clj b/src/main/clojure/clojure/core/async/impl/ioc_macros.clj index e6ea3bf..206b252 100644 --- a/src/main/clojure/clojure/core/async/impl/ioc_macros.clj +++ b/src/main/clojure/clojure/core/async/impl/ioc_macros.clj @@ -19,14 +19,40 @@ [clojure.tools.analyzer.passes.jvm.annotate-loops :refer [annotate-loops]] [clojure.tools.analyzer.passes.jvm.warn-on-reflection :refer [warn-on-reflection]] [clojure.tools.analyzer.jvm :as an-jvm] - [clojure.core.async.impl.runtime :as rt] + [clojure.core.async.impl.protocols :as impl] [clojure.set :as set]) - (:import [java.util.concurrent.atomic AtomicReferenceArray])) + (:import [java.util.concurrent.locks Lock] + [java.util.concurrent.atomic AtomicReferenceArray])) (defn debug [x] (pprint x) x) +(def ^{:const true :tag 'long} FN-IDX 0) +(def ^{:const true :tag 'long} STATE-IDX 1) +(def ^{:const true :tag 'long} VALUE-IDX 2) +(def ^{:const true :tag 'long} BINDINGS-IDX 3) +(def ^{:const true :tag 'long} EXCEPTION-FRAMES 4) +(def ^{:const true :tag 'long} USER-START-IDX 5) + +(defn aset-object [^AtomicReferenceArray arr ^long idx o] + (.set arr idx o)) + +(defn aget-object [^AtomicReferenceArray arr ^long idx] + (.get arr idx)) + +(defmacro aset-all! + [arr & more] + (assert (even? (count more)) "Must give an even number of args to aset-all!") + (let [bindings (partition 2 more) + arr-sym (gensym "statearr-")] + `(let [~arr-sym ~arr] + ~@(map + (fn [[idx val]] + `(aset-object ~arr-sym ~idx ~val)) + bindings) + ~arr-sym))) + ;; State monad stuff, used only in SSA construction (defmacro gen-plan @@ -191,7 +217,7 @@ IEmittableInstruction (emit-instruction [this state-sym] (if (= value ::value) - `[~(:id this) (rt/aget-object ~state-sym ~rt/VALUE-IDX)] + `[~(:id this) (aget-object ~state-sym ~VALUE-IDX)] `[~(:id this) ~value]))) (defrecord RawCode [ast locals] @@ -291,10 +317,10 @@ (terminate-block [_this state-sym _] `(do (case ~val-id ~@(concat (mapcat (fn [test blk] - `[~test (rt/aset-all! ~state-sym ~rt/STATE-IDX ~blk)]) + `[~test (aset-all! ~state-sym ~STATE-IDX ~blk)]) test-vals jmp-blocks) (when default-block - `[(do (rt/aset-all! ~state-sym ~rt/STATE-IDX ~default-block) + `[(do (aset-all! ~state-sym ~STATE-IDX ~default-block) :recur)]))) :recur))) @@ -325,7 +351,7 @@ (block-references [_this] [block]) ITerminator (terminate-block [_this state-sym _] - `(do (rt/aset-all! ~state-sym ~rt/VALUE-IDX ~value ~rt/STATE-IDX ~block) + `(do (aset-all! ~state-sym ~VALUE-IDX ~value ~STATE-IDX ~block) :recur))) (defrecord Return [value] @@ -338,7 +364,7 @@ (terminate-block [this state-sym custom-terminators] (if-let [f (get custom-terminators (terminator-code this))] `(~f ~state-sym ~value) - `(do (rt/aset-all! ~state-sym ~rt/VALUE-IDX ~value ~rt/STATE-IDX ::finished) + `(do (aset-all! ~state-sym ~VALUE-IDX ~value ~STATE-IDX ::finished) nil)))) (defrecord CondBr [test then-block else-block] @@ -349,8 +375,8 @@ ITerminator (terminate-block [_this state-sym _] `(do (if ~test - (rt/aset-all! ~state-sym ~rt/STATE-IDX ~then-block) - (rt/aset-all! ~state-sym ~rt/STATE-IDX ~else-block)) + (aset-all! ~state-sym ~STATE-IDX ~then-block) + (aset-all! ~state-sym ~STATE-IDX ~else-block)) :recur))) (defrecord PushTry [catch-block] @@ -360,7 +386,7 @@ (block-references [_this] [catch-block]) IEmittableInstruction (emit-instruction [_this state-sym] - `[~'_ (rt/aset-all! ~state-sym ~rt/EXCEPTION-FRAMES (cons ~catch-block (rt/aget-object ~state-sym ~rt/EXCEPTION-FRAMES)))])) + `[~'_ (aset-all! ~state-sym ~EXCEPTION-FRAMES (cons ~catch-block (aget-object ~state-sym ~EXCEPTION-FRAMES)))])) (defrecord PopTry [] IInstruction @@ -369,7 +395,7 @@ (block-references [_this] []) IEmittableInstruction (emit-instruction [_this state-sym] - `[~'_ (rt/aset-all! ~state-sym ~rt/EXCEPTION-FRAMES (rest (rt/aget-object ~state-sym ~rt/EXCEPTION-FRAMES)))])) + `[~'_ (aset-all! ~state-sym ~EXCEPTION-FRAMES (rest (aget-object ~state-sym ~EXCEPTION-FRAMES)))])) (defrecord CatchHandler [catches] IInstruction @@ -379,10 +405,10 @@ ITerminator (terminate-block [_this state-sym _] (let [ex (gensym 'ex)] - `(let [~ex (rt/aget-object ~state-sym ~rt/VALUE-IDX)] + `(let [~ex (aget-object ~state-sym ~VALUE-IDX)] (cond ~@(for [[handler-idx type] catches - i [`(instance? ~type ~ex) `(rt/aset-all! ~state-sym ~rt/STATE-IDX ~handler-idx)]] + i [`(instance? ~type ~ex) `(aset-all! ~state-sym ~STATE-IDX ~handler-idx)]] i) :else (throw ~ex)) :recur)))) @@ -862,7 +888,7 @@ (if (empty? args) [] (mapcat (fn [sym] - `[~sym (rt/aget-object ~state-sym ~(id-for-inst local-map sym))]) + `[~sym (aget-object ~state-sym ~(id-for-inst local-map sym))]) args)))) (defn- build-block-body [state-sym blk] @@ -879,27 +905,27 @@ blk) results (interleave (map (partial id-for-inst local-map) results) results)] (if-not (empty? results) - [state-sym `(rt/aset-all! ~state-sym ~@results)] + [state-sym `(aset-all! ~state-sym ~@results)] []))) (defn- emit-state-machine [machine num-user-params custom-terminators] (let [index (index-state-machine machine) state-sym (with-meta (gensym "state_") {:tag 'objects}) - local-start-idx (+ num-user-params rt/USER-START-IDX) + local-start-idx (+ num-user-params USER-START-IDX) state-arr-size (+ local-start-idx (count-persistent-values index)) local-map (atom {::next-idx local-start-idx}) block-catches (:block-catches machine)] `(fn state-machine# - ([] (rt/aset-all! (AtomicReferenceArray. ~state-arr-size) - ~rt/FN-IDX state-machine# - ~rt/STATE-IDX ~(:start-block machine))) + ([] (aset-all! (AtomicReferenceArray. ~state-arr-size) + ~FN-IDX state-machine# + ~STATE-IDX ~(:start-block machine))) ([~state-sym] (let [old-frame# (clojure.lang.Var/getThreadBindingFrame) ret-value# (try - (clojure.lang.Var/resetThreadBindingFrame (rt/aget-object ~state-sym ~rt/BINDINGS-IDX)) + (clojure.lang.Var/resetThreadBindingFrame (aget-object ~state-sym ~BINDINGS-IDX)) (loop [] - (let [result# (case (int (rt/aget-object ~state-sym ~rt/STATE-IDX)) + (let [result# (case (int (aget-object ~state-sym ~STATE-IDX)) ~@(mapcat (fn [[id blk]] [id `(let [~@(concat (build-block-preamble local-map index state-sym blk) @@ -911,18 +937,77 @@ (recur) result#))) (catch Throwable ex# - (rt/aset-all! ~state-sym ~rt/VALUE-IDX ex#) - (if (seq (rt/aget-object ~state-sym ~rt/EXCEPTION-FRAMES)) - (rt/aset-all! ~state-sym ~rt/STATE-IDX (first (rt/aget-object ~state-sym ~rt/EXCEPTION-FRAMES))) + (aset-all! ~state-sym ~VALUE-IDX ex#) + (if (seq (aget-object ~state-sym ~EXCEPTION-FRAMES)) + (aset-all! ~state-sym ~STATE-IDX (first (aget-object ~state-sym ~EXCEPTION-FRAMES))) (throw ex#)) :recur) (finally - (rt/aset-object ~state-sym ~rt/BINDINGS-IDX (clojure.lang.Var/getThreadBindingFrame)) + (aset-object ~state-sym ~BINDINGS-IDX (clojure.lang.Var/getThreadBindingFrame)) (clojure.lang.Var/resetThreadBindingFrame old-frame#)))] (if (identical? ret-value# :recur) (recur ~state-sym) ret-value#)))))) +(defn finished? + "Returns true if the machine is in a finished state" + [state-array] + (identical? (aget-object state-array STATE-IDX) ::finished)) + +(defn- fn-handler + [f] + (reify + Lock + (lock [_]) + (unlock [_]) + + impl/Handler + (active? [_] true) + (blockable? [_] true) + (lock-id [_] 0) + (commit [_] f))) + + +(defn run-state-machine [state] + ((aget-object state FN-IDX) state)) + +(defn run-state-machine-wrapped [state] + (try + (run-state-machine state) + (catch Throwable ex + (impl/close! (aget-object state USER-START-IDX)) + (throw ex)))) + +(defn take! [state blk c] + (if-let [cb (impl/take! c (fn-handler + (fn [x] + (aset-all! state VALUE-IDX x STATE-IDX blk) + (run-state-machine-wrapped state))))] + (do (aset-all! state VALUE-IDX @cb STATE-IDX blk) + :recur) + nil)) + +(defn put! [state blk c val] + (if-let [cb (impl/put! c val (fn-handler (fn [ret-val] + (aset-all! state VALUE-IDX ret-val STATE-IDX blk) + (run-state-machine-wrapped state))))] + (do (aset-all! state VALUE-IDX @cb STATE-IDX blk) + :recur) + nil)) + +(defn return-chan [state value] + (let [c (aget-object state USER-START-IDX)] + (when-not (nil? value) + (impl/put! c value (fn-handler (fn [_] nil)))) + (impl/close! c) + c)) + +(def async-custom-terminators + {'clojure.core.async/! `put! + 'clojure.core.async/alts! 'clojure.core.async/ioc-alts! + :Return `return-chan}) + (defn mark-transitions {:pass-info {:walk :post :depends #{} :after an-jvm/default-passes}} [{:keys [op fn] :as ast}] diff --git a/src/test/clojure/clojure/core/async/ioc_macros_test.clj b/src/test/clojure/clojure/core/async/ioc_macros_test.clj index 5436719..269a37e 100644 --- a/src/test/clojure/clojure/core/async/ioc_macros_test.clj +++ b/src/test/clojure/clojure/core/async/ioc_macros_test.clj @@ -1,7 +1,7 @@ (ns clojure.core.async.ioc-macros-test (:refer-clojure :exclude [map into reduce transduce merge take partition partition-by]) - (:require [clojure.core.async.impl.runtime :as ioc] + (:require [clojure.core.async.impl.ioc-macros :as ioc] [clojure.core.async :refer :all :as async] [clojure.set :as set] [clojure.test :refer :all]) @@ -24,7 +24,7 @@ crossing-env (zipmap (keys &env) (repeatedly gensym))] `(let [captured-bindings# (clojure.lang.Var/getThreadBindingFrame) ~@(mapcat (fn [[l sym]] [sym `(^:once fn* [] ~l)]) crossing-env) - state# (~(clojure.core.async.impl.ioc-macros/state-machine `(do ~@body) 0 [crossing-env &env] terminators))] + state# (~(ioc/state-machine `(do ~@body) 0 [crossing-env &env] terminators))] (ioc/aset-all! state# ~ioc/BINDINGS-IDX captured-bindings#) (ioc/run-state-machine state#) (ioc/aget-object state# ioc/VALUE-IDX)))) @@ -553,7 +553,7 @@ crossing-env (zipmap (keys &env) (repeatedly gensym))] `(let [captured-bindings# (clojure.lang.Var/getThreadBindingFrame) ~@(mapcat (fn [[l sym]] [sym `(^:once fn* [] ~(vary-meta l dissoc :tag))]) crossing-env) - state# (~(clojure.core.async.impl.ioc-macros/state-machine + state# (~(ioc/state-machine `(do ~@body) 0 [crossing-env &env]