Skip to content

Commit 0859070

Browse files
committed
Streams in progress - safe stream model
1 parent 0f38a4e commit 0859070

File tree

13 files changed

+372
-235
lines changed

13 files changed

+372
-235
lines changed

src/clj/clojure/core.clj

Lines changed: 179 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,31 @@
448448
(recur (first zs) (rest zs)))))]
449449
(cat (concat x y) zs))))
450450

451+
;;;;;;;;;;;;;;;;;;;;;;;;;;;; streams ;;;;;;;;;;;;;;;;;;;;;;;;;;;;
452+
(defn stream
453+
"Creates a stream of the items in coll."
454+
{:tag clojure.lang.AStream}
455+
[coll] (clojure.lang.RT/stream coll))
456+
457+
(defn stream-iter
458+
"Returns an iter on (stream coll). Only one iter on a stream is
459+
supported at a time."
460+
{:tag clojure.lang.AStream$Iter}
461+
[coll] (.iter (stream coll)))
462+
463+
(defn next!
464+
"Takes a stream iter and an eos value, returns (and consumes) the next element in the stream, or eos."
465+
[#^clojure.lang.AStream$Iter iter eos] (.next iter eos))
466+
467+
(defn push-back!
468+
"Takes a stream iter and pushes x onto front of stream, returns iter."
469+
[#^clojure.lang.AStream$Iter iter x] (.pushBack iter x))
470+
471+
(defn detach!
472+
"Takes a stream iter and disconnects it from the underlying stream,
473+
returning the stream. All further operations on the iter will fail."
474+
[#^clojure.lang.AStream$Iter iter] (.detach iter))
475+
451476
;;;;;;;;;;;;;;;;at this point all the support for syntax-quote exists;;;;;;;;;;;;;;;;;;;;;;
452477
(defmacro if-not
453478
"Evaluates test. If logical false, evaluates and returns then expr, otherwise else expr, if supplied, else nil."
@@ -1274,30 +1299,24 @@
12741299
(. ref (touch))
12751300
(. ref (get)))
12761301

1302+
(def #^{:private true :tag clojure.lang.Closer} *io-context* nil)
1303+
12771304
(defmacro sync
12781305
"transaction-flags => TBD, pass nil for now
12791306
12801307
Runs the exprs (in an implicit do) in a transaction that encompasses
12811308
exprs and any nested calls. Starts a transaction if none is already
12821309
running on this thread. Any uncaught exception will abort the
12831310
transaction and flow out of sync. The exprs may be run more than
1284-
once, but any effects on Refs will be atomic."
1311+
once, but any effects on Refs will be atomic. Transactions are not
1312+
allowed in io! blocks - will throw IllegalStateException."
12851313
[flags-ignored-for-now & body]
1286-
`(. clojure.lang.LockingTransaction
1287-
(runInTransaction (fn [] ~@body))))
1314+
`(if *io-context*
1315+
(throw (IllegalStateException. "Transaction in io!"))
1316+
(. clojure.lang.LockingTransaction
1317+
(runInTransaction (fn [] ~@body)))))
12881318

12891319

1290-
(defmacro io!
1291-
"If an io! block occurs in a transaction, throws an
1292-
IllegalStateException, else runs body in an implicit do. If the
1293-
first expression in body is a literal string, will use that as the
1294-
exception message."
1295-
[& body]
1296-
(let [message (when (string? (first body)) (first body))
1297-
body (if message (rest body) body)]
1298-
`(if (clojure.lang.LockingTransaction/isRunning)
1299-
(throw (new IllegalStateException ~(or message "I/O in transaction")))
1300-
(do ~@body))))
13011320

13021321
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; fn stuff ;;;;;;;;;;;;;;;;
13031322

@@ -1380,24 +1399,70 @@
13801399
(map f (rest c1) (rest c2) (rest c3)))))
13811400
([f c1 c2 c3 & colls]
13821401
(let [step (fn step [cs]
1383-
(when (every? seq cs)
1402+
(when (every? seq cs)
13841403
(lazy-cons (map first cs) (step (map rest cs)))))]
13851404
(map #(apply f %) (step (conj colls c3 c2 c1))))))
13861405

1406+
(defn map-stream
1407+
"Returns a stream consisting of the result of applying f to the
1408+
set of first items of each coll, followed by applying f to the set
1409+
of second items in each coll, until any one of the colls is
1410+
exhausted. Any remaining items in other colls are ignored. Function
1411+
f should accept number-of-colls arguments."
1412+
([f coll]
1413+
(identity (let [iter (stream-iter coll)]
1414+
(stream
1415+
#(let [x (next! iter %)]
1416+
(if (= % x) x (f x)))))))
1417+
([f c1 c2]
1418+
(identity (let [s1 (stream-iter c1), s2 (stream-iter c2)]
1419+
(stream
1420+
#(let [x1 (next! s1 %), x2 (next! s2 %)]
1421+
(if (or (= % x1) (= % x2))
1422+
%
1423+
(f x1 x2)))))))
1424+
([f c1 c2 c3]
1425+
(identity (let [s1 (stream-iter c1), s2 (stream-iter c2), s3 (stream-iter c3)]
1426+
(stream
1427+
#(let [x1 (next! s1 %), x2 (next! s2 %), x3 (next! s3 %)]
1428+
(if (or (= % x1) (= % x2) (= % x3))
1429+
%
1430+
(f x1 x2 x3)))))))
1431+
([f c1 c2 c3 & colls]
1432+
(identity (let [iters (map stream-iter (list* c1 c2 c3 colls))]
1433+
(stream
1434+
(fn [eos]
1435+
(let [xs (seq (map #(next! % eos) iters))]
1436+
(if (some #{eos} xs)
1437+
eos
1438+
(apply f xs)))))))))
1439+
13871440
(defn mapcat
13881441
"Returns the result of applying concat to the result of applying map
13891442
to f and colls. Thus function f should return a collection."
13901443
[f & colls]
13911444
(apply concat (apply map f colls)))
13921445

13931446
(defn filter
1394-
"Returns a lazy seq of the items in coll for which
1447+
"Returns a stream of the items in coll for which
13951448
(pred item) returns true. pred must be free of side-effects."
13961449
[pred coll]
1397-
(when (seq coll)
1398-
(if (pred (first coll))
1399-
(lazy-cons (first coll) (filter pred (rest coll)))
1400-
(recur pred (rest coll)))))
1450+
(seq
1451+
(let [iter (stream-iter coll)]
1452+
(stream
1453+
#(let [x (next! iter %)]
1454+
(if (or (= % x) (pred x))
1455+
x
1456+
(recur %)))))))
1457+
1458+
;(defn filter
1459+
; "Returns a lazy seq of the items in coll for which
1460+
; (pred item) returns true. pred must be free of side-effects."
1461+
; [pred coll]
1462+
; (when (seq coll)
1463+
; (if (pred (first coll))
1464+
; (lazy-cons (first coll) (filter pred (rest coll)))
1465+
; (recur pred (rest coll)))))
14011466

14021467
(defn remove
14031468
"Returns a lazy seq of the items in coll for which
@@ -1633,39 +1698,6 @@
16331698
(dorun n coll)
16341699
coll))
16351700

1636-
(defn await
1637-
"Blocks the current thread (indefinitely!) until all actions
1638-
dispatched thus far, from this thread or agent, to the agent(s) have
1639-
occurred."
1640-
[& agents]
1641-
(io! "await in transaction"
1642-
(when *agent*
1643-
(throw (new Exception "Can't await in agent action")))
1644-
(let [latch (new java.util.concurrent.CountDownLatch (count agents))
1645-
count-down (fn [agent] (. latch (countDown)) agent)]
1646-
(doseq [agent agents]
1647-
(send agent count-down))
1648-
(. latch (await)))))
1649-
1650-
(defn await1 [#^clojure.lang.Agent a]
1651-
(when (pos? (.getQueueCount a))
1652-
(await a))
1653-
a)
1654-
1655-
(defn await-for
1656-
"Blocks the current thread until all actions dispatched thus
1657-
far (from this thread or agent) to the agents have occurred, or the
1658-
timeout (in milliseconds) has elapsed. Returns nil if returning due
1659-
to timeout, non-nil otherwise."
1660-
[timeout-ms & agents]
1661-
(io! "await-for in transaction"
1662-
(when *agent*
1663-
(throw (new Exception "Can't await in agent action")))
1664-
(let [latch (new java.util.concurrent.CountDownLatch (count agents))
1665-
count-down (fn [agent] (. latch (countDown)) agent)]
1666-
(doseq [agent agents]
1667-
(send agent count-down))
1668-
(. latch (await timeout-ms (. java.util.concurrent.TimeUnit MILLISECONDS))))))
16691701

16701702
(defmacro dotimes
16711703
"bindings => name n
@@ -1947,6 +1979,97 @@
19471979
:else (throw (IllegalArgumentException.
19481980
"with-open only allows Symbols in bindings"))))
19491981

1982+
1983+
(defmacro io!
1984+
"If an io! block occurs in a transaction, throws an
1985+
IllegalStateException, else runs body in an implicit do. If the
1986+
first expression in body is a literal string, will use that as the
1987+
exception message. Establishes a dynamic io context for use with io-scope."
1988+
[& body]
1989+
(let [message (when (string? (first body)) (first body))
1990+
body (if message (rest body) body)]
1991+
`(if (clojure.lang.LockingTransaction/isRunning)
1992+
(throw (new IllegalStateException ~(or message "I/O in transaction")))
1993+
(binding [*io-context* (clojure.lang.Closer.)]
1994+
(try
1995+
~@body
1996+
(finally
1997+
(.close *io-context*)))))))
1998+
1999+
(def *scope* nil)
2000+
2001+
(defn run-scope-actions []
2002+
(let [failed (= (first *scope*) :failed)
2003+
entries (if failed (rest *scope*) *scope*)]
2004+
(doseq [e entries]
2005+
(let [cause (first e)
2006+
action (second e)]
2007+
(when (or (= cause :exits)
2008+
(and (= cause :fails) failed)
2009+
(and (= cause :succeeds) (not failed)))
2010+
(action))))))
2011+
2012+
(defmacro scope
2013+
"Creates a scope for use with when-scope."
2014+
[& body]
2015+
`(binding [*scope* (list)]
2016+
(try
2017+
~@body
2018+
(catch Throwable t#
2019+
(set! *scope* (conj *scope* :failed))
2020+
(throw t#))
2021+
(finally
2022+
(run-scope-actions)))))
2023+
2024+
(defmacro when-scope
2025+
"Causes a body of expressions to be executed at the termination of
2026+
the nearest dynamically enclosing scope (created with scope). If no
2027+
scope is in effect, throws IllegalStateException. Cause must be one of:
2028+
2029+
:exits - will run unconditionally on scope exit
2030+
:fails - will run only if scope exits due to an exception
2031+
:succeeds - will run only if scope exits normally"
2032+
2033+
[cause & body]
2034+
`(do
2035+
(when-not *scope*
2036+
(throw (IllegalStateException. "No scope in effect")))
2037+
(set! *scope* (conj *scope* [~cause (fn [] ~@body)]))))
2038+
2039+
(defn await
2040+
"Blocks the current thread (indefinitely!) until all actions
2041+
dispatched thus far, from this thread or agent, to the agent(s) have
2042+
occurred."
2043+
[& agents]
2044+
(io! "await in transaction"
2045+
(when *agent*
2046+
(throw (new Exception "Can't await in agent action")))
2047+
(let [latch (new java.util.concurrent.CountDownLatch (count agents))
2048+
count-down (fn [agent] (. latch (countDown)) agent)]
2049+
(doseq [agent agents]
2050+
(send agent count-down))
2051+
(. latch (await)))))
2052+
2053+
(defn await1 [#^clojure.lang.Agent a]
2054+
(when (pos? (.getQueueCount a))
2055+
(await a))
2056+
a)
2057+
2058+
(defn await-for
2059+
"Blocks the current thread until all actions dispatched thus
2060+
far (from this thread or agent) to the agents have occurred, or the
2061+
timeout (in milliseconds) has elapsed. Returns nil if returning due
2062+
to timeout, non-nil otherwise."
2063+
[timeout-ms & agents]
2064+
(io! "await-for in transaction"
2065+
(when *agent*
2066+
(throw (new Exception "Can't await in agent action")))
2067+
(let [latch (new java.util.concurrent.CountDownLatch (count agents))
2068+
count-down (fn [agent] (. latch (countDown)) agent)]
2069+
(doseq [agent agents]
2070+
(send agent count-down))
2071+
(. latch (await timeout-ms (. java.util.concurrent.TimeUnit MILLISECONDS))))))
2072+
19502073
(defmacro doto
19512074
"Evaluates x then calls all of the methods and functions with the
19522075
value of x supplied at the from of the given arguments. The forms
@@ -2883,7 +3006,8 @@
28833006
exprs and any nested calls. Starts a transaction if none is already
28843007
running on this thread. Any uncaught exception will abort the
28853008
transaction and flow out of dosync. The exprs may be run more than
2886-
once, but any effects on Refs will be atomic."
3009+
once, but any effects on Refs will be atomic. Transactions are not
3010+
allowed in io! blocks - will throw IllegalStateException."
28873011
[& exprs]
28883012
`(sync nil ~@exprs))
28893013

@@ -3772,3 +3896,4 @@
37723896
(load "genclass")
37733897

37743898

3899+

src/jvm/clojure/lang/APersistentVector.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@
1313
package clojure.lang;
1414

1515
import java.util.*;
16-
import java.util.concurrent.atomic.AtomicInteger;
17-
import java.util.concurrent.Callable;
1816

1917
public abstract class APersistentVector extends AFn implements IPersistentVector, Iterable,
2018
List,
@@ -363,18 +361,18 @@ public AStream stream() throws Exception {
363361
return new AStream(new Src(this));
364362
}
365363

366-
static class Src implements Callable{
364+
static class Src extends AFn{
367365
final IPersistentVector v;
368366
int i = 0;
369367

370368
Src(IPersistentVector v) {
371369
this.v = v;
372370
}
373371

374-
public Object call() throws Exception {
372+
public Object invoke(Object eos) throws Exception {
375373
if (i < v.count())
376374
return v.nth(i++);
377-
return RT.eos();
375+
return eos;
378376
}
379377
}
380378

src/jvm/clojure/lang/ASeq.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212

1313
import java.util.Collection;
1414
import java.util.Iterator;
15-
import java.util.concurrent.Callable;
1615

1716
public abstract class ASeq extends Obj implements ISeq, Collection, Streamable{
1817
transient int _hash = -1;
@@ -181,21 +180,21 @@ public AStream stream() throws Exception {
181180
return new AStream(new Src(this));
182181
}
183182

184-
static class Src implements Callable{
183+
static class Src extends AFn{
185184
ISeq s;
186185

187186
public Src(ISeq s) {
188187
this.s = s;
189188
}
190189

191-
public Object call() throws Exception {
190+
public Object invoke(Object eos) throws Exception {
192191
if(s != null)
193192
{
194193
Object ret = s.first();
195194
s = s.rest();
196195
return ret;
197196
}
198-
return RT.eos();
197+
return eos;
199198
}
200199
}
201200
}

0 commit comments

Comments
 (0)