Skip to content

Commit

Permalink
camellia-redis-proxy support multi-read-resources while rw_separate
Browse files Browse the repository at this point in the history
  • Loading branch information
nxttl-ucas committed Oct 30, 2020
1 parent a2d2b40 commit f2f5352
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.netease.nim.camellia.core.util;

import java.util.Objects;

/**
*
* Created by caojiajun on 2020/4/10.
*/
public class CamelliaPair<T, W> {
private T first;
private W second;

public CamelliaPair() {
}

public CamelliaPair(T first, W second) {
this.first = first;
this.second = second;
}

public T getFirst() {
return first;
}

public void setFirst(T first) {
this.first = first;
}

public W getSecond() {
return second;
}

public void setSecond(W second) {
this.second = second;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CamelliaPair<?, ?> camelliaPair = (CamelliaPair<?, ?>) o;
return Objects.equals(first, camelliaPair.first) &&
Objects.equals(second, camelliaPair.second);
}

@Override
public int hashCode() {
return Objects.hash(first, second);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ public class ResourceChooser {
private final ResourceTable resourceTable;
private final ProxyEnv proxyEnv;

private List<Resource> readResources = null;
private Resource firstReadResource = null;
private CamelliaPair<Boolean, List<Resource>> readResources = null;
private Resource readResource = null;
private List<Resource> writeResources = null;

private boolean bucketSizeIs2Power = false;
Expand All @@ -45,37 +45,43 @@ public Set<Resource> getAllResources() {
return allResources;
}

public Resource getFirstReadResource(byte[]... shadingParam) {
if (firstReadResource != null) return firstReadResource;
return getReadResources(shadingParam).get(0);
public Resource getReadResource(byte[]... shadingParam) {
if (readResource != null) return readResource;
ResourceTable.Type type = resourceTable.getType();
if (type == ResourceTable.Type.SIMPLE) {
CamelliaPair<Boolean, List<Resource>> readResources = _getReadResources(shadingParam);
if (readResources == null) return null;
List<Resource> list = readResources.getSecond();
if (readResources.getFirst() && list.size() > 1) {
int index = ThreadLocalRandom.current().nextInt(list.size());
return list.get(index);
} else {
readResource = list.get(0);
return readResource;
}
} else {
CamelliaPair<Boolean, List<Resource>> readResources = _getReadResources(shadingParam);
if (readResources == null) return null;
List<Resource> list = readResources.getSecond();
if (readResources.getFirst() && list.size() > 1) {
int index = ThreadLocalRandom.current().nextInt(list.size());
return list.get(index);
} else {
return list.get(0);
}
}
}

public List<Resource> getReadResources(byte[]... shadingParam) {
private CamelliaPair<Boolean, List<Resource>> _getReadResources(byte[]... shadingParam) {
if (readResources != null) {
firstReadResource = readResources.get(0);
return readResources;
}
ResourceTable.Type type = resourceTable.getType();
if (type == ResourceTable.Type.SIMPLE) {
ResourceTable.SimpleTable simpleTable = resourceTable.getSimpleTable();
ResourceOperation resourceOperation = simpleTable.getResourceOperation();
List<Resource> readResources = getReadResourcesFromOperation(resourceOperation);
if (simpleTable.getResourceOperation().getType() == ResourceOperation.Type.SIMPLE) {
this.readResources = readResources;
}
if (simpleTable.getResourceOperation().getType() == ResourceOperation.Type.RW_SEPARATE) {
ResourceReadOperation readOperation = simpleTable.getResourceOperation().getReadOperation();
if (readOperation.getType() == ResourceReadOperation.Type.SIMPLE) {
this.readResources = readResources;
} else if (readOperation.getType() == ResourceReadOperation.Type.RANDOM) {
if (readOperation.getReadResources().size() == 1) {
this.readResources = readResources;
}
} else if (readOperation.getType() == ResourceReadOperation.Type.ORDER) {
this.readResources = readResources;
}
}
return readResources;
this.readResources = getReadResourcesFromOperation(resourceOperation);
return this.readResources;
} else {
int shadingCode = proxyEnv.getShadingFunc().shadingCode(shadingParam);
ResourceTable.ShadingTable shadingTable = resourceTable.getShadingTable();
Expand Down Expand Up @@ -106,25 +112,21 @@ public List<Resource> getWriteResources(byte[]... shadingParam) {
}
}

private List<Resource> getReadResourcesFromOperation(ResourceOperation resourceOperation) {
private CamelliaPair<Boolean, List<Resource>> getReadResourcesFromOperation(ResourceOperation resourceOperation) {
ResourceOperation.Type resourceOperationType = resourceOperation.getType();
if (resourceOperationType == ResourceOperation.Type.SIMPLE) {
return Collections.singletonList(resourceOperation.getResource());
return new CamelliaPair<>(false, Collections.singletonList(resourceOperation.getResource()));
} else if (resourceOperationType == ResourceOperation.Type.RW_SEPARATE) {
ResourceReadOperation readOperation = resourceOperation.getReadOperation();
ResourceReadOperation.Type readOperationType = readOperation.getType();
if (readOperationType == ResourceReadOperation.Type.SIMPLE) {
return Collections.singletonList(readOperation.getReadResource());
} else if (readOperationType == ResourceReadOperation.Type.RANDOM) {
int size = readOperation.getReadResources().size();
if (size == 1) {
return Collections.singletonList(readOperation.getReadResources().get(0));
}
int index = ThreadLocalRandom.current().nextInt(size);
return Collections.singletonList(readOperation.getReadResources().get(index));
return new CamelliaPair<>(false, Collections.singletonList(readOperation.getReadResource()));
} else if (readOperationType == ResourceReadOperation.Type.ORDER) {
List<Resource> readResources = readOperation.getReadResources();
return new ArrayList<>(readResources);
return new CamelliaPair<>(false, readResources);
} else if (readOperationType == ResourceReadOperation.Type.RANDOM) {
List<Resource> readResources = readOperation.getReadResources();
return new CamelliaPair<>(true, readResources);
}
}
throw new IllegalArgumentException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public List<CompletableFuture<Reply>> sendCommand(List<Command> commands) {
futureList.add(future);
continue;
}
Resource resource = resourceChooser.getFirstReadResource(Utils.EMPTY_ARRAY);
Resource resource = resourceChooser.getReadResource(Utils.EMPTY_ARRAY);
AsyncClient client = factory.get(resource.getUrl());
CompletableFuture<Reply> future;
switch (redisCommand) {
Expand Down Expand Up @@ -152,7 +152,7 @@ public List<CompletableFuture<Reply>> sendCommand(List<Command> commands) {
switch (redisCommand) {
case KEYS:
case SCAN:
Resource resource = resourceChooser.getFirstReadResource(Utils.EMPTY_ARRAY);
Resource resource = resourceChooser.getReadResource(Utils.EMPTY_ARRAY);
String url = resource.getUrl();
AsyncClient client = factory.get(url);
future = commandFlusher.sendCommand(client, command);
Expand Down Expand Up @@ -370,7 +370,7 @@ private CompletableFuture<Reply> xinfo(Command command, CommandFlusher commandFl
return future;
}
byte[] key = objects[2];
Resource resource = resourceChooser.getFirstReadResource(key);
Resource resource = resourceChooser.getReadResource(key);
AsyncClient client = factory.get(resource.getUrl());
CompletableFuture<Reply> future = commandFlusher.sendCommand(client, command);
incrRead(resource, command);
Expand Down Expand Up @@ -676,7 +676,7 @@ private CompletableFuture<Reply> readCommandWithDynamicKeyCount(Command command,
String url = null;
for (int i=start; i<=end; i++) {
byte[] key = objects[i];
Resource resource = resourceChooser.getFirstReadResource(key);
Resource resource = resourceChooser.getReadResource(key);
if (url != null && !url.equals(resource.getUrl())) {
CompletableFuture<Reply> completableFuture = new CompletableFuture<>();
completableFuture.complete(new ErrorReply("ERR keys in request not in same resources"));
Expand All @@ -688,7 +688,7 @@ private CompletableFuture<Reply> readCommandWithDynamicKeyCount(Command command,
incrRead(url, command);
return commandFlusher.sendCommand(client, command);
} else {
Resource resource = resourceChooser.getFirstReadResource(Utils.EMPTY_ARRAY);
Resource resource = resourceChooser.getReadResource(Utils.EMPTY_ARRAY);
AsyncClient client = factory.get(resource.getUrl());
incrRead(resource.getUrl(), command);
return commandFlusher.sendCommand(client, command);
Expand Down Expand Up @@ -898,7 +898,7 @@ public void run() {
}

private Resource getReadResource(byte[] key) {
return resourceChooser.getFirstReadResource(key);
return resourceChooser.getReadResource(key);
}

private List<Resource> getWriteResources(byte[] key) {
Expand All @@ -907,7 +907,7 @@ private List<Resource> getWriteResources(byte[] key) {

private Resource getReadResource(Command command) {
byte[] key = command.getObjects()[1];
return resourceChooser.getFirstReadResource(key);
return resourceChooser.getReadResource(key);
}

private List<Resource> getWriteResources(Command command) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1661,14 +1661,11 @@ public Jedis getReadJedis(byte[]... keys) {
ResourceChooser chooser = factory.getResourceChooser();
String url = null;
for (byte[] key : keys) {
List<Resource> readResources = chooser.getReadResources(key);
if (readResources == null || readResources.size() > 1) {
throw new CamelliaRedisException("not support while in multi-write mode");
}
if (url != null && !url.equalsIgnoreCase(readResources.get(0).getUrl())) {
Resource readResource = chooser.getReadResource(key);
if (url != null && !url.equalsIgnoreCase(readResource.getUrl())) {
throw new CamelliaRedisException("ERR keys in request not in same resources");
}
url = readResources.get(0).getUrl();
url = readResource.getUrl();
}
ICamelliaRedis redis = CamelliaRedisInitializr.init(new Resource(url), env);
return redis.getJedis(keys[0]);
Expand Down
1 change: 1 addition & 0 deletions update-zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* camellia-redis-proxy支持热key回调HotKeyCallback
* camellia-redis-proxy支持热key在proxy层的本地缓存(TODO)
* camellia-redis-proxy支持key/value等的自定义转换,可以用于透明的数据加密(TODO)
* camellia-redis-proxy支持配置读写分离时设置多个读地址
* CamelliaRedisTemplate支持获取原始Jedis

### 更新
Expand Down
1 change: 1 addition & 0 deletions update.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* camellia-redis-proxy support HotKeyCallback
* camellia-redis-proxy support hot key cache in proxy(TODO)
* camellia-redis-proxy support key/value custom transfer, you can use this feature in data encryption(TODO)
* camellia-redis-proxy support multi-read-resources while rw_separate
* CamelliaRedisTemplate support get original Jedis

### update
Expand Down

0 comments on commit f2f5352

Please sign in to comment.