diff --git a/.github/workflows/vthreads.yml b/.github/workflows/vthreads.yml index 3f1d333..d6c537a 100644 --- a/.github/workflows/vthreads.yml +++ b/.github/workflows/vthreads.yml @@ -9,12 +9,13 @@ on: default: 'master' jobs: - test: + test-src: strategy: matrix: os: [ubuntu-latest] # macOS-latest, windows-latest] java-version: ["8", "11", "17", "21"] clojure-version: ["1.10.3", "1.11.4", "1.12.0"] + test-context: [":dev:test", ":dev:test:vthreads", ":dev:test:no-vthreads", ":dev:test:go-check"] runs-on: ${{ matrix.os }} steps: - name: Set up Clojure @@ -31,5 +32,33 @@ jobs: java-version: ${{ matrix.java-version }} distribution: 'temurin' cache: 'maven' - - name: test with test-runner - run: clj -X:dev:test -Sdeps '{:deps {org.clojure/clojure {:mvn/version ${{ matrix.clojure-version }} }}}' :dirs '["src/test/clojure"]' + - name: Test source with test-runner + run: clojure -X${{ matrix.test-context }} -Sdeps '{:deps {org.clojure/clojure {:mvn/version "${{ matrix.clojure-version }}"}}}' :dirs '["src/test/clojure"]' + test-aot: + strategy: + matrix: + os: [ubuntu-latest] # macOS-latest, windows-latest] + java-version: ["21", "8", "11", "17"] + clojure-version: ["1.10.3", "1.11.4", "1.12.0"] + test-context: [":aot:test", ":aot:test:vthreads", ":aot:test:no-vthreads", ":aot:test:go-check"] + compile-mode: ["compile-tests-vthreads", "compile-tests-no-vthreads"] + runs-on: ${{ matrix.os }} + steps: + - name: Set up Clojure + uses: DeLaGuardo/setup-clojure@13.1 + with: + cli: 'latest' + - name: Checkout Clojure Source Code + uses: actions/checkout@v4 + with: + ref: ${{ github.event.inputs.asyncRef }} + - name: Set up Java + uses: actions/setup-java@v4 + with: + java-version: ${{ matrix.java-version }} + distribution: 'temurin' + cache: 'maven' + - name: Compile tests + run: clojure -T:build ${{ matrix.compile-mode }} + - name: Test AOT with test-runner + run: clojure -X${{ matrix.test-context }} -Sdeps '{:deps {org.clojure/clojure {:mvn/version "${{ matrix.clojure-version }}"}}}' :dirs '["src/test/clojure"]' :nses '[clojure.core.async-test clojure.core.async.buffers-test clojure.core.async.concurrent-test clojure.core.async.exceptions-test clojure.core.async.timers-test]' diff --git a/build.clj b/build.clj index e21a1ea..f43ec39 100644 --- a/build.clj +++ b/build.clj @@ -3,12 +3,52 @@ (:require [clojure.tools.build.api :as b])) (def class-dir "target/classes") -(def basis (b/create-basis {:project "deps.edn"})) +(def basis (b/create-basis {:project "deps.edn" + :aliases [:dev]})) + +(def comp-test-nses '[clojure.core.async-test + clojure.core.pipeline-test + clojure.core.async.buffers-test + clojure.core.async.concurrent-test + clojure.core.async.exceptions-test + clojure.core.async.timers-test]) + +;; clj -T:build compile-tests +(defn compile-tests + [_] + (b/delete {:path "target"}) + (b/compile-clj {:basis basis + :src-dirs ["src/test/clojure"] + :class-dir class-dir, + :ns-compile comp-test-nses})) + +;; clj -T:build compile-tests-vthreads +(defn compile-tests-vthreads + [_] + (b/delete {:path "target"}) + (b/compile-clj {:basis (b/create-basis {:project "deps.edn" :aliases [:dev :vthreads]}) + :src-dirs ["src/test/clojure"] + :class-dir class-dir, + :filter-nses '[clojure.core.async] + :ns-compile comp-test-nses}) + (println "DONE " (-> "target/classes/clojure/core/" clojure.java.io/file .list vec))) + +;; clj -T:build compile-tests-no-vthreads +(defn compile-tests-no-vthreads + [_] + (b/delete {:path "target"}) + (b/compile-clj {:basis (b/create-basis {:project "deps.edn" :aliases [:dev :no-vthreads]}) + :src-dirs ["src/test/clojure"] + :class-dir class-dir, + :filter-nses '[clojure.core.async] + :ns-compile comp-test-nses})) (defn compile [_] (b/delete {:path "target"}) - (b/compile-clj {:basis basis, :src-dirs ["src/main/clojure"], :class-dir class-dir, + (b/compile-clj {:basis basis + :src-dirs ["src/main/clojure"] + :class-dir class-dir :filter-nses '[clojure.core.async] :ns-compile '[clojure.core.async.impl.protocols clojure.core.async.impl.mutex diff --git a/deps.edn b/deps.edn index fe03f46..d129e5f 100644 --- a/deps.edn +++ b/deps.edn @@ -32,5 +32,21 @@ :doc-files ["doc/rationale.md" "doc/reference.md" "doc/walkthrough.md" "doc/flow.md" "doc/flow-guide.md"] :output-path "docs" :html {:namespace-list :flat}}} + + :dev {:extra-paths ["src/test/clojure"]} + :aot {:extra-paths ["target/classes"]} + :vthreads {:jvm-opts ["-Dclojure.core.async.vthreads=target"]} + :no-vthreads {:jvm-opts ["-Dclojure.core.async.vthreads=avoid"]} + :go-check {:jvm-opts ["-Dclojure.core.async.go-checking=true"]} + + ;; clj -X:dev:test :dirs '["src/test/clojure"]' + ;; clj -X:dev:go-check:test :dirs '["src/test/clojure"]' + ;; clj -X:dev:vthreads:test :dirs '["src/test/clojure"]' + ;; clj -X:dev:no-vthreads:test :dirs '["src/test/clojure"]' + ;; clj -X:aot:test :dirs '["target/classes"]' ??? + :test {:extra-deps {io.github.cognitect-labs/test-runner + {:git/tag "v0.5.1" :git/sha "dfb30dd"}} + :main-opts ["-m" "cognitect.test-runner"] + :exec-fn cognitect.test-runner.api/test} }} diff --git a/doc/vthreads.md b/doc/vthreads.md new file mode 100644 index 0000000..03a6556 --- /dev/null +++ b/doc/vthreads.md @@ -0,0 +1,38 @@ +# core.async and virtual threads +## Rationale + +Java 21 saw the inclusion of [virtual threads](https://docs.oracle.com/en/java/javase/21/core/virtual-threads.html). Virtual threads were added to address the limitations of platform threads for handling highly-concurrent, I/O heavy workloads. While core.async provided mechanisms for mapping operations to platform threads, their limitations were absorbed by the library itself. That is, the granularity of workflow profiles that core.async could directly support were limited by those supported by platform threads and its internal IOC mechanisms. By enhancing core.async to use virtual threads for workloads in which they excel, we can open up new execution modes supported by the best-fit threading modes in the JVM. + +## Workload types + +Traditionally, core.async implicitly supported two workload types: + +- Channel dispatch workloads implicit to the `go` block IOC completion handling and thunk processing, backed by a user-configurable bounded thread-pool +- User-specific workloads backed by platform threads created with the `clojure.core.async/thread` macro + +As a first step toward integrating virtual threads, we changed the underlying workflow model within core.async to support a richer set of types, each described via related keywords: + +- `:io` - used in the new `clojure.core.async/io-thread` macro for `:io` workloads in flow/process, and for dispatch handling if no explicit dispatch handler is provided (see below) +- `:mixed` - used by `clojure.core.async/thread` and for `:mixed` workloads in flow/process +- `:compute` - used for `:compute` workloads in flow/process +- `:core-async-dispatch` - used for completion fn handling (e.g. in `clojure.core.aync/put!` and `take!`, as well as go block IOC thunk processing) throughout core.async. + +The defaults and mechanisms for customization around these workloads are described later. + +## core.async's use of virtual threads + +By default, whenever core.aync runs in an environment that supports virtual threads (i.e. a version 21+ JVM), it will use virtual threads to service all `:io` workloads executed with the `io-thread` macro. Additionally, all `go` IOC dispatching will use virtual threads under the hood. + +### Specifying a factory for custom thread `ExecutorService` instances + +However, to support a graceful upgrade path for core.async users, we've added mechanisms to customize if and how virtual threads are used. First, the Java system property `clojure.core.async.executor-factory` specifies a function that will provide `java.util.concurrent.ExecutorService`s for application-wide use by core.async in lieu of its defaults. The property value should name a fully qualified var. The function will be passed workflow type keywords `:io`, `:mixed`, `:compute`, or `:core-async-dispatch`, and should return either an ExecutorService, or nil to signal to core.async to use its default. Results per keyword will be cached and used for the remainder of the application. + +### Targeting or avoiding virtual threads + +Users can also set the Java system property `clojure.core.async.vthreads` to control how core.async uses JDK 21+ virtual threads. The property can be one of the following values: + +- unset - Always default to IOC when AOT compiling, and use virtual threads for `io-thread` blocks if available at runtime +- `"target"` - Always target virtual threads when compiling `go` blocks and require them at runtime in `io-thread` blocks +- `"avoid"` - Always use IOC when compiling `go` blocks (will work regardless), and do not use virtual threads for `io-thread` blocks + +There is one circumstance that needs special attention. That is, users can choose to AOT compile their applications/libraries and target virtual threads using the `"target"` flag. However, users may run that compiled code on a JVM without virtual threads support. By using "target" to compile code, you've fixed an expectation of that the runtime environment support virtual threads. When users run compiled code targeting virtual threads in a runtime environment without them then `go` blocks will not guarantee non-blocking semantics anymore. In this particular circumstance, core.async will throw an error when the compiled `"target"` expectation does not match the runtime capability. diff --git a/src/main/clojure/clojure/core/async.clj b/src/main/clojure/clojure/core/async.clj index 6028481..665ec14 100644 --- a/src/main/clojure/clojure/core/async.clj +++ b/src/main/clojure/clojure/core/async.clj @@ -40,7 +40,34 @@ core.async. If not supplied the ExecutorService for :io will be used instead. The set of contexts may grow in the future so the function should -return nil for unexpected contexts." +return nil for unexpected contexts. + +Use the Java system property `clojure.core.async.vthreads` to control +how core.async uses JDK 21+ virtual threads. The property can be one of +the following values: + +unset - core.async will opportunistically use vthreads when available +(≥ Java 21) and will otherwise use the old IOC impl. io-thread and :io +thread pool will run on platform threads if vthreads are not available. +If AOT compiling, go blocks will always use IOC so that the resulting +bytecode works on all JVMs (so no change in compiled output) + +\"target\" - means that you are targeting virtual threads. At runtime +from source, go blocks will use vthreads if available, but will fall back +to IOC if not available. If AOT compiling, go blocks are always compiled +as normal Clojure code to be run on vthreads and will throw at runtime +if vthreads are not available (Java <21) + +\"avoid\" - means that vthreads will not be used by core.async - you can +use this to minimize impacts if you are not yet ready to utilize vthreads +in your app. If AOT compiling, go blocks will use IOC. At runtime, io-thread +and the :io thread pool use platform threads + +Note: existing IOC compiled go blocks from older core.async versions continue +to work (we retain and load the IOC state machine runtime - this does not +require the analyzer), and you can interact with the same channels from both +IOC and vthread code. +" (:refer-clojure :exclude [reduce transduce into merge map take partition partition-by bounded-count]) (:require [clojure.core.async.impl.protocols :as impl] @@ -49,7 +76,6 @@ return nil for unexpected contexts." [clojure.core.async.impl.timers :as timers] [clojure.core.async.impl.dispatch :as dispatch] [clojure.core.async.impl.ioc-macros :as ioc] - clojure.core.async.impl.go ;; TODO: make conditional [clojure.core.async.impl.mutex :as mutex] ) (:import [java.util.concurrent.atomic AtomicLong] @@ -138,6 +164,19 @@ return nil for unexpected contexts." [^long msecs] (timers/timeout msecs)) +(defmacro defparkingop + "Emits either parking op or reimplement as blocking op when vthreads + available." + [op doc arglist & body] + (let [as (mapv #(list 'quote %) arglist) + blockingop (-> op name (str "!") symbol)] + `(def ~(with-meta op {:arglists `(list ~as) :doc doc}) + (if (dispatch/vthreads-available-and-allowed?) + (fn [~'& ~'args] + ~(list* apply blockingop '[args])) + (fn ~arglist + ~@body))))) + (defmacro defblockingop [op doc arglist & body] (let [as (mapv #(list 'quote %) arglist)] @@ -162,9 +201,10 @@ return nil for unexpected contexts." @ret (deref p)))) -(defn ! +(defparkingop >! "puts a val into port. nil values are not allowed. Must be called - inside a (go ...) block. Will park if no buffer space is available. + inside a (go ...) block, or on a virtual thread. Will park if no buffer + space is available. Returns true unless port is already closed." [port val] (assert nil ">! used not in (go ...) block")) @@ -344,11 +385,11 @@ return nil for unexpected contexts." @ret (deref p)))) -(defn alts! +(defparkingop alts! "Completes at most one of several channel operations. Must be called - inside a (go ...) block. ports is a vector of channel endpoints, - which can be either a channel to take from or a vector of - [channel-to-put-to val-to-put], in any combination. Takes will be + inside a (go ...) block, or on a virtual thread. ports is a vector of + channel endpoints, which can be either a channel to take from or a vector + of [channel-to-put-to val-to-put], in any combination. Takes will be made as if by !. Unless the :priority option is true, if more than one port operation is ready a non-deterministic choice will be made. If no operation is @@ -471,6 +512,33 @@ return nil for unexpected contexts." (let [ret (impl/take! port (fn-handler nop false))] (when ret @ret))) +(defn- require-fresh + "Like require but takes only a single namespace symbol and attempts to + require the namespace on a separate thread. This is done to start + with a fresh dynamic environment augmented only with the vars + needed by require to perform its job. If the namespace is + found to have already been loaded then this function will return + immediately." + [nsym] + (when (not (contains? @@#'clojure.core/*loaded-libs* nsym)) + (let [p (promise) + n *ns* + ll @#'clojure.core/*loaded-libs*] + (dispatch/exec + (^:once fn* [] + (try + (let [result (binding [*ns* n + clojure.core/*loaded-libs* ll] + (#'clojure.core/serialized-require nsym))] + (deliver p result)) + (catch Throwable t + (deliver p t)))) + :io) + (let [res @p] + (if res + (throw res) + res))))) + (defmacro go "Asynchronously executes the body, returning immediately to the calling thread. Additionally, any visible calls to ! and alt!/alts! @@ -487,7 +555,12 @@ return nil for unexpected contexts." Returns a channel which will receive the result of the body when completed" [& body] - (#'clojure.core.async.impl.go/go-impl &env body)) + (if (not (dispatch/target-vthreads?)) + (do (require-fresh 'clojure.core.async.impl.go) + ((find-var 'clojure.core.async.impl.go/go-impl) &env body)) + `(do ~(when clojure.core/*compile-files* + `(dispatch/ensure-runtime-vthreads!)) + (thread-call (^:once fn* [] ~@body) :io)))) (defonce ^:private thread-macro-executor nil) diff --git a/src/main/clojure/clojure/core/async/impl/dispatch.clj b/src/main/clojure/clojure/core/async/impl/dispatch.clj index 542ed02..8495052 100644 --- a/src/main/clojure/clojure/core/async/impl/dispatch.clj +++ b/src/main/clojure/clojure/core/async/impl/dispatch.clj @@ -72,11 +72,44 @@ [workload] (Executors/newCachedThreadPool (counted-thread-factory (str "async-" (name workload) "-%d") true))) +(def ^:private virtual-threads-available? + (delay + (try + (Class/forName "java.lang.Thread$Builder$OfVirtual") + true + (catch ClassNotFoundException _ + false)))) + +(defn- vthreads-directive + "Retrieves the value of the sysprop clojure.core.async.vthreads." + [] + (System/getProperty "clojure.core.async.vthreads")) + +(defn target-vthreads? [] + (= (vthreads-directive) "target")) + +(defn vthreads-available-and-allowed? [] + (and @virtual-threads-available? + (not= (vthreads-directive) "avoid"))) + +(defn ensure-runtime-vthreads! [] + (when (not (vthreads-available-and-allowed?)) + (throw (ex-info "Code compiled to target virtual threads, but is running without vthread support." + {:runtime-jvm-version (System/getProperty "java.version") + :vthreads-directive (vthreads-directive)})))) + +(defn- make-io-executor + [] + (if (vthreads-available-and-allowed?) + (-> (.getDeclaredMethod Executors "newVirtualThreadPerTaskExecutor" (make-array Class 0)) + (.invoke nil (make-array Class 0))) + (make-ctp-named :io))) + (defn ^:private create-default-executor [workload] (case workload :compute (make-ctp-named :compute) - :io (make-ctp-named :io) + :io (make-io-executor) :mixed (make-ctp-named :mixed))) (def executor-for diff --git a/src/test/clojure/clojure/core/async_test.clj b/src/test/clojure/clojure/core/async_test.clj index 26499e5..79f9b71 100644 --- a/src/test/clojure/clojure/core/async_test.clj +++ b/src/test/clojure/clojure/core/async_test.clj @@ -193,15 +193,7 @@ (io-thread (>!! c2 (clojure.string/upper-case (!! c3 (clojure.string/reverse (!! c1 "loop") - (is (= "POOL" (! c1 :no) - (catch AssertionError _ - (>!! c1 :yes)))) - (is (= :yes (