forked from sofastack/sofa-jraft
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
(feat) refactor ThreadId and replicator (sofastack#169)
* (feat) refactor ThreadId and replicator * (feat) Adds javadoc
- Loading branch information
1 parent
b76e3e6
commit d6ac5c2
Showing
4 changed files
with
117 additions
and
108 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,7 +20,6 @@ | |
import java.util.Collections; | ||
import java.util.List; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.concurrent.locks.Lock; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
@@ -34,40 +33,49 @@ | |
*/ | ||
public class ThreadId { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(ThreadId.class); | ||
private static final int TRY_LOCK_TIMEOUT_MS = 10; | ||
|
||
private final Object data; | ||
private volatile NonReentrantLock lock = new NonReentrantLock(); | ||
private final List<Integer> pendingErrors = Collections.synchronizedList(new ArrayList<>()); | ||
private final OnError onError; | ||
private static final Logger LOG = LoggerFactory.getLogger(ThreadId.class); | ||
|
||
private final Object data; | ||
private final NonReentrantLock lock = new NonReentrantLock(); | ||
private final List<Integer> pendingErrors = Collections.synchronizedList(new ArrayList<>()); | ||
private final OnError onError; | ||
private volatile boolean destroyed; | ||
|
||
/** | ||
* @author boyan ([email protected]) | ||
* | ||
* 2018-Mar-29 11:01:54 AM | ||
*/ | ||
public interface OnError { | ||
/** | ||
* Error callback,it will be called in lock, but should take care of unlocking it. | ||
* @param id the thread id | ||
* @param data the data | ||
* @param errorCode the error code | ||
*/ | ||
void onError(ThreadId id, Object data, int errorCode); | ||
} | ||
|
||
public ThreadId(Object data, OnError onError) { | ||
public ThreadId(final Object data, final OnError onError) { | ||
super(); | ||
this.data = data; | ||
this.onError = onError; | ||
this.destroyed = false; | ||
} | ||
|
||
public Object getData() { | ||
return data; | ||
return this.data; | ||
} | ||
|
||
public Object lock() { | ||
final Lock theLock = this.lock; | ||
if (theLock == null) { | ||
if (this.destroyed) { | ||
return null; | ||
} | ||
try { | ||
while (!theLock.tryLock(1, TimeUnit.SECONDS)) { | ||
if (this.lock == null) { | ||
while (!this.lock.tryLock(TRY_LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) { | ||
if (this.destroyed) { | ||
return null; | ||
} | ||
} | ||
|
@@ -76,55 +84,45 @@ public Object lock() { | |
return null; | ||
} | ||
// Got the lock, double checking state. | ||
if (this.lock == null) { | ||
if (this.destroyed) { | ||
// should release lock | ||
theLock.unlock(); | ||
this.lock.unlock(); | ||
return null; | ||
} | ||
return this.data; | ||
} | ||
|
||
public void unlock() { | ||
final NonReentrantLock theLock = this.lock; | ||
if (theLock == null) { | ||
return; | ||
} | ||
if (!theLock.isHeldByCurrentThread()) { | ||
if (!this.lock.isHeldByCurrentThread()) { | ||
LOG.warn("Fail to unlock with {}, the lock is held by {} and current thread is {}.", this.data, | ||
theLock.getOwner(), Thread.currentThread()); | ||
this.lock.getOwner(), Thread.currentThread()); | ||
return; | ||
} | ||
// calls all pending errors before unlock | ||
boolean doUnlock = true; | ||
try { | ||
final List<Integer> errors; | ||
synchronized (this.pendingErrors) { | ||
errors = new ArrayList<>(this.pendingErrors); | ||
this.pendingErrors.clear(); | ||
} | ||
for (final Integer code : errors) { | ||
// The lock will be unlocked in onError. | ||
doUnlock = false; | ||
if (this.onError != null) { | ||
this.onError.onError(this, this.data, code); | ||
} | ||
} | ||
} finally { | ||
doUnlock(theLock); | ||
} | ||
} | ||
|
||
private void doUnlock(final NonReentrantLock theLock) { | ||
if (theLock != null) { | ||
try { | ||
theLock.unlock(); | ||
} catch (final Exception e) { | ||
LOG.warn("Fail to unlock with {}, the lock is held by {} and current thread is {}.", this.data, | ||
theLock.getOwner(), Thread.currentThread(), e); | ||
if (doUnlock) { | ||
this.lock.unlock(); | ||
} | ||
} | ||
} | ||
|
||
public void join() { | ||
while (this.lock != null) { | ||
Thread.yield(); | ||
while (!this.destroyed) { | ||
ThreadHelper.onSpinWait(); | ||
} | ||
} | ||
|
||
|
@@ -134,15 +132,16 @@ public String toString() { | |
} | ||
|
||
public void unlockAndDestroy() { | ||
final Lock theLock = this.lock; | ||
this.lock = null; | ||
if (theLock != null) { | ||
try { | ||
theLock.unlock(); | ||
} catch (final Exception ignored) { | ||
// ignored | ||
} | ||
if (this.destroyed) { | ||
return; | ||
} | ||
this.destroyed = true; | ||
if (!this.lock.isHeldByCurrentThread()) { | ||
LOG.warn("Fail to unlockAndDestroy with {}, the lock is held by {} and current thread is {}.", this.data, | ||
this.lock.getOwner(), Thread.currentThread()); | ||
return; | ||
} | ||
this.lock.unlock(); | ||
} | ||
|
||
/** | ||
|
@@ -153,17 +152,17 @@ public void unlockAndDestroy() { | |
* @param errorCode error code | ||
*/ | ||
public void setError(final int errorCode) { | ||
final NonReentrantLock theLock = this.lock; | ||
if (theLock == null) { | ||
if (this.destroyed) { | ||
return; | ||
} | ||
if (theLock.tryLock()) { | ||
try { | ||
if (this.onError != null) { | ||
this.onError.onError(this, data, errorCode); | ||
} | ||
} finally { | ||
doUnlock(theLock); | ||
if (this.lock.tryLock()) { | ||
if (this.destroyed) { | ||
this.lock.unlock(); | ||
return; | ||
} | ||
if (this.onError != null) { | ||
// The lock will be unlocked in onError. | ||
this.onError.onError(this, this.data, errorCode); | ||
} | ||
} else { | ||
this.pendingErrors.add(errorCode); | ||
|
Oops, something went wrong.