Skip to content

Commit

Permalink
bobo-contrib: Fix for occassionally corrupt geo index; Catch the erro…
Browse files Browse the repository at this point in the history
…r when writing geo segment, attempt to repair, and log additional information if we cannot
  • Loading branch information
gcooney committed Aug 28, 2013
1 parent 08981ff commit c750405
Show file tree
Hide file tree
Showing 8 changed files with 221 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,12 @@ public void flush(Directory directory, String segmentName) throws IOException {
boolean success = false;
try {
String fileName = config.getGeoFileName(segmentName);
geoRecordBTree = new GeoSegmentWriter<CartesianGeoRecord>(treeToFlush, directory,
fileName, geoSegmentInfo, geoRecordSerializer);
try {
geoRecordBTree = new GeoSegmentWriter<CartesianGeoRecord>(treeToFlush, directory,
fileName, geoSegmentInfo, geoRecordSerializer);
} catch (InvalidTreeSizeException e) {
throw new IOException(e);
}

success = true;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,13 @@ public class GeoSegmentWriter<G extends IGeoRecord> extends BTree<G> implements
IGeoRecordSerializer<G> geoRecordSerializer;
IndexOutput indexOutput;
GeoSegmentInfo geoSegmentInfo;
int maxIndex;

int arrayLength;
long treeDataStart;

public GeoSegmentWriter(Set<G> tree, Directory directory, String fileName,
GeoSegmentInfo geoSegmentInfo, IGeoRecordSerializer<G> geoRecordSerializer)
throws IOException {
throws IOException, InvalidTreeSizeException {
super(tree.size(), false);
this.arrayLength = tree.size();
this.geoSegmentInfo = geoSegmentInfo;
Expand All @@ -44,12 +43,13 @@ public GeoSegmentWriter(Set<G> tree, Directory directory, String fileName,
try {
buildBTreeFromSet(tree);
} catch (IOException e) {
close();
indexOutput.close();
throw e;
}
}

public GeoSegmentWriter(int treeSize, Iterator<G> inputIterator, Directory directory, String fileName,
GeoSegmentInfo geoSegmentInfo, IGeoRecordSerializer<G> geoRecordSerializer) throws IOException {
GeoSegmentInfo geoSegmentInfo, IGeoRecordSerializer<G> geoRecordSerializer) throws IOException, InvalidTreeSizeException {
super(treeSize, false);
this.arrayLength = treeSize;
this.geoSegmentInfo = geoSegmentInfo;
Expand All @@ -60,28 +60,33 @@ public GeoSegmentWriter(int treeSize, Iterator<G> inputIterator, Directory direc
try {
buildBTreeFromIterator(inputIterator);
} catch (IOException e) {
close();
indexOutput.close();
throw e;
}
}

private void buildBTreeFromIterator(Iterator<G> geoIter) throws IOException {
private void buildBTreeFromIterator(Iterator<G> geoIter) throws IOException, InvalidTreeSizeException {
writeGeoInfo();

int recordCount = 0;
int index = getLeftMostLeafIndex();
ensureNotWritingPastEndOfFile(index);
while (geoIter.hasNext()) {
setValueAtIndex(index, geoIter.next());
index = getNextIndex(index);

if(index >= this.arrayLength) {
throw new IllegalArgumentException("Tree only created for " + arrayLength + " nodes but iterator contains more than that");
G nextValue = geoIter.next();
if (index != -1) {
setValueAtIndex(index, nextValue);
index = getNextIndex(index);
}

recordCount++;
}

maxIndex = index;
if (arrayLength != recordCount) {
throw new InvalidTreeSizeException(arrayLength, recordCount);
}
}

private void buildBTreeFromSet(Set<G> geoSet) throws IOException {
private void buildBTreeFromSet(Set<G> geoSet) throws IOException, InvalidTreeSizeException {
buildBTreeFromIterator(geoSet.iterator());
}

Expand Down Expand Up @@ -171,9 +176,4 @@ public void close() throws IOException {
indexOutput.close();
}

// Test if full binary tree.
public int getMaxIndex() {
return maxIndex;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.browseengine.bobo.geosearch.index.impl;

/**
*
* This exception indicates that the size of the compact binary tree,
* does not match the number of elements actually written to it.
*
* @author Geoff Cooney
*
*/
public class InvalidTreeSizeException extends Exception {
private static final long serialVersionUID = -586521581062650233L;
private final int treeSize;
private final int recordSize;

public InvalidTreeSizeException(int treeSize, int recordSize) {
super("Explicit tree size(" + treeSize + ") does not match the number of records attempted to be written to the tree("
+ recordSize + ")");
this.treeSize = treeSize;
this.recordSize = recordSize;
}

/**
* @return The explicit size of the tree that this exception was generated for
*/
public int getTreeSize() {
return treeSize;
}

/**
* @return The actual number of records in the tree this exception was generated for
*/
public int getRecordSize() {
return recordSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.browseengine.bobo.geosearch.impl.CartesianGeoRecordSerializer;
import com.browseengine.bobo.geosearch.index.impl.GeoSegmentReader;
import com.browseengine.bobo.geosearch.index.impl.GeoSegmentWriter;
import com.browseengine.bobo.geosearch.index.impl.InvalidTreeSizeException;
import com.browseengine.bobo.geosearch.merge.IGeoMergeInfo;
import com.browseengine.bobo.geosearch.merge.IGeoMerger;

Expand Down Expand Up @@ -96,8 +97,9 @@ public void merge(IGeoMergeInfo geoMergeInfo, GeoSearchConfig config) throws IOE
}

int newSegmentSize = calculateMergedSegmentSize(deletedDocsList, mergeInputBTrees, geoConverter);
buildMergedSegmentWithRetry(mergeInputBTrees, deletedDocsList, newSegmentSize,
geoMergeInfo, config, fieldNameFilterConverter);

buildMergedSegment(mergeInputBTrees, deletedDocsList, newSegmentSize, geoMergeInfo, config, fieldNameFilterConverter);
success = true;

} finally {
Expand All @@ -110,6 +112,23 @@ public void merge(IGeoMergeInfo geoMergeInfo, GeoSearchConfig config) throws IOE
}
}

protected void buildMergedSegmentWithRetry(List<BTree<CartesianGeoRecord>> mergeInputBTrees, List<BitVector> deletedDocsList,
int newSegmentSize, IGeoMergeInfo geoMergeInfo, GeoSearchConfig config, IFieldNameFilterConverter fieldNameFilterConverter) throws IOException {
try {
buildMergedSegment(mergeInputBTrees, deletedDocsList, newSegmentSize, geoMergeInfo, config, fieldNameFilterConverter);
} catch (InvalidTreeSizeException e) {
LOGGER.warn("Number of records does not match expected number of merged records. Attempting to repair.", e);

newSegmentSize = e.getRecordSize();
try {
buildMergedSegment(mergeInputBTrees, deletedDocsList, newSegmentSize, geoMergeInfo, config, fieldNameFilterConverter);
} catch (InvalidTreeSizeException e2) {
LOGGER.error("Unable to merge geo segments", e2);
throw new IOException(e2);
}
}
}

/**
*
* @param directory
Expand Down Expand Up @@ -139,7 +158,7 @@ protected boolean loadFieldNameFilterConverter(Directory directory, String geoFi
private void buildMergedSegment(List<BTree<CartesianGeoRecord>> mergeInputBTrees,
List<BitVector> deletedDocsList, int newSegmentSize,
IGeoMergeInfo geoMergeInfo, GeoSearchConfig config,
IFieldNameFilterConverter fieldNameFilterConverter) throws IOException {
IFieldNameFilterConverter fieldNameFilterConverter) throws IOException, InvalidTreeSizeException {
Directory directory = geoMergeInfo.getDirectory();
IGeoConverter geoConverter = config.getGeoConverter();

Expand Down Expand Up @@ -176,7 +195,7 @@ private GeoSegmentInfo buildGeoSegmentInfo(String segmentName, IFieldNameFilterC
}

protected BTree<CartesianGeoRecord> getOutputBTree(int newSegmentSize, Iterator<CartesianGeoRecord> inputIterator,
Directory directory, String outputFileName, GeoSegmentInfo geoSegmentInfo) throws IOException {
Directory directory, String outputFileName, GeoSegmentInfo geoSegmentInfo) throws IOException, InvalidTreeSizeException {
return new GeoSegmentWriter<CartesianGeoRecord>(newSegmentSize, inputIterator,
directory, outputFileName, geoSegmentInfo, geoRecordSerializer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.browseengine.bobo.geosearch.index.bo.GeoCoordinateField;
import com.browseengine.bobo.geosearch.index.impl.GeoSegmentReader;
import com.browseengine.bobo.geosearch.index.impl.GeoSegmentWriter;
import com.browseengine.bobo.geosearch.index.impl.InvalidTreeSizeException;
import com.browseengine.bobo.geosearch.solo.bo.IDGeoRecord;
import com.browseengine.bobo.geosearch.solo.bo.IndexTooLargeException;
import com.browseengine.bobo.geosearch.solo.impl.IDGeoRecordComparator;
Expand Down Expand Up @@ -123,9 +124,13 @@ private void flushInMemoryIndex() throws IOException {
GeoSegmentWriter<IDGeoRecord> getGeoSegmentWriter(Set<IDGeoRecord> dataToFlush) throws IOException {
String fileName = indexName + "." + config.getGeoFileExtension();

return new GeoSegmentWriter<IDGeoRecord>(
dataToFlush, directory, fileName,
buildGeoSegmentInfo(indexName), geoRecordSerializer);
try {
return new GeoSegmentWriter<IDGeoRecord>(
dataToFlush, directory, fileName,
buildGeoSegmentInfo(indexName), geoRecordSerializer);
} catch (InvalidTreeSizeException e) {
throw new IOException(e);
}
}

private void loadCurrentIndex() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void test_fileNotFoundGivesZeroGeoRecords() throws Exception {
}

@Test
public void test_WriteThenRead() {
public void test_WriteThenRead() throws InvalidTreeSizeException {

for(int i = 0; i < 100; i++) {
try {
Expand All @@ -60,8 +60,6 @@ public void test_WriteThenRead() {
String fileName = geoSegmentInfo.getSegmentName() + "." + geoConf.getGeoFileExtension();
GeoSegmentWriter<CartesianGeoRecord> geoOut = new GeoSegmentWriter<CartesianGeoRecord>(
tree, dir, fileName, geoSegmentInfo, geoRecordSerializer);
assertTrue("Not a full binary tree. ",
geoOut.getMaxIndex() < geoOut.getArrayLength());
geoOut.close();

GeoSegmentReader<CartesianGeoRecord> geoRand =
Expand All @@ -77,7 +75,7 @@ public void test_WriteThenRead() {
}

@Test
public void test_WriteThenRead_V1() throws IOException {
public void test_WriteThenRead_V1() throws IOException, InvalidTreeSizeException {
int len = 100;
int idBytes = 16;

Expand Down Expand Up @@ -105,8 +103,6 @@ public void test_WriteThenRead_V1() throws IOException {
//write data
GeoSegmentWriter<IDGeoRecord> geoOut = new GeoSegmentWriter<IDGeoRecord>(
tree, dir, fileName, geoSegmentInfo, geoRecordSerializer);
assertTrue("Not a full binary tree. ",
geoOut.getMaxIndex() < geoOut.getArrayLength());
geoOut.close();

//read and verify data
Expand Down
Loading

0 comments on commit c750405

Please sign in to comment.