Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wip: Support UDF for spark-rapids-tools #1347

Draft
wants to merge 17 commits into
base: dev
Choose a base branch
from
Prev Previous commit
Next Next commit
outputDirectory for tools should be hdfs
  • Loading branch information
hezhengjie committed Sep 18, 2024
commit 3347da9ae2ceff3d4d3471b973b39df93e4a495b
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ class ToolTextFileWriter(
// this overwrites existing path
private var utf8Writer: Option[BufferedWriter] = {
try {
// pass hadoopConf to None
Some(FSUtils.getUTF8BufferedWriter(textOutputLoc, None))
Some(FSUtils.getUTF8BufferedWriter(textOutputLoc, hadoopConf))
} catch {
case NonFatal(e) =>
logError(s"Failed to open output path [$textOutputLoc] for writing", e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@ object EstimateEventRapidsUDF extends Logging {
scoreFile: String = "onprem",
enabledML: Boolean = false): (Int, Seq[QualificationSummaryInfo], String) = {
val eventPath = eventDir + "/" + applicationId + "_1"
val outputDirectory = if (enabledML) {
s"tmp_$applicationId"
} else outputDir + "/" + applicationId
val outputDirectory = outputDir + "/" + applicationId
val numOutputRows = 1000
val hadoopConf = RapidsToolsConfUtil.newHadoopConf()
// timeout: 20min
Expand Down Expand Up @@ -107,8 +105,7 @@ object EstimateEventRapidsUDF extends Logging {
(0, res, predictScore)
} catch {
case NonFatal(e) =>
logError(s"Error when analyze ${applicationId}, path is ${eventPath}.")
e.printStackTrace()
logError(s"Error when analyze ${applicationId}, path is ${eventPath}.", e)
(1, Seq[QualificationSummaryInfo](), "")
}
}
Expand Down