Skip to content

Commit

Permalink
Merge pull request #1420 from acemilio/fixes/SPARTA-410
Browse files Browse the repository at this point in the history
[SPARTA-410] delete policy context when deleting policy
  • Loading branch information
aalfonso-stratio committed Feb 24, 2016
2 parents da01048 + f779cb8 commit 70d6366
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,25 @@ package com.stratio.sparkta.serving.api.service.http

import java.io.File
import javax.ws.rs.Path
import scala.concurrent.Await
import scala.util.{Failure, Success}

import akka.pattern.ask
import com.stratio.sparkta.serving.api.actor.PolicyActor._
import com.stratio.sparkta.serving.api.actor.SparkStreamingContextActor
import com.stratio.sparkta.serving.api.constants.HttpConstant
import com.stratio.sparkta.serving.core.constants.AkkaConstant
import com.stratio.sparkta.serving.core.helpers.PolicyHelper
import com.stratio.sparkta.serving.core.models._
import com.stratio.sparkta.serving.core.policy.status.{PolicyStatusActor, PolicyStatusEnum}
import com.wordnik.swagger.annotations._
import org.json4s.jackson.Serialization.write
import spray.http.HttpHeaders.`Content-Disposition`
import spray.http.{HttpResponse, StatusCodes}
import spray.httpx.marshalling.ToResponseMarshallable
import spray.routing._

import scala.concurrent.Await
import scala.util.{Failure, Success}
import com.stratio.sparkta.serving.api.actor.PolicyActor._
import com.stratio.sparkta.serving.api.actor.SparkStreamingContextActor
import com.stratio.sparkta.serving.api.constants.HttpConstant
import com.stratio.sparkta.serving.core.constants.AkkaConstant
import com.stratio.sparkta.serving.core.helpers.PolicyHelper
import com.stratio.sparkta.serving.core.models._
import com.stratio.sparkta.serving.core.policy.status.PolicyStatusActor.ResponseDelete
import com.stratio.sparkta.serving.core.policy.status.{PolicyStatusActor, PolicyStatusEnum}

@Api(value = HttpConstant.PolicyPath, description = "Operations over policies.")
trait PolicyHttpService extends BaseHttpService with SparktaSerializer {
Expand Down Expand Up @@ -239,10 +240,14 @@ trait PolicyHttpService extends BaseHttpService with SparktaSerializer {
path(HttpConstant.PolicyPath / Segment) { (id) =>
delete {
complete {
val future = supervisor ? new Delete(id)
Await.result(future, timeout.duration) match {
case Response(Failure(exception)) => throw exception
case Response(Success(_)) => HttpResponse(StatusCodes.OK)
for {
Response(Success(_)) <- (supervisor ? Delete(id)).mapTo[Response]
policyStatusActor = actors.get(AkkaConstant.PolicyStatusActor).get
deleteContextResponse <- (policyStatusActor ? PolicyStatusActor.Delete(id))
.mapTo[PolicyStatusActor.ResponseDelete]
} yield deleteContextResponse match {
case PolicyStatusActor.ResponseDelete(Success(_)) => StatusCodes.OK
case PolicyStatusActor.ResponseDelete(Failure(exception)) => throw exception
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,33 @@ with HttpServiceBaseTest {

"PolicyHttpService.remove" should {
"return an OK because the policy was deleted" in {
val policyStatusActorAutoPilot = Option(new TestActor.AutoPilot {
def run(sender: ActorRef, msg: Any): TestActor.AutoPilot =
msg match {
case PolicyStatusActor.Delete(id) =>
sender ! PolicyStatusActor.ResponseDelete(Success(true))
TestActor.NoAutoPilot
}
})
startAutopilot(Response(Success(getFragmentModel())))
startAutopilot(None, policyStatusActorTestProbe, policyStatusActorAutoPilot)
Delete(s"/${HttpConstant.PolicyPath}/id") ~> routes ~> check {
testProbe.expectMsgType[Delete]
status should be(StatusCodes.OK)
}
}
"return a 500 if there was any error" in {
startAutopilot(Response(Failure(new MockException())))
val policyStatusActorAutoPilot = Option(new TestActor.AutoPilot {
def run(sender: ActorRef, msg: Any): TestActor.AutoPilot =
msg match {
case PolicyStatusActor.Delete(id) =>
sender ! PolicyStatusActor.ResponseDelete(Success(true))
TestActor.NoAutoPilot
}
})
startAutopilot(Response(Failure(new MockException())))
startAutopilot(None, policyStatusActorTestProbe, policyStatusActorAutoPilot)
Delete(s"/${HttpConstant.PolicyPath}/id") ~> routes ~> check {
testProbe.expectMsgType[Delete]
status should be(StatusCodes.InternalServerError)
Expand All @@ -219,9 +238,19 @@ with HttpServiceBaseTest {
}
}
"return a 500 if there was any error" in {
val policyAutoPilot = Option(new TestActor.AutoPilot {
def run(sender: ActorRef, msg: Any): TestActor.AutoPilot =
msg match {
case SparkStreamingContextActor.Create(policy) =>
sender ! Success(getPolicyModel())
TestActor.NoAutoPilot
case Delete => TestActor.NoAutoPilot
}
})
startAutopilot(Response(Failure(new MockException())))
Delete(s"/${HttpConstant.PolicyPath}/id") ~> routes ~> check {
testProbe.expectMsgType[Delete]
startAutopilot(None, sparkStreamingTestProbe, policyAutoPilot)
Get(s"/${HttpConstant.PolicyPath}/run/id") ~> routes ~> check {
testProbe.expectMsgType[Find]
status should be(StatusCodes.InternalServerError)
}
}
Expand All @@ -238,8 +267,8 @@ with HttpServiceBaseTest {
}
"return a 500 if there was any error" in {
startAutopilot(Response(Failure(new MockException())))
Delete(s"/${HttpConstant.PolicyPath}/id") ~> routes ~> check {
testProbe.expectMsgType[Delete]
Get(s"/${HttpConstant.PolicyPath}/download/id") ~> routes ~> check {
testProbe.expectMsgType[Find]
status should be(StatusCodes.InternalServerError)
}
}
Expand Down
22 changes: 22 additions & 0 deletions serving-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,28 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.spray</groupId>
<artifactId>spray-testkit_${scala.binary.version}</artifactId>
<version>${spray.version}</version>
<exclusions>
<exclusion>
<groupId>org.specs2</groupId>
<artifactId>specs2_${scala.binary.version}</artifactId>
</exclusion>
<exclusion>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.spark-project.akka</groupId>
<artifactId>akka-testkit_${scala.binary.version}</artifactId>
<version>${akka.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.spray</groupId>
<artifactId>spray-can_${scala.binary.version}</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@
package com.stratio.sparkta.serving.core.policy.status

import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions
import scala.util.{Failure, Success, Try}

import akka.actor.{ActorRef, Actor}
import akka.actor.Actor
import akka.event.slf4j.SLF4JLogging
import akka.util.Timeout
import com.stratio.sparkta.serving.core.CuratorFactoryHolder
import com.stratio.sparkta.serving.core.constants.{AkkaConstant, AppConstant}
import com.stratio.sparkta.serving.core.models.{PolicyStatusModel, SparktaSerializer}
import com.stratio.sparkta.serving.core.policy.status.PolicyStatusActor._
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.recipes.cache.{NodeCache, NodeCacheListener}
import org.json4s.jackson.Serialization.{read, write}

import scala.collection.JavaConversions
import scala.concurrent.Future
import scala.util.{Failure, Success, Try}
import com.stratio.sparkta.serving.core.CuratorFactoryHolder
import com.stratio.sparkta.serving.core.constants.{AkkaConstant, AppConstant}
import com.stratio.sparkta.serving.core.exception.ServingCoreException
import com.stratio.sparkta.serving.core.models._
import com.stratio.sparkta.serving.core.policy.status.PolicyStatusActor._

class PolicyStatusActor(curatorFramework: CuratorFramework)
extends Actor with SLF4JLogging with SparktaSerializer {
Expand All @@ -42,16 +42,15 @@ class PolicyStatusActor(curatorFramework: CuratorFramework)
case FindAll => findAll
case Kill(name) => sender ! kill(name)
case AddListener(name, callback) => addListener(name, callback)
case Delete(id) => sender ! delete(id)
}



def kill(policyName: String): Boolean = {
implicit val timeout:Timeout = Timeout(2L, TimeUnit.SECONDS)
implicit val timeout: Timeout = Timeout(2L, TimeUnit.SECONDS)
val Stopped = true
val NotStopped = false
val pActor = context.actorSelection(s"${AkkaConstant.SparkStreamingContextActor}-${policyName.replace(" ", "_")}")
.resolveOne().value
.resolveOne().value

pActor match {
case Some(Success(actor)) => {
Expand Down Expand Up @@ -123,9 +122,24 @@ class PolicyStatusActor(curatorFramework: CuratorFramework)
}))
}

def delete(id: String): ResponseDelete =
ResponseDelete(
Try {
val statusPath = s"${AppConstant.ContextPath}/$id"
if (Option(curatorFramework.checkExists.forPath(statusPath)).isDefined) {
log.info(s">> Deleting context $id >")
curatorFramework.delete().forPath(statusPath)
} else {
throw new ServingCoreException(ErrorModel.toString(new ErrorModel(ErrorModel.CodeNotExistsPolicyWithId,
s"No policy context with id $id.")))
}
}
)

/**
* Adds a listener to one policy and executes the callback when it changed.
* @param id of the policy.
*
* @param id of the policy.
* @param callback with a function that will be executed.
*/
def addListener(id: String, callback: (PolicyStatusModel, NodeCache) => Unit): Unit = {
Expand All @@ -148,18 +162,22 @@ class PolicyStatusActor(curatorFramework: CuratorFramework)

object PolicyStatusActor {

case class Kill(name:String)
case class Kill(name: String)

case class Update(policyStatus: PolicyStatusModel)

case class Create(policyStatus: PolicyStatusModel)

case class AddListener(name: String, callback: (PolicyStatusModel, NodeCache) => Unit)

case class Delete(id: String)

case object FindAll

case class Response(policyStatus: Try[Seq[PolicyStatusModel]])

case class ResponseDelete(value: Try[Unit])

/**
* This map represents the state machine of one context.
*/
Expand All @@ -177,8 +195,9 @@ object PolicyStatusActor {

/**
* Validates with the StateMachine if one status could be changed to another.
*
* @param initialStatus that contains the currently status.
* @param finalStatus to change. If not one exception will be thrown.
* @param finalStatus to change. If not one exception will be thrown.
*/
def validate(initialStatus: Option[PolicyStatusEnum.Value], finalStatus: PolicyStatusEnum.Value): Unit = {
if (!StateMachine.exists(_._1 == initialStatus)) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/**
* Copyright (C) 2016 Stratio (http://stratio.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.stratio.sparkta.serving.core.policy.status

import scala.util.{Failure, Success}

import akka.actor.{ActorSystem, Props}
import akka.testkit._
import akka.util.Timeout
import org.apache.curator.framework.CuratorFramework
import org.apache.curator.framework.api._
import org.apache.zookeeper.data.Stat
import org.junit.runner.RunWith
import org.mockito.Mockito._
import org.scalatest._
import org.scalatest.junit.JUnitRunner
import org.scalatest.mock.MockitoSugar
import scala.concurrent.duration._

import com.stratio.sparkta.serving.core.constants.AppConstant
import com.stratio.sparkta.serving.core.exception.ServingCoreException
import com.stratio.sparkta.serving.core.policy.status.PolicyStatusActor.ResponseDelete

@RunWith(classOf[JUnitRunner])
class PolicyStatusActorTest extends TestKit(ActorSystem("FragmentActorSpec"))
with WordSpecLike
with Matchers
with ImplicitSender
with MockitoSugar {

val curatorFramework = mock[CuratorFramework]
val getChildrenBuilder = mock[GetChildrenBuilder]
val getDataBuilder = mock[GetDataBuilder]
val existsBuilder = mock[ExistsBuilder]
val createBuilder = mock[CreateBuilder]
val deleteBuilder = mock[DeleteBuilder]

val actor = system.actorOf(Props(new PolicyStatusActor(curatorFramework)))
implicit val timeout: Timeout = Timeout(15.seconds)
val id = "existingID"

"PolicyStatusActor" must {

"delete: returns success when deleting an existing ID " in {
when(curatorFramework
.checkExists())
.thenReturn(existsBuilder)
when(curatorFramework.checkExists()
.forPath(s"${AppConstant.ContextPath}/$id"))
.thenReturn(new Stat)
// scalastyle:off null

when(curatorFramework.delete())
.thenReturn(deleteBuilder)
when(curatorFramework.delete()
.forPath(s"${AppConstant.ContextPath}/$id"))
.thenReturn(null)

actor ! PolicyStatusActor.Delete(id)

expectMsg(ResponseDelete(Success(null)))
// scalastyle:on null

}

"delete: returns failure when deleting an unexisting ID " in {
// scalastyle:off null
when(curatorFramework
.checkExists())
.thenReturn(existsBuilder)
when(curatorFramework.checkExists()
.forPath(s"${AppConstant.ContextPath}/$id"))
.thenReturn(null)

actor ! PolicyStatusActor.Delete(id)

expectMsgAnyClassOf(classOf[ResponseDelete])
// scalastyle:on null

}

"delete: returns failure when deleting an existing ID and an error occurs while deleling" in {
// scalastyle:off null
when(curatorFramework
.checkExists())
.thenReturn(existsBuilder)
when(curatorFramework.checkExists()
.forPath(s"${AppConstant.ContextPath}/$id"))
.thenReturn(new Stat())

when(curatorFramework.delete())
.thenReturn(deleteBuilder)
when(curatorFramework.delete()
.forPath(s"${AppConstant.ContextPath}/$id"))
.thenThrow(new RuntimeException())
actor ! PolicyStatusActor.Delete(id)

expectMsgAnyClassOf(classOf[ResponseDelete])
// scalastyle:on null

}
}
}
Loading

0 comments on commit 70d6366

Please sign in to comment.