Skip to content

Commit

Permalink
Add option to use single actor system for local execution. Use local …
Browse files Browse the repository at this point in the history
…connection manager if a single task manager is used for local execution. Remove synchronized blcok in getReceiverList of ChannelManager which effectively serialized the connection lookup calls of a single task manager.

Fix Java6 problem that File has no method toPath
  • Loading branch information
tillrohrmann committed Dec 18, 2014
1 parent a0f7785 commit c93d9ea
Show file tree
Hide file tree
Showing 22 changed files with 181 additions and 76 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,3 @@ tmp
.DS_Store
_site
docs/api
atlassian-ide-plugin.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public void testExternalProgram() {
try {
Configuration config = new Configuration();
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 4);
testMiniCluster = new ForkableFlinkMiniCluster(config);
testMiniCluster = new ForkableFlinkMiniCluster(config, false);

String jarFile = JAR_FILE;
String testData = getClass().getResource(TEST_DATA_FILE).toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static void runOnMiniCluster(JobGraph jobGraph, int degreeOfParallelism,
}

try {
exec = new LocalFlinkMiniCluster(configuration);
exec = new LocalFlinkMiniCluster(configuration, true);

Client client = new Client(new InetSocketAddress("localhost", exec.getJobManagerRPCPort()),
configuration, ClusterUtil.class.getClassLoader());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ class ApplicationClient(appId: ApplicationId, port: Int, yarnClient: YarnClient,

writeYarnProperties(address)

jobManager = Some(AkkaUtils.getReference(JobManager.getAkkaURL(address))(system, timeout))
jobManager = Some(AkkaUtils.getReference(JobManager.getRemoteAkkaURL(address))(system,
timeout))
jobManager.get ! RegisterMessageListener

pollingTimer foreach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,39 @@ object YarnUtils {
AkkaUtils.createActorSystem(akkaConfig)
}

def createActorSystem: ActorSystem = {
def createActorSystem(): ActorSystem = {
val akkaConfig = ConfigFactory.parseString(AkkaUtils.getDefaultActorSystemConfigString +
getConfigString)

AkkaUtils.createActorSystem(akkaConfig)
}

def getConfigString: String = {
s"""akka.loglevel = "INFO"
|akka.stdout-loglevel = "INFO"
|akka.log-dead-letters-during-shutdown = off
|akka.log-dead-letters = off
|akka.remote.log-remote-lifecycle-events=off
|""".stripMargin
"""
|akka{
| loglevel = "INFO"
| stdout-loglevel = "INFO"
| log-dead-letters-during-shutdown = off
| log-dead-letters = off
|
| actor {
| provider = "akka.remote.RemoteActorRefProvider"
| }
|
| remote{
| log-remote-lifecycle-events = off
|
| netty{
| tcp{
| port = 0
| transport-class = "akka.remote.transport.netty.NettyTransport"
| tcp-nodelay = on
| maximum-frame-size = 1MB
| execution-pool-size = 4
| }
| }
| }
|}""".stripMargin
}

def startActorSystemAndTaskManager(args: Array[String]): (ActorSystem, ActorRef) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ public void start() throws Exception {
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getTaskManagerNumSlots());
configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, isDefaultOverwriteFiles());
// start it up
this.flink = new LocalFlinkMiniCluster(configuration);
this.flink = new LocalFlinkMiniCluster(configuration, true);
} else {
throw new IllegalStateException("The local executor was already started.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,15 +380,9 @@ private EnvelopeReceiverList getReceiverList(JobID jobID, ChannelID sourceChanne

while (true) {
ConnectionInfoLookupResponse lookupResponse;
synchronized (this.channelLookup) {
try{
lookupResponse = AkkaUtils.<JobManagerMessages.ConnectionInformation>ask(channelLookup,
new JobManagerMessages.LookupConnectionInformation(connectionInfo, jobID,
sourceChannelID), timeout).response();
}catch(IOException ioe) {
throw ioe;
}
}
lookupResponse = AkkaUtils.<JobManagerMessages.ConnectionInformation>ask(channelLookup,
new JobManagerMessages.LookupConnectionInformation(connectionInfo, jobID,
sourceChannelID), timeout).response();

if (lookupResponse.receiverReady()) {
receiverList = new EnvelopeReceiverList(lookupResponse);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ object AkkaUtils {
createActorSystem(getDefaultActorSystemConfig)
}

def createLocalActorSystem(): ActorSystem = {
createActorSystem(getDefaultLocalActorSystemConfig)
}

def createActorSystem(akkaConfig: Config): ActorSystem = {
ActorSystem.create("flink", akkaConfig)
}
Expand Down Expand Up @@ -133,20 +137,47 @@ object AkkaUtils {
getDefaultActorSystemConfigString + configString
}

def getLocalConfigString(configuration: Configuration): String = {
val akkaThroughput = configuration.getInteger(ConfigConstants.AKKA_DISPATCHER_THROUGHPUT,
ConfigConstants.DEFAULT_AKKA_DISPATCHER_THROUGHPUT)
val lifecycleEvents = configuration.getBoolean(ConfigConstants.AKKA_LOG_LIFECYCLE_EVENTS,
ConfigConstants.DEFAULT_AKKA_LOG_LIFECYCLE_EVENTS)

val logLifecycleEvents = if (lifecycleEvents) "on" else "off"

val logLevel = configuration.getString(ConfigConstants.AKKA_LOG_LEVEL,
ConfigConstants.DEFAULT_AKKA_LOG_LEVEL)

val configString =
s"""
|akka {
| loglevel = $logLevel
| stdout-loglevel = $logLevel
|
| log-dead-letters = $logLifecycleEvents
| log-dead-letters-during-shutdown = $logLifecycleEvents
|
| actor{
| default-dispatcher{
| executor = "default-executor"
|
| throughput = ${akkaThroughput}
|
| fork-join-executor {
| parallelism-factor = 2.0
| }
| }
| }
|
|}
""".stripMargin

getDefaultLocalActorSystemConfigString + configString
}

def getDefaultActorSystemConfigString: String = {
"""
val config = """
|akka {
| daemonic = on
|
| loggers = ["akka.event.slf4j.Slf4jLogger"]
| logger-startup-timeout = 30s
| loglevel = "WARNING"
| logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
| stdout-loglevel = "WARNING"
| jvm-exit-on-fatal-error = off
| log-config-on-start = on
| serialize-messages = on
|
| actor {
| provider = "akka.remote.RemoteActorRefProvider"
| }
Expand All @@ -164,7 +195,26 @@ object AkkaUtils {
| }
|}
""".stripMargin
}

getDefaultLocalActorSystemConfigString + config
}

def getDefaultLocalActorSystemConfigString: String = {
"""
|akka {
| daemonic = on
|
| loggers = ["akka.event.slf4j.Slf4jLogger"]
| logger-startup-timeout = 30s
| loglevel = "DEBUG"
| logging-filter = "akka.event.slf4j.Slf4jLoggingFilter"
| stdout-loglevel = "DEBUG"
| jvm-exit-on-fatal-error = off
| log-config-on-start = off
| serialize-messages = on
|}
""".stripMargin
}

// scalastyle:off line.size.limit

Expand Down Expand Up @@ -347,6 +397,10 @@ object AkkaUtils {
ConfigFactory.parseString(getDefaultActorSystemConfigString)
}

def getDefaultLocalActorSystemConfig = {
ConfigFactory.parseString(getDefaultLocalActorSystemConfigString)
}

def getChild(parent: ActorRef, child: String)(implicit system: ActorSystem, timeout:
FiniteDuration): ActorRef = {
Await.result(system.actorSelection(parent.path / child).resolveOne()(timeout), timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ object JobClient{
"configuration.")
}

JobManager.getAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort)
JobManager.getRemoteAkkaURL(jobManagerAddress + ":" + jobManagerRPCPort)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ Actor with ActorLogMessages with ActorLogging with WrapAsScala {

Execution.timeout = timeout;

log.info("Starting job manager.")
log.info(s"Starting job manager at ${self.path}.")

val (archiveCount,
profiling,
Expand Down Expand Up @@ -520,7 +520,7 @@ object JobManager {
actorSystem.actorOf(props, JOB_MANAGER_NAME)
}

def getAkkaURL(address: String): String = {
def getRemoteAkkaURL(address: String): String = {
s"akka.tcp://flink@${address}/user/${JOB_MANAGER_NAME}"
}

Expand All @@ -541,6 +541,6 @@ object JobManager {

def getJobManager(address: InetSocketAddress)(implicit system: ActorSystem, timeout:
FiniteDuration): ActorRef = {
AkkaUtils.getReference(getAkkaURL(address.getHostName + ":" + address.getPort))
AkkaUtils.getReference(getRemoteAkkaURL(address.getHostName + ":" + address.getPort))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit

import akka.pattern.ask
import akka.actor.{ActorRef, ActorSystem}
import com.typesafe.config.{ConfigFactory, Config}
import com.typesafe.config.{ConfigFactory}
import org.apache.flink.configuration.{ConfigConstants, Configuration}
import org.apache.flink.runtime.akka.AkkaUtils
import org.apache.flink.runtime.messages.TaskManagerMessages.NotifyWhenRegisteredAtJobManager
Expand All @@ -31,7 +31,8 @@ import org.slf4j.LoggerFactory
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Future, Await}

abstract class FlinkMiniCluster(userConfiguration: Configuration) {
abstract class FlinkMiniCluster(userConfiguration: Configuration,
val singleActorSystem: Boolean) {
import FlinkMiniCluster._

val HOSTNAME = "localhost"
Expand All @@ -41,14 +42,24 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {

val configuration = generateConfiguration(userConfiguration)

if(singleActorSystem){
configuration.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, "akka://flink/user/jobmanager")
}

val jobManagerActorSystem = startJobManagerActorSystem()
val jobManagerActor = startJobManager(jobManagerActorSystem)

val numTaskManagers = configuration.getInteger(ConfigConstants
.LOCAL_INSTANCE_MANAGER_NUMBER_TASK_MANAGER, 1)

val actorSystemsTaskManagers = for(i <- 0 until numTaskManagers) yield {
val actorSystem = startTaskManagerActorSystem(i)
val actorSystem = if(singleActorSystem) {
jobManagerActorSystem
}
else{
startTaskManagerActorSystem(i)
}

(actorSystem, startTaskManager(i)(actorSystem))
}

Expand All @@ -66,7 +77,12 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
val port = configuration.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, ConfigConstants
.DEFAULT_JOB_MANAGER_IPC_PORT)

AkkaUtils.getConfigString(HOSTNAME, port, configuration)
if(singleActorSystem){
AkkaUtils.getLocalConfigString(configuration)
}else{
AkkaUtils.getConfigString(HOSTNAME, port, configuration)
}

}

def startJobManagerActorSystem(): ActorSystem = {
Expand Down Expand Up @@ -111,13 +127,23 @@ abstract class FlinkMiniCluster(userConfiguration: Configuration) {
}

def shutdown(): Unit = {
taskManagerActorSystems foreach { _.shutdown() }
if(!singleActorSystem){
taskManagerActorSystems foreach {
_.shutdown()
}
}

jobManagerActorSystem.shutdown()
}

def awaitTermination(): Unit = {
jobManagerActorSystem.awaitTermination()
taskManagerActorSystems foreach { _.awaitTermination()}

if(!singleActorSystem) {
taskManagerActorSystems foreach {
_.awaitTermination()
}
}
}

def waitForTaskManagersToBeRegistered(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@ import org.apache.flink.runtime.taskmanager.TaskManager
import org.apache.flink.runtime.util.EnvironmentInformation
import org.slf4j.LoggerFactory

class LocalFlinkMiniCluster(userConfiguration: Configuration) extends
FlinkMiniCluster(userConfiguration){
class LocalFlinkMiniCluster(userConfiguration: Configuration, singleActorSystem: Boolean = true)
extends FlinkMiniCluster(userConfiguration, singleActorSystem){
import LocalFlinkMiniCluster._

val jobClientActorSystem = AkkaUtils.createActorSystem()
val jobClientActorSystem = if(singleActorSystem){
jobManagerActorSystem
}else{
AkkaUtils.createActorSystem()
}

var jobClient: Option[ActorRef] = None

override def generateConfiguration(userConfiguration: Configuration): Configuration = {
Expand Down Expand Up @@ -70,7 +75,7 @@ FlinkMiniCluster(userConfiguration){
}

val localExecution = if(numTaskManagers == 1){
false
true
}else{
false
}
Expand All @@ -87,6 +92,10 @@ FlinkMiniCluster(userConfiguration){
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, HOSTNAME)
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, getJobManagerRPCPort)

if(singleActorSystem){
config.setString(ConfigConstants.JOB_MANAGER_AKKA_URL, "akka://flink/user/jobmanager")
}

val jc = JobClient.startActorWithConfiguration(config)(jobClientActorSystem)
jobClient = Some(jc)
jc
Expand All @@ -101,11 +110,16 @@ FlinkMiniCluster(userConfiguration){

override def shutdown(): Unit = {
super.shutdown()
jobClientActorSystem.shutdown()

if(!singleActorSystem) {
jobClientActorSystem.shutdown()
}
}

override def awaitTermination(): Unit = {
jobClientActorSystem.awaitTermination()
if(!singleActorSystem) {
jobClientActorSystem.awaitTermination()
}
super.awaitTermination()
}

Expand Down
Loading

0 comments on commit c93d9ea

Please sign in to comment.