Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Repartition and generate a specified num of files #625

Open
chenruotao opened this issue Mar 18, 2021 · 12 comments
Open

Repartition and generate a specified num of files #625

chenruotao opened this issue Mar 18, 2021 · 12 comments
Labels
acknowledged This issue has been read and acknowledged by Delta admins enhancement New feature or request

Comments

@chenruotao
Copy link

I do not know why the delta repartition is repartition by partitionColumn, it may lead to data skew, like this:

 protected def repartitionIfNeeded(
      spark: SparkSession,
      df: DataFrame,
      partitionColumns: Seq[String]): DataFrame = {
    if (partitionColumns.nonEmpty && spark.conf.get(DeltaSQLConf.MERGE_REPARTITION_BEFORE_WRITE)) {
      df.repartition(partitionColumns.map(col): _*)
    } else {
      df
    }
  }

so I do some change, like this:

 protected def repartitionIfNeeded(
    spark: SparkSession,
    df: DataFrame): DataFrame = {
    var shuffleNum = spark.conf.get(DeltaSQLConf.MERGE_SHUFFLE_REPARTITION);
    if (spark.conf.get(DeltaSQLConf.MERGE_REPARTITION_BEFORE_WRITE) && shuffleNum > 0) {
      df.repartition(shuffleNum)
    } else {
      df
    }
  }

it can repartitioned by hash and generate a specified num of files, this helps me more to control the size and number of small files.
does it resonable?

@pranavanand
Copy link
Contributor

I'm not certain what the MERGE_SHUFFLE_REPARTITION conf is. Can you point me to it?

@chenruotao
Copy link
Author

chenruotao commented Mar 19, 2021

I'm not certain what the MERGE_SHUFFLE_REPARTITION conf is. Can you point me to it?

Just a custom parameter like this:

  val MERGE_SHUFFLE_REPARTITION =
    buildConf("merge.shuffle.partitions")
      .internal()
      .doc("merge output data shuffle partitions")
      .intConf
      .checkValue(_ >= 0, "partitions can not be negative.")
      .createWithDefault(0)

@pranavanand
Copy link
Contributor

pranavanand commented Mar 19, 2021

I think this makes sense if you want to control number of files written out, you will still have small files if there is data skew however.

@awdavidson
Copy link

Another implementation would be to add a synthetic key to the .repartition(partitionColumns.map(col): _*) e.g.
.repartition((rand() * shuffleNum).cast("Integer") +: partitionColumns.map(col): _*) this should help reduce the many small file issue and address the skewed partitions.

Note: I did previously comment the above with some logic but looks to have been removed or didn't submit correctly. Will happily regenerate if required

@chenruotao
Copy link
Author

In fact, I think that users need to generate files more accurately, but I now use the spark adaptive(by set spark.sql.adaptive.enabled=true) to generate files, which is also a good strategy.
However, i think repartition(partitionColumns.map(col): _*) does not meet my needs,maybe more customizable parameter would be better,below is my code,haha

  protected def repartitionIfNeeded(
    spark: SparkSession,
    df: DataFrame,
    sortKey: Option[String]): DataFrame = {
    var shuffleNum = spark.conf.get(DeltaSQLConf.MERGE_SHUFFLE_REPARTITION);
    if (spark.conf.get(DeltaSQLConf.MERGE_REPARTITION_BEFORE_WRITE) && shuffleNum > 0) {
      if (sortKey.nonEmpty) {
        df.repartition(shuffleNum).sort(sortKey.get)
      } else {
        df.repartition(shuffleNum)
      }
    } else {
      df
    }
  }

That sortKey helped me reduce the number of touch files(get by findTouchedFiles),very useful

@chenruotao
Copy link
Author

chenruotao commented Mar 22, 2021

I think this makes sense if you want to control number of files written out, you will still have small files if there is data skew however.

but df.repartition(shuffleNum) will repartition data by hash, i think it will not lead to data skew in one partition, and I do another patch to let all touch files shuffled in the same partition

@awdavidson
Copy link

awdavidson commented Mar 23, 2021

Your implementation still may lead to many small files if your output is partitioned by a column or columns. E.g df.repartition(10).write.partitionBy($”country”) would create 10 files per country. Obviously you can handle this later on.

One thing to be aware of is the implementation may potentially hinder/bottleneck performance. For example output is partitioned by country, assume we have 50 countries; 49 countries have output size of 5GB each and 1 country has 20Gb. Rather than the skewed country having 1 20Gb part file we want 20 1Gb part files. Your implementation will look something like df.repartition(20)? If that’s the case the process will pull down 265Gb of data onto 20 partitions (approx 13Gb each partition). You potentially end up loosing a lot of the parallelism in the write phase and will make fewer workers do more work

@chenruotao
Copy link
Author

chenruotao commented Mar 24, 2021

Your implementation still may lead to many small files if your output is partitioned by a column or columns. E.g df.repartition(10).write.partitionBy($”country”) would create 10 files per country. Obviously you can handle this later on.

One thing to be aware of is the implementation may potentially hinder/bottleneck performance. For example output is partitioned by country, assume we have 50 countries; 49 countries have output size of 5GB each and 1 country has 20Gb. Rather than the skewed country having 1 20Gb part file we want 20 1Gb part files. Your implementation will look something like df.repartition(20)? If that’s the case the process will pull down 265Gb of data onto 20 partitions (approx 13Gb each partition). You potentially end up loosing a lot of the parallelism in the write phase and will make fewer workers do more work

yes,you are right, but now i can not find a better way.Maybe we should reduce the number of hit partitions or let the data be balanced in partitions, and turn on the spark adaptive function to generate outputs.

@awdavidson
Copy link

Your implementation still may lead to many small files if your output is partitioned by a column or columns. E.g df.repartition(10).write.partitionBy($”country”) would create 10 files per country. Obviously you can handle this later on.
One thing to be aware of is the implementation may potentially hinder/bottleneck performance. For example output is partitioned by country, assume we have 50 countries; 49 countries have output size of 5GB each and 1 country has 20Gb. Rather than the skewed country having 1 20Gb part file we want 20 1Gb part files. Your implementation will look something like df.repartition(20)? If that’s the case the process will pull down 265Gb of data onto 20 partitions (approx 13Gb each partition). You potentially end up loosing a lot of the parallelism in the write phase and will make fewer workers do more work

yes,you are right, but now i can not find a better way.Maybe we should reduce the number of hit partitions or let the data be balanced in partitions, and turn on the spark adaptive function to generate outputs.

You could do something like this?

protected def repartitionIfNeeded(
      spark: SparkSession,
      df: DataFrame,
      partitionColumns: Seq[String]): DataFrame = {
    if (partitionColumns.nonEmpty && spark.conf.get(DeltaSQLConf.MERGE_REPARTITION_BEFORE_WRITE)) {
      // MERGE_MAX_PARTITION_FILES default = 1 meaning no change to original functionality
      // When MERGE_MAX_PARTITION_FILES > 1 syntheticCol will help split partition into smaller
      // chunks. This helps mitigate skewed partitions.
      val maxFiles = spark.conf.get(DeltaSQLConf.MERGE_MAX_PARTITION_FILES)
      val syntheticCol = (rand() * maxFiles).cast(IntegerType)
      df.repartition(syntheticCol +: partitionColumns.map(col): _*)
    } else if
      (partitionColumns.isEmpty && spark.conf.get(DeltaSQLConf.MERGE_REPARTITION_BEFORE_WRITE)) {
      df.repartition(spark.conf.get(DeltaSQLConf.MERGE_MAX_PARTITION_FILES))
    } else {
      df
    }
  }

This should keep the parallelism in the write phase when partitioning by a column or columns. When not partitioning by a column we still have the potential to reduce or we could remove this section and let AQE do what it needs to. NB: This might not be final implementation, just an idea :)

@chenruotao
Copy link
Author

chenruotao commented Mar 24, 2021

You could do something like this?

protected def repartitionIfNeeded(
      spark: SparkSession,
      df: DataFrame,
      partitionColumns: Seq[String]): DataFrame = {
    if (partitionColumns.nonEmpty && spark.conf.get(DeltaSQLConf.MERGE_REPARTITION_BEFORE_WRITE)) {
      // MERGE_MAX_PARTITION_FILES default = 1 meaning no change to original functionality
      // When MERGE_MAX_PARTITION_FILES > 1 syntheticCol will help split partition into smaller
      // chunks. This helps mitigate skewed partitions.
      val maxFiles = spark.conf.get(DeltaSQLConf.MERGE_MAX_PARTITION_FILES)
      val syntheticCol = (rand() * maxFiles).cast(IntegerType)
      df.repartition(syntheticCol +: partitionColumns.map(col): _*)
    } else if
      (partitionColumns.isEmpty && spark.conf.get(DeltaSQLConf.MERGE_REPARTITION_BEFORE_WRITE)) {
      df.repartition(spark.conf.get(DeltaSQLConf.MERGE_MAX_PARTITION_FILES))
    } else {
      df
    }
  }

This should keep the parallelism in the write phase when partitioning by a column or columns. When not partitioning by a column we still have the potential to reduce or we could remove this section and let AQE do what it needs to. NB: This might not be final implementation, just an idea :)

Well, your idea is worth to try,I will do some test fo it. I am looking for the solution of small files for a long time, it is so greate to discuss with you.

@awdavidson
Copy link

awdavidson commented Mar 24, 2021

You could do something like this?

protected def repartitionIfNeeded(
      spark: SparkSession,
      df: DataFrame,
      partitionColumns: Seq[String]): DataFrame = {
    if (partitionColumns.nonEmpty && spark.conf.get(DeltaSQLConf.MERGE_REPARTITION_BEFORE_WRITE)) {
      // MERGE_MAX_PARTITION_FILES default = 1 meaning no change to original functionality
      // When MERGE_MAX_PARTITION_FILES > 1 syntheticCol will help split partition into smaller
      // chunks. This helps mitigate skewed partitions.
      val maxFiles = spark.conf.get(DeltaSQLConf.MERGE_MAX_PARTITION_FILES)
      val syntheticCol = (rand() * maxFiles).cast(IntegerType)
      df.repartition(syntheticCol +: partitionColumns.map(col): _*)
    } else if
      (partitionColumns.isEmpty && spark.conf.get(DeltaSQLConf.MERGE_REPARTITION_BEFORE_WRITE)) {
      df.repartition(spark.conf.get(DeltaSQLConf.MERGE_MAX_PARTITION_FILES))
    } else {
      df
    }
  }

This should keep the parallelism in the write phase when partitioning by a column or columns. When not partitioning by a column we still have the potential to reduce or we could remove this section and let AQE do what it needs to. NB: This might not be final implementation, just an idea :)

Well, your idea is worth to try,I will do some test fo it. I am looking for the solution of small files for a long time, it is so greate to discuss with you.

Ah yeah it’s an interesting area and I commonly see people accept the issue rather than reduce it.
I’ve previous handle this with either a similar implementation to above (calculating approximate sizes of the entire dataframe or partitions) or by a light weight process that runs after (leveraging file system statistics). I know DataBricks has the OPTIMIZE feature but wouldn’t expect them to ship this to oss

@scottsand-db scottsand-db added acknowledged This issue has been read and acknowledged by Delta admins enhancement New feature or request labels Oct 7, 2021
@scottsand-db
Copy link
Collaborator

Thanks for bringing this to our attention. We will take a look.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
acknowledged This issue has been read and acknowledged by Delta admins enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

4 participants