Skip to content

Commit

Permalink
Allow connection to environment daemon from another port
Browse files Browse the repository at this point in the history
  • Loading branch information
maxsklar committed Oct 3, 2023
1 parent efd3c79 commit 07dc32a
Show file tree
Hide file tree
Showing 4 changed files with 144 additions and 52 deletions.
10 changes: 6 additions & 4 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
allow-java-serialization = on
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 0
artery {
enabled = on
transport = tcp
canonical.hostname = "127.0.0.1"
canonical.port = 0
}
}
}
50 changes: 50 additions & 0 deletions src/main/scala/ai/newmap/interpreter/DaemonActor.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package ai.newmap.interpreter

import akka.actor.{Actor, ActorSystem, ExtendedActorSystem, Props}
import akka.pattern.after
import ai.newmap.util.{Success, Failure}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

class DaemonActor extends Actor {
private val envInterpreter = new EnvironmentInterpreter()

def getBoundPort(): Future[Int] = {
implicit val actorSystem: ActorSystem = context.system
context.system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress.port match {
case Some(port) => Future.successful(port)
case None => {
after(100.millis, actorSystem.scheduler) {
getBoundPort()
}
}
}
}

def receive = {
case (code: String) => sender() ! passCode(code)
case (_: Unit) => sender() ! {
val boundPort = Await.result(getBoundPort(), 400.millis)
//val boundPortStr = boundPort.toOption.map(_.toString).getOrElse("not avilable")
s"Connected to the Environment Daemon on port $boundPort at path ${self.path}"
}
case other => sender() ! s"Can't interpret: $other"
}

def passCode(code: String): DaemonActor.CodeResponse = {
val response = envInterpreter(code)

response match {
case Success(s) => DaemonActor.CodeResponse(s, (s == ":exit"))
case Failure(s) => DaemonActor.CodeResponse("Error:\n" + s)
}
}
}

object DaemonActor {
case class CodeResponse(
response: String,
timeToQuit: Boolean = false
)
}
111 changes: 73 additions & 38 deletions src/main/scala/ai/newmap/interpreter/EnvironmentDaemon.scala
Original file line number Diff line number Diff line change
@@ -1,55 +1,90 @@
package ai.newmap.interpreter

import akka.actor.{Actor, ActorSystem, ExtendedActorSystem, Props}
import akka.actor.{Actor, ActorRef, ActorSystem, ActorIdentity, ExtendedActorSystem, Identify, Props}
import akka.pattern.after
import akka.util.Timeout
import ai.newmap.util.{Success, Failure}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._
import java.net.Socket
import java.net.InetSocketAddress
import akka.pattern.ask
import scala.concurrent.ExecutionContext.Implicits.global

class EnvironmentDaemon() {
val system = ActorSystem("DaemonActorSystem")
val daemonActor = system.actorOf(Props[DaemonActor], name = "daemonActor")
}

class DaemonActor extends Actor {
private val envInterpreter = new EnvironmentInterpreter()

def getBoundPort(): Future[Int] = {
implicit val actorSystem: ActorSystem = context.system
context.system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress.port match {
case Some(port) => Future.successful(port)
case None => {
after(100.millis, actorSystem.scheduler) {
getBoundPort()
}
/**
* This handles either creating an environment daemon, or connecting to
* an existing daemon on another port.
*/
class EnvironmentDaemon(portOpt: Option[Int]) {
implicit val timeout: Timeout = 5.seconds

def isPortOpen(host: String, port: Int, timeoutMillis: Int = 2000): Boolean = {
val socket = new Socket()
try {
socket.connect(new InetSocketAddress(host, port), timeoutMillis)
true
} catch {
case _: Throwable => false
} finally {
try {
socket.close()
} catch {
case _: Throwable => // Ignore
}
}
}

def receive = {
case (code: String) => sender() ! passCode(code)
case (_: Unit) => sender() ! {
val boundPort = Await.result(getBoundPort(), 400.millis)
//val boundPortStr = boundPort.toOption.map(_.toString).getOrElse("not avilable")
s"Connected to the Environment Daemon on port $boundPort"
}
case other => sender() ! s"Can't interpret: $other"
}
val unit: Unit = ()

def passCode(code: String): DaemonActor.CodeResponse = {
val response = envInterpreter(code)
// This ensures that the environment daemon is initialized
val resultF: Future[(ActorRef, ActorSystem)] = {
val actorSystem: ActorSystem = ActorSystem("DaemonActorSystem")
for {
result: ActorRef <- portOpt match {
case None => {
val actorRef = actorSystem.actorOf(Props[DaemonActor], name = "daemonActor")

for {
pingResponse <- actorRef ? unit
} yield {
println(pingResponse)
actorRef
}
}
case Some(port) if !isPortOpen("localhost", port) => {
println(s"port $port not open, creating a new EnvironemntDaemon")
val actorRef = actorSystem.actorOf(Props[DaemonActor], name = "daemonActor")

for {
pingResponse <- actorRef ? unit
} yield {
println(pingResponse)
actorRef
}
}
case Some(port) => {
val daemonActorPath = s"akka://[email protected]:$port/user/daemonActor"
println(s"Connecting to path $daemonActorPath")
val selection = actorSystem.actorSelection(daemonActorPath)

response match {
case Success(s) => DaemonActor.CodeResponse(s, (s == ":exit"))
case Failure(s) => DaemonActor.CodeResponse("Error:\n" + s)
}
for {
pingResponse <- (selection ? unit)
actorF = (selection ? Identify(None)).mapTo[ActorIdentity]
actor <- actorF
} yield {
println(pingResponse)
actor.ref.get
}
}
}
} yield (result -> actorSystem)
}
}

object DaemonActor {
case class CodeResponse(
response: String,
timeToQuit: Boolean = false
)
val result = Await.result(resultF, 5.seconds)

val (daemon, actorSystemRef) = result

def terminate(): Unit = actorSystemRef.terminate()

def sendCode(code: String): Future[Any] = daemon ? code
}
25 changes: 15 additions & 10 deletions src/main/scala/ai/newmap/interpreter/repl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,14 @@ import org.jline.reader.impl.history.DefaultHistory
import org.jline.reader.{LineReader, LineReaderBuilder}
import org.jline.terminal.TerminalBuilder
import akka.util.Timeout
import akka.pattern.ask
import akka.actor.{Actor, ActorRef, ActorSystem, ActorIdentity, ExtendedActorSystem, Identify, Props}
import java.nio.file.Paths
import scala.concurrent.duration._
import scala.concurrent.Await
import scala.concurrent.{Await, Future}
import java.net.Socket
import java.net.InetSocketAddress
import scala.concurrent.ExecutionContext.Implicits.global
import com.typesafe.config.ConfigFactory

/**
* This is the main class that opens a repl to the appropriate environment daemon.
Expand All @@ -28,18 +32,19 @@ object repl extends App {
history.attach(lineReader)
history.load()

// This ensures that the environment daemon is initialized
val unit: Unit = ()
val envDaemon = new EnvironmentDaemon()
val pingResponseF = (envDaemon.daemonActor ? unit)
val pingResponse = Await.result(pingResponseF, 5.seconds)
println(pingResponse)
def getPortArg(args: Array[String]): Option[Int] = {
args.sliding(2, 1).collectFirst {
case Array("--port", port: String) if port.forall(_.isDigit) => port.toInt
}
}

val envDaemon = new EnvironmentDaemon(getPortArg(args))

var continue = true
while(continue) {
val code = lineReader.readLine("> ")

val responseF = envDaemon.daemonActor ? code
val responseF = envDaemon.sendCode(code)

val response = Await.result(responseF, 5.seconds)

Expand All @@ -55,5 +60,5 @@ object repl extends App {
}
}

envDaemon.system.terminate()
envDaemon.terminate()
}

0 comments on commit 07dc32a

Please sign in to comment.