Skip to content

Commit

Permalink
more work on reusing work node level data, now when recovering from p…
Browse files Browse the repository at this point in the history
…eers
  • Loading branch information
kimchy committed Jun 24, 2010
1 parent 57169d4 commit edf0075
Show file tree
Hide file tree
Showing 7 changed files with 254 additions and 94 deletions.
9 changes: 8 additions & 1 deletion config/logging.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
rootLogger: INFO, console, file
logger:
# log action execution errors for easier debugging
action : DEBUG
action: DEBUG

# gateway
#gateway: DEBUG
#index.gateway: DEBUG

# peer shard recovery
#index.shard.recovery: DEBUG

appender:
console:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ public synchronized void recover() throws IndexShardGatewayRecoveryException, Ig
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("recovery completed from ").append(shardGateway).append(", took [").append(stopWatch.totalTime()).append("], throttling_wait [").append(throttlingWaitTime.totalTime()).append("]\n");
sb.append(" index : number_of_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("], throttling_wait [").append(recoveryStatus.index().throttlingWaitTime()).append("]\n");
sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfExistingFiles()).append("] with total_size [").append(recoveryStatus.index().existingTotalSize()).append("]\n");
sb.append(" index : recovered_files [").append(recoveryStatus.index().numberOfFiles()).append("] with total_size [").append(recoveryStatus.index().totalSize()).append("], throttling_wait [").append(recoveryStatus.index().throttlingWaitTime()).append("]\n");
sb.append(" : reusing_files [").append(recoveryStatus.index().numberOfExistingFiles()).append("] with total_size [").append(recoveryStatus.index().existingTotalSize()).append("]\n");
sb.append(" translog : translog_id [").append(recoveryStatus.translog().translogId()).append("], number_of_operations [").append(recoveryStatus.translog().numberOfOperations()).append("] with total_size[").append(recoveryStatus.translog().totalSize()).append("]");
logger.debug(sb.toString());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,11 @@ private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecove
break;
}
}
indexShard.performRecovery(operations);
indexShard.performRecoveryPrepareForTranslog();
indexShard.performRecoveryOperations(operations);
indexShard.performRecoveryFinalization();

// clean all the other translogs
// clean all the other translog
for (Long translogIdToDelete : translogIds) {
if (!translogId.equals(translogIdToDelete)) {
try {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -400,35 +400,30 @@ private void innerDeleteByQuery(byte[] querySource, String queryParserName, Stri
}
}

public void performRecovery(Iterable<Translog.Operation> operations) throws ElasticSearchException {
/**
* After the store has been recovered, we need to start the engine in order to apply operations
*/
public void performRecoveryPrepareForTranslog() throws ElasticSearchException {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
engine.start();
applyTranslogOperations(operations);
}

public void performRecoveryFinalization() throws ElasticSearchException {
synchronized (mutex) {
logger.debug("Moved to state [STARTED] post recovery (from gateway)");
logger.debug("Moved to state [STARTED] post recovery (from another shard)");
state = IndexShardState.STARTED;
}
scheduleRefresherIfNeeded();
engine.refresh(new Engine.Refresh(true));
}

public void performRecovery(Translog.Snapshot snapshot, boolean phase3) throws ElasticSearchException {
public void performRecoveryOperations(Iterable<Translog.Operation> operations) throws ElasticSearchException {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
if (!phase3) {
// start the engine, but the shard is not started yet...
engine.start();
}
applyTranslogOperations(snapshot);
if (phase3) {
synchronized (mutex) {
logger.debug("Moved to state [STARTED] post recovery (from another shard)");
state = IndexShardState.STARTED;
}
scheduleRefresherIfNeeded();
}
applyTranslogOperations(operations);
}

private void applyTranslogOperations(Iterable<Translog.Operation> snapshot) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.io.IOException;

/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class TranslogStreams {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,10 @@ private RecoveryStatus.Translog recoverTranslog() throws IndexShardGatewayRecove
}
currentTranslogPartToWrite = index;

indexShard.performRecovery(operations);
indexShard.performRecoveryPrepareForTranslog();
indexShard.performRecoveryOperations(operations);
indexShard.performRecoveryFinalization();

return new RecoveryStatus.Translog(latestTranslogId, operations.size(), new ByteSizeValue(size, ByteSizeUnit.BYTES));
} catch (Exception e) {
throw new IndexShardGatewayRecoveryException(shardId(), "Failed to perform recovery of translog", e);
Expand Down

0 comments on commit edf0075

Please sign in to comment.