Skip to content

Commit

Permalink
[RPC] android process isolation/watchdog (apache#1387)
Browse files Browse the repository at this point in the history
  • Loading branch information
eqy authored and tqchen committed Jul 8, 2018
1 parent 8a90f87 commit 0d673a9
Show file tree
Hide file tree
Showing 8 changed files with 242 additions and 48 deletions.
12 changes: 8 additions & 4 deletions apps/android_rpc/app/src/main/AndroidManifest.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
package="ml.dmlc.tvm.tvmrpc" >

<uses-permission android:name="android.permission.INTERNET" />

<application
android:allowBackup="true"
android:label="@string/app_name"
android:supportsRtl="true"
android:theme="@style/AppTheme" >
android:theme="@style/AppTheme"
android:icon="@mipmap/ic_launcher" >
<activity
android:name=".MainActivity"
android:label="@string/app_name"
Expand All @@ -17,8 +20,9 @@
<category android:name="android.intent.category.LAUNCHER" />
</intent-filter>
</activity>
<service android:name=".RPCService"
android:process=":RPCServiceProcess"
android:permission="android.permission.BIND_JOB_SERVICE" />
</application>

<uses-permission android:name="android.permission.INTERNET" />

</manifest>
</manifest>
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,18 @@
import android.os.Bundle;
import android.os.Handler;
import android.os.Message;

import android.support.v7.app.AppCompatActivity;
import android.support.v7.widget.Toolbar;
import android.widget.CompoundButton;
import android.widget.EditText;
import android.widget.Switch;
import android.content.Intent;


public class MainActivity extends AppCompatActivity {
static final int MSG_RPC_ERROR = 0;
static final String MSG_RPC_ERROR_DATA_KEY = "msg_rpc_error_data_key";

private RPCProcessor tvmServerWorker;
@SuppressLint("HandlerLeak")
private final Handler rpcHandler = new Handler() {
@Override
public void dispatchMessage(Message msg) {
Switch switchConnect = findViewById(R.id.switch_connect);
if (msg.what == MSG_RPC_ERROR && switchConnect.isChecked()) {
// switch off and show alert dialog.
switchConnect.setChecked(false);
String msgBody = msg.getData().getString(MSG_RPC_ERROR_DATA_KEY);
showDialog("Error", msgBody);
}
}
};

private RPCWatchdog watchdog;

private void showDialog(String title, String msg) {
AlertDialog.Builder builder = new AlertDialog.Builder(this);
Expand All @@ -71,10 +59,6 @@ protected void onCreate(Bundle savedInstanceState) {
Toolbar toolbar = findViewById(R.id.toolbar);
setSupportActionBar(toolbar);

tvmServerWorker = new RPCProcessor(rpcHandler);
tvmServerWorker.setDaemon(true);
tvmServerWorker.start();

Switch switchConnect = findViewById(R.id.switch_connect);
switchConnect.setOnCheckedChangeListener(new CompoundButton.OnCheckedChangeListener() {
@Override
Expand All @@ -88,25 +72,33 @@ public void onCheckedChanged(CompoundButton buttonView, boolean isChecked) {
}
}
});

enableInputView(true);

}

@Override
protected void onDestroy() {
super.onDestroy();
tvmServerWorker.disconnect();
if (watchdog != null) {
watchdog.disconnect();
watchdog = null;
}
}

private void connectProxy() {
EditText edProxyAddress = findViewById(R.id.input_address);
EditText edProxyPort = findViewById(R.id.input_port);
EditText edAppKey = findViewById(R.id.input_key);

final String proxyHost = edProxyAddress.getText().toString();
final int proxyPort = Integer.parseInt(edProxyPort.getText().toString());
final String key = edAppKey.getText().toString();

tvmServerWorker.connect(proxyHost, proxyPort, key);
System.err.println("creating watchdog thread...");
watchdog = new RPCWatchdog(proxyHost, proxyPort, key, this);

System.err.println("starting watchdog thread...");
watchdog.start();

SharedPreferences pref = getApplicationContext().getSharedPreferences("RPCProxyPreference", Context.MODE_PRIVATE);
SharedPreferences.Editor editor = pref.edit();
Expand All @@ -117,8 +109,10 @@ private void connectProxy() {
}

private void disconnect() {
tvmServerWorker.disconnect();
System.err.println("Disconnected.");
if (watchdog != null) {
watchdog.disconnect();
watchdog = null;
}
}

private void enableInputView(boolean enable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ class RPCProcessor extends Thread {
private String host;
private int port;
private String key;

private boolean running = false;
private long startTime;
private ConnectProxyServerProcessor currProcessor;
private final Handler uiHandler;
private boolean kill = false;
public static final int SESSION_TIMEOUT = 30000;

static final SocketFileDescriptorGetter socketFdGetter
= new SocketFileDescriptorGetter() {
Expand All @@ -46,9 +47,18 @@ public int get(Socket socket) {
return ParcelFileDescriptor.fromSocket(socket).getFd();
}
};
// callback to initialize the start time of an rpc session
class setTimeCallback implements Runnable {
private RPCProcessor rPCProcessor;

public setTimeCallback(RPCProcessor rPCProcessor) {
this.rPCProcessor = rPCProcessor;
}

RPCProcessor(Handler uiHandler) {
this.uiHandler = uiHandler;
@Override
public void run() {
rPCProcessor.setStartTime();
}
}

@Override public void run() {
Expand All @@ -61,23 +71,51 @@ public int get(Socket socket) {
} catch (InterruptedException e) {
}
}
currProcessor = new ConnectProxyServerProcessor(host, port, key, socketFdGetter);
}
try {
currProcessor.run();
} catch (Throwable e) {
disconnect();
// turn connect switch off.
Message message = new Message();
message.what = MainActivity.MSG_RPC_ERROR;
Bundle bundle = new Bundle();
bundle.putString(MainActivity.MSG_RPC_ERROR_DATA_KEY, e.getMessage());
message.setData(bundle);
uiHandler.sendMessage(message);
// if kill, we do nothing and wait for app restart
// to prevent race where timedOut was reported but restart has not
// happened yet
if (kill) {
System.err.println("waiting for restart...");
currProcessor = null;
}
else {
startTime = 0;
currProcessor = new ConnectProxyServerProcessor(host, port, key, socketFdGetter);
currProcessor.setStartTimeCallback(new setTimeCallback(this));
}
}
if (currProcessor != null)
currProcessor.run();
}
}

/**
* check if the current RPCProcessor has timed out while in a session
*/
synchronized boolean timedOut(long curTime) {
if (startTime == 0) {
return false;
}
else if ((curTime - startTime) > SESSION_TIMEOUT) {
System.err.println("set kill flag...");
kill = true;
return true;
}
return false;
}

/**
* set the start time of the current RPC session (used in callback)
*/
synchronized void setStartTime() {
startTime = System.currentTimeMillis();
System.err.println("start time set to: " + startTime);
}

synchronized long getStartTime() {
return startTime;
}

/**
* Disconnect from the proxy server.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package ml.dmlc.tvm.tvmrpc;

import android.app.Service;
import android.os.IBinder;
import android.content.Intent;

public class RPCService extends Service {
private String host;
private int port;
private String key;
private int intentNum;
private RPCProcessor tvmServerWorker;

@Override
public int onStartCommand(Intent intent, int flags, int startId) {
synchronized(this) {
System.err.println("start command intent");
// use an alternate kill to prevent android from recycling the
// process
if (intent.getBooleanExtra("kill", false)) {
System.err.println("rpc service received kill...");
System.exit(0);
}

this.host = intent.getStringExtra("host");
this.port = intent.getIntExtra("port", 9090);
this.key = intent.getStringExtra("key");
System.err.println("got the following: " + this.host + ", " + this.port + ", " + this.key);
System.err.println("intent num: " + this.intentNum);

if (tvmServerWorker == null) {
System.err.println("service created worker...");
tvmServerWorker = new RPCProcessor();
tvmServerWorker.setDaemon(true);
tvmServerWorker.start();
tvmServerWorker.connect(this.host, this.port, this.key);
}
else if (tvmServerWorker.timedOut(System.currentTimeMillis())) {
System.err.println("rpc service timed out, killing self...");
System.exit(0);
}
this.intentNum++;
}
// do not restart unless watchdog/app expliciltly does so
return START_NOT_STICKY;
}

@Override
public IBinder onBind(Intent intent) {
System.err.println("rpc service got onBind, doing nothing...");
return null;
}

@Override
public void onCreate() {
System.err.println("rpc service onCreate...");
}

@Override
public void onDestroy() {
tvmServerWorker.disconnect();
System.err.println("rpc service onDestroy...");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package ml.dmlc.tvm.tvmrpc;

import android.content.Context;
import android.content.Intent;

/**
* Watchdog for RPCService
*/
class RPCWatchdog extends Thread {
public static final int WATCHDOG_POLL_INTERVAL = 5000;
private String host;
private int port;
private String key;
private Context context;
private boolean done = false;

public RPCWatchdog(String host, int port, String key, Context context) {
super();
this.host = host;
this.port = port;
this.key = key;
this.context = context;
}

/**
* Polling loop to check on RPCService status
*/
@Override public void run() {
try {
while (true) {
synchronized (this) {
if (done) {
System.err.println("watchdog done, returning...");
return;
}
else {
System.err.println("polling rpc service...");
System.err.println("sending rpc service intent...");
Intent intent = new Intent(context, RPCService.class);
intent.putExtra("host", host);
intent.putExtra("port", port);
intent.putExtra("key", key);
// will implicilty restart the service if it died
context.startService(intent);
}
}
Thread.sleep(WATCHDOG_POLL_INTERVAL);
}
} catch (InterruptedException e) {
}
}

/**
* Disconnect from the proxy server.
*/
synchronized void disconnect() {
// kill service
System.err.println("watchdog disconnect call...");
System.err.println("stopping rpc service...");
done = true;
Intent intent = new Intent(context, RPCService.class);
intent.putExtra("kill", true);
context.startService(intent);
}
}
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit 0d673a9

Please sign in to comment.