Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[spark] Enable TagCreationMode#batch for spark writer #5185

Merged
merged 4 commits into from
Mar 1, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions paimon-core/src/main/java/org/apache/paimon/tag/TagBatchCreation.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.tag;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;

/** Tag creation for batch mode. */
public class TagBatchCreation {

private static final String BATCH_WRITE_TAG_PREFIX = "batch-write-";
private final FileStoreTable table;
private final CoreOptions options;
private final TagManager tagManager;
private final SnapshotManager snapshotManager;
private final TagDeletion tagDeletion;

public TagBatchCreation(FileStoreTable table) {
this.table = table;
this.snapshotManager = table.snapshotManager();
this.tagManager = table.tagManager();
this.tagDeletion = table.store().newTagDeletion();
this.options = table.coreOptions();
}

public void createTag() {
Snapshot snapshot = snapshotManager.latestSnapshot();
if (snapshot == null) {
return;
}
Instant instant = Instant.ofEpochMilli(snapshot.timeMillis());
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
String tagName =
options.tagBatchCustomizedName() != null
? options.tagBatchCustomizedName()
: BATCH_WRITE_TAG_PREFIX
+ localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
try {
// If the tag already exists, delete the tag
tagManager.deleteTag(
tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks());
// Create a new tag
tagManager.createTag(
snapshot,
tagName,
table.coreOptions().tagDefaultTimeRetained(),
table.store().createTagCallbacks(),
false);
// Expire the tag
expireTag();
} catch (Exception e) {
if (tagManager.tagExists(tagName)) {
tagManager.deleteTag(
tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waring-Logs should be printed here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

}
}

private void expireTag() {
Integer tagNumRetainedMax = options.tagNumRetainedMax();
if (tagNumRetainedMax != null) {
if (snapshotManager.latestSnapshot() == null) {
return;
}
long tagCount = tagManager.tagCount();

while (tagCount > tagNumRetainedMax) {
for (List<String> tagNames : tagManager.tags().values()) {
if (tagCount - tagNames.size() >= tagNumRetainedMax) {
tagManager.deleteAllTagsOfOneSnapshot(
tagNames, tagDeletion, snapshotManager);
tagCount = tagCount - tagNames.size();
} else {
List<String> sortedTagNames = tagManager.sortTagsOfOneSnapshot(tagNames);
for (String toBeDeleted : sortedTagNames) {
tagManager.deleteTag(
toBeDeleted,
tagDeletion,
snapshotManager,
table.store().createTagCallbacks());
tagCount--;
if (tagCount == tagNumRetainedMax) {
break;
}
}
break;
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@

package org.apache.paimon.flink.sink;

import org.apache.paimon.Snapshot;
import org.apache.paimon.operation.TagDeletion;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
import org.apache.paimon.tag.TagBatchCreation;

import org.apache.flink.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
Expand All @@ -37,12 +34,6 @@
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;

import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.List;

/**
* Commit {@link Committable} for snapshot using the {@link CommitterOperator}. When the task is
* completed, the corresponding tag is generated.
Expand All @@ -58,10 +49,13 @@ public class BatchWriteGeneratorTagOperator<CommitT, GlobalCommitT>

protected final FileStoreTable table;

protected final TagBatchCreation tagBatchCreation;

public BatchWriteGeneratorTagOperator(
CommitterOperator<CommitT, GlobalCommitT> commitOperator, FileStoreTable table) {
this.table = table;
this.commitOperator = commitOperator;
this.tagBatchCreation = new TagBatchCreation(table);
}

@Override
Expand Down Expand Up @@ -91,79 +85,6 @@ public void notifyCheckpointAborted(long checkpointId) throws Exception {
commitOperator.notifyCheckpointAborted(checkpointId);
}

private void createTag() {
SnapshotManager snapshotManager = table.snapshotManager();
Snapshot snapshot = snapshotManager.latestSnapshot();
if (snapshot == null) {
return;
}
TagManager tagManager = table.tagManager();
TagDeletion tagDeletion = table.store().newTagDeletion();
Instant instant = Instant.ofEpochMilli(snapshot.timeMillis());
LocalDateTime localDateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
String tagName =
table.coreOptions().tagBatchCustomizedName() != null
? table.coreOptions().tagBatchCustomizedName()
: BATCH_WRITE_TAG_PREFIX
+ localDateTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd"));
try {
// If the tag already exists, delete the tag
tagManager.deleteTag(
tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks());
// Create a new tag
tagManager.createTag(
snapshot,
tagName,
table.coreOptions().tagDefaultTimeRetained(),
table.store().createTagCallbacks(),
false);
// Expire the tag
expireTag();
} catch (Exception e) {
if (tagManager.tagExists(tagName)) {
tagManager.deleteTag(
tagName, tagDeletion, snapshotManager, table.store().createTagCallbacks());
}
}
}

private void expireTag() {
Integer tagNumRetainedMax = table.coreOptions().tagNumRetainedMax();
if (tagNumRetainedMax != null) {
SnapshotManager snapshotManager = table.snapshotManager();
if (snapshotManager.latestSnapshot() == null) {
return;
}
TagManager tagManager = table.tagManager();
TagDeletion tagDeletion = table.store().newTagDeletion();
long tagCount = tagManager.tagCount();

while (tagCount > tagNumRetainedMax) {
for (List<String> tagNames : tagManager.tags().values()) {
if (tagCount - tagNames.size() >= tagNumRetainedMax) {
tagManager.deleteAllTagsOfOneSnapshot(
tagNames, tagDeletion, snapshotManager);
tagCount = tagCount - tagNames.size();
} else {
List<String> sortedTagNames = tagManager.sortTagsOfOneSnapshot(tagNames);
for (String toBeDeleted : sortedTagNames) {
tagManager.deleteTag(
toBeDeleted,
tagDeletion,
snapshotManager,
table.store().createTagCallbacks());
tagCount--;
if (tagCount == tagNumRetainedMax) {
break;
}
}
break;
}
}
}
}
}

@Override
public void open() throws Exception {
commitOperator.open();
Expand Down Expand Up @@ -191,7 +112,7 @@ public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {

@Override
public void finish() throws Exception {
createTag();
tagBatchCreation.createTag();
commitOperator.finish();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
package org.apache.paimon.spark.commands

import org.apache.paimon.CoreOptions
import org.apache.paimon.CoreOptions.DYNAMIC_PARTITION_OVERWRITE
import org.apache.paimon.CoreOptions.{DYNAMIC_PARTITION_OVERWRITE, TagCreationMode}
import org.apache.paimon.options.Options
import org.apache.paimon.partition.actions.PartitionMarkDoneAction
import org.apache.paimon.spark._
import org.apache.paimon.spark.schema.SparkSystemColumns
import org.apache.paimon.table.FileStoreTable
import org.apache.paimon.table.sink.CommitMessage
import org.apache.paimon.tag.TagBatchCreation
import org.apache.paimon.utils.{InternalRowPartitionComputer, PartitionPathUtils, TypeUtils}

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -83,10 +84,19 @@ case class WriteIntoPaimonTable(
val commitMessages = writer.write(data)
writer.commit(commitMessages)

markDoneIfNeeded(commitMessages)
preFinish(commitMessages)
Seq.empty
}

private def preFinish(commitMessages: Seq[CommitMessage]): Unit = {
val coreOptions = table.coreOptions();
if (coreOptions.tagCreationMode() == TagCreationMode.BATCH) {
val tagCreation = new TagBatchCreation(table)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a little more concise : if (table.coreOptions().tagCreationMode() == TagCreationMode.BATCH)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

tagCreation.createTag()
}
markDoneIfNeeded(commitMessages)
}

private def markDoneIfNeeded(commitMessages: Seq[CommitMessage]): Unit = {
val coreOptions = table.coreOptions()
if (coreOptions.toConfiguration.get(CoreOptions.PARTITION_MARK_DONE_WHEN_END_INPUT)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this option is used several times, I think it's ok here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,20 @@ abstract class PaimonTagDdlTestBase extends PaimonSparkTestBase {
spark.sql("alter table T rename tag `tag-1` to `tag-2`")
checkAnswer(spark.sql("show tags T"), Row("tag-2"))
}

test("Tag creation: batch creation mode") {
spark.sql("""CREATE TABLE T (id INT, name STRING)
|USING PAIMON
|TBLPROPERTIES (
|'file.format' = 'avro',
|'tag.automatic-creation'='batch',
|'tag.batch.customized-name' = 'haha')""".stripMargin)
spark.sql("insert into T values(1, 'a')")
assertResult(1)(loadTable("T").tagManager().tagObjects().size())
assertResult("haha")(loadTable("T").tagManager().tagObjects().get(0).getRight)
spark.sql("insert into T values(1, 'a')")
// tag overwrite
assertResult(1)(loadTable("T").tagManager().tagObjects().size())
assertResult("haha")(loadTable("T").tagManager().tagObjects().get(0).getRight)
}
}
Loading