Skip to content
Ertuğrul Çetin edited this page Apr 26, 2020 · 2 revisions
(require '[clojure.core.async :refer [chan <!! >!! <! >! go put! close!] :as async])

Pipeline and pipeline-blocking have the same functionality, naming just indicates the performing operation is IO or CPU bound.

(let [c (chan)
      out (chan)]
  (async/onto-chan c (range 10))
  
  ;; or async/pipeline
  (async/pipeline-blocking 5 out (map (fn [x]
                               (Thread/sleep 500)
                               (inc x))) c)

  (<!! (async/into [] out)))

Pipeline-async should be used within async operations:

(defn http-get [url c]
  (println url)
  (http/get url (fn [v]
                  (put! c v)
                  (close! c)))
  c)

(let [from (chan 10 (map (fn [name]
                           (str "http://imdbapi.poromenos.org/js/?name=%25"
                                name
                                "%25"))))

      to (chan 10 (map (fn [response]
                         (-> response
                             :body
                             (cheshire/parse-string true)))))]
  (async/pipeline-async 10 to http-get from)
  (>!! from "Home")
  (println (<!! to)))
Clone this wiki locally