Skip to content

Commit

Permalink
Merge branch 'mrniko/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
jackygurui committed Sep 24, 2015
2 parents e31ac9c + 4c483c6 commit 8b587cf
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 184 deletions.
95 changes: 37 additions & 58 deletions src/main/java/org/redisson/RedissonCountDownLatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,32 @@ protected RedissonCountDownLatch(CommandExecutor commandExecutor, String name, U
this.id = id;
}

private Future<Boolean> subscribe() {
Promise<Boolean> promise = aquire();
if (promise != null) {
return promise;
}
private Future<RedissonCountDownLatchEntry> subscribe() {
synchronized (ENTRIES) {
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
if (entry != null) {
entry.aquire();
return entry.getPromise();
}

Promise<RedissonCountDownLatchEntry> newPromise = newPromise();
final RedissonCountDownLatchEntry value = new RedissonCountDownLatchEntry(newPromise);
value.aquire();

Promise<Boolean> newPromise = newPromise();
final RedissonCountDownLatchEntry value = new RedissonCountDownLatchEntry(newPromise);
value.aquire();
RedissonCountDownLatchEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value);
if (oldValue != null) {
Promise<Boolean> oldPromise = aquire();
if (oldPromise == null) {
return subscribe();
RedissonCountDownLatchEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value);
if (oldValue != null) {
oldValue.aquire();
return oldValue.getPromise();
}
return oldPromise;

RedisPubSubListener<Integer> listener = createListener(value);

commandExecutor.getConnectionManager().subscribe(listener, getChannelName());
return newPromise;
}
}

private RedisPubSubListener<Integer> createListener(final RedissonCountDownLatchEntry value) {
RedisPubSubListener<Integer> listener = new BaseRedisPubSubListener<Integer>() {

@Override
Expand All @@ -89,61 +97,32 @@ public void onMessage(String channel, Integer message) {

@Override
public boolean onStatus(PubSubType type, String channel) {
if (channel.equals(getChannelName()) && !value.getPromise().isSuccess()) {
value.getPromise().setSuccess(true);
if (channel.equals(getChannelName()) && !value.getPromise().isSuccess()
&& type == PubSubType.SUBSCRIBE) {
value.getPromise().setSuccess(value);
return true;
}
return false;
}

};

synchronized (ENTRIES) {
commandExecutor.getConnectionManager().subscribe(listener, getChannelName());
}
return newPromise;
}

private void unsubscribe() {
while (true) {
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
if (entry == null) {
return;
}
RedissonCountDownLatchEntry newEntry = new RedissonCountDownLatchEntry(entry);
newEntry.release();
if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
if (newEntry.isFree()
&& ENTRIES.remove(getEntryName(), newEntry)) {
synchronized (ENTRIES) {
// maybe added during subscription
if (!ENTRIES.containsKey(getEntryName())) {
commandExecutor.getConnectionManager().unsubscribe(getChannelName());
}
}
}
return;
}
}
return listener;
}

private Promise<Boolean> aquire() {
while (true) {
RedissonCountDownLatchEntry entry = ENTRIES.get(getEntryName());
if (entry != null) {
RedissonCountDownLatchEntry newEntry = new RedissonCountDownLatchEntry(entry);
newEntry.aquire();
if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
return newEntry.getPromise();
private void unsubscribe(RedissonCountDownLatchEntry entry) {
synchronized (ENTRIES) {
if (entry.release() == 0) {
// just an assertion
boolean removed = ENTRIES.remove(getEntryName()) == entry;
if (removed) {
commandExecutor.getConnectionManager().unsubscribe(getChannelName());
}
} else {
return null;
}
}
}

public void await() throws InterruptedException {
Future<Boolean> promise = subscribe();
Future<RedissonCountDownLatchEntry> promise = subscribe();
try {
promise.await();

Expand All @@ -155,14 +134,14 @@ public void await() throws InterruptedException {
}
}
} finally {
unsubscribe();
unsubscribe(promise.getNow());
}
}


@Override
public boolean await(long time, TimeUnit unit) throws InterruptedException {
Future<Boolean> promise = subscribe();
Future<RedissonCountDownLatchEntry> promise = subscribe();
try {
if (!promise.await(time, unit)) {
return false;
Expand All @@ -186,7 +165,7 @@ public boolean await(long time, TimeUnit unit) throws InterruptedException {

return true;
} finally {
unsubscribe();
unsubscribe(promise.getNow());
}
}

Expand Down
56 changes: 12 additions & 44 deletions src/main/java/org/redisson/RedissonCountDownLatchEntry.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,69 +15,37 @@
*/
package org.redisson;

import io.netty.util.concurrent.Promise;

import org.redisson.misc.ReclosableLatch;

import io.netty.util.concurrent.Promise;

public class RedissonCountDownLatchEntry {

private int counter;

private final ReclosableLatch latch;
private final Promise<Boolean> promise;

public RedissonCountDownLatchEntry(RedissonCountDownLatchEntry source) {
counter = source.counter;
latch = source.latch;
promise = source.promise;
}

public RedissonCountDownLatchEntry(Promise<Boolean> promise) {
private final Promise<RedissonCountDownLatchEntry> promise;

public RedissonCountDownLatchEntry(Promise<RedissonCountDownLatchEntry> promise) {
super();
this.latch = new ReclosableLatch();
this.promise = promise;
}

public boolean isFree() {
return counter == 0;
}


public void aquire() {
counter++;
}
public void release() {
counter--;

public int release() {
return --counter;
}
public Promise<Boolean> getPromise() {

public Promise<RedissonCountDownLatchEntry> getPromise() {
return promise;
}

public ReclosableLatch getLatch() {
return latch;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + counter;
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
RedissonCountDownLatchEntry other = (RedissonCountDownLatchEntry) obj;
if (counter != other.counter)
return false;
return true;
}

}
108 changes: 42 additions & 66 deletions src/main/java/org/redisson/RedissonLock.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,26 +58,14 @@ protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) {
this.id = id;
}

private void unsubscribe() {
while (true) {
RedissonLockEntry entry = ENTRIES.get(getEntryName());
if (entry == null) {
return;
}

RedissonLockEntry newEntry = new RedissonLockEntry(entry);
newEntry.release();
if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
if (newEntry.isFree()
&& ENTRIES.remove(getEntryName(), newEntry)) {
synchronized (ENTRIES) {
// maybe added during subscription
if (!ENTRIES.containsKey(getEntryName())) {
commandExecutor.getConnectionManager().unsubscribe(getChannelName());
}
}
private void unsubscribe(RedissonLockEntry entry) {
synchronized (ENTRIES) {
if (entry.release() == 0) {
// just an assertion
boolean removed = ENTRIES.remove(getEntryName()) == entry;
if (removed) {
commandExecutor.getConnectionManager().unsubscribe(getChannelName());
}
return;
}
}
}
Expand All @@ -86,63 +74,49 @@ private String getEntryName() {
return id + ":" + getName();
}

private Promise<Boolean> aquire() {
while (true) {
private Future<RedissonLockEntry> subscribe() {
synchronized (ENTRIES) {
RedissonLockEntry entry = ENTRIES.get(getEntryName());
if (entry == null) {
return null;
}

RedissonLockEntry newEntry = new RedissonLockEntry(entry);
newEntry.aquire();
if (ENTRIES.replace(getEntryName(), entry, newEntry)) {
return newEntry.getPromise();
if (entry != null) {
entry.aquire();
return entry.getPromise();
}
}
}

private Future<Boolean> subscribe() {
Promise<Boolean> promise = aquire();
if (promise != null) {
return promise;
}
Promise<RedissonLockEntry> newPromise = newPromise();
final RedissonLockEntry value = new RedissonLockEntry(newPromise);
value.aquire();

Promise<Boolean> newPromise = newPromise();
final RedissonLockEntry value = new RedissonLockEntry(newPromise);
value.aquire();
RedissonLockEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value);
if (oldValue != null) {
Promise<Boolean> oldPromise = aquire();
if (oldPromise == null) {
return subscribe();
RedissonLockEntry oldValue = ENTRIES.putIfAbsent(getEntryName(), value);
if (oldValue != null) {
oldValue.aquire();
return oldValue.getPromise();
}
return oldPromise;
}

RedisPubSubListener<Integer> listener = new BaseRedisPubSubListener<Integer>() {
RedisPubSubListener<Integer> listener = new BaseRedisPubSubListener<Integer>() {

@Override
public void onMessage(String channel, Integer message) {
if (message.equals(unlockMessage) && getChannelName().equals(channel)) {
value.getLatch().release();
@Override
public void onMessage(String channel, Integer message) {
if (message.equals(unlockMessage) && getChannelName().equals(channel)) {
value.getLatch().release();
}
}
}

@Override
public boolean onStatus(PubSubType type, String channel) {
if (channel.equals(getChannelName()) && !value.getPromise().isSuccess()) {
value.getPromise().setSuccess(true);
return true;
@Override
public boolean onStatus(PubSubType type, String channel) {
if (channel.equals(getChannelName())
&& !value.getPromise().isSuccess()
&& type == PubSubType.SUBSCRIBE) {
value.getPromise().setSuccess(value);
return true;
}
return false;
}
return false;
}

};
};

synchronized (ENTRIES) {
commandExecutor.getConnectionManager().subscribe(listener, getChannelName());
return newPromise;
}
return newPromise;
}

private String getChannelName() {
Expand Down Expand Up @@ -186,7 +160,8 @@ public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedE
return;
}

subscribe().awaitUninterruptibly();
Future<RedissonLockEntry> future = subscribe();
future.awaitUninterruptibly();

try {
while (true) {
Expand All @@ -209,7 +184,7 @@ public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedE
}
}
} finally {
unsubscribe();
unsubscribe(future.getNow());
}
}

Expand Down Expand Up @@ -290,7 +265,8 @@ public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws Inte
return true;
}

if (!subscribe().awaitUninterruptibly(time, TimeUnit.MILLISECONDS)) {
Future<RedissonLockEntry> future = subscribe();
if (!future.awaitUninterruptibly(time, TimeUnit.MILLISECONDS)) {
return false;
}

Expand Down Expand Up @@ -325,7 +301,7 @@ public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws Inte
}
return true;
} finally {
unsubscribe();
unsubscribe(future.getNow());
}
}

Expand Down
Loading

0 comments on commit 8b587cf

Please sign in to comment.