-
Notifications
You must be signed in to change notification settings - Fork 16
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
.collect() not entirely functional when using Scala 3.3.1 #40
Comments
Just to be clear, with 3.3.0 the compiling issue within |
No not when I tested it. |
Interesting! I wonder if this is due to something different in Scala 3.3.1 |
I get different exceptions - and it has nothing to do with Scala 3.3.0 or 3.3.1 - rather with the collect and the StarWars files I tried collecting the data of the last Dataset sw_ds in the starwars example and got something like:
This means that internally the call to The reason for this actually lies in the data and the case class definitions - they don't match. println(sw_df.schema.fields.map { f =>
s"${f.name}: ${f.dataType.toString().replace("Type", "")}"
}.mkString("\n")) This yields name: String
gender: String
height: Double
weight: String
eyecolor: String
haircolor: String
skincolor: String
homeland: String
born: String
died: String
jedi: String
species: String
weapon: String
friends: String which does not fit the case class definition - probably the other types do not fit as well... For instance the gender is missing - OK - missing fields should not be a problem, but height is actually a Double and was defined as Int => here we have the parsing error. Also weight is a String, not an Int - the String comes from "NA" being in the data and "inferSchema" does not know about "NA" - same for the other columns where NA is present, should actually be mapped to None, but isn't. The error message is a bit irritating - probably in spark + scala2 it is a bit better. |
btw - deriving the schema from the case class T works this way: List.empty[T].toDS.schema |
Ah yes I forgot to mention that I also had notice that the later datasets of case classes was broken either way, since I had modified them in my own example. But with the first Dataset[Friends] one should be able to reproduce the error with no other modification. |
Or just create a small example with a dataset of a case class. |
Should we close this issue and create a new one for deriving schema of case classes? |
I created the issue based on how to recreate problem. If there is a reformulation of the same issue that more closely describes what is actually wrong then sure. |
Thank you @michael72 for the in-depth analysis! I had a look at your arguments and I have a few points to highlight:
Indeed
This is already the case with Spark in Scala 2.x. For example I think the library is able to derive a schema from case classes as
I think there are already multiple libraries that focus on type safety for spark jobs such as Frameless and Iskra. |
I added a PR which gives better error messages than the current code - so the collect wouldn't work, because the However it was possible to do the data conversion using a on package level: import scala3udf.{
// "old" udf doesn't interfer with new scala3udf.udf when renamed
Udf => udf
}
case class Character(
name: String,
height: Double,
weight: Option[Double],
eyecolor: Option[String],
haircolor: Option[String],
jedi: String,
species: String
)
def toOption[T](what: Any, parse: Any => T) =
if (what.toString() == "NA") None else Some(parse(what))
val character = udf(
(
name: String,
height: Double,
weight: String,
eyecolor: String,
haircolor: String,
jedi: String,
species: String
) =>
Character(
name,
height,
toOption(weight, _.toString.toDouble),
toOption(eyecolor, _.toString),
toOption(haircolor, _.toString),
jedi,
species
)
) and initialization: implicit val spark: SparkSession =
SparkSession.builder().master("local").getOrCreate
udf.register(character) and the use later: val characters = spark
.sql(
"SELECT character(name, height, weight, eyecolor, haircolor, jedi, species) as character from data"
)
.select("character.*")
.as[Character] which does the trick. The rest of the code would have to be adapted as well. |
The fact you manage to get it working through |
No, I can't get the import org.apache.spark.sql.SparkSession
import scala3encoders.given
case class A(x: Double)
case class B(x: Int)
object MapSample extends App {
val spark = SparkSession.builder().master("local").getOrCreate
import spark.implicits._
val data = List(A(1.0), A(2.3)).toDF().as[A].map {
case A(x) => B(x.toInt)
}
data.show() // with Scala 3 it simply hangs
}
Maybe I should create another ticket? |
Have you tried lowering the logging level to debug to see if it prints something there while hanging? |
I tried with |
I tested to upgrade this project to use Scala 3.3.1 but encountered an issue. It seems that using .collect on a dataset containing some Scala case class causes some generated code to fail to compile. The odd thing is that this only happens if you are running from a main def (either a def with @main or the usual main(args: Array[String]): Unit) but not when running from an object that extends App.
This can be reproduce in the example code StarWars.scala by first changing the Scala version to 3.3.1 in build.sbt, then adding a main method to the StarWars object and delete "extends App". Now if you add .collect on any dataset in the file and run it it should crash with this or a similar error:
org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 60, Column 8: No applicable constructor/method found for actual parameters "java.lang.String, java.lang.String"; candidates are: "sql.StarWars$Friends$1(sql.StarWars$, java.lang.String, java.lang.String)"
The text was updated successfully, but these errors were encountered: