Skip to content

Commit

Permalink
specs for serving-api http services
Browse files Browse the repository at this point in the history
  • Loading branch information
anistal authored and Sergio Gomez committed Oct 21, 2015
1 parent 98a5cc8 commit 19afc2b
Show file tree
Hide file tree
Showing 22 changed files with 897 additions and 157 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,12 @@

package com.stratio.sparkta.sdk.exception

import scala.util.control.NoStackTrace

/**
* This class it's used to create custom exceptions that will be mostly used in tests
* To throw a MockException: new MockException(mockErrorMessage)
* @author gschiavon
*/
class MockException(msg: String) extends RuntimeException(msg)

object MockException {
def create(msg: String): MockException = new MockException(msg)
class MockException extends NoStackTrace {

def create(msg: String, cause: Throwable): Throwable = new MockException(msg).initCause(cause)
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ object Sparkta extends App with SLF4JLogging {
SparktaConfig.initApiConfig()
SparktaConfig.initSwaggerConfig()
SparktaHelper.initAkkaSystem(AppConstant.ConfigAppName)
ResetStatusHelper.ResetStatuses
ResetStatusHelper.resetStatuses


}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ package com.stratio.sparkta.serving.api.actor

import java.util.UUID

import akka.actor.{ActorRef, Actor}
import akka.actor.{Actor, ActorRef}
import akka.event.slf4j.SLF4JLogging
import akka.pattern.ask
import com.stratio.sparkta.serving.api.actor.PolicyActor._
import com.stratio.sparkta.serving.api.exception.ServingApiException
import com.stratio.sparkta.serving.core.{CuratorFactoryHolder, AppConstant}
import com.stratio.sparkta.serving.core.models._
import com.stratio.sparkta.serving.core.policy.status.{PolicyStatusActor, PolicyStatusEnum}
import com.stratio.sparkta.serving.core.{AppConstant, CuratorFactoryHolder}
import org.apache.curator.framework.CuratorFramework
import org.apache.zookeeper.KeeperException.NoNodeException
import org.json4s.jackson.Serialization.{read, write}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,45 +16,15 @@

package com.stratio.sparkta.serving.api.constants

/**
* HttpConstants used in http services mainly.
*/
object HttpConstant {
object HttpConstant {

/**
* Serving api prefix endpoints
*/
final val FragmentPath = "fragment"
final val TemplatePath = "template"
final val PolicyPath = "policy"
final val PolicyContextPath = "policyContext"
final val SwaggerPath = "swagger"
final val JobServerPath = "jobServer"
final val JobsPath = "jobs"
final val ContextsPath = "contexts"
final val AppStatus= "status"

/**
* Http codes.
*/
final val NotFound = 400
final val BadRequest = 500

/**
* Http messages.
*/
final val NotFoundMessage = "Not Found"
final val BadRequestMessage = "Bad Request"
final val JobServerOkMessage = "OK"

/**
* Exceptions messages.
*/
final val JobServerHostPortExceptionMsg = "JobServer host and port is not defined in configuration file"

/**
* RequestTimeouts
*/
final val ConnectionTimeout = 2000
final val ReadTimeout = 60000
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
package com.stratio.sparkta.serving.api.helpers
/**
* Copyright (C) 2015 Stratio (http://stratio.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.stratio.sparkta.serving.api.helpers

import com.stratio.sparkta.serving.api.Sparkta._
import com.stratio.sparkta.serving.core.models.{SparktaSerializer, PolicyStatusModel}
Expand All @@ -10,12 +25,9 @@ import org.json4s.jackson.Serialization._
import scala.collection.JavaConversions
import scala.util.{Failure, Success, Try}

/**
* Created by arincon on 23/09/15.
*/
object ResetStatusHelper extends SparktaSerializer {

def ResetStatuses {
def resetStatuses {
Try {
if (SparktaConfig.getClusterConfig.isEmpty) {
val curator = CuratorFactoryHolder.getInstance()
Expand All @@ -40,5 +52,4 @@ object ResetStatusHelper extends SparktaSerializer {

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ import com.stratio.sparkta.serving.core.models.ErrorModel
import com.wordnik.swagger.annotations._
import spray.routing._

@Api(value = HttpConstant.AppStatus, description = "Operations about sparkta status.", position = 9)
@Api(value = HttpConstant.AppStatus, description = "Operations about sparkta status.")
trait AppStatusHttpService extends BaseHttpService {


override def routes: Route = checkStatus

@ApiOperation(value = "Finds all policy contexts",
Expand All @@ -41,16 +40,12 @@ trait AppStatusHttpService extends BaseHttpService {
path(HttpConstant.AppStatus) {
get {
complete {

if (!CuratorFactoryHolder.getInstance().getZookeeperClient.getZooKeeper.getState.isConnected)

throw new ServingApiException(ErrorModel.toString(
new ErrorModel(ErrorModel.CodeUnknow, s"Zk isn't connected at" +
s" ${CuratorFactoryHolder.getInstance().getZookeeperClient.getCurrentConnectionString}.")
))

else
"OK"
else "OK"
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@ trait FragmentHttpService extends BaseHttpService {
complete {
val future = supervisor ? new Update(fragment)
Await.result(future, timeout.duration) match {
case Response(Failure(exception)) => throw exception
case Response(Success(fragment)) => HttpResponse(StatusCodes.OK)
case Response(Failure(exception)) => throw exception
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ import com.stratio.sparkta.serving.api.actor.SparkStreamingContextActor._
@Api(value = HttpConstant.PolicyContextPath, description = "Operations about policy contexts.", position = 0)
trait PolicyContextHttpService extends BaseHttpService {

case class Result(policyId: String, policyName: String)

override def routes: Route = findAll ~ update ~ create

@ApiOperation(value = "Finds all policy contexts",
Expand All @@ -56,7 +54,6 @@ trait PolicyContextHttpService extends BaseHttpService {
case Response(Failure(exception)) => throw exception
case Response(Success(policyStatuses)) => policyStatuses
}

}
}
}
Expand Down Expand Up @@ -97,7 +94,7 @@ trait PolicyContextHttpService extends BaseHttpService {
@ApiOperation(value = "Creates a policy context.",
notes = "Returns the result",
httpMethod = "POST",
response = classOf[Result])
response = classOf[PolicyResult])
@ApiImplicitParams(Array(
new ApiImplicitParam(name = "policy",
value = "policy json",
Expand All @@ -123,7 +120,7 @@ trait PolicyContextHttpService extends BaseHttpService {
} yield {
response match {
case Success(policy) =>
Result(policy.id.getOrElse(""), p.name)
PolicyResult(policy.id.getOrElse(""), p.name)
case Failure(e) => throw e
}
}
Expand All @@ -133,5 +130,4 @@ trait PolicyContextHttpService extends BaseHttpService {
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,6 @@ trait PolicyHttpService extends BaseHttpService with SparktaSerializer {

override def routes: Route = find ~ findAll ~ findByFragment ~ create ~ update ~ remove ~ run ~ download ~ findByName

def getPolicyWithFragments(policy: AggregationPoliciesModel): AggregationPoliciesModel =
PolicyHelper.parseFragments(PolicyHelper.fillFragments(policy, actors.get(AkkaConstant.FragmentActor).get, timeout))

@Path("/find/{id}")
@ApiOperation(value = "Find a policy from its id.",
notes = "Find a policy from its id.",
Expand Down Expand Up @@ -141,33 +138,6 @@ trait PolicyHttpService extends BaseHttpService with SparktaSerializer {
}
}

def withStatus(policies: Seq[AggregationPoliciesModel]): ToResponseMarshallable = {

if (!policies.isEmpty) {
val policyStatusActor = actors.get(AkkaConstant.PolicyStatusActor).get
for {
response <- (policyStatusActor ? PolicyStatusActor.FindAll)
.mapTo[PolicyStatusActor.Response]
} yield {
policies.map(policy => getPolicyWithStatus(policy, response.policyStatus.get))
}
} else {
Seq()
}
}

def getPolicyWithStatus(policy: AggregationPoliciesModel, statuses: Seq[PolicyStatusModel]): PolicyWithStatus = {
val status = statuses.find(_.id == policy.id.get) match {
case Some(statusPolicy) => statusPolicy.status
case None => PolicyStatusEnum.NotStarted
}
PolicyWithStatus(status, policy)
}

//TODO @dvallejo Refactor this case class to a better place
case class PolicyWithStatus(status: PolicyStatusEnum.Value,
policy: AggregationPoliciesModel)

@Path("/all")
@ApiOperation(value = "Finds all policies.",
notes = "Finds all policies.",
Expand Down Expand Up @@ -240,7 +210,7 @@ trait PolicyHttpService extends BaseHttpService with SparktaSerializer {
val future = supervisor ? new Update(policy)
Await.result(future, timeout.duration) match {
case Response(Failure(exception)) => throw exception
case Response(Success(_)) => HttpResponse(StatusCodes.Created)
case Response(Success(_)) => HttpResponse(StatusCodes.OK)
}
}
}
Expand Down Expand Up @@ -343,17 +313,45 @@ trait PolicyHttpService extends BaseHttpService with SparktaSerializer {
case ResponsePolicy(Success(policy)) => {
val isValidAndMessageTuple = AggregationPoliciesValidator.validateDto(getPolicyWithFragments(policy))
validate(isValidAndMessageTuple._1, isValidAndMessageTuple._2) {
complete {
val tempFile = File.createTempFile(s"${policy.id.get}-${policy.name}-", ".json")
respondWithHeader(`Content-Disposition`("attachment", Map("filename" -> s"${policy.name}.json"))) {
scala.tools.nsc.io.File(tempFile).writeAll(write(policy))
getFromFile(tempFile)
}
val tempFile = File.createTempFile(s"${policy.id.get}-${policy.name}-", ".json")
tempFile.deleteOnExit()
respondWithHeader(`Content-Disposition`("attachment", Map("filename" -> s"${policy.name}.json"))) {
scala.tools.nsc.io.File(tempFile).writeAll(write(policy))
getFromFile(tempFile)
}
}
}
}
}
}
}

// XXX Protected methods

def getPolicyWithFragments(policy: AggregationPoliciesModel): AggregationPoliciesModel =
PolicyHelper.parseFragments(PolicyHelper.fillFragments(policy, actors.get(AkkaConstant.FragmentActor).get, timeout))

protected def withStatus(policies: Seq[AggregationPoliciesModel]): ToResponseMarshallable = {

if (!policies.isEmpty) {
val policyStatusActor = actors.get(AkkaConstant.PolicyStatusActor).get
for {
response <- (policyStatusActor ? PolicyStatusActor.FindAll)
.mapTo[PolicyStatusActor.Response]
} yield {
policies.map(policy => getPolicyWithStatus(policy, response.policyStatus.get))
}
} else {
Seq()
}
}

protected def getPolicyWithStatus(policy: AggregationPoliciesModel, statuses: Seq[PolicyStatusModel])
: PolicyWithStatus = {
val status = statuses.find(_.id == policy.id.get) match {
case Some(statusPolicy) => statusPolicy.status
case None => PolicyStatusEnum.NotStarted
}
PolicyWithStatus(status, policy)
}
}
Loading

0 comments on commit 19afc2b

Please sign in to comment.