From 23c878ac2f145f4f2a7400cf390cb95f59767f68 Mon Sep 17 00:00:00 2001 From: Fogus Date: Tue, 15 Jul 2025 12:38:37 -0400 Subject: [PATCH] ASYNC-263: Throwing when :chan-opts on out chan. --- src/main/clojure/clojure/core/async/flow.clj | 4 +++- .../clojure/clojure/core/async/flow/impl.clj | 6 +++++- src/test/clojure/clojure/core/flow_test.clj | 21 +++++++++++++++++++ 3 files changed, 29 insertions(+), 2 deletions(-) create mode 100644 src/test/clojure/clojure/core/flow_test.clj diff --git a/src/main/clojure/clojure/core/async/flow.clj b/src/main/clojure/clojure/core/async/flow.clj index 4810bb9..c3f66ee 100644 --- a/src/main/clojure/clojure/core/async/flow.clj +++ b/src/main/clojure/clojure/core/async/flow.clj @@ -73,7 +73,9 @@ :args - a map of param->val which will be passed to the process ctor :chan-opts - a map of in-or-out-id->{:keys [buf-or-n xform]}, where buf-or-n and xform have their meanings per core.async/chan - the default is {:buf-or-n 10} + the default is {:buf-or-n 10}. For efficiency, Flow creates + only the in channel per in/out pair and so :chan-opts + should be associated with the in channel. :conns - a collection of [[from-pid outid] [to-pid inid]] tuples. diff --git a/src/main/clojure/clojure/core/async/flow/impl.clj b/src/main/clojure/clojure/core/async/flow/impl.clj index 74fa651..4038203 100644 --- a/src/main/clojure/clojure/core/async/flow/impl.clj +++ b/src/main/clojure/clojure/core/async/flow/impl.clj @@ -62,7 +62,11 @@ conn-map (reduce (fn [ret [out in :as conn]] (if (and (contains? outopts out) (contains? inopts in)) - (update ret out set-conj in) + (if (seq (vals (get outopts out))) + (throw (ex-info (str "only one channel created for connection. " + ":chan-opts must be associated with input side.") + {:conn conn, :out-opts outopts})) + (update ret out set-conj in)) (throw (ex-info "invalid connection" {:conn conn})))) {} conns) running-chans #(or (deref chans) (throw (Exception. "flow not running"))) diff --git a/src/test/clojure/clojure/core/flow_test.clj b/src/test/clojure/clojure/core/flow_test.clj new file mode 100644 index 0000000..1f87caf --- /dev/null +++ b/src/test/clojure/clojure/core/flow_test.clj @@ -0,0 +1,21 @@ +(ns clojure.core.flow-test + (:require [clojure.core.async.flow :as flow] + [clojure.test :refer :all])) + +(defn tap-n-drop [x] (tap> x) nil) + +(deftest chan-opts-tests + (testing ":chan-opts only specified on in side of connected pair" + (is (thrown? clojure.lang.ExceptionInfo + (flow/create-flow + {:procs {:source {:proc (-> identity flow/lift1->step flow/process) + :chan-opts {:out {:buf-or-n 11 + :xform (map (fn [x] (str "Saw " x)))}}} + :sink {:proc (-> #'tap-n-drop flow/lift1->step flow/process)}} + :conns [[[:source :out] [:sink :in]]]}))) + (is (flow/create-flow + {:procs {:source {:proc (-> identity flow/lift1->step flow/process)} + :sink {:proc (-> #'tap-n-drop flow/lift1->step flow/process) + :chan-opts {:in {:buf-or-n 11 + :xform (map (fn [x] (str "Saw " x)))}}}} + :conns [[[:source :out] [:sink :in]]]}))))