Skip to content

Commit

Permalink
Fix duplicated locks after sync from storage (apache#4521)
Browse files Browse the repository at this point in the history
* Fix duplicated locks after sync from storage

* Remove unnecessary table creation
  • Loading branch information
jihoonson authored and fjy committed Jul 11, 2017
1 parent 98b1385 commit 6d2df2a
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package io.druid.indexing.overlord;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
Expand Down Expand Up @@ -138,30 +139,37 @@ public int compare(Pair<Task, TaskLock> left, Pair<Task, TaskLock> right)
log.warn("WTF?! Got lock with empty interval for task: %s", task.getId());
continue;
}
final Optional<TaskLock> acquiredTaskLock = tryLock(

final TaskLockPosse taskLockPosse = tryAddTaskToLockPosse(
task,
savedTaskLock.getInterval(),
Optional.of(savedTaskLock.getVersion())
);
if (acquiredTaskLock.isPresent() && savedTaskLock.getVersion().equals(acquiredTaskLock.get().getVersion())) {
taskLockCount ++;
log.info(
"Reacquired lock on interval[%s] version[%s] for task: %s",
savedTaskLock.getInterval(),
savedTaskLock.getVersion(),
task.getId()
);
} else if (acquiredTaskLock.isPresent()) {
taskLockCount ++;
log.info(
"Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task: %s",
savedTaskLock.getInterval(),
savedTaskLock.getVersion(),
acquiredTaskLock.get().getVersion(),
task.getId()
);
if (taskLockPosse != null) {
taskLockPosse.getTaskIds().add(task.getId());

final TaskLock taskLock = taskLockPosse.getTaskLock();

if (savedTaskLock.getVersion().equals(taskLock.getVersion())) {
taskLockCount ++;
log.info(
"Reacquired lock on interval[%s] version[%s] for task: %s",
savedTaskLock.getInterval(),
savedTaskLock.getVersion(),
task.getId()
);
} else {
taskLockCount ++;
log.info(
"Could not reacquire lock on interval[%s] version[%s] (got version[%s] instead) for task: %s",
savedTaskLock.getInterval(),
savedTaskLock.getVersion(),
taskLock.getVersion(),
task.getId()
);
}
} else {
log.info(
throw new ISE(
"Could not reacquire lock on interval[%s] version[%s] for task: %s",
savedTaskLock.getInterval(),
savedTaskLock.getVersion(),
Expand Down Expand Up @@ -264,14 +272,59 @@ private Optional<TaskLock> tryLock(final Task task, final Interval interval, fin
throw new ISE("Unable to grant lock to inactive Task [%s]", task.getId());
}
Preconditions.checkArgument(interval.toDurationMillis() > 0, "interval empty");

final TaskLockPosse posseToUse = tryAddTaskToLockPosse(task, interval, preferredVersion);
if (posseToUse != null) {
// Add to existing TaskLockPosse, if necessary
if (posseToUse.getTaskIds().add(task.getId())) {
log.info("Added task[%s] to TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId());

// Update task storage facility. If it fails, revoke the lock.
try {
taskStorage.addLock(task.getId(), posseToUse.getTaskLock());
return Optional.of(posseToUse.getTaskLock());
} catch(Exception e) {
log.makeAlert("Failed to persist lock in storage")
.addData("task", task.getId())
.addData("dataSource", posseToUse.getTaskLock().getDataSource())
.addData("interval", posseToUse.getTaskLock().getInterval())
.addData("version", posseToUse.getTaskLock().getVersion())
.emit();
unlock(task, interval);
return Optional.absent();
}
} else {
log.info("Task[%s] already present in TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId());
return Optional.of(posseToUse.getTaskLock());
}

} else {
return Optional.absent();
}
}
finally {
giant.unlock();
}

}

private TaskLockPosse tryAddTaskToLockPosse(
final Task task,
final Interval interval,
final Optional<String> preferredVersion
)
{
giant.lock();

try {
final String dataSource = task.getDataSource();
final List<TaskLockPosse> foundPosses = findLockPossesForInterval(dataSource, interval);
final TaskLockPosse posseToUse;

if (foundPosses.size() > 1) {

// Too many existing locks.
return Optional.absent();
return null;

} else if (foundPosses.size() == 1) {

Expand All @@ -287,7 +340,7 @@ private Optional<TaskLock> tryLock(final Task task, final Interval interval, fin
.addData("task", task.getId())
.addData("interval", interval);
}
return Optional.absent();
return null;
}

} else {
Expand Down Expand Up @@ -321,33 +374,11 @@ private Optional<TaskLock> tryLock(final Task task, final Interval interval, fin
log.info("Created new TaskLockPosse: %s", posseToUse);
}

// Add to existing TaskLockPosse, if necessary
if (posseToUse.getTaskIds().add(task.getId())) {
log.info("Added task[%s] to TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId());

// Update task storage facility. If it fails, revoke the lock.
try {
taskStorage.addLock(task.getId(), posseToUse.getTaskLock());
return Optional.of(posseToUse.getTaskLock());
} catch(Exception e) {
log.makeAlert("Failed to persist lock in storage")
.addData("task", task.getId())
.addData("dataSource", posseToUse.getTaskLock().getDataSource())
.addData("interval", posseToUse.getTaskLock().getInterval())
.addData("version", posseToUse.getTaskLock().getVersion())
.emit();
unlock(task, interval);
return Optional.absent();
}
} else {
log.info("Task[%s] already present in TaskLock[%s]", task.getId(), posseToUse.getTaskLock().getGroupId());
return Optional.of(posseToUse.getTaskLock());
}
return posseToUse;
}
finally {
giant.unlock();
}

}

/**
Expand Down Expand Up @@ -572,7 +603,19 @@ public void add(Task task)
}
}

private static class TaskLockPosse
@VisibleForTesting
Set<String> getActiveTasks()
{
return activeTasks;
}

@VisibleForTesting
Map<String, NavigableMap<Interval, TaskLockPosse>> getAllLocks()
{
return running;
}

static class TaskLockPosse
{
final private TaskLock taskLock;
final private Set<String> taskIds;
Expand All @@ -593,6 +636,31 @@ public Set<String> getTaskIds()
return taskIds;
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}

if (!getClass().equals(o.getClass())) {
return false;
}

final TaskLockPosse that = (TaskLockPosse) o;
if (!taskLock.equals(that.taskLock)) {
return false;
}

return taskIds.equals(that.taskIds);
}

@Override
public int hashCode()
{
return Objects.hashCode(taskLock, taskIds);
}

@Override
public String toString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,23 @@
package io.druid.indexing.overlord;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import io.druid.data.input.FirehoseFactory;
import io.druid.indexing.common.TaskLock;
import io.druid.indexing.common.TaskStatus;
import io.druid.indexing.common.config.TaskStorageConfig;
import io.druid.indexing.common.task.NoopTask;
import io.druid.indexing.common.task.Task;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.java.util.common.ISE;
import io.druid.java.util.common.StringUtils;
import io.druid.metadata.EntryExistsException;
import io.druid.metadata.SQLMetadataStorageActionHandlerFactory;
import io.druid.metadata.TestDerbyConnector;
import io.druid.server.initialization.ServerConfig;
import org.easymock.EasyMock;
import org.joda.time.Interval;
Expand All @@ -40,23 +47,39 @@
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public class TaskLockboxTest
{
private TaskStorage taskStorage;
@Rule
public final TestDerbyConnector.DerbyConnectorRule derby = new TestDerbyConnector.DerbyConnectorRule();

private final ObjectMapper objectMapper = new DefaultObjectMapper();
private ServerConfig serverConfig;
private TaskStorage taskStorage;
private TaskLockbox lockbox;

@Rule
public final ExpectedException exception = ExpectedException.none();

@Before
public void setUp()
public void setup()
{
taskStorage = new HeapMemoryTaskStorage(new TaskStorageConfig(null));
ServerConfig serverConfig = EasyMock.niceMock(ServerConfig.class);
EasyMock.expect(serverConfig.getMaxIdleTime()).andReturn(new Period(100));
final TestDerbyConnector derbyConnector = derby.getConnector();
derbyConnector.createTaskTables();
taskStorage = new MetadataTaskStorage(
derbyConnector,
new TaskStorageConfig(null),
new SQLMetadataStorageActionHandlerFactory(
derbyConnector,
derby.metadataTablesConfigSupplier().get(),
objectMapper
)
);
serverConfig = EasyMock.niceMock(ServerConfig.class);
EasyMock.expect(serverConfig.getMaxIdleTime()).andReturn(new Period(100)).anyTimes();
EasyMock.replay(serverConfig);

ServiceEmitter emitter = EasyMock.createMock(ServiceEmitter.class);
Expand Down Expand Up @@ -166,6 +189,37 @@ public void testTimeoutForLock() throws InterruptedException
lockbox.lock(task2, new Interval("2015-01-01/2015-01-15"));
}

@Test
public void testSyncFromStorage() throws EntryExistsException
{
final TaskLockbox originalBox = new TaskLockbox(taskStorage, serverConfig);
for (int i = 0; i < 5; i++) {
final Task task = NoopTask.create();
taskStorage.insert(task, TaskStatus.running(task.getId()));
originalBox.add(task);
Assert.assertTrue(
originalBox.tryLock(task, new Interval(StringUtils.format("2017-01-0%d/2017-01-0%d", (i + 1), (i + 2))))
.isPresent()
);
}

final List<TaskLock> beforeLocksInStorage = taskStorage.getActiveTasks().stream()
.flatMap(task -> taskStorage.getLocks(task.getId()).stream())
.collect(Collectors.toList());

final TaskLockbox newBox = new TaskLockbox(taskStorage, serverConfig);
newBox.syncFromStorage();

Assert.assertEquals(originalBox.getAllLocks(), newBox.getAllLocks());
Assert.assertEquals(originalBox.getActiveTasks(), newBox.getActiveTasks());

final List<TaskLock> afterLocksInStorage = taskStorage.getActiveTasks().stream()
.flatMap(task -> taskStorage.getLocks(task.getId()).stream())
.collect(Collectors.toList());

Assert.assertEquals(beforeLocksInStorage, afterLocksInStorage);
}

public static class SomeTask extends NoopTask {

public SomeTask(
Expand All @@ -190,5 +244,4 @@ public String getType()
public String getGroupId() { return "someGroupId";}

}

}

0 comments on commit 6d2df2a

Please sign in to comment.