|
26 | 26 | import org.apache.hudi.avro.model.FloatWrapper;
|
27 | 27 | import org.apache.hudi.avro.model.HoodieCleanMetadata;
|
28 | 28 | import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
|
| 29 | +import org.apache.hudi.avro.model.HoodieMetadataFileInfo; |
29 | 30 | import org.apache.hudi.avro.model.HoodieRecordIndexInfo;
|
30 | 31 | import org.apache.hudi.avro.model.HoodieRestoreMetadata;
|
31 | 32 | import org.apache.hudi.avro.model.HoodieRollbackMetadata;
|
|
79 | 80 | import org.apache.hudi.common.util.collection.ClosableIterator;
|
80 | 81 | import org.apache.hudi.common.util.collection.Pair;
|
81 | 82 | import org.apache.hudi.common.util.collection.Tuple3;
|
| 83 | +import org.apache.hudi.common.util.hash.ColumnIndexID; |
| 84 | +import org.apache.hudi.common.util.hash.PartitionIndexID; |
82 | 85 | import org.apache.hudi.exception.HoodieException;
|
83 | 86 | import org.apache.hudi.exception.HoodieIOException;
|
84 | 87 | import org.apache.hudi.exception.HoodieMetadataException;
|
|
118 | 121 | import java.util.LinkedList;
|
119 | 122 | import java.util.List;
|
120 | 123 | import java.util.Map;
|
| 124 | +import java.util.Objects; |
121 | 125 | import java.util.Set;
|
122 | 126 | import java.util.UUID;
|
123 | 127 | import java.util.function.BiFunction;
|
|
131 | 135 | import static org.apache.hudi.avro.HoodieAvroUtils.getNestedFieldSchemaFromWriteSchema;
|
132 | 136 | import static org.apache.hudi.avro.HoodieAvroUtils.getSchemaForFields;
|
133 | 137 | import static org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
|
| 138 | +import static org.apache.hudi.avro.HoodieAvroUtils.wrapValueIntoAvro; |
134 | 139 | import static org.apache.hudi.common.config.HoodieCommonConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
|
135 | 140 | import static org.apache.hudi.common.config.HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED;
|
136 | 141 | import static org.apache.hudi.common.config.HoodieCommonConfig.MAX_MEMORY_FOR_COMPACTION;
|
|
140 | 145 | import static org.apache.hudi.common.util.ConfigUtils.getReaderConfigs;
|
141 | 146 | import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
|
142 | 147 | import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
|
| 148 | +import static org.apache.hudi.common.util.ValidationUtils.checkArgument; |
143 | 149 | import static org.apache.hudi.common.util.ValidationUtils.checkState;
|
144 | 150 | import static org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_MISSING_FILEINDEX_FALLBACK;
|
145 | 151 | import static org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME;
|
@@ -2141,6 +2147,118 @@ private static List<HoodieColumnRangeMetadata<Comparable>> translateWriteStatToF
|
2141 | 2147 | return getFileStatsRangeMetadata(writeStat.getPartitionPath(), writeStat.getPath(), datasetMetaClient, columnsToIndex, false);
|
2142 | 2148 | }
|
2143 | 2149 |
|
| 2150 | + public static String getPartitionStatsIndexKey(String partitionPath, String columnName) { |
| 2151 | + final PartitionIndexID partitionIndexID = new PartitionIndexID(getColumnStatsIndexPartitionIdentifier(partitionPath)); |
| 2152 | + final ColumnIndexID columnIndexID = new ColumnIndexID(columnName); |
| 2153 | + return columnIndexID.asBase64EncodedString().concat(partitionIndexID.asBase64EncodedString()); |
| 2154 | + } |
| 2155 | + |
| 2156 | + @SuppressWarnings({"rawtypes", "unchecked"}) |
| 2157 | + public static HoodieMetadataColumnStats mergeColumnStatsRecords(HoodieMetadataColumnStats prevColumnStats, |
| 2158 | + HoodieMetadataColumnStats newColumnStats) { |
| 2159 | + checkArgument(Objects.equals(prevColumnStats.getColumnName(), newColumnStats.getColumnName())); |
| 2160 | + |
| 2161 | + // We're handling 2 cases in here |
| 2162 | + // - New record is a tombstone: in this case it simply overwrites previous state |
| 2163 | + // - Previous record is a tombstone: in that case new proper record would also |
| 2164 | + // be simply overwriting previous state |
| 2165 | + if (newColumnStats.getIsDeleted() || prevColumnStats.getIsDeleted()) { |
| 2166 | + return newColumnStats; |
| 2167 | + } |
| 2168 | + |
| 2169 | + Comparable minValue = |
| 2170 | + (Comparable) Stream.of( |
| 2171 | + (Comparable) unwrapAvroValueWrapper(prevColumnStats.getMinValue()), |
| 2172 | + (Comparable) unwrapAvroValueWrapper(newColumnStats.getMinValue())) |
| 2173 | + .filter(Objects::nonNull) |
| 2174 | + .min(Comparator.naturalOrder()) |
| 2175 | + .orElse(null); |
| 2176 | + |
| 2177 | + Comparable maxValue = |
| 2178 | + (Comparable) Stream.of( |
| 2179 | + (Comparable) unwrapAvroValueWrapper(prevColumnStats.getMaxValue()), |
| 2180 | + (Comparable) unwrapAvroValueWrapper(newColumnStats.getMaxValue())) |
| 2181 | + .filter(Objects::nonNull) |
| 2182 | + .max(Comparator.naturalOrder()) |
| 2183 | + .orElse(null); |
| 2184 | + |
| 2185 | + return HoodieMetadataColumnStats.newBuilder(HoodieMetadataPayload.METADATA_COLUMN_STATS_BUILDER_STUB.get()) |
| 2186 | + .setFileName(newColumnStats.getFileName()) |
| 2187 | + .setColumnName(newColumnStats.getColumnName()) |
| 2188 | + .setMinValue(wrapValueIntoAvro(minValue)) |
| 2189 | + .setMaxValue(wrapValueIntoAvro(maxValue)) |
| 2190 | + .setValueCount(prevColumnStats.getValueCount() + newColumnStats.getValueCount()) |
| 2191 | + .setNullCount(prevColumnStats.getNullCount() + newColumnStats.getNullCount()) |
| 2192 | + .setTotalSize(prevColumnStats.getTotalSize() + newColumnStats.getTotalSize()) |
| 2193 | + .setTotalUncompressedSize(prevColumnStats.getTotalUncompressedSize() + newColumnStats.getTotalUncompressedSize()) |
| 2194 | + .setIsDeleted(newColumnStats.getIsDeleted()) |
| 2195 | + .build(); |
| 2196 | + } |
| 2197 | + |
| 2198 | + public static Map<String, HoodieMetadataFileInfo> combineFileSystemMetadata(HoodieMetadataPayload older, HoodieMetadataPayload newer) { |
| 2199 | + Map<String, HoodieMetadataFileInfo> combinedFileInfo = new HashMap<>(); |
| 2200 | + // First, add all files listed in the previous record |
| 2201 | + if (older.filesystemMetadata != null) { |
| 2202 | + combinedFileInfo.putAll(older.filesystemMetadata); |
| 2203 | + } |
| 2204 | + |
| 2205 | + // Second, merge in the files listed in the new record |
| 2206 | + if (newer.filesystemMetadata != null) { |
| 2207 | + validatePayload(newer.type, newer.filesystemMetadata); |
| 2208 | + |
| 2209 | + newer.filesystemMetadata.forEach((key, fileInfo) -> { |
| 2210 | + combinedFileInfo.merge(key, fileInfo, |
| 2211 | + // Combine previous record w/ the new one, new records taking precedence over |
| 2212 | + // the old one |
| 2213 | + // |
| 2214 | + // NOTE: That if previous listing contains the file that is being deleted by the tombstone |
| 2215 | + // record (`IsDeleted` = true) in the new one, we simply delete the file from the resulting |
| 2216 | + // listing as well as drop the tombstone itself. |
| 2217 | + // However, if file is not present in the previous record we have to persist tombstone |
| 2218 | + // record in the listing to make sure we carry forward information that this file |
| 2219 | + // was deleted. This special case could occur since the merging flow is 2-stage: |
| 2220 | + // - First we merge records from all of the delta log-files |
| 2221 | + // - Then we merge records from base-files with the delta ones (coming as a result |
| 2222 | + // of the previous step) |
| 2223 | + (oldFileInfo, newFileInfo) -> { |
| 2224 | + // NOTE: We can’t assume that MT update records will be ordered the same way as actual |
| 2225 | + // FS operations (since they are not atomic), therefore MT record merging should be a |
| 2226 | + // _commutative_ & _associative_ operation (ie one that would work even in case records |
| 2227 | + // will get re-ordered), which is |
| 2228 | + // - Possible for file-sizes (since file-sizes will ever grow, we can simply |
| 2229 | + // take max of the old and new records) |
| 2230 | + // - Not possible for is-deleted flags* |
| 2231 | + // |
| 2232 | + // *However, we’re assuming that the case of concurrent write and deletion of the same |
| 2233 | + // file is _impossible_ -- it would only be possible with concurrent upsert and |
| 2234 | + // rollback operation (affecting the same log-file), which is implausible, b/c either |
| 2235 | + // of the following have to be true: |
| 2236 | + // - We’re appending to failed log-file (then the other writer is trying to |
| 2237 | + // rollback it concurrently, before it’s own write) |
| 2238 | + // - Rollback (of completed instant) is running concurrently with append (meaning |
| 2239 | + // that restore is running concurrently with a write, which is also nut supported |
| 2240 | + // currently) |
| 2241 | + if (newFileInfo.getIsDeleted()) { |
| 2242 | + if (oldFileInfo.getIsDeleted()) { |
| 2243 | + LOG.warn("A file is repeatedly deleted in the files partition of the metadata table: {}", key); |
| 2244 | + return newFileInfo; |
| 2245 | + } |
| 2246 | + return null; |
| 2247 | + } |
| 2248 | + return new HoodieMetadataFileInfo( |
| 2249 | + Math.max(newFileInfo.getSize(), oldFileInfo.getSize()), false); |
| 2250 | + }); |
| 2251 | + }); |
| 2252 | + } |
| 2253 | + return combinedFileInfo; |
| 2254 | + } |
| 2255 | + |
| 2256 | + private static void validatePayload(int type, Map<String, HoodieMetadataFileInfo> filesystemMetadata) { |
| 2257 | + if (type == MetadataPartitionType.FILES.getRecordType()) { |
| 2258 | + filesystemMetadata.forEach((fileName, fileInfo) -> checkState(fileInfo.getIsDeleted() || fileInfo.getSize() > 0, "Existing files should have size > 0")); |
| 2259 | + } |
| 2260 | + } |
| 2261 | + |
2144 | 2262 | /**
|
2145 | 2263 | * A class which represents a directory and the files and directories inside it.
|
2146 | 2264 | * <p>
|
|
0 commit comments