Skip to content

Commit

Permalink
Introduce StorageConnector for Azure (apache#14660)
Browse files Browse the repository at this point in the history
The Azure connector is introduced and MSQ's fault tolerance and durable storage can now be used with Microsoft Azure's blob storage. Also, the results of newly introduced queries from deep storage can now store and fetch the results from Azure's blob storage.
  • Loading branch information
LakshSingla authored Aug 9, 2023
1 parent a45b25f commit 8f102f9
Show file tree
Hide file tree
Showing 31 changed files with 2,295 additions and 235 deletions.
29 changes: 22 additions & 7 deletions docs/multi-stage-query/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,20 +349,35 @@ SQL-based ingestion supports using durable storage to store intermediate files t

### Durable storage configurations

The following common service properties control how durable storage behaves:
Durable storage is supported on Amazon S3 storage and Microsoft's Azure storage. There are a few common configurations that controls the behavior for both the services as documented below. Apart from the common configurations,
there are a few properties specific to each storage that must be set.

Common properties to configure the behavior of durable storage

|Parameter |Default | Description |
|-------------------|----------------------------------------|----------------------|
|`druid.msq.intermediate.storage.enable` | true | Required. Whether to enable durable storage for the cluster. For more information about enabling durable storage, see [Durable storage](../operations/durable-storage.md).|
|`druid.msq.intermediate.storage.type` | `s3` for Amazon S3 | Required. The type of storage to use. `s3` is the only supported storage type. |
|`druid.msq.intermediate.storage.bucket` | n/a | The S3 bucket to store intermediate files. |
|`druid.msq.intermediate.storage.prefix` | n/a | S3 prefix to store intermediate stage results. Provide a unique value for the prefix. Don't share the same prefix between clusters. If the location includes other files or directories, then they will get cleaned up as well. |
|`druid.msq.intermediate.storage.tempDir`| n/a | Required. Directory path on the local disk to temporarily store intermediate stage results. |
|`druid.msq.intermediate.storage.enable` | false | Whether to enable durable storage for the cluster. Set it to true to enable durable storage. For more information about enabling durable storage, see [Durable storage](../operations/durable-storage.md).|
|`druid.msq.intermediate.storage.type` | n/a | Required. The type of storage to use. Set it to `s3` for S3 and `azure` for Azure |
|`druid.msq.intermediate.storage.tempDir`| n/a | Required. Directory path on the local disk to store temporary files required while uploading and downloading the data |
|`druid.msq.intermediate.storage.maxRetry` | 10 | Optional. Defines the max number times to attempt S3 API calls to avoid failures due to transient errors. |
|`druid.msq.intermediate.storage.chunkSize` | 100MiB | Optional. Defines the size of each chunk to temporarily store in `druid.msq.intermediate.storage.tempDir`. The chunk size must be between 5 MiB and 5 GiB. A large chunk size reduces the API calls made to the durable storage, however it requires more disk space to store the temporary chunks. Druid uses a default of 100MiB if the value is not provided.|

Following properties need to be set in addition to the common properties to enable durable storage on S3

|Parameter |Default | Description |
|-------------------|----------------------------------------|----------------------|
|`druid.msq.intermediate.storage.bucket` | n/a | Required. The S3 bucket where the files are uploaded to and download from |
|`druid.msq.intermediate.storage.prefix` | n/a | Required. Path prepended to all the paths uploaded to the bucket to namespace the connector's files. Provide a unique value for the prefix and do not share the same prefix between different clusters. If the location includes other files or directories, then they might get cleaned up as well. |

Following properties must be set in addition to the common properties to enable durable storage on Azure.

|Parameter |Default | Description |
|-------------------|----------------------------------------|----------------------|
|`druid.msq.intermediate.storage.container` | n/a | Required. The Azure container where the files are uploaded to and downloaded from. |
|`druid.msq.intermediate.storage.prefix` | n/a | Required. Path prepended to all the paths uploaded to the container to namespace the connector's files. Provide a unique value for the prefix and do not share the same prefix between different clusters. If the location includes other files or directories, then they might get cleaned up as well. |

In addition to the common service properties, there are certain properties that you configure on the Overlord specifically to clean up intermediate files:
Durable storage creates files on the remote storage and is cleaned up once the job no longer requires those files. However, due to failures causing abrupt exit of the tasks, these files might not get cleaned up.
Therefore, there are certain properties that you configure on the Overlord specifically to clean up intermediate files for the tasks that have completed and would no longer require these files:

|Parameter |Default | Description |
|-------------------|----------------------------------------|----------------------|
Expand Down
13 changes: 12 additions & 1 deletion extensions-core/azure-extensions/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@
<dependency>
<groupId>com.google.inject.extensions</groupId>
<artifactId>guice-assistedinject</artifactId>
<version>${guice.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
Expand Down Expand Up @@ -152,6 +152,17 @@
<artifactId>equalsverifier</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ public Iterator<LocationWithSize> getDescriptorIteratorForPrefixes(List<URI> pre
public long getObjectSize(CloudObjectLocation location)
{
try {
final CloudBlob blobWithAttributes = storage.getBlobReferenceWithAttributes(
final CloudBlob blobWithAttributes = storage.getBlockBlobReferenceWithAttributes(
location.getBucket(),
location.getPath()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public InputStream openStream() throws IOException
public InputStream openStream(long offset) throws IOException
{
try {
return azureStorage.getBlobInputStream(offset, containerName, blobPath);
return azureStorage.getBlockBlobInputStream(offset, containerName, blobPath);
}
catch (StorageException | URISyntaxException e) {
if (AzureUtils.AZURE_RETRY.apply(e)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ DataSegment uploadDataSegment(
)
throws StorageException, IOException, URISyntaxException
{
azureStorage.uploadBlob(compressedSegmentData, segmentConfig.getContainer(), azurePath);
azureStorage.uploadBlockBlob(compressedSegmentData, segmentConfig.getContainer(), azurePath);

final DataSegment outSegment = segment
.withSize(size)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,26 @@
import com.google.common.base.Supplier;
import com.microsoft.azure.storage.ResultContinuation;
import com.microsoft.azure.storage.ResultSegment;
import com.microsoft.azure.storage.RetryExponentialRetry;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobDeleteBatchOperation;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobRequestOptions;
import com.microsoft.azure.storage.blob.CloudBlob;
import com.microsoft.azure.storage.blob.CloudBlobClient;
import com.microsoft.azure.storage.blob.CloudBlobContainer;
import com.microsoft.azure.storage.blob.CloudBlockBlob;
import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
import com.microsoft.azure.storage.blob.ListBlobItem;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.logger.Logger;

import javax.annotation.Nullable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.EnumSet;
Expand All @@ -48,6 +55,9 @@ public class AzureStorage
{
private static final boolean USE_FLAT_BLOB_LISTING = true;

// Default value from Azure library
private static final int DELTA_BACKOFF_MS = 30_000;

private static final Logger log = new Logger(AzureStorage.class);

/**
Expand All @@ -70,14 +80,28 @@ public AzureStorage(

public List<String> emptyCloudBlobDirectory(final String containerName, final String virtualDirPath)
throws StorageException, URISyntaxException
{
return emptyCloudBlobDirectory(containerName, virtualDirPath, null);
}

public List<String> emptyCloudBlobDirectory(final String containerName, final String virtualDirPath, final Integer maxAttempts)
throws StorageException, URISyntaxException
{
List<String> deletedFiles = new ArrayList<>();
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);

for (ListBlobItem blobItem : container.listBlobs(virtualDirPath, true, null, null, null)) {
Iterable<ListBlobItem> blobItems = container.listBlobs(
virtualDirPath,
USE_FLAT_BLOB_LISTING,
null,
getRequestOptionsWithRetry(maxAttempts),
null
);

for (ListBlobItem blobItem : blobItems) {
CloudBlob cloudBlob = (CloudBlob) blobItem;
log.info("Removing file[%s] from Azure.", cloudBlob.getName());
if (cloudBlob.deleteIfExists()) {
log.debug("Removing file[%s] from Azure.", cloudBlob.getName());
if (cloudBlob.deleteIfExists(DeleteSnapshotsOption.NONE, null, getRequestOptionsWithRetry(maxAttempts), null)) {
deletedFiles.add(cloudBlob.getName());
}
}
Expand All @@ -89,7 +113,7 @@ public List<String> emptyCloudBlobDirectory(final String containerName, final St
return deletedFiles;
}

public void uploadBlob(final File file, final String containerName, final String blobPath)
public void uploadBlockBlob(final File file, final String containerName, final String blobPath)
throws IOException, StorageException, URISyntaxException
{
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
Expand All @@ -98,36 +122,127 @@ public void uploadBlob(final File file, final String containerName, final String
}
}

public CloudBlob getBlobReferenceWithAttributes(final String containerName, final String blobPath)
public OutputStream getBlockBlobOutputStream(
final String containerName,
final String blobPath,
@Nullable final Integer streamWriteSizeBytes,
Integer maxAttempts
) throws URISyntaxException, StorageException
{
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
CloudBlockBlob blockBlobReference = container.getBlockBlobReference(blobPath);

if (blockBlobReference.exists()) {
throw new RE("Reference already exists");
}

if (streamWriteSizeBytes != null) {
blockBlobReference.setStreamWriteSizeInBytes(streamWriteSizeBytes);
}

return blockBlobReference.openOutputStream(null, getRequestOptionsWithRetry(maxAttempts), null);

}

public CloudBlob getBlockBlobReferenceWithAttributes(final String containerName, final String blobPath)
throws URISyntaxException, StorageException
{
final CloudBlockBlob blobReference = getOrCreateCloudBlobContainer(containerName).getBlockBlobReference(blobPath);
blobReference.downloadAttributes();
return blobReference;
}

public long getBlobLength(final String containerName, final String blobPath)
public long getBlockBlobLength(final String containerName, final String blobPath)
throws URISyntaxException, StorageException
{
return getBlockBlobReferenceWithAttributes(containerName, blobPath).getProperties().getLength();
}

public InputStream getBlockBlobInputStream(final String containerName, final String blobPath)
throws URISyntaxException, StorageException
{
return getBlockBlobInputStream(0L, containerName, blobPath);
}

public InputStream getBlockBlobInputStream(long offset, final String containerName, final String blobPath)
throws URISyntaxException, StorageException
{
return getBlockBlobInputStream(offset, null, containerName, blobPath);
}

public InputStream getBlockBlobInputStream(long offset, Long length, final String containerName, final String blobPath)
throws URISyntaxException, StorageException
{
return getBlobReferenceWithAttributes(containerName, blobPath).getProperties().getLength();
return getBlockBlobInputStream(offset, length, containerName, blobPath, null);
}

public InputStream getBlockBlobInputStream(long offset, Long length, final String containerName, final String blobPath, Integer maxAttempts)
throws URISyntaxException, StorageException
{
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
return container.getBlockBlobReference(blobPath)
.openInputStream(offset, length, null, getRequestOptionsWithRetry(maxAttempts), null);
}

public InputStream getBlobInputStream(final String containerName, final String blobPath)
public void batchDeleteFiles(String containerName, Iterable<String> paths, Integer maxAttempts)
throws URISyntaxException, StorageException
{
return getBlobInputStream(0L, containerName, blobPath);
CloudBlobContainer cloudBlobContainer = getOrCreateCloudBlobContainer(containerName);
BlobDeleteBatchOperation blobDeleteBatchOperation = new BlobDeleteBatchOperation();
for (String path : paths) {
CloudBlob blobReference = cloudBlobContainer.getBlockBlobReference(path);
blobDeleteBatchOperation.addSubOperation(blobReference);
}
cloudBlobClient.get().executeBatch(blobDeleteBatchOperation, getRequestOptionsWithRetry(maxAttempts), null);
}

public InputStream getBlobInputStream(long offset, final String containerName, final String blobPath)
public List<String> listDir(final String containerName, final String virtualDirPath)
throws URISyntaxException, StorageException
{
return listDir(containerName, virtualDirPath, null);
}

public List<String> listDir(final String containerName, final String virtualDirPath, final Integer maxAttempts)
throws StorageException, URISyntaxException
{
List<String> files = new ArrayList<>();
CloudBlobContainer container = getOrCreateCloudBlobContainer(containerName);
return container.getBlockBlobReference(blobPath).openInputStream(offset, null, null, null, null);

for (ListBlobItem blobItem :
container.listBlobs(virtualDirPath, USE_FLAT_BLOB_LISTING, null, getRequestOptionsWithRetry(maxAttempts), null)) {
CloudBlob cloudBlob = (CloudBlob) blobItem;
files.add(cloudBlob.getName());
}

return files;
}

public boolean getBlobExists(String container, String blobPath) throws URISyntaxException, StorageException
public boolean getBlockBlobExists(String container, String blobPath) throws URISyntaxException, StorageException
{
return getBlockBlobExists(container, blobPath, null);
}


public boolean getBlockBlobExists(String container, String blobPath, Integer maxAttempts)
throws URISyntaxException, StorageException
{
return getOrCreateCloudBlobContainer(container).getBlockBlobReference(blobPath).exists();
return getOrCreateCloudBlobContainer(container).getBlockBlobReference(blobPath)
.exists(null, getRequestOptionsWithRetry(maxAttempts), null);
}

/**
* If maxAttempts is provided, this method returns request options with retry built in.
* Retry backoff is exponential backoff, with maxAttempts set to the one provided
*/
@Nullable
private BlobRequestOptions getRequestOptionsWithRetry(Integer maxAttempts)
{
if (maxAttempts == null) {
return null;
}
BlobRequestOptions requestOptions = new BlobRequestOptions();
requestOptions.setRetryPolicyFactory(new RetryExponentialRetry(DELTA_BACKOFF_MS, maxAttempts));
return requestOptions;
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
public class AzureStorageDruidModule implements DruidModule
{

static final String SCHEME = "azure";
public static final String SCHEME = "azure";
public static final String
STORAGE_CONNECTION_STRING_WITH_KEY = "DefaultEndpointsProtocol=%s;AccountName=%s;AccountKey=%s";
public static final String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private void pushTaskFile(final File logFile, String taskKey)
try {
AzureUtils.retryAzureOperation(
() -> {
azureStorage.uploadBlob(logFile, config.getContainer(), taskKey);
azureStorage.uploadBlockBlob(logFile, config.getContainer(), taskKey);
return null;
},
config.getMaxTries()
Expand Down Expand Up @@ -129,12 +129,12 @@ private Optional<InputStream> streamTaskFile(final String taskid, final long off
{
final String container = config.getContainer();
try {
if (!azureStorage.getBlobExists(container, taskKey)) {
if (!azureStorage.getBlockBlobExists(container, taskKey)) {
return Optional.absent();
}
try {
final long start;
final long length = azureStorage.getBlobLength(container, taskKey);
final long length = azureStorage.getBlockBlobLength(container, taskKey);

if (offset > 0 && offset < length) {
start = offset;
Expand All @@ -144,7 +144,7 @@ private Optional<InputStream> streamTaskFile(final String taskid, final long off
start = 0;
}

InputStream stream = azureStorage.getBlobInputStream(container, taskKey);
InputStream stream = azureStorage.getBlockBlobInputStream(container, taskKey);
stream.skip(start);

return Optional.of(stream);
Expand Down
Loading

0 comments on commit 8f102f9

Please sign in to comment.