Skip to content

Commit

Permalink
Refactoring some classes
Browse files Browse the repository at this point in the history
  • Loading branch information
Leemoonsoo committed Aug 30, 2014
1 parent a4c378b commit f2bdd5c
Show file tree
Hide file tree
Showing 52 changed files with 353 additions and 602 deletions.
29 changes: 16 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,38 @@
[![Build Status](https://secure.travis-ci.org/NFLabs/zeppelin.png?branch=master)](https://travis-ci.org/NFLabs/zeppelin)


**Zeppelin** is complete large scale data analysis environment, including
**Zeppelin** is data analytics environment

* Web based GUI
* With interactive visualization
* Super easy SQL like analysis language called **ZQL**
* Custom user routine support
* Central archive of library called **ZAN** (Zeppelin Archivce Network)
* On top of Hive (or any Hive compatible system like Shark)
* Web based notebook style editor.
* Built-in Apache Spark support


To know more about Zeppelin, visit our web site http://zeppelin-project.org

###Build

mvn clean package

###Run UnitTests
mvn test
with specific spark version

mvn clean package -Dspark.version=1.0.1

###Configure

Configure following configuration files

./conf/zeppelin-env.sh
./conf/zeppelin-site.xml

###Run
To run Zeppelin in _local-mode_ using hive 0.9 + embedded derby metastore:
./bin/zeppelin-daemon.sh start

#make sure hadoop is availavle thorugh PATH or HADOOP_HOME
./bin/zeppelin.sh
browse localhost:8080 in your browser.

For configuration details check __./conf__ subdirectory.

###Package
To package final distribution do:

mvn clean package -P build-distr

The archive is generated under _zeppelin-distribution/target_ directory
Expand Down
4 changes: 2 additions & 2 deletions bin/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ if [ "x$ZEPPELIN_LOG_DIR" == "x" ]; then
export ZEPPELIN_LOG_DIR="$ZEPPELIN_HOME/logs"
fi

if [ "x$ZEPPELIN_DATA_DIR" == "x" ]; then
export ZEPPELIN_DATA_DIR="$ZEPPELIN_HOME/data"
if [ "x$ZEPPELIN_NOTEBOOK_DIR" == "x" ]; then
export ZEPPELIN_NOTEBOOK_DIR="$ZEPPELIN_HOME/notebook"
fi

if [ "x$ZEPPELIN_PID_DIR" == "x" ]; then
Expand Down
5 changes: 5 additions & 0 deletions bin/zeppelin-daemon.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ function init(){
echo "Pid dir doesn't exist, create $ZEPPELIN_PID_DIR"
mkdir -p "$ZEPPELIN_PID_DIR"
fi

if [ ! -d "$ZEPPELIN_NOTEBOOK_DIR" ]; then
echo "Pid dir doesn't exist, create $ZEPPELIN_NOTEBOOK_DIR"
mkdir -p "$ZEPPELIN_NOTEBOOK_DIR"
fi
}

function start(){
Expand Down
6 changes: 3 additions & 3 deletions markdown/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<artifactId>zeppelin-markdown</artifactId>
<packaging>jar</packaging>
<version>0.3.4-SNAPSHOT</version>
<name>Zeppelin: Markdown Repl driver</name>
<name>Zeppelin: Markdown interpreter</name>
<url>http://www.nflabs.com</url>

<dependencies>
Expand Down Expand Up @@ -70,7 +70,7 @@
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../repl/md</outputDirectory>
<outputDirectory>${project.build.directory}/../../interpreter/md</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
Expand All @@ -84,7 +84,7 @@
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../repl/md</outputDirectory>
<outputDirectory>${project.build.directory}/../../interpreter/md</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@

import org.markdown4j.Markdown4jProcessor;

import com.nflabs.zeppelin.repl.Repl;
import com.nflabs.zeppelin.repl.ReplResult;
import com.nflabs.zeppelin.repl.ReplResult.Code;
import com.nflabs.zeppelin.interpreter.Interpreter;
import com.nflabs.zeppelin.interpreter.InterpreterResult;
import com.nflabs.zeppelin.interpreter.InterpreterResult.Code;

public class Markdown extends Repl {
public class Markdown extends Interpreter {
private Markdown4jProcessor md;

public Markdown(Properties property){
Expand All @@ -31,14 +31,14 @@ public Object getValue(String name) {
}

@Override
public ReplResult interpret(String st) {
public InterpreterResult interpret(String st) {
String html;
try {
html = md.process(st);
} catch (IOException e) {
return new ReplResult(Code.ERROR, e.getMessage());
return new InterpreterResult(Code.ERROR, e.getMessage());
}
return new ReplResult(Code.SUCCESS, html);
return new InterpreterResult(Code.SUCCESS, html);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import org.junit.Before;
import org.junit.Test;

import com.nflabs.zeppelin.repl.ReplResult;
import com.nflabs.zeppelin.interpreter.InterpreterResult;

public class MarkdownTest {

Expand All @@ -24,7 +24,7 @@ public void tearDown() throws Exception {
public void test() {
Markdown md = new Markdown(new Properties());
md.initialize();
ReplResult result = md.interpret("This is ~~deleted~~ text");
InterpreterResult result = md.interpret("This is ~~deleted~~ text");
assertEquals("<p>This is <s>deleted</s> text</p>\n", result.message());
}

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@
<configuration>
<filesets>
<fileset>
<directory>repl</directory>
<directory>interpreter</directory>
<followSymlinks>false</followSymlinks>
</fileset>
</filesets>
Expand Down
4 changes: 2 additions & 2 deletions spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../repl/spark</outputDirectory>
<outputDirectory>${project.build.directory}/../../interpreter/spark</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
Expand All @@ -363,7 +363,7 @@
<goal>copy</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/../../repl/spark</outputDirectory>
<outputDirectory>${project.build.directory}/../../interpreter/spark</outputDirectory>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
Expand Down
18 changes: 9 additions & 9 deletions spark/src/main/java/com/nflabs/zeppelin/spark/SparkRepl.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.sql.SQLContext;

import com.nflabs.zeppelin.repl.Repl;
import com.nflabs.zeppelin.repl.ReplFactory;
import com.nflabs.zeppelin.repl.ReplResult;
import com.nflabs.zeppelin.repl.ReplResult.Code;
import com.nflabs.zeppelin.interpreter.Interpreter;
import com.nflabs.zeppelin.interpreter.InterpreterFactory;
import com.nflabs.zeppelin.interpreter.InterpreterResult;
import com.nflabs.zeppelin.interpreter.InterpreterResult.Code;

import scala.None;
import scala.Some;
Expand All @@ -34,7 +34,7 @@
import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
import scala.tools.nsc.settings.MutableSettings.PathSetting;

public class SparkRepl extends Repl {
public class SparkRepl extends Interpreter {

private SparkILoop interpreter;
private SparkIMain intp;
Expand Down Expand Up @@ -216,11 +216,11 @@ public Object getValue(String name){
/**
* Interpret a single line
*/
public ReplResult interpret(String line){
public InterpreterResult interpret(String line){
return interpret(line.split("\n"));
}

public ReplResult interpret(String [] lines){
public InterpreterResult interpret(String [] lines){
synchronized(this){
intp.interpret("Console.setOut(_binder.get(\"out\").asInstanceOf[java.io.PrintStream])");
out.reset();
Expand All @@ -233,15 +233,15 @@ public ReplResult interpret(String [] lines){

if (r == Code.ERROR) {
sc.clearJobGroup();
return new ReplResult(r, out.toString());
return new InterpreterResult(r, out.toString());
} else if(r==Code.INCOMPLETE) {
incomplete += s +"\n";
} else {
incomplete = "";
}
}
sc.clearJobGroup();
return new ReplResult(r, out.toString());
return new InterpreterResult(r, out.toString());
}
}

Expand Down
30 changes: 15 additions & 15 deletions spark/src/main/java/com/nflabs/zeppelin/spark/SparkSqlRepl.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.Row;

import com.nflabs.zeppelin.repl.ClassloaderRepl;
import com.nflabs.zeppelin.repl.Repl;
import com.nflabs.zeppelin.repl.ReplResult;
import com.nflabs.zeppelin.repl.ReplResult.Code;
import com.nflabs.zeppelin.interpreter.ClassloaderInterpreter;
import com.nflabs.zeppelin.interpreter.Interpreter;
import com.nflabs.zeppelin.interpreter.InterpreterResult;
import com.nflabs.zeppelin.interpreter.InterpreterResult.Code;

public class SparkSqlRepl extends Repl {
private ClassloaderRepl sparkClassloaderRepl;
public class SparkSqlRepl extends Interpreter {
private ClassloaderInterpreter sparkClassloaderRepl;
AtomicInteger num = new AtomicInteger(0);

public SparkSqlRepl(Properties property) {
Expand All @@ -25,22 +25,22 @@ public SparkSqlRepl(Properties property) {

@Override
public void initialize() {
Map<String, Repl> repls = (Map<String, Repl>) this.getProperty().get("repls");
Map<String, Interpreter> repls = (Map<String, Interpreter>) this.getProperty().get("repls");
if(repls!=null) {
sparkClassloaderRepl = (ClassloaderRepl) repls.get("spark");
sparkClassloaderRepl = (ClassloaderInterpreter) repls.get("spark");
}
}

public void setSparkClassloaderRepl(ClassloaderRepl repl) {
this.sparkClassloaderRepl = (ClassloaderRepl) repl;
public void setSparkClassloaderRepl(ClassloaderInterpreter repl) {
this.sparkClassloaderRepl = (ClassloaderInterpreter) repl;
}


private void findSpark(){
if(sparkClassloaderRepl!=null) return;
Map<String, Repl> repls = (Map<String, Repl>) this.getProperty().get("repls");
Map<String, Interpreter> repls = (Map<String, Interpreter>) this.getProperty().get("repls");
if(repls!=null) {
sparkClassloaderRepl = (ClassloaderRepl) repls.get("spark");
sparkClassloaderRepl = (ClassloaderInterpreter) repls.get("spark");
}
}

Expand All @@ -56,15 +56,15 @@ public Object getValue(String name) {


@Override
public ReplResult interpret(String st) {
public InterpreterResult interpret(String st) {
findSpark();
SQLContext sqlc = ((SparkRepl)sparkClassloaderRepl.getInnerRepl()).getSQLContext();
SchemaRDD rdd = sqlc.sql(st);
Row[] rows = null;
try {
rows = rdd.take(10000);
} catch(Exception e){
return new ReplResult(Code.ERROR, e.getMessage());
return new InterpreterResult(Code.ERROR, e.getMessage());
}

String msg = null;
Expand Down Expand Up @@ -109,7 +109,7 @@ public ReplResult interpret(String st) {
msg += "\n";
}

return new ReplResult(Code.SUCCESS, "%table "+msg);
return new InterpreterResult(Code.SUCCESS, "%table "+msg);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package com.nflabs.zeppelin.spark

import scala.tools.nsc._
import scala.tools.nsc.interpreter._
import org.apache.spark.repl.SparkILoop
import org.apache.spark.repl.SparkIMain
import org.apache.spark.util.Utils
import java.io.BufferedReader
import scala.tools.nsc.util.{ ClassPath, Exceptional, stringFromWriter, stringFromStream }


class ReflectSparkILoop(in0: Option[BufferedReader], override protected val out: JPrintWriter, override val master: Option[String])
extends SparkILoop(in0, out, master) {
def this(in0: BufferedReader, out: JPrintWriter, master: String) = this(Some(in0), out, Some(master))
def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out, None)
def this() = this(None, new JPrintWriter(Console.out, true), None)


class ReflectSparkILoopInterpreter extends ReflectSparkIMain(settings, out) {
outer =>

override lazy val formatting = new Formatting {
def prompt = ReflectSparkILoop.this.prompt
}
override protected def parentClassLoader = SparkHelper.explicitParentLoader(settings).getOrElse(classOf[SparkILoop].getClassLoader)
}

/** Create a new interpreter. */
override def createInterpreter() {
require(settings != null)

if (addedClasspath != "") settings.classpath.append(addedClasspath)
// work around for Scala bug
val totalClassPath = SparkILoop.getAddedJars.foldLeft(
settings.classpath.value)((l, r) => ClassPath.join(l, r))
this.settings.classpath.value = totalClassPath

intp = new ReflectSparkILoopInterpreter
}

/** Create a new interpreter. */
def createReflectInterpreter(settings : Settings) : SparkIMain = {
require(settings != null)

if (addedClasspath != "") settings.classpath.append(addedClasspath)
// work around for Scala bug
val totalClassPath = SparkILoop.getAddedJars.foldLeft(
settings.classpath.value)((l, r) => ClassPath.join(l, r))
this.settings.classpath.value = totalClassPath

intp = new ReflectSparkILoopInterpreter
intp
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.nflabs.zeppelin.spark

import scala.tools.nsc._
import scala.tools.nsc.interpreter._
import reporters._
import org.apache.spark.repl.SparkIMain
import scala.tools.reflect._
class ReflectSparkIMain(initialSettings: Settings, override val out: JPrintWriter) extends SparkIMain(initialSettings, out) {

override def newCompiler(settings: Settings, reporter: Reporter): ReplGlobal = {
settings.outputDirs setSingleOutput virtualDirectory
settings.exposeEmptyPackage.value = true
new ReflectGlobal(settings, reporter, classLoader) with ReplGlobal {
override def toString: String = "<global>"
}
}
}
Loading

0 comments on commit f2bdd5c

Please sign in to comment.