Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
adeet1 committed Jun 6, 2024
1 parent 1f88aeb commit 27533be
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -355,12 +355,8 @@ abstract class AbstractFileSystemStorage(

// TODO: move this class to OrcFileSystemStorage, since that's the only class that uses it
class UpdateObserver(partition: String, file: Path, action: StorageFileAction) extends MetadataObserver with BoundsObserver {

override def getBoundingBox: Envelope = super.getBoundingBox

override protected def onClose(bounds: Envelope, count: Long): Unit = {
val files = Seq(StorageFile(file.getName, System.currentTimeMillis(), action))
println(s"[UpdateObserver] metadata.addPartition(PartitionMetadata($partition, $files, $PartitionBounds($bounds), $count))")
metadata.addPartition(PartitionMetadata(partition, files, PartitionBounds(bounds), count))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ class ParquetFileSystemStorage(context: FileSystemContext, metadata: StorageMeta
// TODO: this is an exact copy of UpdateObserver::onClose, we might want to put this in AbstractFileSystemStorage to reduce code redundancy
private class StorageMetadataCallback(partition: String, action: StorageFileAction, file: Path) extends ((Envelope, Long) => Unit) {
override def apply(env: Envelope, count: Long): Unit = {
// TODO: i might be adding the same partition twice
val files = Seq(StorageFile(file.getName, System.currentTimeMillis(), action))
println(s"[StorageMetadataCallback] metadata.addPartition(PartitionMetadata($partition, $files, $PartitionBounds($env), $count))")
metadata.addPartition(PartitionMetadata(partition, files, PartitionBounds(env), count))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,11 @@ class SimpleFeatureWriteSupport(callback: (Envelope, Long) => Unit = ((_, _) =>
mergedBounds.expandToInclude(b)
}

// TODO: put this callback in the onClose method instead
// TODO: basically do: onClose(mergedBounds, count)
// Invoke the callback function for adding metadata to the storage partition
callback(mergedBounds, count)
onClose(mergedBounds, count)
}

override protected def onClose(bounds: Envelope, count: Long): Unit = {}
// Invokes the callback function that adds metadata to the storage partition
override protected def onClose(bounds: Envelope, count: Long): Unit = callback(bounds, count)
}

private val observer = new MultipleGeometriesObserver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import org.specs2.runner.JUnitRunner
import java.nio.file.Files
import scala.collection.mutable

// TODO: on a separate MR - might want to split this file into two: ParquetCompactCommandTest and OrcCompactCommandTest
@RunWith(classOf[JUnitRunner])
class CompactCommandTest extends Specification {

Expand All @@ -43,7 +42,6 @@ class CompactCommandTest extends Specification {

sequential

// val encodings = Seq("orc")
val encodings = Seq("parquet", "orc")

val tempDir: java.nio.file.Path = Files.createTempDirectory("compactCommand")
Expand Down Expand Up @@ -155,19 +153,20 @@ class CompactCommandTest extends Specification {
fs.getCount(Query.ALL) mustEqual numFeatures

val partitions = ds.storage(sft.getTypeName).metadata.getPartitions()
val partitionNames = partitions.map(_.name)
partitionNames.foreach(partitionName => {
val filePaths = ds.storage(sft.getTypeName).getFilePaths(partitionName)
filePaths.foreach(path => {
val filepath = path.path
if (encoding == "parquet") {

// For parquet files, get bounding boxes from each file in each partition
if (encoding == "parquet") {
val partitionNames = partitions.map(_.name)
partitionNames.foreach(partitionName => {
val filePaths = ds.storage(sft.getTypeName).getFilePaths(partitionName)
filePaths.foreach(path => {
val filepath = path.path
val bbox = getBoundingBoxFromGeoParquetFile(filepath)
partitionBoundingBoxes.addBinding(partitionName, bbox)
}
})
})
})
}

// TODO: might be able to replace the number 10 in Seq.fill with something like partitions.length??
partitions.map(_.files.size) mustEqual Seq.fill(10)(numFilesPerPartition)
}
}
Expand All @@ -181,7 +180,6 @@ class CompactCommandTest extends Specification {
command.params.runMode = RunModes.Distributed.toString
// invoke on our existing store so the cached metadata gets updated
command.compact(ds)// must not(throwAn[Exception])
1 mustEqual 1
ok
}
}
Expand All @@ -202,23 +200,22 @@ class CompactCommandTest extends Specification {

val partitions = ds.storage(sft.getTypeName).metadata.getPartitions()

// TODO: this block only runs for parquet files. Put the if encoding==parquet block on the outside
val partitionNames = partitions.map(_.name)
partitionNames.foreach(partitionName => {
val filePaths = ds.storage(sft.getTypeName).getFilePaths(partitionName).map(_.path)
filePaths.foreach(path => {
if (encoding == "parquet") {
// In each partition, assert that the union of bounding boxes of the 2 files before compaction
// is the same as the bounding box of the 1 file after compaction
val bboxesUnion = new Envelope
partitionBoundingBoxes(partitionName).foreach(bbox => bboxesUnion.expandToInclude(bbox))
val metadataBbox = getBoundingBoxFromGeoParquetFile(path)
bboxesUnion mustEqual metadataBbox
}
// For parquet files, check that the union of bounding boxes of the 2 files before
// compaction is the same as the bounding box of the 1 file after compaction
if (encoding == "parquet") {
val partitionNames = partitions.map(_.name)
partitionNames.foreach(partitionName => {
val filePaths = ds.storage(sft.getTypeName).getFilePaths(partitionName).map(_.path)
filePaths.foreach(path => {
// In each partition, assert that the
val bboxesUnion = new Envelope
partitionBoundingBoxes(partitionName).foreach(bbox => bboxesUnion.expandToInclude(bbox))
val metadataBbox = getBoundingBoxFromGeoParquetFile(path)
bboxesUnion mustEqual metadataBbox
})
})
})
}

// TODO: might be able to replace the number 10 in Seq.fill with something like partitions.length??
partitions.map(_.files.size) mustEqual Seq.fill(10)(1)
}
}
Expand Down

0 comments on commit 27533be

Please sign in to comment.