Skip to content

Commit

Permalink
[query] fix and enforce that workers transmit stack traces (hail-is#1…
Browse files Browse the repository at this point in the history
…4122)

Co-authored-by: Edmund Higham <[email protected]>
  • Loading branch information
danking and ehigham authored Jan 3, 2024
1 parent dfc87fc commit ff8c7e2
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import hail as hl
import pytest
import re

from hail.utils.java import FatalError
from .helpers import qobtest


@qobtest
def test_exceptions_from_workers_have_stack_traces():
ht = hl.utils.range_table(10, n_partitions=10)
ht = ht.annotate(x=hl.int(1)//hl.int(hl.rand_norm(0, 0.1)))
pattern = (
'.*' +
re.escape('java.lang.Math.floorDiv(Math.java:1052)') +
'.*' +
re.escape('(BackendUtils.scala:') +
'[0-9]+' +
re.escape(')\n') +
'.*'
)
with pytest.raises(FatalError, match=re.compile(pattern, re.DOTALL)):
ht.collect()
12 changes: 10 additions & 2 deletions hail/src/main/scala/is/hail/backend/spark/SparkBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,13 @@ import org.json4s.jackson.{JsonMethods, Serialization}
import org.json4s.{DefaultFormats, Formats}

import com.sun.net.httpserver.{HttpExchange}
import java.io.{Closeable, PrintWriter, OutputStream}
import java.io.{Closeable, PrintWriter, PrintStream, OutputStream}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal


class SparkBroadcastValue[T](bc: Broadcast[T]) extends BroadcastValue[T] with Serializable {
Expand Down Expand Up @@ -424,7 +425,14 @@ class SparkBackend(
val sp = partition.asInstanceOf[TaggedRDDPartition]
val fs = new HadoopFS(null)
// FIXME: this is broken: the partitionId of SparkTaskContext will be incorrect
val result = Try(f(sp.data, SparkTaskContext.get(), theHailClassLoaderForSparkWorkers, fs))
val result = try {
Success(f(sp.data, SparkTaskContext.get(), theHailClassLoaderForSparkWorkers, fs))
} catch {
case NonFatal(exc) =>
exc.getStackTrace() // Calling getStackTrace appears to ensure the exception is
// serialized with its stack trace.
Failure(exc)
}
Iterator.single((result, sp.tag))
}
}
Expand Down

0 comments on commit ff8c7e2

Please sign in to comment.