Skip to content

Commit e26c247

Browse files
committed
Added watcher support for agents/atoms/refs/vars
Watchers must be agents (add-watcher reference :send/:send-off an-agent an-action)
1 parent 8a6c525 commit e26c247

File tree

8 files changed

+150
-118
lines changed

8 files changed

+150
-118
lines changed

src/clj/clojure/core.clj

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1100,24 +1100,22 @@
11001100
occurring, does nothing. Returns the number of actions dispatched."
11011101
[] (clojure.lang.Agent/releasePendingSends))
11021102

1103-
(defn add-watch
1103+
(defn add-watcher
11041104
"Experimental.
1105-
Adds a watcher to an agent. Whenever the agent runs an action, any
1106-
registered watchers will have their callback function called. The
1107-
callback fn will be passed 3 args, the watcher, the agent and a boolean
1108-
which will be true if the agent's state was (potentially) changed by
1109-
the action. The callback fn is run synchronously with the action,
1110-
and thus derefs of the agent in the callback will see the value set
1111-
during that action. Because it is run on the action thread, the
1112-
callback should not block, but can send messages."
1113-
[#^clojure.lang.Agent a watcher callback]
1114-
(.addWatch a watcher callback))
1115-
1116-
(defn remove-watch
1105+
Adds a watcher to an agent/atom/var/ref reference. The watcher must
1106+
be an Agent, and the action a function of the agent's state and one
1107+
additional arg, the reference. Whenever the reference's state
1108+
changes, any registered watchers will have their actions
1109+
sent. send-type must be one of :send or :send-off. The actions will
1110+
be sent afer the reference's state is changed."
1111+
[#^clojure.lang.IRef reference send-type watcher-agent action-fn]
1112+
(.addWatch reference watcher-agent action-fn (= send-type :send-off)))
1113+
1114+
(defn remove-watcher
11171115
"Experimental.
1118-
Removes a watcher (set by add-watch) from an agent"
1119-
[#^clojure.lang.Agent a watcher]
1120-
(.removeWatch a watcher))
1116+
Removes a watcher (set by add-watcher) from a reference"
1117+
[#^clojure.lang.IRef reference watcher-agent]
1118+
(.removeWatch reference watcher-agent))
11211119

11221120
(defn agent-errors
11231121
"Returns a sequence of the exceptions thrown during asynchronous

src/jvm/clojure/lang/ARef.java

Lines changed: 62 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@
1212

1313
package clojure.lang;
1414

15+
import java.util.concurrent.atomic.AtomicReference;
16+
import java.util.Map;
17+
1518
public abstract class ARef extends AReference implements IRef {
16-
private volatile IFn validator = null;
19+
protected volatile IFn validator = null;
20+
private AtomicReference<IPersistentMap> watchers = new AtomicReference(PersistentHashMap.EMPTY);
1721

1822
public ARef() {
1923
super();
@@ -57,4 +61,61 @@ public void setValidator(IFn vf){
5761
public IFn getValidator(){
5862
return validator;
5963
}
64+
65+
public IPersistentMap getWatches(){
66+
return watchers.get();
67+
}
68+
69+
public IRef addWatch(Agent watcher, IFn action, boolean sendOff){
70+
boolean added = false;
71+
IPersistentMap prior = null;
72+
while(!added)
73+
{
74+
prior = watchers.get();
75+
added = watchers.compareAndSet(prior, prior.assoc(watcher,new Object[]{action,sendOff}));
76+
}
77+
78+
return this;
79+
}
80+
81+
public IRef removeWatch(Agent watcher){
82+
boolean removed = false;
83+
IPersistentMap prior = null;
84+
while(!removed)
85+
{
86+
prior = watchers.get();
87+
try
88+
{
89+
removed = watchers.compareAndSet(prior, prior.without(watcher));
90+
}
91+
catch (Exception e)
92+
{
93+
throw new RuntimeException(e);
94+
}
95+
}
96+
97+
return this;
98+
}
99+
100+
public void notifyWatches() {
101+
IPersistentMap ws = watchers.get();
102+
if (ws != null)
103+
{
104+
ISeq args = new Cons(this, null);
105+
for (ISeq s = RT.seq(ws); s != null; s = s.rest())
106+
{
107+
Map.Entry e = (Map.Entry) s.first();
108+
Object[] a = (Object[]) e.getValue();
109+
Agent agent = (Agent) e.getKey();
110+
try
111+
{
112+
agent.dispatch((IFn) a[0], args, (Boolean)a[1]);
113+
}
114+
catch (Exception e1)
115+
{
116+
//eat dispatching exceptions and continue
117+
}
118+
}
119+
}
120+
}
60121
}

src/jvm/clojure/lang/Agent.java

Lines changed: 6 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,8 @@
1919
public class Agent extends ARef {
2020
volatile Object state;
2121
AtomicReference<IPersistentStack> q = new AtomicReference(PersistentQueue.EMPTY);
22-
AtomicReference<IPersistentMap> watchers = new AtomicReference(PersistentHashMap.EMPTY);
2322

24-
volatile ISeq errors = null;
23+
volatile ISeq errors = null;
2524

2625
final public static ExecutorService pooledExecutor =
2726
Executors.newFixedThreadPool(2 + Runtime.getRuntime().availableProcessors());
@@ -68,11 +67,8 @@ static void doRun(Action action){
6867
try
6968
{
7069
changed = action.agent.setState(action.fn.applyTo(RT.cons(action.agent.state, action.args)));
71-
for(Object o : action.agent.watchers.get())
72-
{
73-
Map.Entry e = (Map.Entry) o;
74-
((IFn) e.getValue()).invoke(e.getKey(), action.agent, RT.box(changed));
75-
}
70+
if(changed)
71+
action.agent.notifyWatches();
7672
}
7773
catch(Exception e)
7874
{
@@ -143,10 +139,10 @@ public void clearErrors(){
143139
errors = null;
144140
}
145141

146-
public Object dispatch(IFn fn, ISeq args, boolean solo) throws Exception{
142+
public Object dispatch(IFn fn, ISeq args, boolean solo) {
147143
if(errors != null)
148144
{
149-
throw new Exception("Agent has errors", (Exception) RT.first(errors));
145+
throw new RuntimeException("Agent has errors", (Exception) RT.first(errors));
150146
}
151147
Action action = new Action(this, fn, args, solo);
152148
dispatchAction(action);
@@ -183,31 +179,7 @@ public int getQueueCount(){
183179
return q.get().count();
184180
}
185181

186-
public Agent addWatch(Object watcher, IFn callback){
187-
boolean added = false;
188-
IPersistentMap prior = null;
189-
while(!added)
190-
{
191-
prior = watchers.get();
192-
added = watchers.compareAndSet(prior, prior.assoc(watcher,callback));
193-
}
194-
195-
return this;
196-
}
197-
198-
public Agent removeWatch(Object watcher) throws Exception{
199-
boolean removed = false;
200-
IPersistentMap prior = null;
201-
while(!removed)
202-
{
203-
prior = watchers.get();
204-
removed = watchers.compareAndSet(prior, prior.without(watcher));
205-
}
206-
207-
return this;
208-
}
209-
210-
static public int releasePendingSends(){
182+
static public int releasePendingSends(){
211183
IPersistentVector sends = nested.get();
212184
if(sends == null)
213185
return 0;

src/jvm/clojure/lang/Atom.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,18 +37,26 @@ public Object swap(IFn f, ISeq args) throws Exception {
3737
Object newv = f.applyTo(new Cons(v, args));
3838
validate(newv);
3939
if(state.compareAndSet(v,newv))
40+
{
41+
if(v != newv)
42+
notifyWatches();
4043
return newv;
44+
}
4145
}
4246
}
4347

4448
public boolean compareAndSet(Object oldv, Object newv){
4549
validate(newv);
46-
return state.compareAndSet(oldv, newv);
50+
boolean ret = state.compareAndSet(oldv, newv);
51+
if (ret && oldv != newv)
52+
notifyWatches();
53+
return ret;
4754
}
4855

4956
public Object reset(Object newval){
5057
validate(newval);
5158
state.set(newval);
59+
notifyWatches();
5260
return newval;
5361
}
5462
}

src/jvm/clojure/lang/IRef.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,17 @@
1414

1515
public interface IRef{
1616

17-
Object get() throws Exception;
17+
Object get() throws Exception;
1818

19-
void setValidator(IFn vf);
19+
void setValidator(IFn vf);
2020

21-
IFn getValidator();
21+
IFn getValidator();
22+
23+
IPersistentMap getWatches();
24+
25+
IRef addWatch(Agent watcher, IFn action, boolean sendOff);
26+
27+
IRef removeWatch(Agent watcher);
28+
29+
void notifyWatches();
2230
}

src/jvm/clojure/lang/LockingTransaction.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -252,11 +252,12 @@ Object run(Callable fn) throws Exception{
252252
}
253253
}
254254

255-
//validate
255+
//validate and enqueue notifications
256256
for(Map.Entry<Ref, Object> e : vals.entrySet())
257257
{
258258
Ref ref = e.getKey();
259259
ref.validate(ref.getValidator(), e.getValue());
260+
ref.notifyWatches();
260261
}
261262

262263
//at this point, all values calced, all refs to be written locked

src/jvm/clojure/lang/Ref.java

Lines changed: 41 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import java.util.concurrent.locks.ReentrantReadWriteLock;
1818
import java.util.UUID;
1919

20-
public class Ref extends AReference implements IFn, Comparable<Ref>, IRef{
20+
public class Ref extends ARef implements IFn, Comparable<Ref>, IRef{
2121
public int compareTo(Ref ref) {
2222
if(this.id == ref.id)
2323
return 0;
@@ -58,7 +58,7 @@ public static class TVal{
5858
final AtomicInteger faults;
5959
final ReentrantReadWriteLock lock;
6060
LockingTransaction.Info tinfo;
61-
IFn validator;
61+
//IFn validator;
6262
final long id;
6363

6464
static final AtomicLong ids = new AtomicLong();
@@ -101,45 +101,45 @@ public Object get(){
101101
return t.doGet(this);
102102
}
103103

104-
void validate(IFn vf, Object val){
105-
try{
106-
if(vf != null && !RT.booleanCast(vf.invoke(val)))
107-
throw new IllegalStateException("Invalid ref state");
108-
}
109-
catch(RuntimeException re)
110-
{
111-
throw re;
112-
}
113-
catch(Exception e)
114-
{
115-
throw new IllegalStateException("Invalid ref state", e);
116-
}
117-
}
118-
119-
public void setValidator(IFn vf){
120-
try
121-
{
122-
lock.writeLock().lock();
123-
validate(vf,currentVal());
124-
validator = vf;
125-
}
126-
finally
127-
{
128-
lock.writeLock().unlock();
129-
}
130-
}
131-
132-
public IFn getValidator(){
133-
try
134-
{
135-
lock.readLock().lock();
136-
return validator;
137-
}
138-
finally
139-
{
140-
lock.readLock().unlock();
141-
}
142-
}
104+
//void validate(IFn vf, Object val){
105+
// try{
106+
// if(vf != null && !RT.booleanCast(vf.invoke(val)))
107+
// throw new IllegalStateException("Invalid ref state");
108+
// }
109+
// catch(RuntimeException re)
110+
// {
111+
// throw re;
112+
// }
113+
// catch(Exception e)
114+
// {
115+
// throw new IllegalStateException("Invalid ref state", e);
116+
// }
117+
//}
118+
//
119+
//public void setValidator(IFn vf){
120+
// try
121+
// {
122+
// lock.writeLock().lock();
123+
// validate(vf,currentVal());
124+
// validator = vf;
125+
// }
126+
// finally
127+
// {
128+
// lock.writeLock().unlock();
129+
// }
130+
//}
131+
//
132+
//public IFn getValidator(){
133+
// try
134+
// {
135+
// lock.readLock().lock();
136+
// return validator;
137+
// }
138+
// finally
139+
// {
140+
// lock.readLock().unlock();
141+
// }
142+
//}
143143

144144
public Object set(Object val){
145145
return LockingTransaction.getEx().doSet(this, val);

0 commit comments

Comments
 (0)