diff --git a/build.clj b/build.clj index 13db4be..e21a1ea 100644 --- a/build.clj +++ b/build.clj @@ -10,10 +10,8 @@ (b/delete {:path "target"}) (b/compile-clj {:basis basis, :src-dirs ["src/main/clojure"], :class-dir class-dir, :filter-nses '[clojure.core.async] - :ns-compile '[clojure.core.async.impl.exec.threadpool - clojure.core.async.impl.protocols + :ns-compile '[clojure.core.async.impl.protocols clojure.core.async.impl.mutex - clojure.core.async.impl.concurrent clojure.core.async.impl.dispatch clojure.core.async.impl.ioc-macros clojure.core.async.impl.buffers diff --git a/src/main/clojure/clojure/core/async.clj b/src/main/clojure/clojure/core/async.clj index 8286b9e..339f797 100644 --- a/src/main/clojure/clojure/core/async.clj +++ b/src/main/clojure/clojure/core/async.clj @@ -18,7 +18,33 @@ to validate go blocks do not invoke core.async blocking operations. Property is read once, at namespace load time. Recommended for use primarily during development. Invalid blocking calls will throw in go block threads - use Thread.setDefaultUncaughtExceptionHandler() -to catch and handle." +to catch and handle. + +Use the Java system property `clojure.core.async.executor-factory` +to specify a function that will provide ExecutorServices for +application-wide use by core.async in lieu of its defaults. The +property value should name a fully qualified var. The function +will be passed a keyword indicating the context of use of the +executor, and should return either an ExecutorService, or nil to +use the default. Results per keyword will be cached and used for +the remainder of the application. Possible context arguments are: + +:io - used in async/io-thread, for :io workloads in flow/process, +and for dispatch handling if no explicit dispatch handler is +provided (see below) + +:mixed - used by async/thread and for :mixed workloads in +flow/process + +:compute - used for :compute workloads in flow/process + +:core-async-dispatch - used for completion fn handling (e.g. in put! +and take!, as well as go block IOC thunk processing) throughout +core.async. If not supplied the ExecutorService for :io will be +used instead. + +The set of contexts may grow in the future so the function should +return nil for unexpected contexts." (:refer-clojure :exclude [reduce transduce into merge map take partition partition-by bounded-count]) (:require [clojure.core.async.impl.protocols :as impl] @@ -29,7 +55,6 @@ to catch and handle." [clojure.core.async.impl.ioc-macros :as ioc] clojure.core.async.impl.go ;; TODO: make conditional [clojure.core.async.impl.mutex :as mutex] - [clojure.core.async.impl.concurrent :as conc] ) (:import [java.util.concurrent.atomic AtomicLong] [java.util.concurrent.locks Lock] @@ -463,33 +488,42 @@ to catch and handle." [& body] (#'clojure.core.async.impl.go/go-impl &env body)) -(defonce ^:private ^Executor thread-macro-executor - (Executors/newCachedThreadPool (conc/counted-thread-factory "async-thread-macro-%d" true))) - (defn thread-call "Executes f in another thread, returning immediately to the calling thread. Returns a channel which will receive the result of calling - f when completed, then close." - [f] - (let [c (chan 1)] - (let [binds (Var/getThreadBindingFrame)] - (.execute thread-macro-executor - (fn [] - (Var/resetThreadBindingFrame binds) - (try - (let [ret (f)] - (when-not (nil? ret) - (>!! c ret))) - (finally - (close! c)))))) - c)) + f when completed, then close. workload is a keyword that describes + the work performed by f, where: + + :io - may do blocking I/O but must not do extended computation + :compute - must not ever block + :mixed - anything else (default) + + when workload not supplied, defaults to :mixed" + ([f] (thread-call f :mixed)) + ([f workload] + (let [c (chan 1) + returning-to-chan (fn [bf] + #(try + (when-some [ret (bf)] + (>!! c ret)) + (finally (close! c))))] + (-> f bound-fn* returning-to-chan (dispatch/exec workload)) + c))) (defmacro thread "Executes the body in another thread, returning immediately to the calling thread. Returns a channel which will receive the result of the body when completed, then close." [& body] - `(thread-call (^:once fn* [] ~@body))) + `(thread-call (^:once fn* [] ~@body) :mixed)) + +(defmacro io-thread + "Executes the body in a thread, returning immediately to the calling + thread. The body may do blocking I/O but must not do extended computation. + Returns a channel which will receive the result of the body when completed, + then close." + [& body] + `(thread-call (^:once fn* [] ~@body) :io)) ;;;;;;;;;;;;;;;;;;;; ops ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; diff --git a/src/main/clojure/clojure/core/async/impl/concurrent.clj b/src/main/clojure/clojure/core/async/impl/concurrent.clj deleted file mode 100644 index df08683..0000000 --- a/src/main/clojure/clojure/core/async/impl/concurrent.clj +++ /dev/null @@ -1,38 +0,0 @@ -;; Copyright (c) Rich Hickey and contributors. All rights reserved. -;; The use and distribution terms for this software are covered by the -;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) -;; which can be found in the file epl-v10.html at the root of this distribution. -;; By using this software in any fashion, you are agreeing to be bound by -;; the terms of this license. -;; You must not remove this notice, or any other, from this software. - -(ns ^{:skip-wiki true} - clojure.core.async.impl.concurrent - (:import [java.util.concurrent ThreadFactory])) - -(set! *warn-on-reflection* true) - -(defn counted-thread-factory - "Create a ThreadFactory that maintains a counter for naming Threads. - name-format specifies thread names - use %d to include counter - daemon is a flag for whether threads are daemons or not - opts is an options map: - init-fn - function to run when thread is created" - ([name-format daemon] - (counted-thread-factory name-format daemon nil)) - ([name-format daemon {:keys [init-fn] :as opts}] - (let [counter (atom 0)] - (reify - ThreadFactory - (newThread [_this runnable] - (let [body (if init-fn - (fn [] (init-fn) (.run ^Runnable runnable)) - runnable) - t (Thread. ^Runnable body)] - (doto t - (.setName (format name-format (swap! counter inc))) - (.setDaemon daemon)))))))) - -(defonce - ^{:doc "Number of processors reported by the JVM"} - processors (.availableProcessors (Runtime/getRuntime))) diff --git a/src/main/clojure/clojure/core/async/impl/dispatch.clj b/src/main/clojure/clojure/core/async/impl/dispatch.clj index ae74ed8..45daf7e 100644 --- a/src/main/clojure/clojure/core/async/impl/dispatch.clj +++ b/src/main/clojure/clojure/core/async/impl/dispatch.clj @@ -8,15 +8,45 @@ (ns ^{:skip-wiki true} clojure.core.async.impl.dispatch - (:require [clojure.core.async.impl.protocols :as impl] - [clojure.core.async.impl.exec.threadpool :as tp])) + (:require [clojure.core.async.impl.protocols :as impl]) + (:import [java.util.concurrent Executors ExecutorService ThreadFactory])) (set! *warn-on-reflection* true) (defonce ^:private in-dispatch (ThreadLocal.)) -(defonce executor - (delay (tp/thread-pool-executor #(.set ^ThreadLocal in-dispatch true)))) +(defonce executor nil) + +(defn counted-thread-factory + "Create a ThreadFactory that maintains a counter for naming Threads. + name-format specifies thread names - use %d to include counter + daemon is a flag for whether threads are daemons or not + opts is an options map: + init-fn - function to run when thread is created" + ([name-format daemon] + (counted-thread-factory name-format daemon nil)) + ([name-format daemon {:keys [init-fn] :as opts}] + (let [counter (atom 0)] + (reify + ThreadFactory + (newThread [_this runnable] + (let [body (if init-fn + (fn [] (init-fn) (.run ^Runnable runnable)) + runnable) + t (Thread. ^Runnable body)] + (doto t + (.setName (format name-format (swap! counter inc))) + (.setDaemon daemon)))))))) + +(defonce + ^{:doc "Number of processors reported by the JVM"} + processors (.availableProcessors (Runtime/getRuntime))) + +(def ^:private pool-size + "Value is set via clojure.core.async.pool-size system property; defaults to 8; uses a + delay so property can be set from code after core.async namespace is loaded but before + any use of the async thread pool." + (delay (or (Long/getLong "clojure.core.async.pool-size") 8))) (defn in-dispatch-thread? "Returns true if the current thread is a go block dispatch pool thread" @@ -37,9 +67,40 @@ (.uncaughtException (Thread/currentThread) ex)) nil) +(defn- make-ctp-named + [workflow] + (Executors/newCachedThreadPool (counted-thread-factory (str "async-" (name workflow) "-%d") true))) + +(defn ^:private create-default-executor + [workload] + (case workload + :compute (make-ctp-named :compute) + :io (make-ctp-named :io) + :mixed (make-ctp-named :mixed))) + +(def executor-for + "Given a workload tag, returns an ExecutorService instance and memoizes the result. By + default, core.async will defer to a user factory (if provided via sys prop) or construct + a specialized ExecutorService instance for each tag :io, :compute, and :mixed. When + given the tag :core-async-dispatch it will default to the executor service for :io." + (memoize + (fn ^ExecutorService [workload] + (let [sysprop-factory (when-let [esf (System/getProperty "clojure.core.async.executor-factory")] + (requiring-resolve (symbol esf))) + sp-exec (and sysprop-factory (sysprop-factory workload))] + (or sp-exec + (if (= workload :core-async-dispatch) + (executor-for :io) + (create-default-executor workload))))))) + +(defn exec + [^Runnable r workload] + (let [^ExecutorService e (executor-for workload)] + (.execute e r))) + (defn run "Runs Runnable r on current thread when :on-caller? meta true, else in a thread pool thread." [^Runnable r] (if (-> r meta :on-caller?) (try (.run r) (catch Throwable t (ex-handler t))) - (impl/exec @executor r))) + (exec r :core-async-dispatch))) diff --git a/src/main/clojure/clojure/core/async/impl/exec/threadpool.clj b/src/main/clojure/clojure/core/async/impl/exec/threadpool.clj deleted file mode 100644 index 8cb94da..0000000 --- a/src/main/clojure/clojure/core/async/impl/exec/threadpool.clj +++ /dev/null @@ -1,32 +0,0 @@ -;; Copyright (c) Rich Hickey and contributors. All rights reserved. -;; The use and distribution terms for this software are covered by the -;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php) -;; which can be found in the file epl-v10.html at the root of this distribution. -;; By using this software in any fashion, you are agreeing to be bound by -;; the terms of this license. -;; You must not remove this notice, or any other, from this software. - -(ns clojure.core.async.impl.exec.threadpool - (:require [clojure.core.async.impl.protocols :as impl] - [clojure.core.async.impl.concurrent :as conc]) - (:import [java.util.concurrent Executors])) - -(set! *warn-on-reflection* true) - -(def ^:private pool-size - "Value is set via clojure.core.async.pool-size system property; defaults to 8; uses a - delay so property can be set from code after core.async namespace is loaded but before - any use of the async thread pool." - (delay (or (Long/getLong "clojure.core.async.pool-size") 8))) - -(defn thread-pool-executor - ([] - (thread-pool-executor nil)) - ([init-fn] - (let [executor-svc (Executors/newFixedThreadPool - @pool-size - (conc/counted-thread-factory "async-dispatch-%d" true - {:init-fn init-fn}))] - (reify impl/Executor - (impl/exec [_ r] - (.execute executor-svc ^Runnable r)))))) diff --git a/src/test/clojure/clojure/core/async/concurrent_test.clj b/src/test/clojure/clojure/core/async/concurrent_test.clj index 5b71402..a9adf0d 100644 --- a/src/test/clojure/clojure/core/async/concurrent_test.clj +++ b/src/test/clojure/clojure/core/async/concurrent_test.clj @@ -8,12 +8,12 @@ (ns clojure.core.async.concurrent-test (:require [clojure.test :refer :all] - [clojure.core.async.impl.concurrent :as conc]) + [clojure.core.async.impl.dispatch :as dispatch]) (:import [java.util.concurrent ThreadFactory])) (deftest test-counted-thread-factory (testing "Creates numbered threads" - (let [^ThreadFactory factory (conc/counted-thread-factory "foo-%d" true) + (let [^ThreadFactory factory (dispatch/counted-thread-factory "foo-%d" true) threads (repeatedly 3 #(.newThread factory (constantly nil)))] (is (= ["foo-1" "foo-2" "foo-3"] (map #(.getName ^Thread %) threads)))))) diff --git a/src/test/clojure/clojure/core/async_test.clj b/src/test/clojure/clojure/core/async_test.clj index 017ba7d..835f3d4 100644 --- a/src/test/clojure/clojure/core/async_test.clj +++ b/src/test/clojure/clojure/core/async_test.clj @@ -185,6 +185,23 @@ (binding [test-dyn true] (is (!! c2 (clojure.string/upper-case (!! c3 (clojure.string/reverse (!! c1 "loop") + (is (= "POOL" (! c1 :no) + (catch AssertionError _ + (>!! c1 :yes)))) + (is (= :yes (