Skip to content

Commit c9539e0

Browse files
committed
Ensure nodesMap updates are safe when topology changes.
This change much more closely (perhaps too defensively) couples the Config and the list of nodes. It also ensures that the Map<String, MemcachedNode> is less prone to concurrency problems and fixes a concurrency problem which allowed nodes to be removed when receiving a not-my-vbucket response. This indicates we need to make some changes to how the entire object model and config list are handled, but just to quickly fix a race found, tie together the config and node list when locating the right node. Testing for this is rather manual at the moment. Change-Id: Ida002fd1d510d20c432e77e01eefbc530d3b34b1 Reviewed-on: http://review.couchbase.org/8195 Tested-by: Matt Ingenthron <[email protected]> Reviewed-by: Michael Wiederhold <[email protected]> Reviewed-by: Matt Ingenthron <[email protected]>
1 parent 96d1d2e commit c9539e0

File tree

1 file changed

+59
-18
lines changed

1 file changed

+59
-18
lines changed

src/main/java/net/spy/memcached/vbucket/VBucketNodeLocator.java

Lines changed: 59 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,27 @@
22

33
import java.net.InetSocketAddress;
44
import java.util.Collection;
5+
import java.util.Collections;
56
import java.util.HashMap;
67
import java.util.Iterator;
78
import java.util.List;
89
import java.util.Map;
10+
import java.util.NoSuchElementException;
11+
import java.util.Set;
12+
import java.util.concurrent.atomic.AtomicReference;
913

1014
import net.spy.memcached.MemcachedNode;
1115
import net.spy.memcached.NodeLocator;
1216
import net.spy.memcached.compat.SpyObject;
1317
import net.spy.memcached.vbucket.config.Config;
18+
import net.spy.memcached.vbucket.config.ConfigDifference;
1419

1520
/**
1621
* Implementation of the {@link NodeLocator} interface that contains vbucket hashing methods
1722
*/
1823
public class VBucketNodeLocator extends SpyObject implements NodeLocator {
1924

20-
private Map<String, MemcachedNode> nodesMap;
21-
22-
private Config config;
25+
private final AtomicReference<TotalConfig> fullConfig;
2326

2427
/**
2528
* Construct a VBucketNodeLocator over the given JSON configuration string.
@@ -29,23 +32,32 @@ public class VBucketNodeLocator extends SpyObject implements NodeLocator {
2932
*/
3033
public VBucketNodeLocator(List<MemcachedNode> nodes, Config jsonConfig) {
3134
super();
32-
setNodes(nodes);
33-
setConfig(jsonConfig);
35+
fullConfig = new AtomicReference<TotalConfig>();
36+
fullConfig.set(new TotalConfig(jsonConfig, fillNodesEntries(nodes)));
3437
}
3538

3639
/**
3740
* {@inheritDoc}
3841
*/
3942
public MemcachedNode getPrimary(String k) {
43+
TotalConfig totConfig = fullConfig.get();
44+
Config config = totConfig.getConfig();
45+
Map<String, MemcachedNode> nodesMap = totConfig.getNodesMap();
4046
int vbucket = config.getVbucketByKey(k);
4147
int serverNumber = config.getMaster(vbucket);
4248
String server = config.getServer(serverNumber);
43-
// choose appropriate MemecachedNode according to config data
49+
// choose appropriate MemcachedNode according to config data
4450
MemcachedNode pNode = nodesMap.get(server);
4551
if (pNode == null) {
46-
getLogger().error("The node locator does not have a primary for key %s.", k);
52+
getLogger().error("The node locator does not have a primary for key %s. Wanted vbucket %s which should be on server %s.", k, vbucket, server);
53+
getLogger().error("List of nodes has %s entries:", nodesMap.size());
54+
Set<String> keySet = nodesMap.keySet();
55+
Iterator<String> iterator = keySet.iterator();
56+
while (iterator.hasNext()) {
57+
String anode = iterator.next();
58+
getLogger().error("MemcachedNode for %s is %s", anode, nodesMap.get(anode));
59+
}
4760
Collection<MemcachedNode> nodes = nodesMap.values();
48-
getLogger().error("MemcachedNode has %s entries:", nodesMap.size());
4961
for (MemcachedNode node : nodes) {
5062
getLogger().error(node);
5163
}
@@ -58,14 +70,16 @@ public MemcachedNode getPrimary(String k) {
5870
* {@inheritDoc}
5971
*/
6072
public Iterator<MemcachedNode> getSequence(String k) {
73+
Map<String, MemcachedNode> nodesMap = fullConfig.get().getNodesMap();
6174
return nodesMap.values().iterator();
6275
}
6376

6477
/**
6578
* {@inheritDoc}
6679
*/
6780
public Collection<MemcachedNode> getAll() {
68-
return this.nodesMap.values();
81+
Map<String, MemcachedNode> nodesMap = fullConfig.get().getNodesMap();
82+
return nodesMap.values();
6983
}
7084

7185
/**
@@ -74,9 +88,17 @@ public Collection<MemcachedNode> getAll() {
7488
public NodeLocator getReadonlyCopy() {
7589
return this;
7690
}
77-
public void updateLocator(final List<MemcachedNode> nodes, final Config conf) {
78-
setNodes(nodes);
79-
setConfig(conf);
91+
92+
public void updateLocator(final List<MemcachedNode> nodes, final Config newconf) {
93+
// we'll get a new config for various reasons we don't care about, so check if we do care
94+
Config current = fullConfig.get().getConfig();
95+
ConfigDifference compareTo = current.compareTo(newconf);
96+
if(compareTo.isSequenceChanged() || compareTo.getVbucketsChanges() > 0) {
97+
getLogger().debug("Updating configuration, received updated configuration with significant changes.");
98+
fullConfig.set(new TotalConfig(newconf, fillNodesEntries(nodes)));
99+
} else {
100+
getLogger().debug("Received updated configuration with insignificant changes.");
101+
}
80102
}
81103

82104
/**
@@ -85,10 +107,11 @@ public void updateLocator(final List<MemcachedNode> nodes, final Config conf) {
85107
* @return vbucket index
86108
*/
87109
public int getVBucketIndex(String key) {
88-
return config.getVbucketByKey(key);
110+
Config config = fullConfig.get().getConfig();
111+
return config.getVbucketByKey(key);
89112
}
90113

91-
private void setNodes(Collection<MemcachedNode> nodes) {
114+
private Map<String, MemcachedNode> fillNodesEntries(Collection<MemcachedNode> nodes) {
92115
HashMap<String, MemcachedNode> vbnodesMap = new HashMap<String, MemcachedNode>();
93116
getLogger().debug("Updating nodesMap in VBucketNodeLocator.");
94117
for (MemcachedNode node : nodes) {
@@ -101,12 +124,9 @@ private void setNodes(Collection<MemcachedNode> nodes) {
101124
vbnodesMap.put(hostname, node);
102125
}
103126

104-
this.nodesMap = vbnodesMap;
127+
return Collections.unmodifiableMap(vbnodesMap);
105128
}
106129

107-
private void setConfig(final Config config) {
108-
this.config = config;
109-
}
110130

111131
/**
112132
* Method returns the node that is not contained in the specified collection of the failed nodes
@@ -115,6 +135,8 @@ private void setConfig(final Config config) {
115135
* @return The first MemcachedNode which meets requirements
116136
*/
117137
public MemcachedNode getAlternative(String k, Collection<MemcachedNode> notMyVbucketNodes) {
138+
// it's safe to only copy the map here, only removing references found to be incorrect, and trying remaining
139+
Map<String, MemcachedNode> nodesMap = new HashMap<String,MemcachedNode>(fullConfig.get().getNodesMap());
118140
Collection<MemcachedNode> nodes = nodesMap.values();
119141
nodes.removeAll(notMyVbucketNodes);
120142
if (nodes.isEmpty()) {
@@ -123,4 +145,23 @@ public MemcachedNode getAlternative(String k, Collection<MemcachedNode> notMyVbu
123145
return nodes.iterator().next();
124146
}
125147
}
148+
149+
private class TotalConfig {
150+
private Config config;
151+
private Map<String, MemcachedNode> nodesMap;
152+
153+
public TotalConfig(Config newConfig, Map<String, MemcachedNode> newMap) {
154+
config = newConfig;
155+
nodesMap = Collections.unmodifiableMap(newMap);
156+
}
157+
158+
protected Config getConfig() {
159+
return config;
160+
}
161+
162+
protected Map<String, MemcachedNode> getNodesMap() {
163+
return nodesMap;
164+
}
165+
}
166+
126167
}

0 commit comments

Comments
 (0)