Skip to content

Commit

Permalink
Using prefixes returned by delimiters.
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 346870018
  • Loading branch information
pabloem authored and cloud-teleport committed Dec 10, 2020
1 parent a233c3f commit a485017
Showing 1 changed file with 9 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public PCollection<ReadableFile> expand(PCollection<String> input) {
directories = input
.apply("FindTableDirectory",
Watch
.growthOf(new DirectoryMatchPollFn(1, 1, null, null))
.growthOf(new DirectoryMatchPollFn(1, 1, null, "/"))
.withPollInterval(Duration.standardSeconds(120)))
.apply(Values.create())
.apply("FindTablePerMinuteDirectory",
Expand Down Expand Up @@ -165,12 +165,18 @@ private List<TimestampedValue<String>> getMatchingObjects(GcsPath path) throws I
GcsUtil util = getUtil();
String pageToken = null;
do {
String prefix = path.getObject();
Objects objects = util.listObjects(
path.getBucket(), prefix, pageToken, delimiter);
path.getBucket(), path.getObject(), pageToken, delimiter);
pageToken = objects.getNextPageToken();
List<StorageObject> items = firstNonNull(
objects.getItems(), Lists.newArrayList());
if (objects.getPrefixes() != null) {
for (String prefix : objects.getPrefixes()) {
result.add(TimestampedValue.of(
"gs://" + path.getBucket() + "/" + prefix,
Instant.EPOCH));
}
}
for (StorageObject object : items) {
String fullName = "gs://" + object.getBucket() + "/" + object.getName();
if (!object.getName().endsWith("/")) {
Expand Down

0 comments on commit a485017

Please sign in to comment.