22
22
import java .util .concurrent .CompletableFuture ;
23
23
import java .util .concurrent .CountDownLatch ;
24
24
import java .util .concurrent .ExecutionException ;
25
- import java .util .concurrent .Executor ;
26
25
import java .util .concurrent .Future ;
27
26
import java .util .concurrent .TimeUnit ;
28
27
import java .util .concurrent .TimeoutException ;
29
- import java .util .concurrent .atomic .AtomicBoolean ;
30
28
import java .util .concurrent .atomic .AtomicIntegerFieldUpdater ;
31
29
import java .util .concurrent .atomic .AtomicReferenceFieldUpdater ;
32
30
@@ -70,13 +68,28 @@ public final class NettyResponseFuture<V> extends AbstractListenableFuture<V> {
70
68
71
69
// state mutated from outside the event loop
72
70
// TODO check if they are indeed mutated outside the event loop
73
- private final AtomicBoolean isDone = new AtomicBoolean (false );
74
- private final AtomicBoolean isCancelled = new AtomicBoolean (false );
75
- private final AtomicBoolean inAuth = new AtomicBoolean (false );
76
- private final AtomicBoolean inProxyAuth = new AtomicBoolean (false );
77
- private final AtomicBoolean statusReceived = new AtomicBoolean (false );
78
- private final AtomicBoolean contentProcessed = new AtomicBoolean (false );
79
- private final AtomicBoolean onThrowableCalled = new AtomicBoolean (false );
71
+ private volatile int isDone = 0 ;
72
+ private volatile int isCancelled = 0 ;
73
+ private volatile int inAuth = 0 ;
74
+ private volatile int inProxyAuth = 0 ;
75
+ private volatile int statusReceived = 0 ;
76
+ private volatile int contentProcessed = 0 ;
77
+ private volatile int onThrowableCalled = 0 ;
78
+
79
+ private static final AtomicIntegerFieldUpdater <NettyResponseFuture > isDoneField =
80
+ AtomicIntegerFieldUpdater .newUpdater (NettyResponseFuture .class , "isDone" );
81
+ private static final AtomicIntegerFieldUpdater <NettyResponseFuture > isCancelledField =
82
+ AtomicIntegerFieldUpdater .newUpdater (NettyResponseFuture .class , "isCancelled" );
83
+ private static final AtomicIntegerFieldUpdater <NettyResponseFuture > inAuthField =
84
+ AtomicIntegerFieldUpdater .newUpdater (NettyResponseFuture .class , "inAuth" );
85
+ private static final AtomicIntegerFieldUpdater <NettyResponseFuture > inProxyAuthField =
86
+ AtomicIntegerFieldUpdater .newUpdater (NettyResponseFuture .class , "inProxyAuth" );
87
+ private static final AtomicIntegerFieldUpdater <NettyResponseFuture > statusReceivedField =
88
+ AtomicIntegerFieldUpdater .newUpdater (NettyResponseFuture .class , "statusReceived" );
89
+ private static final AtomicIntegerFieldUpdater <NettyResponseFuture > contentProcessedField =
90
+ AtomicIntegerFieldUpdater .newUpdater (NettyResponseFuture .class , "contentProcessed" );
91
+ private static final AtomicIntegerFieldUpdater <NettyResponseFuture > onThrowableCalledField =
92
+ AtomicIntegerFieldUpdater .newUpdater (NettyResponseFuture .class , "onThrowableCalled" );
80
93
81
94
// volatile where we need CAS ops
82
95
private volatile int redirectCount = 0 ;
@@ -124,19 +137,19 @@ public NettyResponseFuture(Request originalRequest,//
124
137
125
138
@ Override
126
139
public boolean isDone () {
127
- return isDone . get () || isCancelled ();
140
+ return isDone != 0 || isCancelled ();
128
141
}
129
142
130
143
@ Override
131
144
public boolean isCancelled () {
132
- return isCancelled . get () ;
145
+ return isCancelled != 0 ;
133
146
}
134
147
135
148
@ Override
136
149
public boolean cancel (boolean force ) {
137
150
cancelTimeouts ();
138
151
139
- if (isCancelled .getAndSet (true ) )
152
+ if (isCancelledField .getAndSet (this , 1 ) != 0 )
140
153
return false ;
141
154
142
155
// cancel could happen before channel was attached
@@ -145,7 +158,7 @@ public boolean cancel(boolean force) {
145
158
Channels .silentlyCloseChannel (channel );
146
159
}
147
160
148
- if (! onThrowableCalled .getAndSet (true ) ) {
161
+ if (onThrowableCalledField .getAndSet (this , 1 ) == 0 ) {
149
162
try {
150
163
asyncHandler .onThrowable (new CancellationException ());
151
164
} catch (Throwable t ) {
@@ -183,11 +196,11 @@ private V getContent() throws ExecutionException {
183
196
V update = (V ) CONTENT_UPDATER .get (this );
184
197
// No more retry
185
198
CURRENT_RETRY_UPDATER .set (this , maxRetry );
186
- if (! contentProcessed .getAndSet (true ) ) {
199
+ if (contentProcessedField .getAndSet (this , 1 ) == 0 ) {
187
200
try {
188
201
update = asyncHandler .onCompleted ();
189
202
} catch (Throwable ex ) {
190
- if (! onThrowableCalled .getAndSet (true ) ) {
203
+ if (onThrowableCalledField .getAndSet (this , 1 ) == 0 ) {
191
204
try {
192
205
try {
193
206
asyncHandler .onThrowable (ex );
@@ -211,7 +224,7 @@ private boolean terminateAndExit() {
211
224
cancelTimeouts ();
212
225
this .channel = null ;
213
226
this .reuseChannel = false ;
214
- return isDone .getAndSet (true ) || isCancelled . get () ;
227
+ return isDoneField .getAndSet (this , 1 ) != 0 || isCancelled != 0 ;
215
228
}
216
229
217
230
public final void done () {
@@ -241,7 +254,7 @@ public final void abort(final Throwable t) {
241
254
if (terminateAndExit ())
242
255
return ;
243
256
244
- if (onThrowableCalled .compareAndSet (false , true )) {
257
+ if (onThrowableCalledField .compareAndSet (this , 0 , 1 )) {
245
258
try {
246
259
asyncHandler .onThrowable (t );
247
260
} catch (Throwable te ) {
@@ -341,12 +354,28 @@ public TimeoutsHolder getTimeoutsHolder() {
341
354
return timeoutsHolder ;
342
355
}
343
356
344
- public AtomicBoolean getInAuth () {
345
- return inAuth ;
357
+ public boolean getInAuth () {
358
+ return inAuth != 0 ;
346
359
}
347
360
348
- public AtomicBoolean getInProxyAuth () {
349
- return inProxyAuth ;
361
+ public void setInAuth (boolean inAuth ) {
362
+ this .inAuth = inAuth ? 1 : 0 ;
363
+ }
364
+
365
+ public boolean getAndSetInAuth (boolean set ) {
366
+ return inAuthField .getAndSet (this , set ? 1 : 0 ) != 0 ;
367
+ }
368
+
369
+ public boolean getInProxyAuth () {
370
+ return inProxyAuth != 0 ;
371
+ }
372
+
373
+ public void setInProxyAuth (boolean inProxyAuth ) {
374
+ this .inProxyAuth = inProxyAuth ? 1 : 0 ;
375
+ }
376
+
377
+ public boolean getAndSetInProxyAuth (boolean inProxyAuth ) {
378
+ return inProxyAuthField .getAndSet (this , inProxyAuth ? 1 : 0 ) != 0 ;
350
379
}
351
380
352
381
public ChannelState getChannelState () {
@@ -358,7 +387,7 @@ public void setChannelState(ChannelState channelState) {
358
387
}
359
388
360
389
public boolean getAndSetStatusReceived (boolean sr ) {
361
- return statusReceived .getAndSet (sr ) ;
390
+ return statusReceivedField .getAndSet (this , sr ? 1 : 0 ) != 0 ;
362
391
}
363
392
364
393
public boolean isStreamWasAlreadyConsumed () {
@@ -439,7 +468,9 @@ public void setCurrentRequest(Request currentRequest) {
439
468
* @return true if that {@link Future} cannot be recovered.
440
469
*/
441
470
public boolean canBeReplayed () {
442
- return !isDone () && !(Channels .isChannelValid (channel ) && !getUri ().getScheme ().equalsIgnoreCase ("https" )) && !inAuth .get () && !inProxyAuth .get ();
471
+ return !isDone () && !(Channels .isChannelValid (channel ) && !getUri ().getScheme ().equalsIgnoreCase ("https" ))
472
+ && inAuth == 0
473
+ && inProxyAuth == 0 ;
443
474
}
444
475
445
476
public long getStart () {
0 commit comments