Skip to content

Commit

Permalink
Spark / SparkSQL interpreter progress update.
Browse files Browse the repository at this point in the history
Make zeppelin display progress information when paragraph is running
  • Loading branch information
Leemoonsoo committed Sep 9, 2014
1 parent bb92b62 commit 41381f5
Show file tree
Hide file tree
Showing 22 changed files with 278 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,9 @@ public void bindValue(String name, Object o) {
public FormType getFormType() {
return FormType.SIMPLE;
}

@Override
public int getProgress() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,9 @@ public FormType getFormType() {
return FormType.SIMPLE;
}

@Override
public int getProgress() {
return 0;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@
import org.apache.spark.repl.SparkIMain;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.scheduler.Stage;
import org.apache.spark.scheduler.StageInfo;
import org.apache.spark.scheduler.TaskInfo;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.ui.jobs.JobProgressListener;
import org.apache.spark.ui.jobs.TaskUIData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.nflabs.zeppelin.interpreter.Interpreter;
import com.nflabs.zeppelin.interpreter.InterpreterResult;
Expand All @@ -26,20 +33,26 @@

import scala.None;
import scala.Some;
import scala.collection.Iterable;
import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.HashSet;
import scala.tools.nsc.Settings;
import scala.tools.nsc.settings.MutableSettings.BooleanSetting;
import scala.tools.nsc.settings.MutableSettings.PathSetting;

public class SparkInterpreter extends Interpreter {

Logger logger = LoggerFactory.getLogger(SparkInterpreter.class);

private SparkILoop interpreter;
private SparkIMain intp;
private SparkContext sc;
private ByteArrayOutputStream out;
private SQLContext sqlc;
private DependencyResolver dep;

private JobProgressListener sparkListener;


public SparkInterpreter(Properties property) {
Expand All @@ -52,9 +65,13 @@ public SparkContext getSparkContext(){
// save / load sc from common share
Map<String, Object> share = (Map<String, Object>)getProperty().get("share");
sc = (SparkContext) share.get("sc");
sparkListener = (JobProgressListener) share.get("sparkListener");
if(sc==null) {
sc = createSparkContext();
share.put("sc", sc);
sparkListener = new JobProgressListener(sc.getConf());
sc.listenerBus().addListener(sparkListener);
share.put("sc", sc);
share.put("sparkListener", sparkListener);
}
}
return sc;
Expand Down Expand Up @@ -277,21 +294,48 @@ public void cancel(){
sc.cancelJobGroup(jobGroup);
}


public int getProgress(){
// howto get progress from sparkListener? check this out
// https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala

int completedTasks = 0;
int totalTasks = 0;

DAGScheduler scheduler = sc.dagScheduler();
HashSet<ActiveJob> jobs = scheduler.activeJobs();
Iterator<ActiveJob> it = jobs.iterator();
while(it.hasNext()) {
ActiveJob job = it.next();
String g = (String) job.properties().get("spark.jobGroup.id");
if (jobGroup.equals(g)) {
// TODO
int[] progressInfo = getProgressFromStage(sparkListener, job.finalStage());
totalTasks+=progressInfo[0];
completedTasks+=progressInfo[1];
}
}
return 0;

if(totalTasks==0) return 0;
return completedTasks*100/totalTasks;
}

private int [] getProgressFromStage(JobProgressListener sparkListener, Stage stage){
int numTasks = stage.numTasks();
int completedTasks = 0;
Object completedTaskInfo = JavaConversions.asJavaMap(sparkListener.stageIdToTasksComplete()).get(stage.id());
if(completedTaskInfo!=null) {
completedTasks += (int) completedTaskInfo;
}
List<Stage> parents = JavaConversions.asJavaList(stage.parents());
if(parents!=null) {
for(Stage s : parents) {
int[] p = getProgressFromStage(sparkListener, s);
numTasks+= p[0];
completedTasks+= p[1];
}
}

return new int[]{numTasks, completedTasks};
}
private Code getResultCode(scala.tools.nsc.interpreter.Results.Result r){
if (r instanceof scala.tools.nsc.interpreter.Results.Success$) {
return Code.SUCCESS;
Expand All @@ -311,4 +355,8 @@ public void destroy() {
public FormType getFormType() {
return FormType.NATIVE;
}

public JobProgressListener getJobProgressListener(){
return sparkListener;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,22 @@
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.spark.SparkContext;
import org.apache.spark.scheduler.ActiveJob;
import org.apache.spark.scheduler.DAGScheduler;
import org.apache.spark.scheduler.Stage;
import org.apache.spark.scheduler.StageInfo;
import org.apache.spark.scheduler.TaskInfo;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SchemaRDD;
import org.apache.spark.sql.catalyst.expressions.Attribute;
import org.apache.spark.sql.catalyst.expressions.Row;
import org.apache.spark.ui.jobs.JobProgressListener;

import scala.collection.Iterator;
import scala.collection.JavaConversions;
import scala.collection.mutable.HashMap;
import scala.collection.mutable.HashSet;

import com.nflabs.zeppelin.interpreter.ClassloaderInterpreter;
import com.nflabs.zeppelin.interpreter.Interpreter;
Expand All @@ -18,7 +30,9 @@
public class SparkSqlInterpreter extends Interpreter {
private ClassloaderInterpreter sparkClassloaderRepl;
AtomicInteger num = new AtomicInteger(0);


private final String jobGroup = "zeppelin-"+this.hashCode();

public SparkSqlInterpreter(Properties property) {
super(property);
}
Expand All @@ -35,7 +49,6 @@ public void setSparkClassloaderRepl(ClassloaderInterpreter repl) {
this.sparkClassloaderRepl = (ClassloaderInterpreter) repl;
}


private void findSpark(){
if(sparkClassloaderRepl!=null) return;
Map<String, Interpreter> repls = (Map<String, Interpreter>) this.getProperty().get("repls");
Expand All @@ -53,17 +66,20 @@ public void destroy() {
public Object getValue(String name) {
return null;
}



@Override
public InterpreterResult interpret(String st) {
findSpark();
SQLContext sqlc = ((SparkInterpreter)sparkClassloaderRepl.getInnerRepl()).getSQLContext();
SparkContext sc = sqlc.sparkContext();
sc.setJobGroup(jobGroup, "Zeppelin", false);
SchemaRDD rdd = sqlc.sql(st);
Row[] rows = null;
try {
rows = rdd.take(10000);
} catch(Exception e){
sc.clearJobGroup();
return new InterpreterResult(Code.ERROR, e.getMessage());
}

Expand Down Expand Up @@ -108,13 +124,18 @@ public InterpreterResult interpret(String st) {
}
msg += "\n";
}

return new InterpreterResult(Code.SUCCESS, "%table "+msg);
InterpreterResult ret = new InterpreterResult(Code.SUCCESS, "%table "+msg);
sc.clearJobGroup();
return ret;
}

@Override
public void cancel() {
findSpark();
SQLContext sqlc = ((SparkInterpreter)sparkClassloaderRepl.getInnerRepl()).getSQLContext();
SparkContext sc = sqlc.sparkContext();

sc.cancelJobGroup(jobGroup);
}

@Override
Expand All @@ -127,4 +148,54 @@ public FormType getFormType() {
return FormType.SIMPLE;
}

@Override
public int getProgress() {
// howto get progress from sparkListener? check this out
// https://github.com/apache/spark/blob/v1.0.1/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala

JobProgressListener sparkListener = ((SparkInterpreter)sparkClassloaderRepl.getInnerRepl()).getJobProgressListener();
if(sparkListener==null) return -1;

int completedTasks = 0;
int totalTasks = 0;

SQLContext sqlc = ((SparkInterpreter)sparkClassloaderRepl.getInnerRepl()).getSQLContext();
SparkContext sc = sqlc.sparkContext();

DAGScheduler scheduler = sc.dagScheduler();
HashSet<ActiveJob> jobs = scheduler.activeJobs();
Iterator<ActiveJob> it = jobs.iterator();
while(it.hasNext()) {
ActiveJob job = it.next();
String g = (String) job.properties().get("spark.jobGroup.id");
if (jobGroup.equals(g)) {
int[] progressInfo = getProgressFromStage(sparkListener, job.finalStage());
totalTasks+=progressInfo[0];
completedTasks+=progressInfo[1];
}
}

if(totalTasks==0) return 0;
return completedTasks*100/totalTasks;
}

private int [] getProgressFromStage(JobProgressListener sparkListener, Stage stage){
int numTasks = stage.numTasks();
int completedTasks = 0;
Object completedTaskInfo = JavaConversions.asJavaMap(sparkListener.stageIdToTasksComplete()).get(stage.id());
if(completedTaskInfo!=null) {
completedTasks += (int) completedTaskInfo;
}
List<Stage> parents = JavaConversions.asJavaList(stage.parents());
if(parents!=null) {
for(Stage s : parents) {
int[] p = getProgressFromStage(sparkListener, s);
numTasks+= p[0];
completedTasks+= p[1];
}
}

return new int[]{numTasks, completedTasks};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ public static enum OP {
PARAGRAPH, // [s-c] paragraph info
// @param paragraph serialized paragraph object

PROGRESS, // [s-c] progress update
// @param id paragraph id
// @param progress percentage progress

NEW_NOTE, // [c-s] create new notebook
DEL_NOTE, // [c-s] delete notebook
// @param id note id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -381,6 +380,12 @@ public void afterStatusChange(Job job, Status before, Status after) {
// note.getParagraph(paragraphId)));
}
}

@Override
public void onProgressUpdate(Job job, int progress){
System.err.println("NotebookServer: JOb pprogresss update "+progress);
broadcastNote(note.id(), new Message(OP.PROGRESS).put("id", paragraphId).put("progress", job.progress()));
}
});
}
}
2 changes: 2 additions & 0 deletions zeppelin-web/app/scripts/controllers/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ angular.module('zeppelinWebApp')
$rootScope.$emit('setNoteMenu', data.notes);
} else if (op === 'PARAGRAPH') {
$rootScope.$emit('updateParagraph', data);
} else if (op === 'PROGRESS') {
$rootScope.$emit('updateProgress', data);
}
});

Expand Down
10 changes: 10 additions & 0 deletions zeppelin-web/app/scripts/controllers/paragraph.js
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,16 @@ angular.module('zeppelinWebApp')
return $scope.editor.getValue();
};

$scope.getProgress = function(){
return ($scope.currentProgress) ? $scope.currentProgress : 0;
};

$rootScope.$on('updateProgress', function(event, data) {
if (data.id === $scope.paragraph.id) {
$scope.currentProgress = data.progress
}
});

$scope.getResultType = function(paragraph){
var pdata = (paragraph) ? paragraph : $scope.paragraph;
if (pdata.result && pdata.result.type) {
Expand Down
4 changes: 4 additions & 0 deletions zeppelin-web/app/styles/notebook.css
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@
text-align: right;
}

.paragraph .status div {
display: inline;
}

.disable {
opacity:0.4!important;
pointer-events: none;
Expand Down
11 changes: 10 additions & 1 deletion zeppelin-web/app/views/paragraph.html
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@
</div>
</div>

<div id="{{paragraph.id}}_status" class="status">
<div ng-if="paragraph.status == 'RUNNING'" class="">
Running ... {{getProgress()}}%.
</div>

<div>
Shift+Enter to Run. {{paragraph.status}}
</div>
</div>

<form id="{{paragraph.id}}_form" role="form" class="form-horizontal" style="margin-top: 15px;">
<div class="form-group"
ng-repeat="formulaire in paragraph.settings.forms"
Expand Down Expand Up @@ -131,6 +141,5 @@
</nvd3new>

</div>
<div id="{{paragraph.id}}_status" class="status">Shift+Enter to Run. {{paragraph.status}}</div>
</div>
</div>
2 changes: 1 addition & 1 deletion zeppelin-web/bower.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"name": "zeppelin-web2",
"name": "zeppelin-web",
"version": "0.0.0",
"dependencies": {
"angular": "1.2.16",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,18 @@ public FormType getFormType() {
Thread.currentThread().setContextClassLoader(oldcl);
}
}

@Override
public int getProgress() {
ClassLoader oldcl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(cl);
try {
return intp.getProgress();
} catch (Exception e){
throw new InterpreterException(e);
} finally {
cl = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(oldcl);
}
}
}
Loading

0 comments on commit 41381f5

Please sign in to comment.