Skip to content

Commit

Permalink
Add filter of valid nodes and sort Akka Cluster seed nodes and modify…
Browse files Browse the repository at this point in the history
… failure handling of any invalid seeds (filodb#64)

* Added sort, eager parsing of configured seed addresses and fail fast to bootstrapper.

* Made all suggested changes from code review.
  • Loading branch information
Helena Edelson committed Oct 23, 2017
1 parent 2d6206e commit 2b2b292
Show file tree
Hide file tree
Showing 12 changed files with 257 additions and 136 deletions.
2 changes: 1 addition & 1 deletion akka-bootstrapper/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ akka-bootstrapper {
seed-discovery {

# Class name of the discovery implementation
class = "filodb.akkabootstrapper.WhitelistAkkaClusterSeedDiscovery"
class = "filodb.akkabootstrapper.WhitelistClusterSeedDiscovery"

# DiscoveryTimeoutException will be thrown after this time if the cluster is not joined
timeout = 10 minutes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import akka.http.scaladsl.server.Route
import akka.pattern.ask
import akka.util.Timeout
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json._
import com.typesafe.scalalogging.StrictLogging
import spray.json._

object AkkaBootstrapperMessages {

Expand Down Expand Up @@ -47,7 +47,7 @@ class AkkaBootstrapper(protected val cluster: Cluster) extends StrictLogging wit
private[filodb] val settings = new AkkaBootstrapperSettings(cluster.system.settings.config)

/**
* Every instance that wants to join the akka cluster call this method.
* Every node needing to manually join the akka cluster should call this method.
* This is a blocking call and does not return until akka cluster is formed or joined.
*
* It first checks if a cluster already exists to join by invoking the seeds endpoint.
Expand All @@ -64,7 +64,7 @@ class AkkaBootstrapper(protected val cluster: Cluster) extends StrictLogging wit
*/
@throws(classOf[DiscoveryTimeoutException])
def bootstrap(): Unit = {
val seeds = AkkaClusterSeedDiscovery(cluster, settings).discoverAkkaClusterSeeds
val seeds = ClusterSeedDiscovery(cluster, settings).discoverClusterSeeds
logger.info(s"Joining seeds $seeds")
cluster.joinSeedNodes(seeds)
logger.info("Exited from AkkaBootstrapper.joinSeedNodes")
Expand All @@ -84,14 +84,14 @@ class AkkaBootstrapper(protected val cluster: Cluster) extends StrictLogging wit
* @return The akka http Route that should be started by the caller once the method returns.
*/
def getAkkaHttpRoute(membershipActor: Option[ActorRef] = None): Route = {
val clusterMembershipTracker = membershipActor.getOrElse(system.actorOf(Props[ClusterMembershipTracker],
name = "clusterListener"))
val seedsContextPath = settings.seedsPath
val clusterMembershipTracker = membershipActor.getOrElse(
system.actorOf(Props[ClusterMembershipTracker], name = "clusterListener"))

import AkkaBootstrapperMessages._

implicit val executionContext = system.dispatcher
val route =
path(seedsContextPath) {
path(settings.seedsPath) {
get {
implicit val timeout = Timeout(2 seconds) // TODO configurable timeout
val seedNodes = (clusterMembershipTracker ask ClusterMembershipRequest).mapTo[ClusterMembershipResponse]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package filodb.akkabootstrapper

import java.net.MalformedURLException
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.util.Try

import akka.actor.AddressFromURIString
import com.typesafe.config.Config
import com.typesafe.scalalogging.StrictLogging

Expand Down Expand Up @@ -41,22 +39,4 @@ final class AkkaBootstrapperSettings(val config: Config) extends StrictLogging {

lazy val seedsWhitelist: List[String] = bootstrapper.getStringList("whitelist.seeds").asScala.toList

/** Eagerly attempts to create addresses from each whitelist seed config
* to fail fast if MalformedURIException thrown. Logs any invalids,
* and removes from the returned list.
*/
private[akkabootstrapper] lazy val seeds = {
val validate = (s: String) =>
try Right(AddressFromURIString(s)) catch {
case e: MalformedURLException =>
logger.error(s"Invalid configured whitelist seed node: $s.", e)
Left(s)
}

seedsWhitelist.map(validate).partition(_.isRight)
}

private[filodb] lazy val invalidSeeds: List[String] =
seeds._2.collect { case Left(string) => string }

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,23 @@ import com.typesafe.scalalogging.StrictLogging
import spray.json._
import scalaj.http.Http

abstract class AkkaClusterSeedDiscovery(val cluster: Cluster,
val settings: AkkaBootstrapperSettings
) extends StrictLogging with ClusterMembershipJsonSuppport {
/** Seed node strategy. Some implementations discover, some simply read from immutable config. */
abstract class ClusterSeedDiscovery(val cluster: Cluster,
val settings: AkkaBootstrapperSettings)
extends StrictLogging with ClusterMembershipJsonSuppport {

@throws(classOf[DiscoveryTimeoutException])
def discoverAkkaClusterSeeds: Seq[Address] = {
discoverExistingAkkaCluster match {
case Seq() => discoverPeersForNewAkkaCluster
def discoverClusterSeeds: Seq[Address] = {
discoverExistingCluster match {
case Seq() => discoverPeersForNewCluster
case nonEmpty: Seq[Address] => nonEmpty
}
}

@throws(classOf[DiscoveryTimeoutException])
protected def discoverPeersForNewAkkaCluster: Seq[Address]
protected def discoverPeersForNewCluster: Seq[Address]

protected def discoverExistingAkkaCluster: Seq[Address] = {
protected def discoverExistingCluster: Seq[Address] = {

val seedsEndpoint = settings.seedsBaseUrl + settings.seedsPath

Expand All @@ -52,17 +53,19 @@ abstract class AkkaClusterSeedDiscovery(val cluster: Cluster,
}


object AkkaClusterSeedDiscovery {
object ClusterSeedDiscovery {

def apply(cluster: Cluster, settings: AkkaBootstrapperSettings): AkkaClusterSeedDiscovery = {
val className = settings.seedDiscoveryClass
val args: Seq[(Class[_], AnyRef)] = Seq((cluster.getClass, cluster), (settings.getClass, settings))
cluster.system.dynamicAccess.createInstanceFor[AkkaClusterSeedDiscovery](className, args) match {
case Failure(e) =>
throw new IllegalArgumentException(s"Could not instantiate seed discovery class " +
s"${settings.seedDiscoveryClass}. Please check your configuration", e)
case Success(clazz) => clazz
}
/** Seed node strategy. Some implementations discover, some simply read them. */
def apply(cluster: Cluster, settings: AkkaBootstrapperSettings): ClusterSeedDiscovery = {
import settings.{seedDiscoveryClass => fqcn}

cluster.system.dynamicAccess.createInstanceFor[ClusterSeedDiscovery](
fqcn, Seq((cluster.getClass, cluster), (settings.getClass, settings))) match {
case Failure(e) =>
throw new IllegalArgumentException(
s"Could not instantiate seed discovery class $fqcn. Please check your configuration", e)
case Success(clazz) => clazz
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package filodb.akkabootstrapper

import akka.ConfigurationException

import scala.annotation.tailrec
import scala.collection.immutable.Seq

Expand All @@ -8,22 +10,24 @@ import akka.cluster.Cluster
import com.typesafe.scalalogging.StrictLogging
import org.xbill.DNS._

abstract class DnsSrvAkkaClusterSeedDiscovery(override val cluster: Cluster,
override val settings: AkkaBootstrapperSettings)
extends AkkaClusterSeedDiscovery(cluster, settings) {
abstract class DnsSrvClusterSeedDiscovery(override val cluster: Cluster,
override val settings: AkkaBootstrapperSettings)
extends ClusterSeedDiscovery(cluster, settings) {

val srvLookup = new Lookup(settings.serviceName, Type.SRV, DClass.IN)
val selfAddress = cluster.selfAddress
val simpleResolver = settings.resolverHost.map(r => new SimpleResolver(r))

simpleResolver.foreach { _.setPort(settings.resolverPort) }
simpleResolver.foreach { r => srvLookup.setResolver(r) }

override protected def discoverPeersForNewAkkaCluster: Seq[Address] = {
override protected def discoverPeersForNewCluster: Seq[Address] = {
logger.info("Discovering cluster peers by looking up SRV records from DNS for {}", settings.serviceName)
val startTime = System.currentTimeMillis()
discover(startTime)
}

// TODO: 2 Thread.sleep calls, function too long, 4 if/else cases is too many
@tailrec
private def discover(startTime: Long): Seq[Address] = {
val currentTime = System.currentTimeMillis()
Expand Down Expand Up @@ -77,39 +81,43 @@ abstract class DnsSrvAkkaClusterSeedDiscovery(override val cluster: Cluster,

}


/**
* This concrete implementation assumes that registration and de-registration with DNS is not required.
* This implementation assumes that registration and de-registration with DNS is not required.
* And it is done automatically by the deployment environment, for example Mesos DNS.
*/
final class SimpleDnsSrvAkkaClusterSeedDiscovery(cluster: Cluster,
settings: AkkaBootstrapperSettings)
extends DnsSrvAkkaClusterSeedDiscovery(cluster, settings)
final class SimpleDnsSrvClusterSeedDiscovery(cluster: Cluster,
settings: AkkaBootstrapperSettings)
extends DnsSrvClusterSeedDiscovery(cluster, settings)

/**
* This concrete implementation is used for local development. It does a registration
* This implementation is used for local development. It does a registration
* and de-registration with Consul DNS.
*/
final class ConsulAkkaClusterSeedDiscovery(cluster: Cluster,
settings: AkkaBootstrapperSettings)
extends DnsSrvAkkaClusterSeedDiscovery(cluster, settings) with StrictLogging {
final class ConsulClusterSeedDiscovery(cluster: Cluster,
settings: AkkaBootstrapperSettings)
extends DnsSrvClusterSeedDiscovery(cluster, settings) with StrictLogging {

private val defaultAddress = cluster.system.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
val port = defaultAddress.port.get
val host = defaultAddress.host.get

val registrationServiceName: String = settings.registrationServiceName
val serviceId = s"$registrationServiceName-$host-$port"
val consulClient = new ConsulClient(settings)
consulClient.register(serviceId, registrationServiceName, host, port)
logger.info(s"Registered with consul $host:$port as $registrationServiceName ")

Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
consulClient.deregister(serviceId)
logger.info(s"Deregistered $serviceId with consul")
}
})

defaultAddress match {
case Address(_, _, Some(host), Some(port)) =>
val registrationServiceName = settings.registrationServiceName
val consulClient = new ConsulClient(settings)
val serviceId = s"$registrationServiceName-$host-$port"
consulClient.register(serviceId, registrationServiceName, host, port)
logger.info(s"Registered with consul $host:$port as $registrationServiceName ")
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
consulClient.deregister(serviceId)
logger.info(s"Deregistered $serviceId with consul")
}
})
case _ =>
val context = s"""'host' and 'port' in $defaultAddress must be configured properly.
Check 'akka.remote.netty.tcp.{hostname,port}'."""
logger.error(context)
throw new ConfigurationException(context)
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package filodb.akkabootstrapper

import akka.actor.Address
import akka.cluster.Cluster

/** Collects invalid and valid seed nodes from configuration.
* Allows the user to decide error handling if any invalid are found.
*
* Can be extended for further validation behavior.
*/
trait SeedValidator {
self: ClusterSeedDiscovery =>

def cluster: Cluster

def settings: AkkaBootstrapperSettings

/** Collects invalid seed nodes. */
def invalidSeedNodes: List[String]

/** Collects valid seed nodes. */
def validSeedNodes: List[Address]

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package filodb.akkabootstrapper

import java.net.MalformedURLException

import scala.collection.immutable

import akka.actor.{Address, AddressFromURIString}
import akka.cluster.{Cluster, Member}

/**
* This implementation of discovery allows clients to whitelist nodes that form the
* cluster seeds. Essentially, this is just an adapter that allows for the simple
* implementation of `akka.cluster.Cluster.joinSeedNodes`.
*
* Collects invalid and valid seed nodes from configuration.
* Allows the user to decide error handling if any invalid are found.
*/
class WhitelistClusterSeedDiscovery(cluster: Cluster, settings: AkkaBootstrapperSettings)
extends ClusterSeedDiscovery(cluster, settings)
with SeedValidator { self: ClusterSeedDiscovery =>

/** Attempts to create addresses from a whitelist seed config. */
private val validate = (s: String) =>
try AddressFromURIString(s) catch { case e: MalformedURLException =>
logger.error("MalformedURLException: invalid cluster seed node [{}]", s)
s
}

private val validated = settings.seedsWhitelist.map(validate)

/** Collects invalid seed nodes. */
override val invalidSeedNodes: List[String] =
validated collect { case a: String => a }

/** Collects valid seed nodes. */
override val validSeedNodes: List[Address] =
validated collect { case a: Address => a }

/** Removes cluster self node unless it is in the head of the sorted list.
* First logs all invalid seeds first during validation, for full auditing,
* in `filodb.akkabootstrapper.WhitelistSeedValidator.validate`.
* Then raises exception to fail fast.
*/
override protected lazy val discoverPeersForNewCluster: immutable.Seq[Address] = {

if (invalidSeedNodes.nonEmpty) throw new MalformedURLException(
s"Detected ${invalidSeedNodes.size} invalid 'whitelist' seed node configurations.")

import Member.addressOrdering

val headOpt = validSeedNodes.headOption
val selfAddress = cluster.selfAddress
validSeedNodes
.filter(address => address != selfAddress || headOpt.contains(address))
.sorted
}
}
Loading

0 comments on commit 2b2b292

Please sign in to comment.