Skip to content

Commit fa0524f

Browse files
holdenkpwendell
authored andcommitted
Spark-939: allow user jars to take precedence over spark jars
I still need to do a small bit of re-factoring [mostly the one Java file I'll switch it back to a Scala file and use it in both the close loaders], but comments on other things I should do would be great. Author: Holden Karau <[email protected]> Closes apache#217 from holdenk/spark-939-allow-user-jars-to-take-precedence-over-spark-jars and squashes the following commits: cf0cac9 [Holden Karau] Fix the executorclassloader 1955232 [Holden Karau] Fix long line in TestUtils 8f89965 [Holden Karau] Fix tests for new class name 7546549 [Holden Karau] CR feedback, merge some of the testutils methods down, rename the classloader 644719f [Holden Karau] User the class generator for the repl class loader tests too f0b7114 [Holden Karau] Fix the core/src/test/scala/org/apache/spark/executor/ExecutorURLClassLoaderSuite.scala tests 204b199 [Holden Karau] Fix the generated classes 9f68f10 [Holden Karau] Start rewriting the ExecutorURLClassLoaderSuite to not use the hard coded classes 858aba2 [Holden Karau] Remove a bunch of test junk 261aaee [Holden Karau] simplify executorurlclassloader a bit 7a7bf5f [Holden Karau] CR feedback d4ae848 [Holden Karau] rewrite component into scala aa95083 [Holden Karau] CR feedback 7752594 [Holden Karau] re-add https comment a0ef85a [Holden Karau] Fix style issues 125ea7f [Holden Karau] Easier to just remove those files, we don't need them bb8d179 [Holden Karau] Fix issues with the repl class loader 241b03d [Holden Karau] fix my rat excludes a343350 [Holden Karau] Update rat-excludes and remove a useless file d90d217 [Holden Karau] Fix fall back with custom class loader and add a test for it 4919bf9 [Holden Karau] Fix parent calling class loader issue 8a67302 [Holden Karau] Test are good 9e2d236 [Holden Karau] It works comrade 691ee00 [Holden Karau] It works ish dc4fe44 [Holden Karau] Does not depend on being in my home directory 47046ff [Holden Karau] Remove bad import' 22d83cb [Holden Karau] Add a test suite for the executor url class loader suite 7ef4628 [Holden Karau] Clean up 792d961 [Holden Karau] Almost works 16aecd1 [Holden Karau] Doesn't quite work 8d2241e [Holden Karau] Adda FakeClass for testing ClassLoader precedence options 648b559 [Holden Karau] Both class loaders compile. Now for testing e1d9f71 [Holden Karau] One loader workers.
1 parent b9e0c93 commit fa0524f

File tree

10 files changed

+287
-21
lines changed

10 files changed

+287
-21
lines changed

.rat-excludes

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,4 @@ work
3939
.*\.q
4040
golden
4141
test.out/*
42-
.*iml
42+
.*iml

core/src/test/scala/org/apache/spark/TestUtils.scala core/src/main/scala/org/apache/spark/TestUtils.scala

+15-5
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,29 @@ import scala.collection.JavaConversions._
2626
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
2727
import com.google.common.io.Files
2828

29-
object TestUtils {
29+
/**
30+
* Utilities for tests. Included in main codebase since it's used by multiple
31+
* projects.
32+
*
33+
* TODO: See if we can move this to the test codebase by specifying
34+
* test dependencies between projects.
35+
*/
36+
private[spark] object TestUtils {
3037

3138
/**
3239
* Create a jar that defines classes with the given names.
3340
*
3441
* Note: if this is used during class loader tests, class names should be unique
3542
* in order to avoid interference between tests.
3643
*/
37-
def createJarWithClasses(classNames: Seq[String]): URL = {
44+
def createJarWithClasses(classNames: Seq[String], value: String = ""): URL = {
3845
val tempDir = Files.createTempDir()
39-
val files = for (name <- classNames) yield createCompiledClass(name, tempDir)
46+
val files = for (name <- classNames) yield createCompiledClass(name, tempDir, value)
4047
val jarFile = new File(tempDir, "testJar-%s.jar".format(System.currentTimeMillis()))
4148
createJar(files, jarFile)
4249
}
4350

51+
4452
/**
4553
* Create a jar file that contains this set of files. All files will be located at the root
4654
* of the jar.
@@ -80,9 +88,11 @@ object TestUtils {
8088
}
8189

8290
/** Creates a compiled class with the given name. Class file will be placed in destDir. */
83-
def createCompiledClass(className: String, destDir: File): File = {
91+
def createCompiledClass(className: String, destDir: File, value: String = ""): File = {
8492
val compiler = ToolProvider.getSystemJavaCompiler
85-
val sourceFile = new JavaSourceFromString(className, s"public class $className {}")
93+
val sourceFile = new JavaSourceFromString(className,
94+
"public class " + className + " { @Override public String toString() { " +
95+
"return \"" + value + "\";}}")
8696

8797
// Calling this outputs a class file in pwd. It's easier to just rename the file than
8898
// build a custom FileManager that controls the output location.

core/src/main/scala/org/apache/spark/executor/Executor.scala

+12-5
Original file line numberDiff line numberDiff line change
@@ -291,15 +291,19 @@ private[spark] class Executor(
291291
* Create a ClassLoader for use in tasks, adding any JARs specified by the user or any classes
292292
* created by the interpreter to the search path
293293
*/
294-
private def createClassLoader(): ExecutorURLClassLoader = {
295-
val loader = Thread.currentThread().getContextClassLoader
294+
private def createClassLoader(): MutableURLClassLoader = {
295+
val loader = this.getClass.getClassLoader
296296

297297
// For each of the jars in the jarSet, add them to the class loader.
298298
// We assume each of the files has already been fetched.
299299
val urls = currentJars.keySet.map { uri =>
300300
new File(uri.split("/").last).toURI.toURL
301301
}.toArray
302-
new ExecutorURLClassLoader(urls, loader)
302+
val userClassPathFirst = conf.getBoolean("spark.files.userClassPathFirst", false)
303+
userClassPathFirst match {
304+
case true => new ChildExecutorURLClassLoader(urls, loader)
305+
case false => new ExecutorURLClassLoader(urls, loader)
306+
}
303307
}
304308

305309
/**
@@ -310,11 +314,14 @@ private[spark] class Executor(
310314
val classUri = conf.get("spark.repl.class.uri", null)
311315
if (classUri != null) {
312316
logInfo("Using REPL class URI: " + classUri)
317+
val userClassPathFirst: java.lang.Boolean =
318+
conf.getBoolean("spark.files.userClassPathFirst", false)
313319
try {
314320
val klass = Class.forName("org.apache.spark.repl.ExecutorClassLoader")
315321
.asInstanceOf[Class[_ <: ClassLoader]]
316-
val constructor = klass.getConstructor(classOf[String], classOf[ClassLoader])
317-
constructor.newInstance(classUri, parent)
322+
val constructor = klass.getConstructor(classOf[String], classOf[ClassLoader],
323+
classOf[Boolean])
324+
constructor.newInstance(classUri, parent, userClassPathFirst)
318325
} catch {
319326
case _: ClassNotFoundException =>
320327
logError("Could not find org.apache.spark.repl.ExecutorClassLoader on classpath!")

core/src/main/scala/org/apache/spark/executor/ExecutorURLClassLoader.scala

+44-1
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,56 @@ package org.apache.spark.executor
1919

2020
import java.net.{URLClassLoader, URL}
2121

22+
import org.apache.spark.util.ParentClassLoader
23+
2224
/**
2325
* The addURL method in URLClassLoader is protected. We subclass it to make this accessible.
26+
* We also make changes so user classes can come before the default classes.
2427
*/
28+
29+
private[spark] trait MutableURLClassLoader extends ClassLoader {
30+
def addURL(url: URL)
31+
def getURLs: Array[URL]
32+
}
33+
34+
private[spark] class ChildExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
35+
extends MutableURLClassLoader {
36+
37+
private object userClassLoader extends URLClassLoader(urls, null){
38+
override def addURL(url: URL) {
39+
super.addURL(url)
40+
}
41+
override def findClass(name: String): Class[_] = {
42+
super.findClass(name)
43+
}
44+
}
45+
46+
private val parentClassLoader = new ParentClassLoader(parent)
47+
48+
override def findClass(name: String): Class[_] = {
49+
try {
50+
userClassLoader.findClass(name)
51+
} catch {
52+
case e: ClassNotFoundException => {
53+
parentClassLoader.loadClass(name)
54+
}
55+
}
56+
}
57+
58+
def addURL(url: URL) {
59+
userClassLoader.addURL(url)
60+
}
61+
62+
def getURLs() = {
63+
userClassLoader.getURLs()
64+
}
65+
}
66+
2567
private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
26-
extends URLClassLoader(urls, parent) {
68+
extends URLClassLoader(urls, parent) with MutableURLClassLoader {
2769

2870
override def addURL(url: URL) {
2971
super.addURL(url)
3072
}
3173
}
74+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.util
19+
20+
/**
21+
* A class loader which makes findClass accesible to the child
22+
*/
23+
private[spark] class ParentClassLoader(parent: ClassLoader) extends ClassLoader(parent) {
24+
25+
override def findClass(name: String) = {
26+
super.findClass(name)
27+
}
28+
29+
override def loadClass(name: String): Class[_] = {
30+
super.loadClass(name)
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.executor
19+
20+
import java.io.File
21+
import java.net.URLClassLoader
22+
23+
import org.scalatest.FunSuite
24+
25+
import org.apache.spark.TestUtils
26+
27+
class ExecutorURLClassLoaderSuite extends FunSuite {
28+
29+
val childClassNames = List("FakeClass1", "FakeClass2")
30+
val parentClassNames = List("FakeClass1", "FakeClass2", "FakeClass3")
31+
val urls = List(TestUtils.createJarWithClasses(childClassNames, "1")).toArray
32+
val urls2 = List(TestUtils.createJarWithClasses(parentClassNames, "2")).toArray
33+
34+
test("child first") {
35+
val parentLoader = new URLClassLoader(urls2, null)
36+
val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
37+
val fakeClass = classLoader.loadClass("FakeClass2").newInstance()
38+
val fakeClassVersion = fakeClass.toString
39+
assert(fakeClassVersion === "1")
40+
}
41+
42+
test("parent first") {
43+
val parentLoader = new URLClassLoader(urls2, null)
44+
val classLoader = new ExecutorURLClassLoader(urls, parentLoader)
45+
val fakeClass = classLoader.loadClass("FakeClass1").newInstance()
46+
val fakeClassVersion = fakeClass.toString
47+
assert(fakeClassVersion === "2")
48+
}
49+
50+
test("child first can fall back") {
51+
val parentLoader = new URLClassLoader(urls2, null)
52+
val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
53+
val fakeClass = classLoader.loadClass("FakeClass3").newInstance()
54+
val fakeClassVersion = fakeClass.toString
55+
assert(fakeClassVersion === "2")
56+
}
57+
58+
test("child first can fail") {
59+
val parentLoader = new URLClassLoader(urls2, null)
60+
val classLoader = new ChildExecutorURLClassLoader(urls, parentLoader)
61+
intercept[java.lang.ClassNotFoundException] {
62+
classLoader.loadClass("FakeClassDoesNotExist").newInstance()
63+
}
64+
}
65+
66+
67+
}

docs/configuration.md

+9
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,15 @@ Apart from these, the following properties are also available, and may be useful
596596
the driver.
597597
</td>
598598
</tr>
599+
<tr>
600+
<td>spark.files.userClassPathFirst</td>
601+
<td>false</td>
602+
<td>
603+
(Experimental) Whether to give user-added jars precedence over Spark's own jars when
604+
loading classes in Executors. This feature can be used to mitigate conflicts between
605+
Spark's dependencies and user dependencies. It is currently an experimental feature.
606+
</td>
607+
</tr>
599608
<tr>
600609
<td>spark.authenticate</td>
601610
<td>false</td>

project/SparkBuild.scala

+1
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ object SparkBuild extends Build {
195195
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
196196

197197
resolvers ++= Seq(
198+
// HTTPS is unavailable for Maven Central
198199
"Maven Repository" at "http://repo.maven.apache.org/maven2",
199200
"Apache Repository" at "https://repository.apache.org/content/repositories/releases",
200201
"JBoss Repository" at "https://repository.jboss.org/nexus/content/repositories/releases/",

repl/src/main/scala/org/apache/spark/repl/ExecutorClassLoader.scala

+30-9
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,23 @@ import org.apache.hadoop.fs.{FileSystem, Path}
2626

2727
import org.apache.spark.SparkEnv
2828
import org.apache.spark.util.Utils
29-
29+
import org.apache.spark.util.ParentClassLoader
3030

3131
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm._
3232
import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
3333

34-
3534
/**
3635
* A ClassLoader that reads classes from a Hadoop FileSystem or HTTP URI,
37-
* used to load classes defined by the interpreter when the REPL is used
38-
*/
39-
class ExecutorClassLoader(classUri: String, parent: ClassLoader)
40-
extends ClassLoader(parent) {
36+
* used to load classes defined by the interpreter when the REPL is used.
37+
* Allows the user to specify if user class path should be first
38+
*/
39+
class ExecutorClassLoader(classUri: String, parent: ClassLoader,
40+
userClassPathFirst: Boolean) extends ClassLoader {
4141
val uri = new URI(classUri)
4242
val directory = uri.getPath
4343

44+
val parentLoader = new ParentClassLoader(parent)
45+
4446
// Hadoop FileSystem object for our URI, if it isn't using HTTP
4547
var fileSystem: FileSystem = {
4648
if (uri.getScheme() == "http") {
@@ -49,8 +51,27 @@ extends ClassLoader(parent) {
4951
FileSystem.get(uri, new Configuration())
5052
}
5153
}
52-
54+
5355
override def findClass(name: String): Class[_] = {
56+
userClassPathFirst match {
57+
case true => findClassLocally(name).getOrElse(parentLoader.loadClass(name))
58+
case false => {
59+
try {
60+
parentLoader.loadClass(name)
61+
} catch {
62+
case e: ClassNotFoundException => {
63+
val classOption = findClassLocally(name)
64+
classOption match {
65+
case None => throw new ClassNotFoundException(name, e)
66+
case Some(a) => a
67+
}
68+
}
69+
}
70+
}
71+
}
72+
}
73+
74+
def findClassLocally(name: String): Option[Class[_]] = {
5475
try {
5576
val pathInDirectory = name.replace('.', '/') + ".class"
5677
val inputStream = {
@@ -68,9 +89,9 @@ extends ClassLoader(parent) {
6889
}
6990
val bytes = readAndTransformClass(name, inputStream)
7091
inputStream.close()
71-
return defineClass(name, bytes, 0, bytes.length)
92+
Some(defineClass(name, bytes, 0, bytes.length))
7293
} catch {
73-
case e: Exception => throw new ClassNotFoundException(name, e)
94+
case e: Exception => None
7495
}
7596
}
7697

0 commit comments

Comments
 (0)