+Note - Alpha, work-in-progress, names and other details are in flux
+
+A library for building concurrent, event driven data processing
+flows out of communication-free functions, while centralizing
+control, reporting, execution and error handling. Built on core.async.
+
+The top-level construct is the flow, comprising:
+a set of processes (generally, threads) - concurrent activities
+a set of channels flowing data into and out of the processes
+a set of channels for centralized control, reporting, error-handling,
+ and execution of the processes
+
+The flow library itself constructs processes, channels and flows. The
+user provides configuration data and process logic (step-fns) that
+specify how the flow should work.
+
+A flow is constructed from flow configuration data which defines a
+directed graph of processes and the connections between
+them. Processes describe their I/O requirements and the
+flow (library) itself creates channels and passes them to the
+processes that requested them. See 'create-flow' for the
+details. The flow configuration provides a centralized place for
+policy decisions regarding process settings, threading, buffering etc.
+
+It is expected that applications will rarely define instances of the
+process protocol but instead use the API function 'process' that
+implements the process protocol in terms of calls to ordinary
+functions (step-fns) that might include no communication or
+core.async code. In this way the library helps you achieve a strict
+separation of your application logic from its execution,
+communication, lifecycle, error handling and monitoring.
+
+Note that at several points the library calls upon the user to
+supply ids for processes, inputs, outputs etc. These should be
+keywords. When a namespaced keyword is required it is explicitly
+stated. This documentation refers to various keywords utilized by
+the library itself as ::flow/xyz, where ::flow is an alias for
+clojure.core.async.flow
+
+Flows support the Clojure 'datafy' protocol to support
+observability. See also the 'ping' and 'ping-proc' fns for a live
+view of processes.
+
+A process is represented in the flow definition by an implementation
+of spi/ProcLauncher that starts it. See the spi docs for
+details.
create-flow
(create-flow config)
Creates a flow from the supplied configuration: a map containing the
+keys :procs and :conns, and optionally :mixed-exec/:io-exec/:compute-exec
+
+:procs - a map of pid->proc-def
+where proc-def is a map with keys :proc, :args, :chan-opts
+
+:proc - a function that starts a process
+:args - a map of param->val which will be passed to the process ctor
+:chan-opts - a map of in-or-out-id->{:keys [buf-or-n xform]}, where buf-or-n
+ and xform have their meanings per core.async/chan
+ the default is {:buf-or-n 10}
+
+:conns - a collection of [[from-pid outid] [to-pid inid]] tuples.
+
+Inputs and outputs support multiple connections. When an output is
+connected multiple times every connection will get every message,
+as per a core.async/mult.
+
+:mixed-exec/:io-exec/:compute-exec -> ExecutorService
+These can be used to specify the ExecutorService to use for the
+corresonding workload, in lieu of the lib defaults.
+
+N.B. The flow is not started. See 'start'
futurize
(futurize f & {:keys [exec], :or {exec :mixed}, :as opts})
Takes a fn f and returns a fn that takes the same arguments as f
+and immediately returns a future, having started a thread for the
+indicated workload, or via the supplied executor, that invokes f
+with those args and completes that future with its return.
+
+futurize accepts kwarg options:
+:exec - one of the workloads :mixed, :io, :compute
+ or a j.u.c.ExecutorService object,
+ default :mixed
inject
(inject g [pid io-id :as coord] msgs)
asynchronously puts the messages on the channel corresponding to the
+input or output of the process, returning a future that will
+complete when done.
lift*->step
(lift*->step f)
given a fn f taking one arg and returning a collection of non-nil
+values, creates a step fn as needed by process, with one input
+and one output (named :in and :out), and no state.
lift1->step
(lift1->step f)
like lift*->step except taking a fn returning one value, which when
+nil will yield no output.
given a map of functions corresponding to step fn arities (see
+'process'), returns a step fn suitable for passing to 'process'. You
+can use this map form to compose the proc logic from disparate
+functions or to leverage the optionality of some of the entry
+points.
+
+The keys in the map are:
+:describe, arity 0 - required
+:init, arity 1 - optional, but should be provided if 'describe' returns :params.
+:transition, arity 2 - optional
+:transform, arity 3 - required
pause
(pause g)
pauses a running flow
+
pause-proc
(pause-proc g pid)
pauses a process
+
ping
(ping g & {:keys [timeout-ms], :or {timeout-ms 1000}})
pings all processes, returning a map of pid -> proc status and
+state, for those procs that reply within timeout-ms (default 1000)
ping-proc
(ping-proc g pid & {:keys [timeout-ms], :or {timeout-ms 1000}})
Given a function of four arities (0-3), aka the 'step-fn',
+returns a launcher that creates a process compliant with the process
+protocol (see the spi/ProcLauncher doc).
+
+The possible arities for the step-fn are
+
+0 - 'describe', () -> description
+1 - 'init', (arg-map) -> initial-state
+2 - 'transition', (state transition) -> state'
+3 - 'transform', (state input msg) -> [state' output-map]
+
+This is the core facility for defining the logic for processes via
+ordinary functions. Using a var holding a fn as the 'step-fn' is the
+preferred method for defining a proc, as it enables
+hot-code-reloading of the proc logic in a flow, and better names in
+datafy.
+
+arity 0 - 'describe', () -> description
+where description is a map with keys :params :ins and :outs, each of which
+in turn is a map of keyword to doc string, and :workload with
+possible values of :mixed :io :compute. All entries in the describe
+return map are optional.
+
+:params describes the initial arguments to setup the state for the function.
+:ins enumerates the input[s], for which the flow will create channels
+:outs enumerates the output[s], for which the flow may create channels.
+:workload - describes the nature of the workload, one of :mixed :io or :compute
+ an :io workload should not do extended computation
+ a :compute workload should never block
+
+No key may be present in both :ins and :outs, allowing for a uniform
+channel coordinate system of [:process-id :channel-id]. The
+ins/outs/params returned will be the ins/outs/params of the
+process. describe may be called by users to understand how to use
+the proc. It will also be called by the impl in order to discover
+what channels are needed.
+
+arity 1 - 'init', (arg-map) -> initial-state
+
+The init arity will be called once by the process to establish any initial
+state. The arg-map will be a map of param->val, as supplied in the
+flow def. The key ::flow/pid will be added, mapped to the pid
+associated with the process (useful e.g. if the process wants to
+refer to itself in reply-to coordinates).
+
+Optionally, a returned init state may contain the
+keys ::flow/in-ports and/or ::flow/out-ports. These should be maps
+of cid -> a core.async.channel. The cids must not conflict with the
+in/out ids. These channels will become part of the input/output set
+of the process, but are not otherwise visible/resolvable within the
+flow. Ports are a way to allow data to enter or exit the flow from
+outside of it. Use :transition to coordinate the lifecycle of these
+external channels.
+
+Optionally, _any_ returned state, whether from init, transition
+or transform, may contain the key ::flow/input-filter, a predicate
+of cid. Only inputs (including in-ports) satisfying the predicate
+will be part of the next channel read set. In the absence of this
+predicate all inputs are read.
+
+arity 2 - 'transition', (state transition) -> state'
+
+The transition arity will be called when the process makes a state
+transition, transition being one of ::flow/resume, ::flow/pause
+or ::flow/stop
+
+With this a process impl can track changes and coordinate
+resources, especially cleaning up any resources on :stop, since the
+process will no longer be used following that. See the SPI for
+details. state' will be the state supplied to subsequent calls.
+
+arity 3 - 'transform', (state in-name msg) -> [state' output]
+where output is a map of outid->[msgs*]
+
+The transform arity will be called every time a message arrives at any
+of the inputs. Output can be sent to none, any or all of the :outs
+enumerated, and/or an input named by a [pid inid] tuple (e.g. for
+reply-to), and/or to the ::flow/report output. A step need not
+output at all (output or msgs can be empyt/nil), however an output _message_
+may never be nil (per core.async channels). state' will be the state
+supplied to subsequent calls.
+
+process also accepts an option map with keys:
+:workload - one of :mixed, :io or :compute
+:compute-timeout-ms - if :workload is :compute, this timeout (default 5000 msec)
+ will be used when getting the return from the future - see below
+
+A :workload supplied as an option to process will override
+any :workload returned by the :describe fn of the process. If neither
+are provded the default is :mixed.
+
+In the :workload context of :mixed or :io, this dictates the type of
+thread in which the process loop will run, _including its calls to
+transform_.
+
+When :io is specified, transform should not do extensive computation.
+
+When :compute is specified, each call to transform will be run in a
+separate thread. The process loop will run in an :io context (since
+it no longer directly calls transform, all it does is I/O) and it
+will submit transform to the :compute executor then await (blocking,
+for compute-timeout-ms) the completion of the future returned by the
+executor. If the future times out it will be reported
+on ::flow/error.
+
+When :compute is specified transform must not block!
resume
(resume g)
resumes a paused flow
+
resume-proc
(resume-proc g pid)
resumes a process
+
start
(start g)
starts the entire flow from init values. The processes start paused.
+Call 'resume' or 'resume-proc' to start flow. Returns a map with keys:
+
+:report-chan - a core.async chan for reading.'ping' responses
+will show up here, as will any explicit ::flow/report outputs
+from :transform
+
+:error-chan - a core.async chan for reading. Any (and only)
+exceptions thrown anywhere on any thread inside a flow will appear
+in maps sent here. There will at least be a ::flow/ex entry with the
+exception, and may be additional keys for pid, state, status etc
+depending on the context of the error.
stop
(stop g)
shuts down the flow, stopping all procsesses and closing the error
+and report channels. The flow can be started again
\ No newline at end of file
diff --git a/docs/clojure.core.async.flow.spi.html b/docs/clojure.core.async.flow.spi.html
new file mode 100644
index 00000000..12eeba3f
--- /dev/null
+++ b/docs/clojure.core.async.flow.spi.html
@@ -0,0 +1,74 @@
+
+clojure.core.async.flow.spi documentation
Note - defining a ProcLauncher is an advanced feature and should not
+be needed for ordinary use of the library. This protocol is for
+creating new types of Processes that are not possible to create
+with ::flow/process.
+
+A ProcLauncher is a constructor for a process, a thread of activity.
+It has two functions - to describe the parameters and input/output
+requirements of the process and to start it. The launcher should
+acquire no resources, nor retain any connection to the started
+process. A launcher may be called upon to start a process more than
+once, and should start a new process each time start is called.
+
+The process launched process must obey the following:
+
+It must have 2 logical statuses, :paused and :running. In
+the :paused status operation is suspended and no output is
+produced.
+
+When the process starts it must be :paused
+
+Whenever it is reading or writing to any channel a process must use
+alts!! and include a read of the ::flow/control channel, giving it
+priority.
+
+Command messages sent over the ::flow/control channel have the keys:
+::flow/to - either ::flow/all or a process id
+::flow/command - ::flow/stop|pause|resume|ping or process-specific
+
+It must act upon any, and only, control messages whose ::flow/to key is its pid or ::flow/all
+It must act upon the following values of ::flow/command:
+
+::flow/stop - all resources should be cleaned up and any thread(s)
+ should exit ordinarily - there will be no more subsequent use
+ of the process.
+::flow/pause - enter the :paused status
+::flow/resume - enter the :running status and resume processing
+::flow/ping - emit a ping message (format TBD) to
+ the ::flow/report channel containing at least its pid and status
+
+A process can define and respond to other commands in its own namespace.
+
+A process should not transmit channel objects (use [pid io-id] data
+coordinates instead) A process should not close channels
+
+Finally, if a process encounters an error it must report it on the
+::flow/error channel (format TBD) and attempt to continue, though it
+may subsequently get a ::flow/stop command it must respect
members
describe
(describe p)
returns a map with keys - :params, :ins and :outs,
+each of which in turn is a map of keyword to docstring
+
+:params describes the initial arguments to setup the state for the process
+:ins enumerates the input[s], for which the graph will create channels
+:outs enumerates the output[s], for which the graph may create channels.
+
+describe may be called by users to understand how to use the
+proc. It will also be called by the impl in order to discover what
+channels are needed.
start
(start p {:keys [pid args ins outs resolver]})
return ignored, called for the
+effect of starting the process (typically, starting its thread)
+
+where:
+
+:pid - the id of the process in the graph, so that e.g. it can refer to itself in control, reporting etc
+:args - a map of param->val, as supplied in the graph def
+:ins - a map of in-id->readable-channel, plus the ::flow/control channel
+:outs - a map of out-id->writeable-channel, plus the ::flow/error and ::flow/report channels
+ N.B. outputs may be nil if not connected
+:resolver - an impl of spi/Resolver, which can be used to find
+ channels given their logical [pid cid] coordinates, as well as to
+ obtain ExecutorServices corresponding to the
+ logical :mixed/:io/:compute contexts
Resolver
protocol
members
get-exec
(get-exec _ context)
returns the ExecutorService for the given context, one
+of :mixed, :io, :compute
get-write-chan
(get-write-chan _ coord)
Given a tuple of [pid cid], returns a core.async chan to
+write to or nil (in which case the output should be dropped,
+e.g. nothing is connected).
\ No newline at end of file
diff --git a/docs/clojure.core.async.html b/docs/clojure.core.async.html
new file mode 100644
index 00000000..f74d1d74
--- /dev/null
+++ b/docs/clojure.core.async.html
@@ -0,0 +1,303 @@
+
+clojure.core.async documentation
Facilities for async programming and communication.
+
+Set Java system property `clojure.core.async.go-checking` to true
+to validate go blocks do not invoke core.async blocking operations.
+Property is read once, at namespace load time. Recommended for use
+primarily during development. Invalid blocking calls will throw in
+go block threads - use Thread.setDefaultUncaughtExceptionHandler()
+to catch and handle.
+
+Use the Java system property `clojure.core.async.executor-factory`
+to specify a function that will provide ExecutorServices for
+application-wide use by core.async in lieu of its defaults. The
+property value should name a fully qualified var. The function
+will be passed a keyword indicating the context of use of the
+executor, and should return either an ExecutorService, or nil to
+use the default. Results per keyword will be cached and used for
+the remainder of the application. Possible context arguments are:
+
+:io - used in async/io-thread, for :io workloads in flow/process,
+and for dispatch handling if no explicit dispatch handler is
+provided (see below)
+
+:mixed - used by async/thread and for :mixed workloads in
+flow/process
+
+:compute - used for :compute workloads in flow/process
+
+:core-async-dispatch - used for completion fn handling (e.g. in put!
+and take!, as well as go block IOC thunk processing) throughout
+core.async. If not supplied the ExecutorService for :io will be
+used instead.
+
+The set of contexts may grow in the future so the function should
+return nil for unexpected contexts.
<!
(<! port)
takes a val from port. Must be called inside a (go ...) block. Will
+return nil if closed. Will park if nothing is available.
<!!
(<!! port)
takes a val from port. Will return nil if closed. Will block
+if nothing is available.
+Not intended for use in direct or transitive calls from (go ...) blocks.
+Use the clojure.core.async.go-checking flag to detect invalid use (see
+namespace docs).
>!
(>! port val)
puts a val into port. nil values are not allowed. Must be called
+inside a (go ...) block. Will park if no buffer space is available.
+Returns true unless port is already closed.
>!!
(>!! port val)
puts a val into port. nil values are not allowed. Will block if no
+buffer space is available. Returns true unless port is already closed.
+Not intended for use in direct or transitive calls from (go ...) blocks.
+Use the clojure.core.async.go-checking flag to detect invalid use (see
+namespace docs).
admix
(admix mix ch)
Adds ch as an input to the mix
+
alt!
macro
(alt! & clauses)
Makes a single choice between one of several channel operations,
+as if by alts!, returning the value of the result expr corresponding
+to the operation completed. Must be called inside a (go ...) block.
+
+Each clause takes the form of:
+
+channel-op[s] result-expr
+
+where channel-ops is one of:
+
+take-port - a single port to take
+[take-port | [put-port put-val] ...] - a vector of ports as per alts!
+:default | :priority - an option for alts!
+
+and result-expr is either a list beginning with a vector, whereupon that
+vector will be treated as a binding for the [val port] return of the
+operation, else any other expression.
+
+(alt!
+ [c t] ([val ch] (foo ch val))
+ x ([v] v)
+ [[out val]] :wrote
+ :default 42)
+
+Each option may appear at most once. The choice and parking
+characteristics are those of alts!.
alt!!
macro
(alt!! & clauses)
Like alt!, except as if by alts!!, will block until completed, and
+not intended for use in (go ...) blocks.
alts!
(alts! ports & {:as opts})
Completes at most one of several channel operations. Must be called
+inside a (go ...) block. ports is a vector of channel endpoints,
+which can be either a channel to take from or a vector of
+[channel-to-put-to val-to-put], in any combination. Takes will be
+made as if by <!, and puts will be made as if by >!. Unless
+the :priority option is true, if more than one port operation is
+ready a non-deterministic choice will be made. If no operation is
+ready and a :default value is supplied, [default-val :default] will
+be returned, otherwise alts! will park until the first operation to
+become ready completes. Returns [val port] of the completed
+operation, where val is the value taken for takes, and a
+boolean (true unless already closed, as per put!) for puts.
+
+opts are passed as :key val ... Supported options:
+
+:default val - the value to use if none of the operations are immediately ready
+:priority true - (default nil) when true, the operations will be tried in order.
+
+Note: there is no guarantee that the port exps or val exprs will be
+used, nor in what order should they be, so they should not be
+depended upon for side effects.
alts!!
(alts!! ports & opts)
Like alts!, except takes will be made as if by <!!, and puts will
+be made as if by >!!, will block until completed.
+Not intended for use in direct or transitive calls from (go ...) blocks.
+Use the clojure.core.async.go-checking flag to detect invalid use (see
+namespace docs).
buffer
(buffer n)
Returns a fixed buffer of size n. When full, puts will block/park.
+
Creates a channel with an optional buffer, an optional transducer
+(like (map f), (filter p) etc or a composition thereof), and an
+optional exception-handler. If buf-or-n is a number, will create
+and use a fixed buffer of that size. If a transducer is supplied a
+buffer must be specified. ex-handler must be a fn of one argument -
+if an exception occurs during transformation it will be called with
+the Throwable as an argument, and any non-nil return value will be
+placed in the channel. Channels implement Datafiable; use datafy
+to obtain the state of the channel and its buffer.
close!
(close! chan)
Closes a channel. The channel will no longer accept any puts (they
+will be ignored). Data in the channel remains available for taking, until
+exhausted, after which takes will return nil. If there are any
+pending takes, they will be dispatched with nil. Closing a closed
+channel is a no-op. Returns nil.
+
+Logically closing happens after all puts have been delivered. Therefore, any
+blocked or parked puts will remain blocked/parked until a taker releases them.
defblockingop
macro
(defblockingop op doc arglist & body)
do-alt
(do-alt alts clauses)
do-alts
(do-alts fret ports opts)
returns derefable [val port] if immediate, nil if enqueued
+
dropping-buffer
(dropping-buffer n)
Returns a buffer of size n. When full, puts will complete but
+val will be dropped (no transfer).
fn-handler
(fn-handler f)(fn-handler f blockable)
go
macro
(go & body)
Asynchronously executes the body, returning immediately to the
+calling thread. Additionally, any visible calls to <!, >! and alt!/alts!
+channel operations within the body will block (if necessary) by
+'parking' the calling thread rather than tying up an OS thread (or
+the only JS thread when in ClojureScript). Upon completion of the
+operation, the body will be resumed.
+
+go blocks should not (either directly or indirectly) perform operations
+that may block indefinitely. Doing so risks depleting the fixed pool of
+go block threads, causing all go block processing to stop. This includes
+core.async blocking ops (those ending in !!) and other blocking IO.
+
+Returns a channel which will receive the result of the body when
+completed
go-loop
macro
(go-loop bindings & body)
Like (go (loop ...))
+
into
(into coll ch)
Returns a channel containing the single (collection) result of the
+items taken from the channel conjoined to the supplied
+collection. ch must close before into produces a result.
io-thread
macro
(io-thread & body)
Executes the body in a thread, returning immediately to the calling
+thread. The body may do blocking I/O but must not do extended computation.
+Returns a channel which will receive the result of the body when completed,
+then close.
ioc-alts!
(ioc-alts! state cont-block ports & {:as opts})
map
(map f chs)(map f chs buf-or-n)
Takes a function and a collection of source channels, and returns a
+channel which contains the values produced by applying f to the set
+of first items taken from each source channel, followed by applying
+f to the set of second items from each channel, until any one of the
+channels is closed, at which point the output channel will be
+closed. The returned channel will be unbuffered by default, or a
+buf-or-n can be supplied
merge
(merge chs)(merge chs buf-or-n)
Takes a collection of source channels and returns a channel which
+contains all values taken from them. The returned channel will be
+unbuffered by default, or a buf-or-n can be supplied. The channel
+will close after all the source channels have closed.
Mix
protocol
members
admix*
(admix* m ch)
solo-mode*
(solo-mode* m mode)
toggle*
(toggle* m state-map)
unmix*
(unmix* m ch)
unmix-all*
(unmix-all* m)
mix
(mix out)
Creates and returns a mix of one or more input channels which will
+be put on the supplied out channel. Input sources can be added to
+the mix with 'admix', and removed with 'unmix'. A mix supports
+soloing, muting and pausing multiple inputs atomically using
+'toggle', and can solo using either muting or pausing as determined
+by 'solo-mode'.
+
+Each channel can have zero or more boolean modes set via 'toggle':
+
+:solo - when true, only this (ond other soloed) channel(s) will appear
+ in the mix output channel. :mute and :pause states of soloed
+ channels are ignored. If solo-mode is :mute, non-soloed
+ channels are muted, if :pause, non-soloed channels are
+ paused.
+
+:mute - muted channels will have their contents consumed but not included in the mix
+:pause - paused channels will not have their contents consumed (and thus also not included in the mix)
Mult
protocol
members
tap*
(tap* m ch close?)
untap*
(untap* m ch)
untap-all*
(untap-all* m)
mult
(mult ch)
Creates and returns a mult(iple) of the supplied channel. Channels
+containing copies of the channel can be created with 'tap', and
+detached with 'untap'.
+
+Each item is distributed to all taps in parallel and synchronously,
+i.e. each tap must accept before the next item is distributed. Use
+buffering/windowing to prevent slow taps from holding up the mult.
+
+Items received when there are no taps get dropped.
+
+If a tap puts to a closed channel, it will be removed from the mult.
Mux
protocol
members
muxch*
(muxch* _)
offer!
(offer! port val)
Puts a val into port if it's possible to do so immediately.
+nil values are not allowed. Never blocks. Returns true if offer succeeds.
onto-chan
deprecated in 1.2
(onto-chan ch coll)(onto-chan ch coll close?)
Deprecated - use onto-chan! or onto-chan!!
+
onto-chan!
(onto-chan! ch coll)(onto-chan! ch coll close?)
Puts the contents of coll into the supplied channel.
+
+By default the channel will be closed after the items are copied,
+but can be determined by the close? parameter.
+
+Returns a channel which will close after the items are copied.
+
+If accessing coll might block, use onto-chan!! instead
onto-chan!!
(onto-chan!! ch coll)(onto-chan!! ch coll close?)
Like onto-chan! for use when accessing coll might block,
+e.g. a lazy seq of blocking operations
pipe
(pipe from to)(pipe from to close?)
Takes elements from the from channel and supplies them to the to
+channel. By default, the to channel will be closed when the from
+channel closes, but can be determined by the close? parameter. Will
+stop consuming the from channel if the to channel closes
pipeline
(pipeline n to xf from)(pipeline n to xf from close?)(pipeline n to xf from close? ex-handler)
Takes elements from the from channel and supplies them to the to
+channel, subject to the transducer xf, with parallelism n. Because
+it is parallel, the transducer will be applied independently to each
+element, not across elements, and may produce zero or more outputs
+per input. Outputs will be returned in order relative to the
+inputs. By default, the to channel will be closed when the from
+channel closes, but can be determined by the close? parameter. Will
+stop consuming the from channel if the to channel closes. Note this
+should be used for computational parallelism. If you have multiple
+blocking operations to put in flight, use pipeline-blocking instead,
+If you have multiple asynchronous operations to put in flight, use
+pipeline-async instead. See chan for semantics of ex-handler.
pipeline-async
(pipeline-async n to af from)(pipeline-async n to af from close?)
Takes elements from the from channel and supplies them to the to
+channel, subject to the async function af, with parallelism n. af
+must be a function of two arguments, the first an input value and
+the second a channel on which to place the result(s). The
+presumption is that af will return immediately, having launched some
+asynchronous operation whose completion/callback will put results on
+the channel, then close! it. Outputs will be returned in order
+relative to the inputs. By default, the to channel will be closed
+when the from channel closes, but can be determined by the close?
+parameter. Will stop consuming the from channel if the to channel
+closes. See also pipeline, pipeline-blocking.
pipeline-blocking
(pipeline-blocking n to xf from)(pipeline-blocking n to xf from close?)(pipeline-blocking n to xf from close? ex-handler)
Like pipeline, for blocking operations.
+
poll!
(poll! port)
Takes a val from port if it's possible to do so immediately.
+Never blocks. Returns value if successful, nil otherwise.
Creates a promise channel with an optional transducer, and an optional
+exception-handler. A promise channel can take exactly one value that consumers
+will receive. Once full, puts complete but val is dropped (no transfer).
+Consumers will block until either a value is placed in the channel or the
+channel is closed, then return the value (or nil) forever. See chan for the
+semantics of xform and ex-handler.
Pub
protocol
members
sub*
(sub* p v ch close?)
unsub*
(unsub* p v ch)
unsub-all*
(unsub-all* p)(unsub-all* p v)
pub
(pub ch topic-fn)(pub ch topic-fn buf-fn)
Creates and returns a pub(lication) of the supplied channel,
+partitioned into topics by the topic-fn. topic-fn will be applied to
+each value on the channel and the result will determine the 'topic'
+on which that value will be put. Channels can be subscribed to
+receive copies of topics using 'sub', and unsubscribed using
+'unsub'. Each topic will be handled by an internal mult on a
+dedicated channel. By default these internal channels are
+unbuffered, but a buf-fn can be supplied which, given a topic,
+creates a buffer with desired properties.
+
+Each item is distributed to all subs in parallel and synchronously,
+i.e. each sub must accept before the next item is distributed. Use
+buffering/windowing to prevent slow subs from holding up the pub.
+
+Items received when there are no matching subs get dropped.
+
+Note that if buf-fns are used then each topic is handled
+asynchronously, i.e. if a channel is subscribed to more than one
+topic it should not expect them to be interleaved identically with
+the source.
put!
(put! port val)(put! port val fn1)(put! port val fn1 on-caller?)
Asynchronously puts a val into port, calling fn1 (if supplied) when
+complete, passing false iff port is already closed. nil values are
+not allowed. If on-caller? (default true) is true, and the put is
+immediately accepted, will call fn1 on calling thread.
+
+fn1 may be run in a fixed-size dispatch thread pool and should not
+perform blocking IO, including core.async blocking ops (those that
+end in !!).
+
+Returns true unless port is already closed.
reduce
(reduce f init ch)
f should be a function of 2 arguments. Returns a channel containing
+the single result of applying f to init and the first item from the
+channel, then applying f to that result and the 2nd item, etc. If
+the channel closes without yielding items, returns init and f is not
+called. ch must close before reduce produces a result.
sliding-buffer
(sliding-buffer n)
Returns a buffer of size n. When full, puts will complete, and be
+buffered, but oldest elements in buffer will be dropped (not
+transferred).
solo-mode
(solo-mode mix mode)
Sets the solo mode of the mix. mode must be one of :mute or :pause
+
split
(split p ch)(split p ch t-buf-or-n f-buf-or-n)
Takes a predicate and a source channel and returns a vector of two
+channels, the first of which will contain the values for which the
+predicate returned true, the second those for which it returned
+false.
+
+The out channels will be unbuffered by default, or two buf-or-ns can
+be supplied. The channels will close after the source channel has
+closed.
sub
(sub p topic ch)(sub p topic ch close?)
Subscribes a channel to a topic of a pub.
+
+By default the channel will be closed when the source closes,
+but can be determined by the close? parameter.
take
(take n ch)(take n ch buf-or-n)
Returns a channel that will return, at most, n items from ch. After n items
+ have been returned, or ch has been closed, the return channel will close.
+
+The output channel is unbuffered by default, unless buf-or-n is given.
take!
(take! port fn1)(take! port fn1 on-caller?)
Asynchronously takes a val from port, passing to fn1. Will pass nil
+if closed. If on-caller? (default true) is true, and value is
+immediately available, will call fn1 on calling thread.
+
+fn1 may be run in a fixed-size dispatch thread pool and should not
+perform blocking IO, including core.async blocking ops (those that
+end in !!).
+
+Returns nil.
tap
(tap mult ch)(tap mult ch close?)
Copies the mult source onto the supplied channel.
+
+By default the channel will be closed when the source closes,
+but can be determined by the close? parameter.
thread
macro
(thread & body)
Executes the body in another thread, returning immediately to the
+calling thread. Returns a channel which will receive the result of
+the body when completed, then close.
thread-call
(thread-call f)(thread-call f workload)
Executes f in another thread, returning immediately to the calling
+thread. Returns a channel which will receive the result of calling
+f when completed, then close. workload is a keyword that describes
+the work performed by f, where:
+
+:io - may do blocking I/O but must not do extended computation
+:compute - must not ever block
+:mixed - anything else (default)
+
+when workload not supplied, defaults to :mixed
timeout
(timeout msecs)
Returns a channel that will close after msecs
+
to-chan
deprecated in 1.2
(to-chan coll)
Deprecated - use to-chan! or to-chan!!
+
to-chan!
(to-chan! coll)
Creates and returns a channel which contains the contents of coll,
+closing when exhausted.
+
+If accessing coll might block, use to-chan!! instead
to-chan!!
(to-chan!! coll)
Like to-chan! for use when accessing coll might block,
+e.g. a lazy seq of blocking operations
toggle
(toggle mix state-map)
Atomically sets the state(s) of one or more channels in a mix. The
+state map is a map of channels -> channel-state-map. A
+channel-state-map is a map of attrs -> boolean, where attr is one or
+more of :mute, :pause or :solo. Any states supplied are merged with
+the current state.
+
+Note that channels can be added to a mix via toggle, which can be
+used to add channels in a particular (e.g. paused) state.
transduce
(transduce xform f init ch)
async/reduces a channel with a transformation (xform f).
+Returns a channel containing the result. ch must close before
+transduce produces a result.
unblocking-buffer?
(unblocking-buffer? buff)
Returns true if a channel created with buff will never block. That is to say,
+puts into this buffer will never cause the buffer to be full.
unmix
(unmix mix ch)
Removes ch as an input to the mix
+
unmix-all
(unmix-all mix)
removes all inputs from the mix
+
unsub
(unsub p topic ch)
Unsubscribes a channel from a topic of a pub
+
unsub-all
(unsub-all p)(unsub-all p topic)
Unsubscribes all channels from a pub, or a topic of a pub
+
The flow library enables a strict separation application logic from the deployment concerns of topology, execution, communication, lifecycle, monitoring and error handling.
+
Step fns and process launchers
+
You provide logic to flow in the form of step-fns, which are wrapped into running processes, executing in a loop. Flow manages the life cycle of the process and handles incoming and outgoing messages by putting or taking them on channels. Step-fns do not access channels directly or hold state, making them easy to test in isolation and reuse.
+
Step functions have four arities:
+
+
describe: (step-fn) -> descriptor
+
The describe arity must return a static description of the step-fn’s :params, :ins, and :outs. Each of these is a map of name (a keyword) to docstring.
+
For example, the describe arity might return this description for a simple step-fn:
The names used for input and output channels should be distinct (no overlap).
+
init: (step-fn arg-map) -> init-state
+
The init arity is called once by the process to takes a set of args from the flow def (corresponding to the params returned from the describe arity) and returns the init state of the process.
+
transition: (step-fn state transition) -> state'
+
The transition arity is called any time the flow or process undergoes a lifecycle transition (::flow/start, ::flow/stop, ::flow/pause, ::flow/resume). The description arity takes the current state and returns an updated state to be used for subsequent calls.
+
The step-fn should use the transition arity to coordinate the creation, pausing, and shutdown of external resources in a process.
+
transform: (step-fn state input msg) -> [state' {out-id [msgs]}]
+
The transform arity is called in a loop by the process for every message received on an input channel and returns a new state and a map of output cids to messages to return. The process will take care of sending these messages to the output channels. Output can be sent to none, any or all of the :outsenumerated, and/or an input named by a pid inid tuple (e.g. for reply-to), and/or to the ::flow/report output. A step need not output at all (output or msgs can be empyt/nil), however an output message may never be nil (per core.async channels).
+
The step-fn may throw excepitons from any arity and they will be handled by flow. Exceptions thrown from the transition or transform arities, the exception will be logged on the flow’s :error-chan.
+
Process state
+
The process state is a map. It can contain any keys needed by the step-fn transition and transform arities. In addition, there are some flow-specific keys, described here.
+
::flow/pid is added to the state by the process based on the name supplied in the flow def.
+
::flow/in-ports and ::flow/out-ports are maps of cid to external channel, optionally returned in the initial state from the init arity. The in-ports and out-ports are used to connect source and sink processes to external channels. These channels must be provided by the step-fn and returned in the init arity map, either by creating the channel or using a channel passed in via the flow def init args for the process. The flow does not manage the lifecycle of these channels.
+
::flow/input-filter, a predicate of cid, can be returned in the state from any arity to indicate a filter on the process input channel read set. For example, a step-fn that is waiting for a response from multiple inputs might remove the channels that have already responded from the read-set until responses have been received from all.
+
step-fn helpers
+
Some additional helpers exist to create step-fns from other forms:
+
+
lift*->step - given a fn f taking one arg and returning a collection of non-nil values, creates a step-fn as needed by a process launcher, with one input and one output (named :in and :out), and no state
+
lift1->step - like lift*->step but for functions that return a single value (when nil, yield no output)
+
map->step - given a map with keys :describe, :init, :transition, :transform corresponding to the arities above, create a step-fn.
+
+
Creating a process launcher
+
Process launchers can be created using the process function, which takes a step-fn, and an option map with keys:
+
+
::workload - one of :mixed, :io or :compute
+
:compute-timeout-ms - if :workload is :compute, this timeout (default 5000 msec) will be used when getting the return from the future - see below
+
+
A :workload supplied as an option to process will override any :workload returned by the :describe fn of the process launcher. If neither are provded the default is :mixed.
+
In the :workload context of :mixed or :io, this dictates the type of thread in which the process loop will run, including its calls to transform.
+
When :io is specified, transform should not do extensive computation.
+
When :compute is specified, each call to transform will be run in a separate thread. The process loop will run in an :io context (since it no longer directly calls transform, all it does is I/O) and it will submit transform to the :compute executor then await (blocking, for compute-timeout-ms) the completion of the future returned by the executor. If the future times out it will be reported on ::flow/error.
+
When :compute is specified transform must not block!
+
Note that process launchers are defined by the ProcLauncher protocol. While you will typically use process to create a process launcher, advanced uses may also implement the protocol directly.
+
Reloading
+
Because the step-fn is called in a loop, it is a good practice to define the step-fn in a var and use the var (#'the-fn) instead of the function value itself (the-fn). This practice supports interactive development by allowing the var to be rebound from the repl while the flow is running.
+
Flow def
+
The step-fns are how you supply code for each process in the flow. The other thing you must supply is the flow configuration that ties together the proc launchers and the connections between them.
+
This flow definition is supplied to the create-flow function and consists of a map with :procs, :conns, and optionally some workflow executors.
+
The :procs is a map of pid -> proc-def. The proc-def is a map with :proc (the process launcher), the :args (passed to the init arity of the step-fn), and the :chan-opts which can be used to specify channel properties.
+
The :conns is a collection of [[from-pid outid] [to-pid inid]] tuples. Inputs and outputs support multiple connections. When an output is connected multiple times, every connection will get every message, per core.async/mult.
+
An example flow definition might look like this for a flow with two procs where the in-chan and out-chan are being passed through the source and sink args:
The flow is created by passing the flow definition to create-flow.
+
The returned flow object can be passed to the lifecycle methods (see next). In addition the flow can be used with datafy to get a datafied description of the flow. This is a static view - see ping described later for a dynamic view.
+
Flow lifecycle
+
When a flow is created, it starts in the resumed state. The following flow functions can be used to change the flow lifecycle state:
+
+
start - Starts all procs in the flow, return a map of with :report-chan and :error-chan
The map returned from start has the flow’s report and error channels. Procs can output messages to the :report-chan for unified logging across the flow. Exceptions thrown by a step-fn or procs in the flow are all logged to the :error-chan.
\ No newline at end of file
diff --git a/docs/flow.html b/docs/flow.html
new file mode 100644
index 00000000..085227ca
--- /dev/null
+++ b/docs/flow.html
@@ -0,0 +1,25 @@
+
+Flow
The rationale for core.async says “There comes a time in all good programs when components or subsystems must stop communicating directly with one another.” And core.async provides fundamental tools (channels) for doing that.
+
But using core.async well, e.g. keeping your I/O out of your computational logic, requires discipline and architectural savvy, and to do so consistently throughout an application or organization, conventions. Given channels, many architectural decisions remain regarding thread execution, backpressure, error handling etc. And often the topology of your network of communicating processes emerges out of the flow of control of your program as various pieces of code create threads and wire channels together, interleaved with computation, making it difficult to see the topology or administer it in one place.
+
The fundamental objective of core.async.flow is to enable a strict separation of your application logic from its topology, execution, communication, lifecycle, monitoring and error handling, all of which are provided by and centralized in, c.a.flow, yielding more consistent, robust, testable, observable and operable systems.
+
Overview
+
core.async.flow provides concrete implementations of two more abstractions - the ‘process’ - a thread of activity, and the ‘flow’ - a directed graph of processes communicating via channels. A single data structure describes your flow topology, and has all of the settings for threads, channels etc. A process accepts data from and provides data to channels. The process implementation in c.a.flow handles all channel I/O, thread lifecycle and coordination with the flow graph.
+
All you need to do in your application is:
+
+
+
Define ordinary, often pure, data->data functions that the processes will run in their inner loop to do the computational part of processing messages (aka ‘step’ functions). These functions do not handle channels or threads or lifecycle, and do not even know they are running in a flow. They can be tested in isolation, and hot-reloaded. If they encounter a problem they can, and should, just throw an exception. The process will take care of it from there.
+
+
+
Define a flow by creating a data structure that enumerates the processes and the connections between their inputs and outputs, as well as various configuration settings for both.
+
+
+
+
With these application inputs, c.a.flow does the rest. It inquires of the processes what channels they require, creates those channels, then instantiates the processes making all of the channel connections between them. The processes in turn start threads (in fully user-configurable thread pools), await inputs, monitor the admin control channel, and when inputs arrive make data->data calls to your application logic, taking the return from that and sending it to the designated output channels. The processes follow a protocol used by the flow to do lifecycle management and error handling.
+
Once you’ve created a flow, the API provides functions to start/stop(shutdown) the flow, and to pause/resume both the flow and individual processes, to ping processes to get their state and that of their connected channels, to inject data into any point in the graph etc. The flow provides channels containing the ordinary monitoring/reporting stream and, separately, the error stream.
+
The library provides many more details and features, including the ability to create, via ordinary functions, processes that act as sources (of data from outside the flow) or sinks (to recipients outside the flow) so you can situate your flow in a broader context while still coordinating resource management with the flow lifecycle.
+
I hope core.async.flow enables you to write more robust and smaller applications, with more focus on your domain logic and less on plumbing.
+
Rich Hickey 1/2025
+
\ No newline at end of file
diff --git a/docs/index.html b/docs/index.html
new file mode 100644
index 00000000..e678e34b
--- /dev/null
+++ b/docs/index.html
@@ -0,0 +1,3 @@
+
+
\ No newline at end of file
diff --git a/docs/intro.html b/docs/intro.html
new file mode 100644
index 00000000..290d669d
--- /dev/null
+++ b/docs/intro.html
@@ -0,0 +1,4 @@
+
+Introduction to core.async
core.async is a new contrib library for Clojure that adds support for asynchronous programming using channels.
+
There comes a time in all good programs when components or subsystems must stop communicating directly with one another. This is often achieved via the introduction of queues between the producers of data and the consumers/processors of that data. This architectural indirection ensures that important decisions can be made with some degree of independence, and leads to systems that are easier to understand, manage, monitor and change, and make better use of computational resources, etc.
+
On the JVM, the java.util.concurrent package provides some good concurrent blocking queues, and they are a viable and popular choice for Clojure programs. However, in order to use the queues one must dedicate one or more actual threads to their consumption. Per-thread stack allocation and task-switching overheads limit the number of threads that can be used in practice. Another limitation of j.u.c. queues is there is no way to block waiting on a set of alternatives.
+
On JavaScript engines, there are no threads and no queues.
+
Thread overheads or lack of threads often cause people to move to systems based upon events/callbacks, in the pursuit of greater efficiency (often under the misnomer ‘scalability’, which doesn’t apply since you can’t scale a single machine). Events complect communication and flow of control. While there are various mechanisms to make events/callbacks cleaner (FRP, Rx/Observables) they don’t change their fundamental nature, which is that upon an event an arbitrary amount of other code is run, possibly on the same thread, leading to admonitions such as “don’t do too much work in your handler”, and phrases like “callback hell”.
+
The objectives of core.async are:
+
+
To provide facilities for independent threads of activity, communicating via queue-like channels
+
To support both real threads and shared use of thread pools (in any combination), as well as ClojureScript on JS engines
+
To build upon the work done on CSP and its derivatives
+
+
It is our hope that async channels will greatly simplify efficient server-side Clojure programs, and offer simpler and more robust techniques for front-end programming in ClojureScript.
In modern incarnations, the notion of a channel becomes first class, and in doing so provides us the indirection and independence we seek.
+
A key characteristic of channels is that they are blocking. In the most primitive form, an unbuffered channel acts as a rendezvous, any reader will await a writer and vice-versa. Buffering can be introduced, but unbounded buffering is discouraged, as bounded buffering with blocking can be an important tool coordinating pacing and back pressure, ensuring a system doesn’t take on more work than it can achieve.
+
Details
+
Just a library
+
core.async is a library. It doesn’t modify Clojure. It is designed to support Clojure 1.5+.
+
Creating channels
+
You can create a channel with the chan function. This will return a channel that supports multiple writers and readers. By default, the channel is unbuffered, but you can supply a number to indicate a buffer size, or supply a buffer object created via buffer, dropping-buffer or sliding-buffer.
+
The fundamental operations on channels are putting and taking values. Both of those operations potentially block, but the nature of the blocking depends on the nature of the thread of control in which the operation is performed. core.async supports two kinds of threads of control - ordinary threads and IOC (inversion of control) ‘threads’. Ordinary threads can be created in any manner, but IOC threads are created via go blocks. Because JS does not have threads, only go blocks and IOC threads are supported in ClojureScript.
+
go blocks and IOC ‘threads’
+
go is a macro that takes its body and examines it for any channel operations. It will turn the body into a state machine. Upon reaching any blocking operation, the state machine will be ‘parked’ and the actual thread of control will be released. This approach is similar to that used in C# async. When the blocking operation completes, the code will be resumed (on a thread-pool thread, or the sole thread in a JS VM). In this way the inversion of control that normally leaks into the program itself with event/callback systems is encapsulated by the mechanism, and you are left with straightforward sequential code. It will also provide the illusion of threads, and more important, separable sequential subsystems, to ClojureScript.
+
The primary channel operations within go blocks are >! (put) and <! (take). The go block itself immediately returns a channel, on which it will eventually put the value of the last expression of the body (if non-nil), and then close.
+
Channel on ordinary threads
+
There are analogous operations for use on ordinary threads - >!! (put blocking) and <!! (take blocking), which will block the thread on which they are called, until complete. While you can use these operations on threads created with e.g. future, there is also a macro, thread, analogous to go, that will launch a first-class thread and similarly return a channel, and should be preferred over future for channel work.
+
Mixing modes
+
You can put on a channel from either flavor of >!/>!! and similarly take with either of <!/<!! in any combination, i.e. the channel is oblivious to the nature of the threads which use it.
+
alt
+
It is often desirable to be able to wait for any one (and only one) of a set of channel operations to complete. This powerful facility is made available through the alts! function (for use in go blocks), and the analogous alts!! (alts blocking). If more than one operation is available to complete, one can be chosen at random or by priority (i.e. in the order they are supplied). There are corresponding alt! and alt!! macros that combine the choice with conditional evaluation of expressions.
+
Timeouts
+
Timeouts are just channels that automatically close after a period of time. You can create one with the timeout function, then just include the timeout in an alt variant. A nice aspect of this is that timeouts can be shared between threads of control, e.g. in order to have a set of activities share a bound.
+
The value of values
+
As with STM, the pervasive use of persistent data structures offers particular benefits for CSP-style channels. In particular, it is always safe and efficient to put a Clojure data structure on a channel, without fear of its subsequent use by either the producer or consumer.
+
Contrasting Go language channels
+
core.async has obvious similarities to Go channels. Some differences with Go are:
+
+
All of the operations are expressions (not statements)
+
This is a library, not syntax
+
alts! is a function (and supports a runtime-variable number of operations)
+
Priority is supported in alt
+
+
Finally, Clojure is hosted, i.e. we are bringing these facilities to existing platforms, not needing a custom runtime. The flip-side is we don’t have the underpinnings we would with a custom runtime. Reaching existing platforms remains a core Clojure value proposition.
+
Whither actors?
+
I remain unenthusiastic about actors. They still couple the producer with the consumer. Yes, one can emulate or implement certain kinds of queues with actors (and, notably, people often do), but since any actor mechanism already incorporates a queue, it seems evident that queues are more primitive. It should be noted that Clojure’s mechanisms for concurrent use of state remain viable, and channels are oriented towards the flow aspects of a system.
+
Deadlocks
+
Note that, unlike other Clojure concurrency constructs, channels, like all communications, are subject to deadlocks, the simplest being waiting for a message that will never arrive, which must be dealt with manually via timeouts etc. CSP proper is amenable to certain kinds of automated correctness analysis. No work has been done on that front for core.async as yet.
+
Also note that async channels are not intended for fine-grained computational parallelism, though you might see examples in that vein.
+
Future directions
+
Networks channels and distribution are interesting areas for attention. We will also being doing performance tuning and refining the APIs.
+
Team
+
I’d like to thank the original team that helped bring core.async to life:
+
+
Timothy Baldridge
+
Ghadi Shayban
+
Alex Miller
+
Alex Redington
+
Sam Umbach
+
+
I hope that these async channels will help you build simpler and more robust programs.
+
Rich
+
\ No newline at end of file
diff --git a/docs/reference.html b/docs/reference.html
new file mode 100644
index 00000000..ff64e8db
--- /dev/null
+++ b/docs/reference.html
@@ -0,0 +1,101 @@
+
+Reference
Channels are queues that carry values and support multiple writers and readers. Channels are created with chan. Values in a channel are stored in a buffer. Buffers are never unbounded and there are several provided buffer types:
+
+
Unbuffered - (chan) - no buffer is used, and a rendezvous is required to pass a value through the channel from writer to reader
+
Fixed size - (chan 10)
+
Dropping - (chan (dropping-buffer 10)) - fixed size, and when full drop newest value
+
Sliding - (chan (sliding-buffer 10)) - fixed size, and when full drop oldest value
+
+
Channels are first-class values that can be passed around like any other value.
+
Channels may optionally be supplied with a transducer and an exception handler. The transducer will be applied to values that pass through the channel. If a transducer is supplied, the channel must be buffered (transducers can create intermediate values that must be stored somewhere). Channel transducers must not block, whether by issuing i/o operations or by externally synchronizing, else risk impeding or deadlocking go blocks.
+
The ex-handler is a function of one argument (a Throwable). If an exception occurs while applying the transducer, the ex-handler will be invoked, and any non-nil return value will be placed in the channel. If no ex-handler is supplied, exceptions will flow and be handled where they occur (note that this may in either the writer or reader thread depending on the operation and the state of the buffer).
NOTE: As a mnemonic, the < or > points in the direction the value travels relative to the channel arg. For example, in (>!! chan val) the > points into the channel (put) and (<!! chan) points out of the channel (take).
+
The use case dictates the variant to use. Parking operations are only valid in go blocks (see below for more) and never valid outside the lexical scope of a go. Conversely, blocking operations should only be used outside go blocks.
+
The async and non-blocking forms are less common but may be used in either context. Use the async variants to specify a channel and a function that is called when the take or put succeeds. The take! and put! functions also take an optional flag on-caller? to indicate whether the fn can be called on the current thread. The non-blocking offer! and poll! will either complete or return immediately.
+
Channels are closed with close!. When a channel is closed, no values may be added, but values already in the channel may be taken. When all values are drained from a closed channel, take operations will return nil (these are not valid values and serve as a marker).
alts! (parking) and alts!! (blocking) can be used to wait on a set of channel operations until one succeeds. Channel operations can be either a put (with a value) or a take. By default, if more than one operation becomes available, they are chosen in random order, but set :priority true to order a preference. Only one of the operations will occur. If no operation is available and a :default val is specified, the default value will be returned instead.
+
Since it is common to combine an alts with a conditional return based on the action chosen, alt! (parking) and alt!! (blocking) combine an alts! select with destructuring of the channel and value and a result expression.
Promise channels are special channels that will accept only a single value. Once a value is put to a promise channel, all pending and future consumers will receive only that value. Future puts complete but drop the value. When the channel is closed, consumers will receive either the value (if a put occurred) or nil (if no put occurred) forever.
“Processes”, in the most general sense, are represented either as go blocks or threads. Go blocks model a lightweight computation that can be “parked” (paused) without consuming a thread. Go blocks communicate externally via channels. Any core.async parking operation (>!, <!, alt!, alts!) that cannot be immediately completed will cause the block to park and it will be automatically resumed when the operation can complete (when data has arrived on a channel to allow it).
+
Note that go blocks are multiplexed over a finite number of threads and should never be blocked, either by the use of a core.async blocking operation (like <!!) or by calling a blockable I/O operation like a network call. Doing so may effectively block all of the threads in the go block pool and prevent further progress.
+
core.async provides the helper functions thread and thread-call (analogous to future and future-call) to execute a process asynchronously in a separate thread. As these threads are not limited, they are suitable for blocking operations and can communicate with other processes via channels. However, note that these threads are not special - you can create and manage your own threads in any way you like and use core.async channels from those threads to communicate.
The pipeline function (and variants) are designed for modeling your work as a pipeline of multi-threaded processing stages. The stages are connected by channels and each stage has N threads performing transducer xf as values flow from the from channel to the to channel. The variants are:
+
+
pipeline - the work performed in the xf must not block (designed for computational parallelism). The transducer will be applied independently to each value, in parallel, so stateful transducer functions will likely not be useful.
+
pipeline-blocking - the work performed in the xf may block, for example on network operations.
+
pipeline-async - this variant triggers asynchronous work in another system or thread and expects another thread to place the results on a return channel.
Because the core.async go block thread pool is fixed size, blocking IO operations should never be done in go blocks. If all go threads are blocked on blocking operations, you may experience either deadlock or lack of progress.
+
One common issue is the use of core.async blocking operations inside go blocks. core.async includes a debugging facility to detect this situation (other kinds of blocking operation cannot be detected so this covers only part of the problem). To enable go checking, set the Java system property clojure.core.async.go-checking=true. This property is read once, at namespace load time, and should be used in development or testing, not in production.
+
When go checking is active, invalid blocking calls in a go block will throw in go block threads. By default, these will likely throw to the go block thread’s uncaught exception handler and be printed, but you can use Thread/setDefaultUncaughtExceptionHandler to change the default behavior (or depending on your system, you may have one already that routes to logging).
+
\ No newline at end of file
diff --git a/docs/walkthrough.html b/docs/walkthrough.html
new file mode 100644
index 00000000..45cc9a39
--- /dev/null
+++ b/docs/walkthrough.html
@@ -0,0 +1,105 @@
+
+Walkthrough
Values are conveyed on queue-like channels. By default channels are unbuffered (0-length) - they require producer and consumer to rendezvous for the transfer of a value through the channel.
+
Use chan to make an unbuffered channel:
+
(a/chan)
+
+
Pass a number to create a channel with a fixed buffer size:
+
(a/chan 10)
+
+
close! a channel to stop accepting puts. Remaining values are still available to take. Drained channels return nil on take. Nils may not be sent over a channel explicitly!
+
(let [c (a/chan)]
+ (a/close! c))
+
+
Channels can also use custom buffers that have different policies for the “full” case. Two useful examples are provided in the API.
+
;; Use `dropping-buffer` to drop newest values when the buffer is full:
+(a/chan (a/dropping-buffer 10))
+
+;; Use `sliding-buffer` to drop oldest values when the buffer is full:
+(a/chan (a/sliding-buffer 10))
+
+
Threads
+
In ordinary threads, we use >!! (blocking put) and <!! (blocking take) to communicate via channels.
Because these are blocking calls, if we try to put on an unbuffered channel, we will block the main thread. We can use thread (like future) to execute a body in a pool thread and return a channel with the result. Here we launch a background task to put “hello” on a channel, then read that value in the current thread.
The go macro asynchronously executes its body in a special pool of threads. Channel operations that would block will pause execution instead, blocking no threads. This mechanism encapsulates the inversion of control that is external in event/callback systems. Inside go blocks, we use >! (put) and <! (take).
+
Here we convert our prior channel example to use go blocks:
Instead of the explicit thread and blocking call, we use a go block for the producer. The consumer uses a go block to take, then returns a result channel, from which we do a blocking take.
+
== Alts
+
One killer feature for channels over queues is the ability to wait on many channels at the same time (like a socket select). This is done with alts!! (ordinary threads) or alts! in go blocks.
+
We can create a background thread with alts that combines inputs on either of two channels. alts!! takes a set of operations to perform - either a channel to take from or a channel value to put and returns the value (nil for put) and channel that succeeded:
Prints (on stdout, possibly not visible at your repl):
+
Read hi from #object[clojure.core.async.impl.channels.ManyToManyChannel ...]
+Read there from #object[clojure.core.async.impl.channels.ManyToManyChannel ...]
+
+
We can use alts! to do the same thing with go blocks:
Since go blocks are lightweight processes not bound to threads, we can have LOTS of them! Here we create 1000 go blocks that say hi on 1000 channels. We use alts!! to read them as they’re ready.
We can combine timeout with alts! to do timed channel waits. Here we wait for 100 ms for a value to arrive on the channel, then give up:
+
(let [c (a/chan)
+ begin (System/currentTimeMillis)]
+ (a/alts!! [c (a/timeout 100)])
+ (println "Gave up after" (- (System/currentTimeMillis) begin)))
+
+
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 06d9a11b..d93b5b0a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -3,7 +3,7 @@
org.clojurecore.async
- 1.8.742-SNAPSHOT
+ 1.9.809-SNAPSHOTjarcore.asyncFacilities for async programming and communication in Clojure
@@ -20,7 +20,7 @@
org.clojurepom.contrib
- 1.2.0
+ 1.3.0
diff --git a/src/main/clojure/clojure/core/async.clj b/src/main/clojure/clojure/core/async.clj
index fab5b690..6028481f 100644
--- a/src/main/clojure/clojure/core/async.clj
+++ b/src/main/clojure/clojure/core/async.clj
@@ -112,7 +112,8 @@ return nil for unexpected contexts."
buffer must be specified. ex-handler must be a fn of one argument -
if an exception occurs during transformation it will be called with
the Throwable as an argument, and any non-nil return value will be
- placed in the channel."
+ placed in the channel. Channels implement Datafiable; use datafy
+ to obtain the state of the channel and its buffer."
([] (chan nil))
([buf-or-n] (chan buf-or-n nil))
([buf-or-n xform] (chan buf-or-n xform nil))
diff --git a/src/main/clojure/clojure/core/async/flow.clj b/src/main/clojure/clojure/core/async/flow.clj
new file mode 100644
index 00000000..4810bb93
--- /dev/null
+++ b/src/main/clojure/clojure/core/async/flow.clj
@@ -0,0 +1,310 @@
+;; Copyright (c) Rich Hickey and contributors. All rights reserved.
+;; The use and distribution terms for this software are covered by the
+;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+;; which can be found in the file epl-v10.html at the root of this distribution.
+;; By using this software in any fashion, you are agreeing to be bound by
+;; the terms of this license.
+;; You must not remove this notice, or any other, from this software.
+
+(ns ^{:author "Rich Hickey"}
+ clojure.core.async.flow
+ "
+ Note - Alpha, work-in-progress, names and other details are in flux
+
+ A library for building concurrent, event driven data processing
+ flows out of communication-free functions, while centralizing
+ control, reporting, execution and error handling. Built on core.async.
+
+ The top-level construct is the flow, comprising:
+ a set of processes (generally, threads) - concurrent activities
+ a set of channels flowing data into and out of the processes
+ a set of channels for centralized control, reporting, error-handling,
+ and execution of the processes
+
+ The flow library itself constructs processes, channels and flows. The
+ user provides configuration data and process logic (step-fns) that
+ specify how the flow should work.
+
+ A flow is constructed from flow configuration data which defines a
+ directed graph of processes and the connections between
+ them. Processes describe their I/O requirements and the
+ flow (library) itself creates channels and passes them to the
+ processes that requested them. See 'create-flow' for the
+ details. The flow configuration provides a centralized place for
+ policy decisions regarding process settings, threading, buffering etc.
+
+ It is expected that applications will rarely define instances of the
+ process protocol but instead use the API function 'process' that
+ implements the process protocol in terms of calls to ordinary
+ functions (step-fns) that might include no communication or
+ core.async code. In this way the library helps you achieve a strict
+ separation of your application logic from its execution,
+ communication, lifecycle, error handling and monitoring.
+
+ Note that at several points the library calls upon the user to
+ supply ids for processes, inputs, outputs etc. These should be
+ keywords. When a namespaced keyword is required it is explicitly
+ stated. This documentation refers to various keywords utilized by
+ the library itself as ::flow/xyz, where ::flow is an alias for
+ clojure.core.async.flow
+
+ Flows support the Clojure 'datafy' protocol to support
+ observability. See also the 'ping' and 'ping-proc' fns for a live
+ view of processes.
+
+ A process is represented in the flow definition by an implementation
+ of spi/ProcLauncher that starts it. See the spi docs for
+ details."
+
+ (:require
+ [clojure.core.async.flow.impl :as impl]
+ [clojure.core.async.flow.impl.graph :as g]))
+
+(set! *warn-on-reflection* true)
+
+(defn create-flow
+ "Creates a flow from the supplied configuration: a map containing the
+ keys :procs and :conns, and optionally :mixed-exec/:io-exec/:compute-exec
+
+ :procs - a map of pid->proc-def
+ where proc-def is a map with keys :proc, :args, :chan-opts
+
+ :proc - a function that starts a process
+ :args - a map of param->val which will be passed to the process ctor
+ :chan-opts - a map of in-or-out-id->{:keys [buf-or-n xform]}, where buf-or-n
+ and xform have their meanings per core.async/chan
+ the default is {:buf-or-n 10}
+
+ :conns - a collection of [[from-pid outid] [to-pid inid]] tuples.
+
+ Inputs and outputs support multiple connections. When an output is
+ connected multiple times every connection will get every message,
+ as per a core.async/mult.
+
+ :mixed-exec/:io-exec/:compute-exec -> ExecutorService
+ These can be used to specify the ExecutorService to use for the
+ corresonding workload, in lieu of the lib defaults.
+
+ N.B. The flow is not started. See 'start'"
+ [config] (impl/create-flow config))
+
+(defn start
+ "starts the entire flow from init values. The processes start paused.
+ Call 'resume' or 'resume-proc' to start flow. Returns a map with keys:
+
+ :report-chan - a core.async chan for reading.'ping' responses
+ will show up here, as will any explicit ::flow/report outputs
+ from :transform
+
+ :error-chan - a core.async chan for reading. Any (and only)
+ exceptions thrown anywhere on any thread inside a flow will appear
+ in maps sent here. There will at least be a ::flow/ex entry with the
+ exception, and may be additional keys for pid, state, status etc
+ depending on the context of the error."
+ [g] (g/start g))
+
+(defn stop
+ "shuts down the flow, stopping all procsesses and closing the error
+ and report channels. The flow can be started again"
+ [g] (g/stop g))
+
+(defn pause
+ "pauses a running flow"
+ [g] (g/pause g))
+
+(defn resume
+ "resumes a paused flow"
+ [g] (g/resume g))
+
+(defn ping
+ "pings all processes, returning a map of pid -> proc status and
+ state, for those procs that reply within timeout-ms (default 1000)"
+ [g & {:keys [timeout-ms] :or {timeout-ms 1000}}]
+ (g/ping g timeout-ms))
+
+(defn pause-proc
+ "pauses a process"
+ [g pid] (g/pause-proc g pid))
+
+(defn resume-proc
+ "resumes a process"
+ [g pid] (g/resume-proc g pid))
+
+(defn ping-proc
+ "like ping, but just pings the specified process"
+ [g pid & {:keys [timeout-ms] :or {timeout-ms 1000}}]
+ (g/ping-proc g pid timeout-ms))
+
+(defn inject
+ "asynchronously puts the messages on the channel corresponding to the
+ input or output of the process, returning a future that will
+ complete when done."
+ [g [pid io-id :as coord] msgs] (g/inject g coord msgs))
+
+(defn process
+ "Given a function of four arities (0-3), aka the 'step-fn',
+ returns a launcher that creates a process compliant with the process
+ protocol (see the spi/ProcLauncher doc).
+
+ The possible arities for the step-fn are
+
+ 0 - 'describe', () -> description
+ 1 - 'init', (arg-map) -> initial-state
+ 2 - 'transition', (state transition) -> state'
+ 3 - 'transform', (state input msg) -> [state' output-map]
+
+ This is the core facility for defining the logic for processes via
+ ordinary functions. Using a var holding a fn as the 'step-fn' is the
+ preferred method for defining a proc, as it enables
+ hot-code-reloading of the proc logic in a flow, and better names in
+ datafy.
+
+ arity 0 - 'describe', () -> description
+ where description is a map with keys :params :ins and :outs, each of which
+ in turn is a map of keyword to doc string, and :workload with
+ possible values of :mixed :io :compute. All entries in the describe
+ return map are optional.
+
+ :params describes the initial arguments to setup the state for the function.
+ :ins enumerates the input[s], for which the flow will create channels
+ :outs enumerates the output[s], for which the flow may create channels.
+ :workload - describes the nature of the workload, one of :mixed :io or :compute
+ an :io workload should not do extended computation
+ a :compute workload should never block
+
+ No key may be present in both :ins and :outs, allowing for a uniform
+ channel coordinate system of [:process-id :channel-id]. The
+ ins/outs/params returned will be the ins/outs/params of the
+ process. describe may be called by users to understand how to use
+ the proc. It will also be called by the impl in order to discover
+ what channels are needed.
+
+ arity 1 - 'init', (arg-map) -> initial-state
+
+ The init arity will be called once by the process to establish any initial
+ state. The arg-map will be a map of param->val, as supplied in the
+ flow def. The key ::flow/pid will be added, mapped to the pid
+ associated with the process (useful e.g. if the process wants to
+ refer to itself in reply-to coordinates).
+
+ Optionally, a returned init state may contain the
+ keys ::flow/in-ports and/or ::flow/out-ports. These should be maps
+ of cid -> a core.async.channel. The cids must not conflict with the
+ in/out ids. These channels will become part of the input/output set
+ of the process, but are not otherwise visible/resolvable within the
+ flow. Ports are a way to allow data to enter or exit the flow from
+ outside of it. Use :transition to coordinate the lifecycle of these
+ external channels.
+
+ Optionally, _any_ returned state, whether from init, transition
+ or transform, may contain the key ::flow/input-filter, a predicate
+ of cid. Only inputs (including in-ports) satisfying the predicate
+ will be part of the next channel read set. In the absence of this
+ predicate all inputs are read.
+
+ arity 2 - 'transition', (state transition) -> state'
+
+ The transition arity will be called when the process makes a state
+ transition, transition being one of ::flow/resume, ::flow/pause
+ or ::flow/stop
+
+ With this a process impl can track changes and coordinate
+ resources, especially cleaning up any resources on :stop, since the
+ process will no longer be used following that. See the SPI for
+ details. state' will be the state supplied to subsequent calls.
+
+ arity 3 - 'transform', (state in-name msg) -> [state' output]
+ where output is a map of outid->[msgs*]
+
+ The transform arity will be called every time a message arrives at any
+ of the inputs. Output can be sent to none, any or all of the :outs
+ enumerated, and/or an input named by a [pid inid] tuple (e.g. for
+ reply-to), and/or to the ::flow/report output. A step need not
+ output at all (output or msgs can be empyt/nil), however an output _message_
+ may never be nil (per core.async channels). state' will be the state
+ supplied to subsequent calls.
+
+ process also accepts an option map with keys:
+ :workload - one of :mixed, :io or :compute
+ :compute-timeout-ms - if :workload is :compute, this timeout (default 5000 msec)
+ will be used when getting the return from the future - see below
+
+ A :workload supplied as an option to process will override
+ any :workload returned by the :describe fn of the process. If neither
+ are provded the default is :mixed.
+
+ In the :workload context of :mixed or :io, this dictates the type of
+ thread in which the process loop will run, _including its calls to
+ transform_.
+
+ When :io is specified, transform should not do extensive computation.
+
+ When :compute is specified, each call to transform will be run in a
+ separate thread. The process loop will run in an :io context (since
+ it no longer directly calls transform, all it does is I/O) and it
+ will submit transform to the :compute executor then await (blocking,
+ for compute-timeout-ms) the completion of the future returned by the
+ executor. If the future times out it will be reported
+ on ::flow/error.
+
+ When :compute is specified transform must not block!"
+ ([step-fn] (process step-fn nil))
+ ([step-fn {:keys [workload compute-timeout-ms] :as opts}]
+ (impl/proc step-fn opts)))
+
+(defn map->step
+ "given a map of functions corresponding to step fn arities (see
+ 'process'), returns a step fn suitable for passing to 'process'. You
+ can use this map form to compose the proc logic from disparate
+ functions or to leverage the optionality of some of the entry
+ points.
+
+ The keys in the map are:
+ :describe, arity 0 - required
+ :init, arity 1 - optional, but should be provided if 'describe' returns :params.
+ :transition, arity 2 - optional
+ :transform, arity 3 - required"
+ [{:keys [describe init transition transform]}]
+ (assert (and describe transform) "must provide :describe and :transform")
+ (fn
+ ([] (describe))
+ ([arg-map] (when init (init arg-map)))
+ ([state trans] (if transition (transition state trans) state))
+ ([state input msg] (transform state input msg))))
+
+(defn lift*->step
+ "given a fn f taking one arg and returning a collection of non-nil
+ values, creates a step fn as needed by process, with one input
+ and one output (named :in and :out), and no state."
+ [f]
+ (fn
+ ([] {:ins {:in (str "the argument to " f)}
+ :outs {:out (str "the return of " f)}})
+ ([arg-map] nil)
+ ([state transition] nil)
+ ([state input msg] [nil {:out (f msg)}])))
+
+(defn lift1->step
+ "like lift*->step except taking a fn returning one value, which when
+ nil will yield no output."
+ [f]
+ (fn
+ ([] {:ins {:in (str "the argument to " f)}
+ :outs {:out (str "the return of " f)}})
+ ([arg-map] nil)
+ ([state transition] nil)
+ ([state input msg] [nil (when-some [m (f msg)] {:out (vector m)})])))
+
+(defn futurize
+ "Takes a fn f and returns a fn that takes the same arguments as f
+ and immediately returns a future, having started a thread for the
+ indicated workload, or via the supplied executor, that invokes f
+ with those args and completes that future with its return.
+
+ futurize accepts kwarg options:
+ :exec - one of the workloads :mixed, :io, :compute
+ or a j.u.c.ExecutorService object,
+ default :mixed"
+ [f & {:keys [exec]
+ :or {exec :mixed} :as opts}]
+ (impl/futurize f opts))
diff --git a/src/main/clojure/clojure/core/async/flow/impl.clj b/src/main/clojure/clojure/core/async/flow/impl.clj
new file mode 100644
index 00000000..74fa6519
--- /dev/null
+++ b/src/main/clojure/clojure/core/async/flow/impl.clj
@@ -0,0 +1,298 @@
+;; Copyright (c) Rich Hickey and contributors. All rights reserved.
+;; The use and distribution terms for this software are covered by the
+;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+;; which can be found in the file epl-v10.html at the root of this distribution.
+;; By using this software in any fashion, you are agreeing to be bound by
+;; the terms of this license.
+;; You must not remove this notice, or any other, from this software.
+
+(ns clojure.core.async.flow.impl
+ (:require [clojure.core.async :as async]
+ [clojure.core.async.flow :as-alias flow]
+ [clojure.core.async.flow.spi :as spi]
+ [clojure.core.async.flow.impl.graph :as graph]
+ [clojure.core.async.impl.dispatch :as disp]
+ [clojure.walk :as walk]
+ [clojure.datafy :as datafy])
+ (:import [java.util.concurrent Future Executors ExecutorService TimeUnit]
+ [java.util.concurrent.locks ReentrantLock]))
+
+(set! *warn-on-reflection* true)
+
+(defn datafy [x]
+ (condp instance? x
+ clojure.lang.Fn (-> x str symbol)
+ ExecutorService (str x)
+ clojure.lang.Var (symbol x)
+ (datafy/datafy x)))
+
+(defn futurize [f {:keys [exec]}]
+ (fn [& args]
+ (let [^ExecutorService e (if (instance? ExecutorService exec)
+ exec
+ (disp/executor-for exec))]
+ (.submit e ^Callable #(apply f args)))))
+
+(defn prep-proc [ret pid {:keys [proc, args, chan-opts] :or {chan-opts {}}}]
+ (let [{:keys [ins outs]} (spi/describe proc)
+ copts (fn [cs]
+ (zipmap (keys cs) (map #(chan-opts %) (keys cs))))
+ inopts (copts ins)
+ outopts (copts outs)]
+ (when (or (some (partial contains? inopts) (keys outopts))
+ (some (partial contains? outopts) (keys inopts)))
+ (throw (ex-info ":ins and :outs cannot share ids within a process"
+ {:pid pid :ins (keys inopts) :outs (keys outopts)})))
+ (assoc ret pid {:pid pid :proc proc :ins inopts :outs outopts :args args})))
+
+(defn create-flow
+ "see lib ns for docs"
+ [{:keys [procs conns mixed-exec io-exec compute-exec]}]
+ (let [lock (ReentrantLock.)
+ chans (atom nil)
+ execs {:mixed mixed-exec :io io-exec :compute compute-exec}
+ _ (assert (every? #(or (nil? %) (instance? ExecutorService %)) (vals execs))
+ "mixed-exe, io-exec and compute-exec must be ExecutorServices")
+ pdescs (reduce-kv prep-proc {} procs)
+ allopts (fn [iok] (into {} (mapcat #(map (fn [[k opts]] [[(:pid %) k] opts]) (iok %)) (vals pdescs))))
+ inopts (allopts :ins)
+ outopts (allopts :outs)
+ set-conj (fnil conj #{})
+ ;;out-coord->#{in-coords}
+ conn-map (reduce (fn [ret [out in :as conn]]
+ (if (and (contains? outopts out)
+ (contains? inopts in))
+ (update ret out set-conj in)
+ (throw (ex-info "invalid connection" {:conn conn}))))
+ {} conns)
+ running-chans #(or (deref chans) (throw (Exception. "flow not running")))
+ send-command (fn sc
+ ([cmap]
+ (let [{:keys [control]} (running-chans)]
+ (async/>!! control cmap)))
+ ([command to] (sc #::flow{:command command :to to})))
+ handle-ping (fn [to timeout-ms]
+ (let [reply-chan (async/chan (count procs))
+ ret-chan (async/take (if (= to ::flow/all) (count procs) 1) reply-chan)
+ timeout (async/timeout timeout-ms)
+ _ (send-command #::flow{:command ::flow/ping, :to to, :reply-chan reply-chan})
+ ret (loop [ret nil]
+ (let [[{::flow/keys [pid] :as m} c] (async/alts!! [ret-chan timeout])]
+ (if (some? m)
+ (recur (assoc ret pid m))
+ ret)))]
+ (if (= to ::flow/all) ret (-> ret vals first))))]
+ (reify
+ clojure.core.protocols/Datafiable
+ (datafy [_]
+ (walk/postwalk datafy {:procs procs, :conns conns, :execs execs
+ :chans (select-keys @chans [:ins :outs :error :report])}))
+
+ clojure.core.async.flow.impl.graph.Graph
+ (start [_]
+ (.lock lock)
+ (try
+ (if-let [{:keys [report error]} @chans]
+ {:report-chan report :error-chan error :already-running true}
+ (let [control-chan (async/chan 10)
+ control-mult (async/mult control-chan)
+ report-chan (async/chan (async/sliding-buffer 100))
+ error-chan (async/chan (async/sliding-buffer 100))
+ make-chan (fn [[[pid cid]{:keys [buf-or-n xform]}]]
+ (if xform
+ (async/chan
+ buf-or-n xform
+ (fn [ex]
+ (async/put! error-chan
+ #::flow{:ex ex, :pid pid, :cid cid, :xform xform})
+ nil))
+ (async/chan (or buf-or-n 10))))
+ in-chans (zipmap (keys inopts) (map make-chan inopts))
+ needs-mult? (fn [out ins]
+ (or (< 1 (count ins))
+ (= (first out) (ffirst ins))))
+ out-chans (zipmap (keys outopts)
+ (map (fn [[coord opts :as co]]
+ (let [conns (conn-map coord)]
+ (cond
+ (empty? conns) nil
+ (needs-mult? coord conns) (make-chan co)
+ ;;direct connect 1:1
+ :else (in-chans (first conns)))))
+ outopts))
+ ;;mults
+ _ (doseq [[out ins] conn-map]
+ (when (needs-mult? out ins)
+ (let [m (async/mult (out-chans out))]
+ (doseq [in ins]
+ (async/tap m (in-chans in))))))
+ write-chan #(if-let [[_ c] (or (find in-chans %) (find out-chans %))]
+ c
+ (throw (ex-info "can't resolve channel with coord" {:coord %})))
+ resolver (reify spi/Resolver
+ (get-write-chan [_ coord]
+ (write-chan coord))
+ (get-exec [_ context] (or (execs context) (disp/executor-for context))))
+ start-proc
+ (fn [{:keys [pid proc args ins outs]}]
+ (try
+ (let [chan-map (fn [ks coll] (zipmap (keys ks) (map #(coll [pid %]) (keys ks))))
+ control-tap (async/chan 10)]
+ (async/tap control-mult control-tap)
+ (spi/start proc {:pid pid :args (assoc args ::flow/pid pid) :resolver resolver
+ :ins (assoc (chan-map ins in-chans)
+ ::flow/control control-tap)
+ :outs (assoc (chan-map outs out-chans)
+ ::flow/error error-chan
+ ::flow/report report-chan)}))
+ (catch Throwable ex
+ (async/>!! control-chan #::flow{:command ::flow/stop :to ::flow/all})
+ (throw ex))))]
+ (doseq [p (vals pdescs)]
+ (start-proc p))
+ ;;the only connection to a running flow is via channels
+ (reset! chans {:control control-chan :resolver resolver
+ :report report-chan, :error error-chan
+ :ins in-chans, :outs out-chans})
+ {:report-chan report-chan :error-chan error-chan}))
+ (finally (.unlock lock))))
+ (stop [_]
+ (.lock lock)
+ (try
+ (when-let [{:keys [report error]} @chans]
+ (send-command ::flow/stop ::flow/all)
+ (async/close! error)
+ (async/close! report)
+ (reset! chans nil)
+ true)
+ (finally (.unlock lock))))
+ (pause [_] (send-command ::flow/pause ::flow/all))
+ (resume [_] (send-command ::flow/resume ::flow/all))
+ (ping [_ timeout-ms] (handle-ping ::flow/all timeout-ms))
+ (pause-proc [_ pid] (send-command ::flow/pause pid))
+ (resume-proc [_ pid] (send-command ::flow/resume pid))
+ (ping-proc [_ pid timeout-ms] (handle-ping pid timeout-ms))
+ (inject [_ coord msgs]
+ (let [{:keys [resolver]} (running-chans)
+ chan (spi/get-write-chan resolver coord)]
+ ((futurize #(doseq [m msgs]
+ (async/>!! chan m))
+ {:exec :io})))))))
+
+(defn handle-command
+ [pid pong status cmd]
+ (let [transition #::flow{:stop :exit, :resume :running, :pause :paused}
+ {::flow/keys [to command reply-chan]} cmd]
+ (if (#{::flow/all pid} to)
+ (do
+ (when (= command ::flow/ping) (pong reply-chan))
+ (or (transition command) status))
+ status)))
+
+(defn handle-transition
+ "when transition, returns maybe new state"
+ [transition status nstatus state]
+ (if (not= status nstatus)
+ (transition state (case nstatus
+ :exit ::flow/stop
+ :running ::flow/resume
+ :paused ::flow/pause))
+ state))
+
+(defn send-outputs [status state outputs outs resolver control handle-command transition]
+ (loop [nstatus status, nstate state, outputs (seq outputs)]
+ (if (or (nil? outputs) (= nstatus :exit))
+ [nstatus nstate]
+ (let [[output msgs] (first outputs)]
+ (if-let [outc (or (outs output) (spi/get-write-chan resolver output))]
+ (let [[nstatus nstate]
+ (loop [nstatus nstatus, nstate nstate, msgs (seq msgs)]
+ (if (or (nil? msgs) (= nstatus :exit))
+ [nstatus nstate]
+ (let [[v c] (async/alts!!
+ [control [outc (first msgs)]]
+ :priority true)]
+ (if (= c control)
+ (let [nnstatus (handle-command nstatus v)
+ nnstate (handle-transition transition nstatus nnstatus nstate)]
+ (recur nnstatus nnstate msgs))
+ (recur nstatus nstate (next msgs))))))]
+ (recur nstatus nstate (next outputs)))
+ (recur nstatus nstate (next outputs)))))))
+
+(defn proc
+ "see lib ns for docs"
+ [step {:keys [workload compute-timeout-ms] :or {compute-timeout-ms 5000}}]
+ (let [{:keys [params ins] :as desc} (step)
+ workload (or workload (:workload desc) :mixed)]
+ ;;(assert (or (not params) init) "must have :init if :params")
+ (reify
+ clojure.core.protocols/Datafiable
+ (datafy [_]
+ (let [{:keys [params ins outs]} desc]
+ (walk/postwalk datafy {:step step :desc desc})))
+ spi/ProcLauncher
+ (describe [_] desc)
+ (start [_ {:keys [pid args ins outs resolver]}]
+ (assert (or (not params) args) "must provide :args if :params")
+ (let [transform (if (= workload :compute)
+ #(.get ^Future ((futurize step {:exec (spi/get-exec resolver :compute)}) %1 %2 %3)
+ compute-timeout-ms TimeUnit/MILLISECONDS)
+ step)
+ exs (spi/get-exec resolver (if (= workload :mixed) :mixed :io))
+ state (step args)
+ ins (into (or ins {}) (::flow/in-ports state))
+ outs (into (or outs {}) (::flow/out-ports state))
+ io-id (zipmap (concat (vals ins) (vals outs)) (concat (keys ins) (keys outs)))
+ control (::flow/control ins)
+ read-ins (dissoc ins ::flow/control)
+ run
+ #(loop [status :paused, state state, count 0, read-ins read-ins]
+ (let [pong (fn [c]
+ (let [pins (dissoc ins ::flow/control)
+ pouts (dissoc outs ::flow/error ::flow/report)]
+ (async/>!! c (walk/postwalk datafy
+ #::flow{:pid pid, :status status
+ :state state, :count count
+ :ins pins :outs pouts}))))
+ handle-command (partial handle-command pid pong)
+ [nstatus nstate count read-ins]
+ (try
+ (if (= status :paused)
+ (let [nstatus (handle-command status (async/!! (outs ::flow/error)
+ #::flow{:pid pid, :status status, :state state,
+ :count (inc count), :cid cid, :msg msg :op :step, :ex ex})
+ [status state count read-ins])))))
+ (catch Throwable ex
+ (async/>!! (outs ::flow/error)
+ #::flow{:pid pid, :status status, :state state, :count (inc count), :ex ex})
+ [status state count read-ins]))]
+ (when-not (= nstatus :exit) ;;fall out
+ (recur nstatus nstate (long count) read-ins))))]
+ ((futurize run {:exec exs})))))))
diff --git a/src/main/clojure/clojure/core/async/flow/impl/graph.clj b/src/main/clojure/clojure/core/async/flow/impl/graph.clj
new file mode 100644
index 00000000..063661f8
--- /dev/null
+++ b/src/main/clojure/clojure/core/async/flow/impl/graph.clj
@@ -0,0 +1,28 @@
+;; Copyright (c) Rich Hickey and contributors. All rights reserved.
+;; The use and distribution terms for this software are covered by the
+;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+;; which can be found in the file epl-v10.html at the root of this distribution.
+;; By using this software in any fashion, you are agreeing to be bound by
+;; the terms of this license.
+;; You must not remove this notice, or any other, from this software.
+
+(ns clojure.core.async.flow.impl.graph)
+
+(defprotocol Graph
+ (start [g] "starts the entire graph from init values. The processes start paused.
+ Call resume-all or resume-proc to start flow.
+ returns {:report-chan - a core.async chan for reading
+ :error-chan - a core.async chan for reading}")
+ (stop [g] "shuts down the graph, stopping all procs, can be started again")
+ (pause [g] "pauses a running graph")
+ (resume [g] "resumes a paused graph")
+ (ping [g timeout-ms] "pings all processes, which will put their status and state on the report channel")
+
+ (pause-proc [g pid] "pauses a process")
+ (resume-proc [g pid] "resumes a process")
+ (ping-proc [g pid timeout-ms] "pings the process, which will put its status and state on the report channel")
+ (command-proc [g pid cmd-id more-kvs] "synchronously sends a process-specific command with the given id
+ and additional kvs to the process")
+
+ (inject [g [pid io-id] msgs] "synchronously puts the messages on the channel
+ corresponding to the input or output of the process"))
diff --git a/src/main/clojure/clojure/core/async/flow/spi.clj b/src/main/clojure/clojure/core/async/flow/spi.clj
new file mode 100644
index 00000000..e86a66a1
--- /dev/null
+++ b/src/main/clojure/clojure/core/async/flow/spi.clj
@@ -0,0 +1,95 @@
+;; Copyright (c) Rich Hickey and contributors. All rights reserved.
+;; The use and distribution terms for this software are covered by the
+;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+;; which can be found in the file epl-v10.html at the root of this distribution.
+;; By using this software in any fashion, you are agreeing to be bound by
+;; the terms of this license.
+;; You must not remove this notice, or any other, from this software.
+
+(ns clojure.core.async.flow.spi)
+
+(defprotocol ProcLauncher
+ "Note - defining a ProcLauncher is an advanced feature and should not
+ be needed for ordinary use of the library. This protocol is for
+ creating new types of Processes that are not possible to create
+ with ::flow/process.
+
+ A ProcLauncher is a constructor for a process, a thread of activity.
+ It has two functions - to describe the parameters and input/output
+ requirements of the process and to start it. The launcher should
+ acquire no resources, nor retain any connection to the started
+ process. A launcher may be called upon to start a process more than
+ once, and should start a new process each time start is called.
+
+ The process launched process must obey the following:
+
+ It must have 2 logical statuses, :paused and :running. In
+ the :paused status operation is suspended and no output is
+ produced.
+
+ When the process starts it must be :paused
+
+ Whenever it is reading or writing to any channel a process must use
+ alts!! and include a read of the ::flow/control channel, giving it
+ priority.
+
+ Command messages sent over the ::flow/control channel have the keys:
+ ::flow/to - either ::flow/all or a process id
+ ::flow/command - ::flow/stop|pause|resume|ping or process-specific
+
+ It must act upon any, and only, control messages whose ::flow/to key is its pid or ::flow/all
+ It must act upon the following values of ::flow/command:
+
+ ::flow/stop - all resources should be cleaned up and any thread(s)
+ should exit ordinarily - there will be no more subsequent use
+ of the process.
+ ::flow/pause - enter the :paused status
+ ::flow/resume - enter the :running status and resume processing
+ ::flow/ping - emit a ping message (format TBD) to
+ the ::flow/report channel containing at least its pid and status
+
+ A process can define and respond to other commands in its own namespace.
+
+ A process should not transmit channel objects (use [pid io-id] data
+ coordinates instead) A process should not close channels
+
+ Finally, if a process encounters an error it must report it on the
+ ::flow/error channel (format TBD) and attempt to continue, though it
+ may subsequently get a ::flow/stop command it must respect"
+
+ (describe [p]
+ "returns a map with keys - :params, :ins and :outs,
+ each of which in turn is a map of keyword to docstring
+
+ :params describes the initial arguments to setup the state for the process
+ :ins enumerates the input[s], for which the graph will create channels
+ :outs enumerates the output[s], for which the graph may create channels.
+
+ describe may be called by users to understand how to use the
+ proc. It will also be called by the impl in order to discover what
+ channels are needed.")
+
+ (start [p {:keys [pid args ins outs resolver]}]
+ "return ignored, called for the
+ effect of starting the process (typically, starting its thread)
+
+ where:
+
+ :pid - the id of the process in the graph, so that e.g. it can refer to itself in control, reporting etc
+ :args - a map of param->val, as supplied in the graph def
+ :ins - a map of in-id->readable-channel, plus the ::flow/control channel
+ :outs - a map of out-id->writeable-channel, plus the ::flow/error and ::flow/report channels
+ N.B. outputs may be nil if not connected
+ :resolver - an impl of spi/Resolver, which can be used to find
+ channels given their logical [pid cid] coordinates, as well as to
+ obtain ExecutorServices corresponding to the
+ logical :mixed/:io/:compute contexts"))
+
+(defprotocol Resolver
+ (get-write-chan [_ coord]
+ "Given a tuple of [pid cid], returns a core.async chan to
+ write to or nil (in which case the output should be dropped,
+ e.g. nothing is connected).")
+ (get-exec [_ context]
+ "returns the ExecutorService for the given context, one
+ of :mixed, :io, :compute"))
diff --git a/src/main/clojure/clojure/core/async/impl/buffers.clj b/src/main/clojure/clojure/core/async/impl/buffers.clj
index 1f4948d0..ab273c7b 100644
--- a/src/main/clojure/clojure/core/async/impl/buffers.clj
+++ b/src/main/clojure/clojure/core/async/impl/buffers.clj
@@ -14,6 +14,7 @@
(set! *warn-on-reflection* true)
+
(deftype FixedBuffer [^LinkedList buf ^long n]
impl/Buffer
(full? [_this]
@@ -26,12 +27,14 @@
(close-buf! [_this])
Counted
(count [_this]
- (.size buf)))
+ (.size buf))
+ impl/Capacity
+ (capacity [_this]
+ n))
(defn fixed-buffer [^long n]
(FixedBuffer. (LinkedList.) n))
-
(deftype DroppingBuffer [^LinkedList buf ^long n]
impl/UnblockingBuffer
impl/Buffer
@@ -46,7 +49,10 @@
(close-buf! [_this])
Counted
(count [_this]
- (.size buf)))
+ (.size buf))
+ impl/Capacity
+ (capacity [_this]
+ n))
(defn dropping-buffer [n]
(DroppingBuffer. (LinkedList.) n))
@@ -66,7 +72,10 @@
(close-buf! [_this])
Counted
(count [_this]
- (.size buf)))
+ (.size buf))
+ impl/Capacity
+ (capacity [_this]
+ n))
(defn sliding-buffer [n]
(SlidingBuffer. (LinkedList.) n))
@@ -91,7 +100,25 @@
(set! val nil)))
Counted
(count [_]
- (if (undelivered? val) 0 1)))
+ (if (undelivered? val) 0 1))
+ impl/Capacity
+ (capacity [_this]
+ 1))
(defn promise-buffer []
- (PromiseBuffer. NO-VAL))
\ No newline at end of file
+ (PromiseBuffer. NO-VAL))
+
+(defn datafy-buffer [buffer]
+ {:type (-> buffer class .getSimpleName symbol)
+ :count (count buffer)
+ :capacity (impl/capacity buffer)})
+
+(extend-protocol clojure.core.protocols/Datafiable
+ FixedBuffer
+ (datafy [b] (datafy-buffer b))
+ DroppingBuffer
+ (datafy [b] (datafy-buffer b))
+ SlidingBuffer
+ (datafy [b] (datafy-buffer b))
+ PromiseBuffer
+ (datafy [b] (datafy-buffer b)))
diff --git a/src/main/clojure/clojure/core/async/impl/channels.clj b/src/main/clojure/clojure/core/async/impl/channels.clj
index b3f08cad..0769532d 100644
--- a/src/main/clojure/clojure/core/async/impl/channels.clj
+++ b/src/main/clojure/clojure/core/async/impl/channels.clj
@@ -8,7 +8,8 @@
(ns ^{:skip-wiki true}
clojure.core.async.impl.channels
- (:require [clojure.core.async.impl.protocols :as impl]
+ (:require [clojure.datafy :as datafy]
+ [clojure.core.async.impl.protocols :as impl]
[clojure.core.async.impl.dispatch :as dispatch]
[clojure.core.async.impl.mutex :as mutex])
(:import [java.util LinkedList Queue]
@@ -303,3 +304,15 @@
(add! buf val)
(catch Throwable t
(handle buf exh t)))))))))
+
+(extend-protocol clojure.core.protocols/Datafiable
+ ManyToManyChannel
+ (datafy [c]
+ (let [b (.buf c)]
+ (with-meta
+ (cond->
+ {:put-count (count (.puts c))
+ :take-count (count (.takes c))
+ :closed? (impl/closed? c)}
+ b (assoc :buffer (datafy/datafy b)))
+ {::datafy/obj c}))))
\ No newline at end of file
diff --git a/src/main/clojure/clojure/core/async/impl/protocols.clj b/src/main/clojure/clojure/core/async/impl/protocols.clj
index f27cddf4..e9e88c56 100644
--- a/src/main/clojure/clojure/core/async/impl/protocols.clj
+++ b/src/main/clojure/clojure/core/async/impl/protocols.clj
@@ -34,6 +34,9 @@
(add!* [b itm] "if room, add item to the buffer, returns b, called under chan mutex")
(close-buf! [b] "called on chan closed under chan mutex, return ignored"))
+(defprotocol Capacity
+ (capacity [b] "The max capacity of the buffer, nil if unknown"))
+
(defn add!
([b] b)
([b itm]