Skip to content

Commit

Permalink
[HUDI-6333] Allow using manifest file directly to create a table (apa…
Browse files Browse the repository at this point in the history
…che#8898)

Co-authored-by: jp0317 <[email protected]>
  • Loading branch information
jp0317 and jp0317 authored Jun 22, 2023
1 parent 70877e5 commit 5fd9263
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,14 @@ public class BigQuerySyncConfig extends HoodieSyncConfig implements Serializable
.markAdvanced()
.withDocumentation("Name of the target table in BigQuery");

public static final ConfigProperty<Boolean> BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE = ConfigProperty
.key("hoodie.gcp.bigquery.sync.use_bq_manifest_file")
.defaultValue(false)
.markAdvanced()
.withDocumentation("If true, generate a manifest file with data file absolute paths and use BigQuery manifest file support to "
+ "directly create one external table over the Hudi table. If false (default), generate a manifest file with data file "
+ "names and create two external tables and one view in BigQuery. Query the view for the same results as querying the Hudi table");

public static final ConfigProperty<String> BIGQUERY_SYNC_SOURCE_URI = ConfigProperty
.key("hoodie.gcp.bigquery.sync.source_uri")
.noDefaultValue()
Expand Down Expand Up @@ -136,9 +144,14 @@ public static class BigQuerySyncConfigParams {
public String datasetName;
@Parameter(names = {"--dataset-location"}, description = "Location of the target dataset in BigQuery", required = true)
public String datasetLocation;
@Parameter(names = {"--use-bq-manifest-file"}, description = "If true, generate a manifest file with data file absolute paths and use "
+ " BigQuery manifest file support to directly create one external table over the Hudi table. If false (default), generate a manifest "
+ " file with data file names and create two external tables and one view in BigQuery. Query the view for the same results as querying "
+ "the Hudi table")
public Boolean useBqManifestFile;
@Parameter(names = {"--source-uri"}, description = "Name of the source uri gcs path of the table", required = true)
public String sourceUri;
@Parameter(names = {"--source-uri-prefix"}, description = "Name of the source uri gcs path prefix of the table", required = true)
@Parameter(names = {"--source-uri-prefix"}, description = "Name of the source uri gcs path prefix of the table", required = false)
public String sourceUriPrefix;

public boolean isHelp() {
Expand All @@ -151,6 +164,7 @@ public TypedProperties toProps() {
props.setPropertyIfNonNull(BIGQUERY_SYNC_DATASET_NAME.key(), datasetName);
props.setPropertyIfNonNull(BIGQUERY_SYNC_DATASET_LOCATION.key(), datasetLocation);
props.setPropertyIfNonNull(BIGQUERY_SYNC_TABLE_NAME.key(), hoodieSyncConfigParams.tableName);
props.setPropertyIfNonNull(BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key(), useBqManifestFile);
props.setPropertyIfNonNull(BIGQUERY_SYNC_SOURCE_URI.key(), sourceUri);
props.setPropertyIfNonNull(BIGQUERY_SYNC_SOURCE_URI_PREFIX.key(), sourceUriPrefix);
props.setPropertyIfNonNull(BIGQUERY_SYNC_SYNC_BASE_PATH.key(), hoodieSyncConfigParams.basePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SYNC_BASE_PATH;
Expand Down Expand Up @@ -82,6 +83,14 @@ public void syncHoodieTable() {
}
}

private boolean tableExists(HoodieBigQuerySyncClient bqSyncClient, String tableName) {
if (bqSyncClient.tableExists(tableName)) {
LOG.info(tableName + " already exists");
return true;
}
return false;
}

private void syncCoWTable(HoodieBigQuerySyncClient bqSyncClient) {
ValidationUtils.checkState(bqSyncClient.getTableType() == HoodieTableType.COPY_ON_WRITE);
LOG.info("Sync hoodie table " + snapshotViewName + " at base path " + bqSyncClient.getBasePath());
Expand All @@ -96,21 +105,39 @@ private void syncCoWTable(HoodieBigQuerySyncClient bqSyncClient) {
.setUseFileListingFromMetadata(config.getBoolean(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA))
.setAssumeDatePartitioning(config.getBoolean(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING))
.build();

if (config.getBoolean(BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE)) {
manifestFileWriter.writeManifestFile(true);

if (!tableExists(bqSyncClient, tableName)) {
bqSyncClient.createTableUsingBqManifestFile(
tableName,
manifestFileWriter.getManifestSourceUri(true),
config.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX));
LOG.info("Completed table " + tableName + " creation using the manifest file");
}

LOG.info("Sync table complete for " + tableName);
return;
}

manifestFileWriter.writeManifestFile(false);

if (!bqSyncClient.tableExists(manifestTableName)) {
bqSyncClient.createManifestTable(manifestTableName, manifestFileWriter.getManifestSourceUri());
if (!tableExists(bqSyncClient, manifestTableName)) {
bqSyncClient.createManifestTable(manifestTableName, manifestFileWriter.getManifestSourceUri(false));
LOG.info("Manifest table creation complete for " + manifestTableName);
}
if (!bqSyncClient.tableExists(versionsTableName)) {

if (!tableExists(bqSyncClient, versionsTableName)) {
bqSyncClient.createVersionsTable(
versionsTableName,
config.getString(BIGQUERY_SYNC_SOURCE_URI),
config.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX),
config.getSplitStrings(BIGQUERY_SYNC_PARTITION_FIELDS));
LOG.info("Versions table creation complete for " + versionsTableName);
}
if (!bqSyncClient.tableExists(snapshotViewName)) {

if (!tableExists(bqSyncClient, snapshotViewName)) {
bqSyncClient.createSnapshotView(snapshotViewName, versionsTableName, manifestTableName);
LOG.info("Snapshot view creation complete for " + snapshotViewName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.hudi.gcp.bigquery;

import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.sync.common.HoodieSyncClient;

import com.google.cloud.bigquery.BigQuery;
Expand All @@ -31,6 +32,10 @@
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.HivePartitioningOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.Table;
Expand All @@ -43,6 +48,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_LOCATION;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
Expand Down Expand Up @@ -78,6 +84,44 @@ private void createBigQueryConnection() {
}
}

public void createTableUsingBqManifestFile(String tableName, String bqManifestFileUri, String sourceUriPrefix) {
try {
String withClauses = "";
String extraOptions = "";
if (!StringUtils.isNullOrEmpty(sourceUriPrefix)) {
withClauses = "WITH PARTITION COLUMNS";
extraOptions = String.format("hive_partition_uri_prefix=\"%s\",", sourceUriPrefix);
}
String query =
String.format(
"CREATE EXTERNAL TABLE `%s.%s` %s OPTIONS (%s "
+ "uris=[\"%s\"], format=\"PARQUET\", file_set_spec_type=\"NEW_LINE_DELIMITED_MANIFEST\")",
datasetName,
tableName,
withClauses,
extraOptions,
bqManifestFileUri);

QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(query)
.setUseLegacySql(false)
.build();
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

queryJob = queryJob.waitFor();

if (queryJob == null) {
LOG.error("Job for table creation no longer exists");
} else if (queryJob.getStatus().getError() != null) {
LOG.error("Job for table creation failed: " + queryJob.getStatus().getError().toString());
} else {
LOG.info("External table created using manifest file.");
}
} catch (InterruptedException | BigQueryException e) {
throw new HoodieBigQuerySyncException("Failed to create external table using manifest file. ", e);
}
}

public void createManifestTable(String tableName, String sourceUri) {
try {
TableId tableId = TableId.of(projectId, datasetName, tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SYNC_BASE_PATH;
Expand All @@ -51,6 +52,7 @@ public void testGetConfigs() {
props.setProperty(BIGQUERY_SYNC_DATASET_NAME.key(), "foodataset");
props.setProperty(BIGQUERY_SYNC_DATASET_LOCATION.key(), "US");
props.setProperty(BIGQUERY_SYNC_TABLE_NAME.key(), "footable");
props.setProperty(BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key(), "true");
props.setProperty(BIGQUERY_SYNC_SOURCE_URI.key(), "gs://test-bucket/dwh/table_name/dt=*");
props.setProperty(BIGQUERY_SYNC_SOURCE_URI_PREFIX.key(), "gs://test-bucket/dwh/table_name/");
props.setProperty(BIGQUERY_SYNC_SYNC_BASE_PATH.key(), "gs://test-bucket/dwh/table_name");
Expand All @@ -62,6 +64,7 @@ public void testGetConfigs() {
assertEquals("foodataset", syncConfig.getString(BIGQUERY_SYNC_DATASET_NAME));
assertEquals("US", syncConfig.getString(BIGQUERY_SYNC_DATASET_LOCATION));
assertEquals("footable", syncConfig.getString(BIGQUERY_SYNC_TABLE_NAME));
assertEquals(true, syncConfig.getBoolean(BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE));
assertEquals("gs://test-bucket/dwh/table_name/dt=*", syncConfig.getString(BIGQUERY_SYNC_SOURCE_URI));
assertEquals("gs://test-bucket/dwh/table_name/", syncConfig.getString(BIGQUERY_SYNC_SOURCE_URI_PREFIX));
assertEquals("gs://test-bucket/dwh/table_name", syncConfig.getString(BIGQUERY_SYNC_SYNC_BASE_PATH));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_DATASET_NAME;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PARTITION_FIELDS;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_PROJECT_ID;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SOURCE_URI_PREFIX;
import static org.apache.hudi.gcp.bigquery.BigQuerySyncConfig.BIGQUERY_SYNC_SYNC_BASE_PATH;
Expand All @@ -51,6 +52,7 @@ public void testArgsParse() {
"--source-uri-prefix", "gs://foobartable/",
"--base-path", "gs://foobartable",
"--partitioned-by", "year,month,day",
"--use-bq-manifest-file",
"--use-file-listing-from-metadata"
};
cmd.parse(args);
Expand All @@ -64,6 +66,7 @@ public void testArgsParse() {
assertEquals("gs://foobartable/", props.getProperty(BIGQUERY_SYNC_SOURCE_URI_PREFIX.key()));
assertEquals("gs://foobartable", props.getProperty(BIGQUERY_SYNC_SYNC_BASE_PATH.key()));
assertEquals("year,month,day", props.getProperty(BIGQUERY_SYNC_PARTITION_FIELDS.key()));
assertEquals("true", props.getProperty(BIGQUERY_SYNC_USE_BQ_MANIFEST_FILE.key()));
assertEquals("true", props.getProperty(BIGQUERY_SYNC_USE_FILE_LISTING_FROM_METADATA.key()));
assertFalse(props.containsKey(BIGQUERY_SYNC_ASSUME_DATE_PARTITIONING.key()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
public class ManifestFileWriter {

public static final String MANIFEST_FOLDER_NAME = "manifest";
public static final String ABSOLUTE_PATH_MANIFEST_FOLDER_NAME = "absolute-path-manifest";
public static final String MANIFEST_FILE_NAME = "latest-snapshot.csv";
private static final Logger LOG = LoggerFactory.getLogger(ManifestFileWriter.class);

Expand All @@ -69,7 +70,7 @@ public synchronized void writeManifestFile(boolean useAbsolutePath) {
} else {
LOG.info("Writing base file names to manifest file: " + baseFiles.size());
}
final Path manifestFilePath = getManifestFilePath();
final Path manifestFilePath = getManifestFilePath(useAbsolutePath);
try (FSDataOutputStream outputStream = metaClient.getFs().create(manifestFilePath, true);
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(outputStream, StandardCharsets.UTF_8))) {
for (String f : baseFiles) {
Expand Down Expand Up @@ -101,16 +102,16 @@ public static Stream<String> fetchLatestBaseFilesForAllPartitions(HoodieTableMet
}
}

public Path getManifestFolder() {
return new Path(metaClient.getMetaPath(), MANIFEST_FOLDER_NAME);
public Path getManifestFolder(boolean useAbsolutePath) {
return new Path(metaClient.getMetaPath(), useAbsolutePath ? ABSOLUTE_PATH_MANIFEST_FOLDER_NAME : MANIFEST_FOLDER_NAME);
}

public Path getManifestFilePath() {
return new Path(getManifestFolder(), MANIFEST_FILE_NAME);
public Path getManifestFilePath(boolean useAbsolutePath) {
return new Path(getManifestFolder(useAbsolutePath), MANIFEST_FILE_NAME);
}

public String getManifestSourceUri() {
return new Path(getManifestFolder(), "*").toUri().toString();
public String getManifestSourceUri(boolean useAbsolutePath) {
return new Path(getManifestFolder(useAbsolutePath), "*").toUri().toString();
}

public static Builder builder() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void testCreateManifestFile() throws Exception {
createTestDataForPartitionedTable(metaClient, 3);
ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build();
manifestFileWriter.writeManifestFile(false);
Path manifestFilePath = manifestFileWriter.getManifestFilePath();
Path manifestFilePath = manifestFileWriter.getManifestFilePath(false);
try (InputStream is = metaClient.getFs().open(manifestFilePath)) {
List<String> expectedLines = FileIOUtils.readAsUTFStringLines(is);
assertEquals(9, expectedLines.size(), "there should be 9 base files in total; 3 per partition.");
Expand All @@ -73,7 +73,7 @@ public void testCreateManifestFileWithAbsolutePath() throws Exception {
createTestDataForPartitionedTable(metaClient, 3);
ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build();
manifestFileWriter.writeManifestFile(true);
Path manifestFilePath = manifestFileWriter.getManifestFilePath();
Path manifestFilePath = manifestFileWriter.getManifestFilePath(true);
try (InputStream is = metaClient.getFs().open(manifestFilePath)) {
List<String> expectedLines = FileIOUtils.readAsUTFStringLines(is);
assertEquals(9, expectedLines.size(), "there should be 9 base files in total; 3 per partition.");
Expand All @@ -93,7 +93,10 @@ private static void createTestDataForPartitionedTable(HoodieTableMetaClient meta
@Test
public void getManifestSourceUri() {
ManifestFileWriter manifestFileWriter = ManifestFileWriter.builder().setConf(metaClient.getHadoopConf()).setBasePath(basePath).build();
String sourceUri = manifestFileWriter.getManifestSourceUri();
String sourceUri = manifestFileWriter.getManifestSourceUri(false);
assertEquals(new Path(basePath, ".hoodie/manifest/*").toUri().toString(), sourceUri);

sourceUri = manifestFileWriter.getManifestSourceUri(true);
assertEquals(new Path(basePath, ".hoodie/absolute-path-manifest/*").toUri().toString(), sourceUri);
}
}

0 comments on commit 5fd9263

Please sign in to comment.