Skip to content

Commit

Permalink
[ZEPPELIN-4419]. Align functions in ZeppelinContext of Scala/Python/R
Browse files Browse the repository at this point in the history
### What is this PR for?

This PR is to align all the functions in ZeppelinContext in Scala/Python/R

### What type of PR is it?
[ Improvement | Refactoring]

### Todos
* [ ] - Task

### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-4419

### How should this be tested?
* CI pass

### Screenshots (if appropriate)

### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No

Author: Jeff Zhang <[email protected]>

Closes apache#3511 from zjffdu/ZEPPELIN-4419 and squashes the following commits:

7d5cdc8 [Jeff Zhang] [ZEPPELIN-4419]. Align functions in ZeppelinContext of Scala/Python/R
  • Loading branch information
zjffdu committed Nov 13, 2019
1 parent 83006c2 commit 2b251b6
Show file tree
Hide file tree
Showing 16 changed files with 718 additions and 178 deletions.
14 changes: 7 additions & 7 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,43 +106,43 @@ matrix:
- sudo: required
jdk: "openjdk8"
dist: xenial
env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.11" PROFILE="-Pspark-2.4 -Pspark-scala-2.11 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,jdbc,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest24,SparkIntegrationTest24,JdbcIntegrationTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.11" PROFILE="-Pspark-2.4 -Pspark-scala-2.11 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,jdbc,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest24,SparkIntegrationTest24,JdbcIntegrationTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false"

# ZeppelinSparkClusterTest24, SparkIntegrationTest24, JdbcIntegrationTest, Unit test of Spark 2.4 (Scala-2.12)
- sudo: required
jdk: "openjdk8"
dist: xenial
env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.12" PROFILE="-Pspark-2.4 -Pspark-scala-2.12 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,jdbc,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest24,SparkIntegrationTest24,JdbcIntegrationTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.12" PROFILE="-Pspark-2.4 -Pspark-scala-2.12 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,jdbc,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest24,SparkIntegrationTest24,JdbcIntegrationTest,org.apache.zeppelin.spark.* -DfailIfNoTests=false"

# ZeppelinSparkClusterTest23, SparkIntegrationTest23, Unit test of Spark 2.3 (Scala-2.11) and Unit test PythonInterpreter under python2
- sudo: required
jdk: "openjdk8"
dist: xenial
env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.11" PROFILE="-Pspark-2.3 -Pspark-scala-2.11 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest23,SparkIntegrationTest23,org.apache.zeppelin.spark.*,apache.zeppelin.python.* -DfailIfNoTests=false"
env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.11" PROFILE="-Pspark-2.3 -Pspark-scala-2.11 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest23,SparkIntegrationTest23,org.apache.zeppelin.spark.*,apache.zeppelin.python.* -DfailIfNoTests=false"

# ZeppelinSparkClusterTest22, SparkIntegrationTest22, Unit test of Spark 2.2 (Scala-2.10) and Unit test PythonInterpreter under python3
- sudo: required
jdk: "openjdk8"
dist: xenial
env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-2.2 -Pspark-scala-2.10 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest22,SparkIntegrationTest22,org.apache.zeppelin.spark.*,apache.zeppelin.python.* -DfailIfNoTests=false"
env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-2.2 -Pspark-scala-2.10 -Phadoop2 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest22,SparkIntegrationTest22,org.apache.zeppelin.spark.*,apache.zeppelin.python.* -DfailIfNoTests=false"

# ZeppelinSparkClusterTest21, SparkIntegrationTest21, Unit test of Spark 2.1 (Scala-2.10)
- sudo: required
jdk: "openjdk8"
dist: xenial
env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-2.1 -Phadoop2 -Pspark-scala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest21,SparkIntegrationTest21,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
env: BUILD_PLUGINS="true" PYTHON="3" SCALA_VER="2.10" PROFILE="-Pspark-2.1 -Phadoop2 -Pspark-scala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest21,SparkIntegrationTest21,org.apache.zeppelin.spark.* -DfailIfNoTests=false"

# ZeppelinSparkClusterTest20, SparkIntegrationTest20, Unit test of Spark 2.0 (Scala-2.10), Use python 3.5 because spark 2.0 doesn't support python 3.6 +
- sudo: required
jdk: "openjdk8"
dist: xenial
env: BUILD_PLUGINS="true" PYTHON="2" SCALA_VER="2.10" PROFILE="-Pspark-2.0 -Phadoop2 -Pspark-scala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest20,SparkIntegrationTest20,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
env: BUILD_PLUGINS="true" PYTHON="2" SCALA_VER="2.10" PROFILE="-Pspark-2.0 -Phadoop2 -Pspark-scala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest20,SparkIntegrationTest20,org.apache.zeppelin.spark.* -DfailIfNoTests=false"

# ZeppelinSparkClusterTest16, SparkIntegrationTest16, Unit test of Spark 1.6 (Scala-2.10)
- sudo: required
jdk: "openjdk8"
dist: xenial
env: BUILD_PLUGINS="true" PYTHON="2" SCALA_VER="2.10" PROFILE="-Pspark-1.6 -Phadoop2 -Pspark-scala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest16,SparkIntegrationTest16,org.apache.zeppelin.spark.* -DfailIfNoTests=false"
env: BUILD_PLUGINS="true" PYTHON="2" SCALA_VER="2.10" PROFILE="-Pspark-1.6 -Phadoop2 -Pspark-scala-2.10 -Pintegration" SPARKR="true" BUILD_FLAG="install -DskipTests -DskipRat -am" TEST_FLAG="test -DskipRat -am" MODULES="-pl zeppelin-interpreter-integration,zeppelin-web,spark/spark-dependencies,markdown" TEST_PROJECTS="-Dtest=ZeppelinSparkClusterTest16,SparkIntegrationTest16,org.apache.zeppelin.spark.* -DfailIfNoTests=false"

# Test python/pyspark with python 2, livy 0.5
- sudo: required
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ class InterpreterLogic(val session: Session) {
case None => {
val listChoices:List[String] = choices.trim.split(CHOICES_SEPARATOR).toList
val paramOptions= listChoices.map(choice => new ParamOption(choice, choice))
val selected = context.getGui.select(variable, listChoices.head, paramOptions.toArray)
val selected = context.getGui.select(variable, paramOptions.toArray, listChoices.head)
statement.replaceAll(escapedExp,selected.toString)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void should_extract_variable_and_choices() throws Exception {
//Given
AngularObjectRegistry angularObjectRegistry = new AngularObjectRegistry("cassandra", null);
when(intrContext.getAngularObjectRegistry()).thenReturn(angularObjectRegistry);
when(intrContext.getGui().select(eq("name"), eq("'Paul'"), optionsCaptor.capture()))
when(intrContext.getGui().select(eq("name"), optionsCaptor.capture(), eq("'Paul'")))
.thenReturn("'Jack'");

//When
Expand Down
58 changes: 47 additions & 11 deletions python/src/main/resources/python/zeppelin_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,6 @@ def __delitem__(self, key):
def __contains__(self, item):
return self.z.containsKey(item)

def add(self, key, value):
self.__setitem__(key, value)

def put(self, key, value):
self.__setitem__(key, value)

Expand All @@ -72,8 +69,14 @@ def getAsDataFrame(self, key):
def angular(self, key, noteId = None, paragraphId = None):
return self.z.angular(key, noteId, paragraphId)

def angularBind(self, key, value, noteId = None, paragraphId = None):
return self.z.angularBind(key, value, noteId, paragraphId)
def remove(self, key):
self.z.remove(key)

def contains(self, key):
return self.contains(key)

def add(self, key, value):
self.__setitem__(key, value)

def getInterpreterContext(self):
return self.z.getInterpreterContext()
Expand All @@ -84,23 +87,56 @@ def input(self, name, defaultValue=""):
def textbox(self, name, defaultValue=""):
return self.z.textbox(name, defaultValue)

def noteTextbox(self, name, defaultValue=""):
return self.z.noteTextbox(name, defaultValue)

def password(self, name):
return self.z.password(name)

def noteTextbox(self, name, defaultValue=""):
return self.z.noteTextbox(name, defaultValue)
def notePassword(self, name):
return self.z.notePassword(name)

def select(self, name, options, defaultValue=""):
return self.z.select(name, defaultValue, self.getParamOptions(options))
return self.z.select(name, self.getParamOptions(options), defaultValue)

def noteSelect(self, name, options, defaultValue=""):
return self.z.noteSelect(name, defaultValue, self.getParamOptions(options))
return self.z.noteSelect(name, self.getParamOptions(options), defaultValue)

def checkbox(self, name, options, defaultChecked=[]):
return self.z.checkbox(name, self.getDefaultChecked(defaultChecked), self.getParamOptions(options))
return self.z.checkbox(name, self.getParamOptions(options), self.getDefaultChecked(defaultChecked))

def noteCheckbox(self, name, options, defaultChecked=[]):
return self.z.noteCheckbox(name, self.getDefaultChecked(defaultChecked), self.getParamOptions(options))
return self.z.noteCheckbox(name, self.getParamOptions(options), self.getDefaultChecked(defaultChecked))

def run(self, paragraphId):
return self.z.run(paragraphId)

def run(self, noteId, paragraphId):
return self .z.run(noteId, paragraphId)

def runNote(self, noteId):
return self.z.runNote(noteId)

def runAll(self):
return self.z.runAll()

def angular(self, key, noteId = None, paragraphId = None):
if noteId == None:
return self.z.angular(key, self.z.getInterpreterContext().getNoteId(), paragraphId)
else:
return self.z.angular(key, noteId, paragraphId)

def angularBind(self, name, value, noteId = None, paragraphId = None):
if noteId == None:
return self.z.angularBind(name, value, noteId, paragraphId)
else:
return self.z.angularBind(name, value, self.z.getInterpreterContext().getNoteId(), paragraphId)

def angularUnbind(self, name, noteId = None):
if noteId == None:
self.z.angularUnbind(name, self.z.getInterpreterContext().getNoteId())
else:
self.z.angularUnbind(name, noteId)

def registerHook(self, event, cmd, replName=None):
if replName is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void cancel(InterpreterContext context) {

@Override
public FormType getFormType() {
return FormType.NONE;
return FormType.NATIVE;
}

@Override
Expand Down
41 changes: 39 additions & 2 deletions spark/interpreter/src/main/resources/R/zeppelin_sparkr.R
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ assign(".zeppelinContext", SparkR:::callJStatic("org.apache.zeppelin.spark.Zeppe
z.put <- function(name, object) {
SparkR:::callJMethod(.zeppelinContext, "put", name, object)
}

z.get <- function(name) {
SparkR:::callJMethod(.zeppelinContext, "get", name)
}
Expand All @@ -80,8 +81,44 @@ z.angularBind <- function(name, value, noteId=NULL, paragraphId=NULL) {
SparkR:::callJMethod(.zeppelinContext, "angularBind", name, value, noteId, paragraphId)
}

z.input <- function(name, value) {
SparkR:::callJMethod(.zeppelinContext, "input", name, value)
z.textbox <- function(name, value) {
SparkR:::callJMethod(.zeppelinContext, "textbox", name, value)
}

z.noteTextbox <- function(name, value) {
SparkR:::callJMethod(.zeppelinContext, "noteTextbox", name, value)
}

z.password <- function(name) {
SparkR:::callJMethod(.zeppelinContext, "password", name)
}

z.notePassword <- function(name) {
SparkR:::callJMethod(.zeppelinContext, "notePassword", name)
}

z.run <- function(paragraphId) {
SparkR:::callJMethod(.zeppelinContext, "run", paragraphId)
}

z.runNote <- function(noteId) {
SparkR:::callJMethod(.zeppelinContext, "runNote", noteId)
}

z.runAll <- function() {
SparkR:::callJMethod(.zeppelinContext, "runAll")
}

z.angular <- function(name) {
SparkR:::callJMethod(.zeppelinContext, "angular", name)
}

z.angularBind <- function(name, value) {
SparkR:::callJMethod(.zeppelinContext, "angularBind", name, value)
}

z.angularUnbind <- function(name, value) {
SparkR:::callJMethod(.zeppelinContext, "angularUnbind", name)
}

# notify script is initialized
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,27 +217,27 @@ public void run() {
assertTrue(completions.size() > 0);
completions.contains(new InterpreterCompletion("sc", "sc", ""));

// pyspark streaming
// pyspark streaming TODO(zjffdu) disable pyspark streaming test temporary
context = createInterpreterContext(mockIntpEventClient);
result = interpreter.interpret(
"from pyspark.streaming import StreamingContext\n" +
"import time\n" +
"ssc = StreamingContext(sc, 1)\n" +
"rddQueue = []\n" +
"for i in range(5):\n" +
" rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]\n" +
"inputStream = ssc.queueStream(rddQueue)\n" +
"mappedStream = inputStream.map(lambda x: (x % 10, 1))\n" +
"reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)\n" +
"reducedStream.pprint()\n" +
"ssc.start()\n" +
"time.sleep(6)\n" +
"ssc.stop(stopSparkContext=False, stopGraceFully=True)", context);
Thread.sleep(1000);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
interpreterResultMessages = context.out.toInterpreterResultMessage();
assertEquals(1, interpreterResultMessages.size());
assertTrue(interpreterResultMessages.get(0).getData().contains("(0, 100)"));
// result = interpreter.interpret(
// "from pyspark.streaming import StreamingContext\n" +
// "import time\n" +
// "ssc = StreamingContext(sc, 1)\n" +
// "rddQueue = []\n" +
// "for i in range(5):\n" +
// " rddQueue += [ssc.sparkContext.parallelize([j for j in range(1, 1001)], 10)]\n" +
// "inputStream = ssc.queueStream(rddQueue)\n" +
// "mappedStream = inputStream.map(lambda x: (x % 10, 1))\n" +
// "reducedStream = mappedStream.reduceByKey(lambda a, b: a + b)\n" +
// "reducedStream.pprint()\n" +
// "ssc.start()\n" +
// "time.sleep(6)\n" +
// "ssc.stop(stopSparkContext=False, stopGraceFully=True)", context);
// Thread.sleep(1000);
// assertEquals(InterpreterResult.Code.SUCCESS, result.code());
// interpreterResultMessages = context.out.toInterpreterResultMessage();
// assertEquals(1, interpreterResultMessages.size());
// assertTrue(interpreterResultMessages.get(0).getData().contains("(0, 100)"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;


public class SparkInterpreterTest {
Expand Down Expand Up @@ -263,7 +262,7 @@ public void testSparkInterpreter() throws IOException, InterruptedException, Int
assertEquals("pwd", pwd.getName());

context = getInterpreterContext();
result = interpreter.interpret("z.checkbox(\"checkbox_1\", Seq(\"value_2\"), Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))", context);
result = interpreter.interpret("z.checkbox(\"checkbox_1\", Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")), Seq(\"value_2\"))", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, context.getGui().getForms().size());
assertTrue(context.getGui().getForms().get("checkbox_1") instanceof CheckBox);
Expand All @@ -278,7 +277,7 @@ public void testSparkInterpreter() throws IOException, InterruptedException, Int
assertEquals("name_2", checkBox.getOptions()[1].getDisplayName());

context = getInterpreterContext();
result = interpreter.interpret("z.select(\"select_1\", Seq(\"value_2\"), Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")))", context);
result = interpreter.interpret("z.select(\"select_1\", Seq((\"value_1\", \"name_1\"), (\"value_2\", \"name_2\")), Seq(\"value_2\"))", context);
assertEquals(InterpreterResult.Code.SUCCESS, result.code());
assertEquals(1, context.getGui().getForms().size());
assertTrue(context.getGui().getForms().get("select_1") instanceof Select);
Expand Down
Loading

0 comments on commit 2b251b6

Please sign in to comment.