Skip to content

Commit

Permalink
Update memcached client for better concurrency in metrics. Also fixes…
Browse files Browse the repository at this point in the history
… another injection problem
  • Loading branch information
drcrallen committed Sep 22, 2015
1 parent 63a3a4a commit e6f07a8
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 61 deletions.
7 changes: 7 additions & 0 deletions server/src/main/java/io/druid/client/cache/Cache.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.base.Preconditions;
import com.google.common.primitives.Ints;
import com.metamx.common.StringUtils;
import com.metamx.emitter.service.ServiceEmitter;

import java.nio.ByteBuffer;
import java.util.Arrays;
Expand All @@ -39,6 +40,12 @@ public interface Cache

public boolean isLocal();

/**
* Custom metrics not covered by CacheStats may be emitted by this method.
* @param emitter The service emitter to emit on.
*/
public void doMonitor(ServiceEmitter emitter);

public class NamedKey
{
final public String namespace;
Expand Down
3 changes: 3 additions & 0 deletions server/src/main/java/io/druid/client/cache/CacheMonitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ public boolean doMonitor(ServiceEmitter emitter)
emitStats(emitter, "query/cache/total", currCacheStats, builder);

prevCacheStats = currCacheStats;

// Any custom cache statistics that need monitoring
cache.doMonitor(emitter);
return true;
}

Expand Down
9 changes: 8 additions & 1 deletion server/src/main/java/io/druid/client/cache/MapCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.metamx.emitter.service.ServiceEmitter;

import java.nio.ByteBuffer;
import java.util.Collections;
Expand Down Expand Up @@ -132,7 +133,7 @@ public void close(String namespace)
toRemove.add(next);
}
}
for(ByteBuffer key : toRemove) {
for (ByteBuffer key : toRemove) {
baseMap.remove(key);
}
}
Expand Down Expand Up @@ -163,4 +164,10 @@ public boolean isLocal()
{
return true;
}

@Override
public void doMonitor(ServiceEmitter emitter)
{
// No special monitoring
}
}
67 changes: 47 additions & 20 deletions server/src/main/java/io/druid/client/cache/MemcachedCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.base.Suppliers;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
Expand All @@ -33,7 +34,6 @@
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.metrics.AbstractMonitor;
import com.metamx.metrics.MonitorScheduler;
import io.druid.collections.LoadBalancingPool;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidResourceHolder;
Expand All @@ -58,6 +58,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -88,11 +89,11 @@ public String toString()
}
};

public static MemcachedCache create(final MemcachedCacheConfig config, MonitorScheduler emitter)
public static MemcachedCache create(final MemcachedCacheConfig config)
{
final ConcurrentMap<String, AtomicLong> counters = new ConcurrentHashMap<>();
final ConcurrentMap<String, AtomicLong> meters = new ConcurrentHashMap<>();
emitter.addMonitor(
final AbstractMonitor monitor =
new AbstractMonitor()
{
final AtomicReference<Map<String, Long>> priorValues = new AtomicReference<Map<String, Long>>(new HashMap<String, Long>());
Expand Down Expand Up @@ -134,8 +135,7 @@ private Map<String, Long> getCurrentValues()
}
return builder.build();
}
}
);
};
try {
LZ4Transcoder transcoder = new LZ4Transcoder(config.getMaxObjectSize());

Expand All @@ -152,12 +152,24 @@ private Map<String, Long> getCurrentValues()

final Predicate<String> interesting = new Predicate<String>()
{
// See net.spy.memcached.MemcachedConnection.registerMetrics()
private final Set<String> interestingMetrics = ImmutableSet.of(
"[MEM] Reconnecting Nodes (ReconnectQueue)",
//"[MEM] Shutting Down Nodes (NodesToShutdown)", // Busted
"[MEM] Request Rate: All",
"[MEM] Average Bytes written to OS per write",
"[MEM] Average Bytes read from OS per read",
"[MEM] Average Time on wire for operations (µs)",
"[MEM] Response Rate: All (Failure + Success + Retry)",
"[MEM] Response Rate: Retry",
"[MEM] Response Rate: Failure",
"[MEM] Response Rate: Success"
);

@Override
public boolean apply(@Nullable String input)
{
// See net.spy.memcached.MemcachedConnection.registerMetrics()
// in current version shutdown queue metric is borked
return input != null && !input.contains("Down");
return input != null && interestingMetrics.contains(input);
}
};

Expand All @@ -169,7 +181,7 @@ public void addCounter(String name)
if (!interesting.apply(name)) {
return;
}
counters.put(name, new AtomicLong(0L));
counters.putIfAbsent(name, new AtomicLong(0L));

if (log.isDebugEnabled()) {
log.debug("Add Counter [%s]", name);
Expand All @@ -179,13 +191,8 @@ public void addCounter(String name)
@Override
public void removeCounter(String name)
{
if (!interesting.apply(name)) {
return;
}
counters.remove(name);

if (log.isDebugEnabled()) {
log.debug("Remove Counter [%s]", name);
log.debug("Ignoring request to remove [%s]", name);
}
}

Expand Down Expand Up @@ -264,7 +271,10 @@ public void decrementCounter(String name, int amount)
@Override
public void addMeter(String name)
{
meters.put(name, new AtomicLong(0L));
if (!interesting.apply(name)) {
return;
}
meters.putIfAbsent(name, new AtomicLong(0L));
if (log.isDebugEnabled()) {
log.debug("Adding meter [%s]", name);
}
Expand All @@ -273,15 +283,20 @@ public void addMeter(String name)
@Override
public void removeMeter(String name)
{
meters.remove(name);
if (!interesting.apply(name)) {
return;
}
if (log.isDebugEnabled()) {
log.debug("Removing meter [%s]", name);
log.debug("Ignoring request to remove meter [%s]", name);
}
}

@Override
public void markMeter(String name)
{
if (!interesting.apply(name)) {
return;
}
AtomicLong meter = meters.get(name);
if (meter == null) {
meters.putIfAbsent(name, new AtomicLong(0L));
Expand Down Expand Up @@ -360,7 +375,7 @@ public MemcachedClientIF get()
);
}

return new MemcachedCache(clientSupplier, config);
return new MemcachedCache(clientSupplier, config, monitor);
}
catch (IOException e) {
throw Throwables.propagate(e);
Expand All @@ -377,16 +392,22 @@ public MemcachedClientIF get()
private final AtomicLong missCount = new AtomicLong(0);
private final AtomicLong timeoutCount = new AtomicLong(0);
private final AtomicLong errorCount = new AtomicLong(0);
private final AbstractMonitor monitor;


MemcachedCache(Supplier<ResourceHolder<MemcachedClientIF>> client, MemcachedCacheConfig config)
MemcachedCache(
Supplier<ResourceHolder<MemcachedClientIF>> client,
MemcachedCacheConfig config,
AbstractMonitor monitor
)
{
Preconditions.checkArgument(
config.getMemcachedPrefix().length() <= MAX_PREFIX_LENGTH,
"memcachedPrefix length [%d] exceeds maximum length [%d]",
config.getMemcachedPrefix().length(),
MAX_PREFIX_LENGTH
);
this.monitor = monitor;
this.timeout = config.getTimeout();
this.expiration = config.getExpiration();
this.client = client;
Expand Down Expand Up @@ -587,4 +608,10 @@ public boolean isLocal()
{
return false;
}

@Override
public void doMonitor(ServiceEmitter emitter)
{
monitor.doMonitor(emitter);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,11 @@

package io.druid.client.cache;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.metamx.metrics.MonitorScheduler;

public class MemcachedCacheProvider extends MemcachedCacheConfig implements CacheProvider
{
private final MonitorScheduler emitter;

@JsonCreator
public MemcachedCacheProvider(
@JacksonInject
MonitorScheduler emitter
)
{
this.emitter = emitter;
}

@Override
public Cache get()
{
return MemcachedCache.create(this, emitter);
return MemcachedCache.create(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public int getExpiration()
{
return 3600;
}
}
}, MemcachedCacheTest.NOOP_MONITOR
);

randBytes = new byte[objectSize * 1024];
Expand Down
39 changes: 16 additions & 23 deletions server/src/test/java/io/druid/client/cache/MemcachedCacheTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import com.metamx.emitter.core.Emitter;
import com.metamx.emitter.core.Event;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.metrics.Monitor;
import com.metamx.metrics.AbstractMonitor;
import com.metamx.metrics.MonitorScheduler;
import io.druid.collections.ResourceHolder;
import io.druid.collections.StupidResourceHolder;
Expand All @@ -56,7 +56,6 @@
import net.spy.memcached.ops.OperationStatus;
import net.spy.memcached.transcoders.SerializingTranscoder;
import net.spy.memcached.transcoders.Transcoder;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -84,6 +83,14 @@ public class MemcachedCacheTest
private static final Logger log = new Logger(MemcachedCacheTest.class);
private static final byte[] HI = "hiiiiiiiiiiiiiiiiiii".getBytes();
private static final byte[] HO = "hooooooooooooooooooo".getBytes();
protected static final AbstractMonitor NOOP_MONITOR = new AbstractMonitor()
{
@Override
public boolean doMonitor(ServiceEmitter emitter)
{
return false;
}
};
private MemcachedCache cache;
private final MemcachedCacheConfig memcachedCacheConfig = new MemcachedCacheConfig()
{
Expand Down Expand Up @@ -119,7 +126,8 @@ public void setUp() throws Exception
Suppliers.<ResourceHolder<MemcachedClientIF>>ofInstance(
StupidResourceHolder.<MemcachedClientIF>create(new MockMemcachedClient())
),
memcachedCacheConfig
memcachedCacheConfig,
NOOP_MONITOR
);
}

Expand Down Expand Up @@ -191,20 +199,8 @@ public void configure(Binder binder)
@Test
public void testMonitor() throws Exception
{
MonitorScheduler monitorScheduler = EasyMock.createNiceMock(MonitorScheduler.class);
Capture<? extends Monitor> monitorCapture = Capture.newInstance();
monitorScheduler.addMonitor(EasyMock.capture(monitorCapture));
EasyMock.expectLastCall().once();
EasyMock.replay(monitorScheduler);
MemcachedCache cache = MemcachedCache.create(memcachedCacheConfig, monitorScheduler);
EasyMock.verify(monitorScheduler);

Assert.assertTrue(monitorCapture.hasCaptured());
final Monitor monitor = monitorCapture.getValue();
monitor.start();
Assert.assertNotNull(monitor);

Emitter emitter = EasyMock.createNiceMock(Emitter.class);
final MemcachedCache cache = MemcachedCache.create(memcachedCacheConfig);
final Emitter emitter = EasyMock.createNiceMock(Emitter.class);
final Collection<Event> events = new ArrayList<>();
final ServiceEmitter serviceEmitter = new ServiceEmitter("service", "host", emitter)
{
Expand All @@ -217,7 +213,7 @@ public void emit(Event event)

while (events.isEmpty()) {
Thread.sleep(memcachedCacheConfig.getTimeout());
Assert.assertTrue(monitor.monitor(serviceEmitter));
cache.doMonitor(serviceEmitter);
}

Assert.assertFalse(events.isEmpty());
Expand Down Expand Up @@ -288,20 +284,17 @@ public int get(Cache cache, String namespace, byte[] key)
class MemcachedProviderWithConfig extends MemcachedCacheProvider
{
private final MemcachedCacheConfig config;
private final MonitorScheduler emitter;

@Inject
public MemcachedProviderWithConfig(MonitorScheduler emitter, MemcachedCacheConfig config)
public MemcachedProviderWithConfig(MemcachedCacheConfig config)
{
super(emitter);
this.emitter = emitter;
this.config = config;
}

@Override
public Cache get()
{
return MemcachedCache.create(config, emitter);
return MemcachedCache.create(config);
}
}

Expand Down

0 comments on commit e6f07a8

Please sign in to comment.