Skip to content

Commit

Permalink
Merge pull request searchbox-io#38 from tootedom/master
Browse files Browse the repository at this point in the history
When Discovery is enabled, the list of servers passed to the client are not published in a thread safe manner
  • Loading branch information
ferhatsb committed May 7, 2013
2 parents 47ce90e + 74f2dd1 commit 7921a39
Show file tree
Hide file tree
Showing 12 changed files with 459 additions and 19 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ target/
.project
.settings
.classpath
.DS_Store
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<junit.version>4.11</junit.version>
<gson.version>2.2.3</gson.version>
<elasticsearch.version>0.90.0.RC2</elasticsearch.version>
<elasticsearch.version>0.90.0</elasticsearch.version>
<log4j.version>1.2.16</log4j.version>
<httpComponent.version>4.2.3</httpComponent.version>
<httpClient.version>4.2.3</httpClient.version>
Expand Down
40 changes: 26 additions & 14 deletions src/main/java/io/searchbox/client/AbstractJestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@


import io.searchbox.Action;
import io.searchbox.client.config.RoundRobinServerList;
import io.searchbox.client.config.ServerList;
import io.searchbox.client.config.discovery.NodeChecker;

import java.util.Iterator;
import java.util.LinkedHashSet;

import io.searchbox.client.config.exception.NoServerConfiguredException;
import io.searchbox.client.util.PaddedAtomicReference;
import org.apache.http.StatusLine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -24,9 +28,7 @@ public abstract class AbstractJestClient implements JestClient {

final static Logger log = LoggerFactory.getLogger(AbstractJestClient.class);

public LinkedHashSet<String> servers;

private Iterator<String> roundRobinIterator;
private final PaddedAtomicReference<ServerList> listOfServers = new PaddedAtomicReference<ServerList>();

private NodeChecker nodeChecker;

Expand All @@ -35,23 +37,34 @@ public void setNodeChecker(NodeChecker nodeChecker) {
}

public LinkedHashSet<String> getServers() {
return servers;
ServerList server = listOfServers.get();
if(server!=null) return new LinkedHashSet<String>(server.getServers());
else return null;
}

public void setServers(LinkedHashSet<String> servers) {
this.servers = servers;
this.roundRobinIterator = Iterators.cycle(servers);
try {
RoundRobinServerList serverList = new RoundRobinServerList(servers);
listOfServers.set(serverList);
} catch(NoServerConfiguredException noServers) {
listOfServers.set(null);
log.warn("No servers are currently available for the client to talk to.");
}
}

public void setServers(ServerList list) {
listOfServers.set(list);
}

public void shutdownClient() {
if (null != nodeChecker)
nodeChecker.stop();
}

protected synchronized String getElasticSearchServer() {
if (roundRobinIterator.hasNext())
return roundRobinIterator.next();
throw new RuntimeException("No Server is assigned to client to connect");
protected String getElasticSearchServer() {
ServerList serverList = listOfServers.get();
if(serverList!=null) return serverList.getServer();
else throw new NoServerConfiguredException("No Server is assigned to client to connect");
}

protected JestResult createNewElasticSearchResult(String json, StatusLine statusLine, Action clientRequest) {
Expand Down Expand Up @@ -87,12 +100,11 @@ protected JsonObject convertJsonStringToMapObject(String jsonTxt) {
}

protected String getRequestURL(String elasticSearchServer, String uri) {
String serverUrl = elasticSearchServer.endsWith("/") ?
elasticSearchServer.substring(0, elasticSearchServer.length() - 1) : elasticSearchServer;
StringBuilder sb = new StringBuilder(elasticSearchServer);

StringBuilder sb = new StringBuilder(serverUrl);
if(uri.length()>0 && uri.charAt(0)=='/') sb.append(uri);
else sb.append('/').append(uri);

sb.append(uri.startsWith("/") ? uri : "/" + uri);
return sb.toString();
}
}
162 changes: 162 additions & 0 deletions src/main/java/io/searchbox/client/config/RoundRobinServerList.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
package io.searchbox.client.config;


import io.searchbox.client.config.exception.NoServerConfiguredException;
import io.searchbox.client.util.PaddedAtomicInteger;

import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;

/**
*
* Uses an String[] Array to loop through the given list of servers in a round robin fashion
*
*/
public class RoundRobinServerList implements ServerList {

private final Set<String> servers;
private final String[] serverList;
private final int wrapPoint;
private final CircularIncrement incrementer;


public RoundRobinServerList(Set<String> servers) {
this(servers,-1,true);
}

public RoundRobinServerList(Set<String> servers,boolean strictOrdering) {
this(servers,-1, strictOrdering);
}

public RoundRobinServerList(Set<String> servers,int startAt) {
this(servers,startAt, true);
}

public RoundRobinServerList(Set<String> servers,int startAt,boolean strictOrdering) throws NoServerConfiguredException {
if(servers.size()==0) throw new NoServerConfiguredException("No Server is assigned to client to connect");
this.servers = servers;
wrapPoint = servers.size();
int i =0;
serverList = new String[wrapPoint];
for(String elasticSearchServer: servers) {
serverList[i++] = elasticSearchServer.endsWith("/") ?
elasticSearchServer.substring(0, elasticSearchServer.length() - 1) : elasticSearchServer;
}

int nextPowerOfTwo = ceilingNextPowerOfTwo(wrapPoint);

if(nextPowerOfTwo == wrapPoint) {
incrementer = new PowerOfTwoIncrement(nextPowerOfTwo,startAt);
} else if(strictOrdering) {
incrementer = new StrictOrderModulusIncrement(wrapPoint,startAt);
} else {
incrementer = new ModulusIncrement(wrapPoint,startAt);
}
}

@Override
public Set getServers() {
return servers;
}

@Override
public String getServer() {
return serverList[incrementer.nextVal()];
}


/**
* Calculate the next power of 2, greater than or equal to x.<p>
* From Hacker's Delight, Chapter 3, Harry S. Warren Jr.
*
* @param x Value to round up
* @return The next power of 2 from x inclusive
*/
public static int ceilingNextPowerOfTwo(final int x)
{
return 1 << (32 - Integer.numberOfLeadingZeros(x - 1));
}

private interface CircularIncrement {
public int nextVal();
}

/**
* Uses power of two bit masking to rotate around the ring buffer of the ServerList
*/
private class PowerOfTwoIncrement implements CircularIncrement {

private final AtomicInteger nextPointer;

private final int mask;
public PowerOfTwoIncrement(int sizeOfArray, int startPosition) {
nextPointer = new PaddedAtomicInteger(startPosition);
mask = sizeOfArray-1;
}

@Override
public int nextVal() {
return nextPointer.incrementAndGet()&mask;
}
}

/**
* Uses modulus operator "%" to loop around the array.
* When the MAX_VALUE of int has been hit, the rotation around the array will loop
* until it goes back into +p've values; when it will be in order again.. i.e.
*
* i.e. lets say MAX_VALUE will return element 2 in the array.
* MAX_VALUE+1 will return element 3
* MAX_VALUE+2 will return element 2 in the array.
* MAX_VALUE+3 will return element 1 in the array
*/
private class ModulusIncrement implements CircularIncrement {
private final int mask;
private final AtomicInteger nextPointer;


public ModulusIncrement(int sizeOfArray,int startPosition) {
nextPointer = new PaddedAtomicInteger(startPosition);
mask = sizeOfArray;
}

@Override
public int nextVal() {
return Math.abs(nextPointer.incrementAndGet()%mask);
}
}

/**
* Uses modulus operator "%" to loop around the array.
* When the MAX_VALUE of int has been hit, the rotation around the array will loop
* continue in the same direction. This is done by using a CAS LOOP to set the next value
* if MAX_VALUE has been reached.
*
*/
private class StrictOrderModulusIncrement implements CircularIncrement {
private final int mask;
private final AtomicInteger nextPointer;


public StrictOrderModulusIncrement(int sizeOfArray,int startPosition) {
nextPointer = new PaddedAtomicInteger(startPosition);
mask = sizeOfArray;
}

@Override
public int nextVal() {
int currentVal;
int nextVal;
do {
currentVal = nextPointer.get();
nextVal = currentVal+1;
if(currentVal==Integer.MAX_VALUE) {
int prev = ((currentVal%mask)+1);
nextVal= (prev > mask) ? 0 : prev;
}
} while(!nextPointer.compareAndSet(currentVal,nextVal));

return nextVal%mask;
}
}
}
25 changes: 25 additions & 0 deletions src/main/java/io/searchbox/client/config/ServerList.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.searchbox.client.config;

import java.util.Set;

/**
* Provides the interface to iterating over a set of
* server addresses.
*/
public interface ServerList {

/**
* Returns the "next" server the client should talk to. The next item is
* determined by the implementation.
* @return
*/
public String getServer();

/**
* Returns the set of servers from which the ServerList implementation
* was generated. This method is only here to satisfy unit test
* @return
*/
public Set getServers();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.searchbox.client.config.exception;

/**
* Exception that specified that the client has no
* knowledge of an elasticsearch node to communicate with.
*
*/
public class NoServerConfiguredException extends RuntimeException {

static final long serialVersionUID = -7034897190745766912L;

/**
* Constructs a new runtime exception with the specified detail message.
*
* @param message the detail message. The detail message is saved for
* later retrieval by the {@link #getMessage()} method.
*/
public NoServerConfiguredException(String message) {
super(message);
}

}
36 changes: 36 additions & 0 deletions src/main/java/io/searchbox/client/util/PaddedAtomicInteger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2012 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.searchbox.client.util;

import java.util.concurrent.atomic.AtomicInteger;

public class PaddedAtomicInteger extends AtomicInteger
{
public PaddedAtomicInteger()
{
}

public PaddedAtomicInteger(final int initialValue)
{
super(initialValue);
}

public long $sum$() {
return p2 + p3 + p4 + p5 + p6 + p7;
}

public volatile long p1, p2, p3, p4, p5, p6, p7 = 7;
}
28 changes: 28 additions & 0 deletions src/main/java/io/searchbox/client/util/PaddedAtomicReference.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright (c) 2011. Peter Lawrey
*
* "THE BEER-WARE LICENSE" (Revision 128)
* As long as you retain this notice you can do whatever you want with this stuff.
* If we meet some day, and you think this stuff is worth it, you can buy me a beer in return
* There is no warranty.
*/

package io.searchbox.client.util;

import java.util.concurrent.atomic.AtomicReference;

public class PaddedAtomicReference<T> extends AtomicReference<T> {
public long p2, p3, p4, p5, p6, p7;

public PaddedAtomicReference() {
super();
}

public PaddedAtomicReference(T t) {
super(t);
}

public long sumPadded() {
return p2 + p3 + p4 + p5 + p6 + p7;
}
}
Loading

0 comments on commit 7921a39

Please sign in to comment.