Skip to content

Commit

Permalink
Use unique segment paths for Kafka indexing (apache#5692)
Browse files Browse the repository at this point in the history
* support unique segment file paths

* forbiddenapis

* code review changes

* code review changes

* code review changes

* checkstyle fix
  • Loading branch information
dclim authored and gianm committed Apr 30, 2018
1 parent 762f882 commit 8ec2d2f
Show file tree
Hide file tree
Showing 59 changed files with 966 additions and 536 deletions.
27 changes: 27 additions & 0 deletions api/src/main/java/io/druid/segment/loading/DataSegmentFinder.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
package io.druid.segment.loading;

import io.druid.guice.annotations.ExtensionPoint;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.logger.Logger;
import io.druid.timeline.DataSegment;

import java.util.Map;
import java.util.Set;

/**
Expand All @@ -31,6 +34,8 @@
@ExtensionPoint
public interface DataSegmentFinder
{
Logger log = new Logger(DataSegmentFinder.class);

/**
* This method should first recursively look for descriptor.json (partitionNum_descriptor.json for HDFS data storage) underneath
* workingDirPath and then verify that index.zip (partitionNum_index.zip for HDFS data storage) exists in the same folder.
Expand All @@ -46,4 +51,26 @@ public interface DataSegmentFinder
* @return a set of segments that were found underneath workingDirPath
*/
Set<DataSegment> findSegments(String workingDirPath, boolean updateDescriptor) throws SegmentLoadingException;

/**
* Adds dataSegment if it does not exist in timestampedSegments. If it exists, replaces entry if segmentModifiedAt is
* newer than stored timestamp.
*
* @param timestampedSegments map of <segmentID, Pair<segment, modifiedAt>> containing segments with modified time
* @param dataSegment segment to add
* @param segmentModifiedAt segment modified timestamp
*/
static void putInMapRetainingNewest(
Map<String, Pair<DataSegment, Long>> timestampedSegments, DataSegment dataSegment, long segmentModifiedAt
)
{
timestampedSegments.merge(
dataSegment.getIdentifier(),
Pair.of(dataSegment, segmentModifiedAt),
(previous, current) -> {
log.warn("Multiple copies of segmentId [%s] found, using newest version", current.lhs.getIdentifier());
return previous.rhs > current.rhs ? previous : current;
}
);
}
}
33 changes: 29 additions & 4 deletions api/src/main/java/io/druid/segment/loading/DataSegmentKiller.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,41 @@
package io.druid.segment.loading;

import io.druid.guice.annotations.ExtensionPoint;
import io.druid.java.util.common.logger.Logger;
import io.druid.timeline.DataSegment;

import java.io.IOException;

/**
*/
@ExtensionPoint
public interface DataSegmentKiller
{
void kill(DataSegment segments) throws SegmentLoadingException;
void killAll() throws IOException;
Logger log = new Logger(DataSegmentKiller.class);

/**
* Removes segment files (index and metadata) from deep storage.
* @param segment the segment to kill
* @throws SegmentLoadingException if the segment could not be completely removed
*/
void kill(DataSegment segment) throws SegmentLoadingException;

/**
* A more stoic killer who doesn't throw a tantrum if things get messy. Use when killing segments for best-effort
* cleanup.
* @param segment the segment to kill
*/
default void killQuietly(DataSegment segment)
{
try {
kill(segment);
}
catch (Exception e) {
log.debug(e, "Failed to kill segment %s", segment);
}
}

/**
* Like a nuke. Use wisely. Used by the 'reset-cluster' command, and of the built-in deep storage implementations, it
* is only implemented by local and HDFS.
*/
void killAll() throws IOException;
}
55 changes: 37 additions & 18 deletions api/src/main/java/io/druid/segment/loading/DataSegmentPusher.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;

@ExtensionPoint
public interface DataSegmentPusher
Expand All @@ -44,36 +45,48 @@ public interface DataSegmentPusher
* Pushes index files and segment descriptor to deep storage.
* @param file directory containing index files
* @param segment segment descriptor
* @param replaceExisting overwrites existing objects if true, else leaves existing objects unchanged on conflict.
* The behavior of the indexer determines whether this should be true or false. For example,
* since Tranquility does not guarantee that replica tasks will generate indexes with the same
* data, the first segment pushed should be favored since otherwise multiple historicals may
* load segments with the same identifier but different contents which is a bad situation. On
* the other hand, indexers that maintain exactly-once semantics by storing checkpoint data can
* lose or repeat data if it fails to write a segment because it already exists and overwriting
* is not permitted. This situation can occur if a task fails after pushing to deep storage but
* before writing to the metadata storage, see: https://github.com/druid-io/druid/issues/5161.
* @param useUniquePath if true, pushes to a unique file path. This prevents situations where task failures or replica
* tasks can either overwrite or fail to overwrite existing segments leading to the possibility
* of different versions of the same segment ID containing different data. As an example, a Kafka
* indexing task starting at offset A and ending at offset B may push a segment to deep storage
* and then fail before writing the loadSpec to the metadata table, resulting in a replacement
* task being spawned. This replacement will also start at offset A but will read to offset C and
* will then push a segment to deep storage and write the loadSpec metadata. Without unique file
* paths, this can only work correctly if new segments overwrite existing segments. Suppose that
* at this point the task then fails so that the supervisor retries again from offset A. This 3rd
* attempt will overwrite the segments in deep storage before failing to write the loadSpec
* metadata, resulting in inconsistencies in the segment data now in deep storage and copies of
* the segment already loaded by historicals.
*
* If replaceExisting is true, existing objects MUST be overwritten, since failure to do so
* will break exactly-once semantics. If replaceExisting is false, existing objects SHOULD be
* prioritized but it is acceptable if they are overwritten (deep storages may be eventually
* consistent or otherwise unable to support transactional writes).
* If unique paths are used, caller is responsible for cleaning up segments that were pushed but
* were not written to the metadata table (for example when using replica tasks).
* @return segment descriptor
* @throws IOException
*/
DataSegment push(File file, DataSegment segment, boolean replaceExisting) throws IOException;
DataSegment push(File file, DataSegment segment, boolean useUniquePath) throws IOException;

//use map instead of LoadSpec class to avoid dependency pollution.
Map<String, Object> makeLoadSpec(URI finalIndexZipFilePath);

/**
* @deprecated backward-compatibiliy shim that should be removed on next major release;
* use {@link #getStorageDir(DataSegment, boolean)} instead.
*/
@Deprecated
default String getStorageDir(DataSegment dataSegment)
{
return getDefaultStorageDir(dataSegment);
return getStorageDir(dataSegment, false);
}

default String getStorageDir(DataSegment dataSegment, boolean useUniquePath)
{
return getDefaultStorageDir(dataSegment, useUniquePath);
}

default String makeIndexPathName(DataSegment dataSegment, String indexName)
{
return StringUtils.format("./%s/%s", getStorageDir(dataSegment), indexName);
// This is only called from Hadoop batch which doesn't require unique segment paths so set useUniquePath=false
return StringUtils.format("./%s/%s", getStorageDir(dataSegment, false), indexName);
}

/**
Expand All @@ -89,13 +102,19 @@ default List<String> getAllowedPropertyPrefixesForHadoop()
// If above format is ever changed, make sure to change it appropriately in other places
// e.g. HDFSDataSegmentKiller uses this information to clean the version, interval and dataSource directories
// on segment deletion if segment being deleted was the only segment
static String getDefaultStorageDir(DataSegment segment)
static String getDefaultStorageDir(DataSegment segment, boolean useUniquePath)
{
return JOINER.join(
segment.getDataSource(),
StringUtils.format("%s_%s", segment.getInterval().getStart(), segment.getInterval().getEnd()),
segment.getVersion(),
segment.getShardSpec().getPartitionNum()
segment.getShardSpec().getPartitionNum(),
useUniquePath ? generateUniquePath() : null
);
}

static String generateUniquePath()
{
return UUID.randomUUID().toString();
}
}
45 changes: 32 additions & 13 deletions docs/content/operations/insert-segment-to-db.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,43 @@ layout: doc_page

`insert-segment-to-db` is a tool that can insert segments into Druid metadata storage. It is intended to be used
to update the segment table in metadata storage after people manually migrate segments from one place to another.
It can also be used to insert missing segment into Druid, or even recover metadata storage by telling it where the
It can also be used to insert missing segments into Druid, or even recover metadata storage by telling it where the
segments are stored.

Note: This tool expects users to have Druid cluster running in a "safe" mode, where there are no active tasks to interfere
the segments being inserted. Users can optionally bring down the cluster to make 100% sure nothing is interfering.
**Note:** This tool simply scans the deep storage directory to reconstruct the metadata entries used to locate and
identify each segment. It does not have any understanding about whether those segments _should actually_ be written to
the metadata storage. In certain cases, this can lead to undesired or inconsistent results. Some examples of things to
watch out for:
- Dropped datasources will be re-enabled.
- The latest version of each segment set will be loaded by Druid, which in some cases may not be the version you
actually want. An example of this is a bad compaction job that generates segments which need to be manually rolled
back by removing that version from the metadata table. If these segments are not also removed from deep storage,
they will be imported back into the metadata table and overshadow the correct version.
- Some indexers such as the Kafka indexing service have the potential to generate more than one set of segments that
have the same segment ID but different contents. When the metadata is first written, the correct set of segments is
referenced and the other set is normally deleted from deep storage. It is possible however that an unhandled
exception could result in multiple sets of segments with the same segment ID remaining in deep storage. Since this
tool does not know which one is the 'correct' one to use, it will simply select the newest segment set and ignore
the other versions. If the wrong segment set is picked, the exactly-once semantics of the Kafka indexing service
will no longer hold true and you may get duplicated or dropped events.

With these considerations in mind, it is recommended that data migrations be done by exporting the original metadata
storage directly, since that is the definitive cluster state. This tool should be used as a last resort when a direct
export is not possible.

**Note:** This tool expects users to have Druid cluster running in a "safe" mode, where there are no active tasks to interfere
with the segments being inserted. Users can optionally bring down the cluster to make 100% sure nothing is interfering.

In order to make it work, user will have to provide metadata storage credentials and deep storage type through Java JVM argument
or runtime.properties file. Specifically, this tool needs to know
or runtime.properties file. Specifically, this tool needs to know:

`druid.metadata.storage.type`

`druid.metadata.storage.connector.connectURI`

`druid.metadata.storage.connector.user`

`druid.metadata.storage.connector.password`

`druid.storage.type`
```
druid.metadata.storage.type
druid.metadata.storage.connector.connectURI
druid.metadata.storage.connector.user
druid.metadata.storage.connector.password
druid.storage.type
```

Besides the properties above, you also need to specify the location where the segments are stored and whether you want to
update descriptor.json (`partitionNum_descriptor.json` for HDFS data storage). These two can be provided through command line arguments.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public String getPathForHadoop()
}

@Override
public String getStorageDir(DataSegment dataSegment)
public String getStorageDir(DataSegment dataSegment, boolean useUniquePath)
{
String seg = JOINER.join(
dataSegment.getDataSource(),
Expand All @@ -96,7 +96,8 @@ public String getStorageDir(DataSegment dataSegment)
dataSegment.getInterval().getEnd().toString(ISODateTimeFormat.basicDateTime())
),
dataSegment.getVersion().replace(":", "_"),
dataSegment.getShardSpec().getPartitionNum()
dataSegment.getShardSpec().getPartitionNum(),
useUniquePath ? DataSegmentPusher.generateUniquePath() : null
);

log.info("DataSegment: [%s]", seg);
Expand All @@ -122,9 +123,9 @@ public File createSegmentDescriptorFile(final ObjectMapper jsonMapper, final Dat
return descriptorFile;
}

public Map<String, String> getAzurePaths(final DataSegment segment)
public Map<String, String> getAzurePaths(final DataSegment segment, final boolean useUniquePath)
{
final String storageDir = this.getStorageDir(segment);
final String storageDir = this.getStorageDir(segment, useUniquePath);

return ImmutableMap.of(
"index", StringUtils.format("%s/%s", storageDir, AzureStorageDruidModule.INDEX_ZIP_FILE_NAME),
Expand All @@ -139,13 +140,12 @@ public DataSegment uploadDataSegment(
final long size,
final File compressedSegmentData,
final File descriptorFile,
final Map<String, String> azurePaths,
final boolean replaceExisting
final Map<String, String> azurePaths
)
throws StorageException, IOException, URISyntaxException
{
azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePaths.get("index"), replaceExisting);
azureStorage.uploadBlob(descriptorFile, config.getContainer(), azurePaths.get("descriptor"), replaceExisting);
azureStorage.uploadBlob(compressedSegmentData, config.getContainer(), azurePaths.get("index"));
azureStorage.uploadBlob(descriptorFile, config.getContainer(), azurePaths.get("descriptor"));

final DataSegment outSegment = segment
.withSize(size)
Expand All @@ -162,7 +162,7 @@ public DataSegment uploadDataSegment(
}

@Override
public DataSegment push(final File indexFilesDir, final DataSegment segment, final boolean replaceExisting)
public DataSegment push(final File indexFilesDir, final DataSegment segment, final boolean useUniquePath)
throws IOException
{
log.info("Uploading [%s] to Azure.", indexFilesDir);
Expand All @@ -176,10 +176,10 @@ public DataSegment push(final File indexFilesDir, final DataSegment segment, fin
final long size = CompressionUtils.zip(indexFilesDir, zipOutFile);

final File descFile = descriptorFile = createSegmentDescriptorFile(jsonMapper, segment);
final Map<String, String> azurePaths = getAzurePaths(segment);
final Map<String, String> azurePaths = getAzurePaths(segment, useUniquePath);

return AzureUtils.retryAzureOperation(
() -> uploadDataSegment(segment, binaryVersion, size, outFile, descFile, azurePaths, replaceExisting),
() -> uploadDataSegment(segment, binaryVersion, size, outFile, descFile, azurePaths),
config.getMaxTries()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.ListBlobItem;
import io.druid.java.util.common.logger.Logger;

Expand Down Expand Up @@ -77,27 +76,14 @@ public List<String> emptyCloudBlobDirectory(final String containerName, final St
}

return deletedFiles;

}

public void uploadBlob(
final File file,
final String containerName,
final String blobPath,
final boolean replaceExisting
)
public void uploadBlob(final File file, final String containerName, final String blobPath)
throws IOException, StorageException, URISyntaxException

{
CloudBlobContainer container = getCloudBlobContainer(containerName);
try (FileInputStream stream = new FileInputStream(file)) {
CloudBlockBlob blob = container.getBlockBlobReference(blobPath);

if (!replaceExisting && blob.exists()) {
log.info("Skipping push because blob [%s] exists && replaceExisting == false", blobPath);
} else {
blob.upload(stream, file.length());
}
container.getBlockBlobReference(blobPath).upload(stream, file.length());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private void pushTaskFile(final String taskId, final File logFile, String taskKe
try {
AzureUtils.retryAzureOperation(
() -> {
azureStorage.uploadBlob(logFile, config.getContainer(), taskKey, true);
azureStorage.uploadBlob(logFile, config.getContainer(), taskKey);
return null;
},
config.getMaxTries()
Expand Down
Loading

0 comments on commit 8ec2d2f

Please sign in to comment.