Skip to content

Commit

Permalink
[CROSSDATA-733] Query Cancellation: HTTP + ClusterClient (Stratio#767)
Browse files Browse the repository at this point in the history
* Working cancellations through HTTP

* Query cancellation for ClusterClient and HTTP Client with tests

* Added license

* Fix: BasicShell wasn't flushing IO buffers in async mode.

* Added changelog

* Snippet style change.

* Fixed file location

* Changed test name so it gets processed by CD

* Added derby abrupt termination guard
  • Loading branch information
pfcoperez authored and pianista215 committed Nov 7, 2016
1 parent f6f4c0d commit d61c01e
Show file tree
Hide file tree
Showing 11 changed files with 124 additions and 56 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Only listing significant user-visible, not internal code cleanups and minor bug

## 1.8.0 (upcoming)

* Pending changelog
* Query cancellation service for both HTTP & ClusterClient channels.
* Updated dependencies with curator and stratio-commons-utils compatible with Zookeeper 3.5.x

## 1.7.0 (upcoming)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private[crossdata] trait ServerReply {
def requestId: UUID
}

private[crossdata] case class QueryCancelledReply(requestId: UUID) extends ServerReply
private[crossdata] case class QueryCancelledReply(requestId: UUID, cancellationRequest: UUID) extends ServerReply

private[crossdata] case class SQLReply(requestId: UUID, sqlResult: SQLResult) extends ServerReply

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ class ClusterClientDriver private[driver](driverConf: DriverConf,
case _ =>
val sqlCommand = new SQLCommand(query, retrieveColNames = driverConf.getFlattenTables)
val futureReply = askCommand(securitizeCommand(sqlCommand)).map {
case SQLReply(_, sqlResult) => sqlResult
case other => throw new RuntimeException(s"SQLReply expected. Received: $other")
case SQLReply(_, sqlResult) =>
sqlResult
case other =>
throw new RuntimeException(s"SQLReply expected. Received: $other")
}
new SQLResponse(sqlCommand.requestId, futureReply) {
// TODO cancel sync => 5 secs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ class HttpDriver private[driver](driverConf: DriverConf,

private def obtainHttpContext: HttpExt = {
val ext = Http(system)
if(driverConf.httpTlsEnable){ //Set for all the requests the Https configurated context with keystores
if(driverConf.httpTlsEnable){ //Set for all the requests the Https configured context with key stores
ext.setDefaultClientHttpsContext(getTlsContext)
}
ext
Expand Down Expand Up @@ -180,11 +180,13 @@ class HttpDriver private[driver](driverConf: DriverConf,
val entity = HttpEntity(ContentTypes.`application/json`, bs)
um(entity)
}
}.runFold(List.empty[Row]) { case (acc: List[Row], StreamedRow(row, None)) => row::acc }
}.runFold(List.empty[Row]) {
case (acc: List[Row], StreamedRow(row, None)) => row::acc
case _ => Nil
}

} yield SuccessfulSQLResult(rrows.reverse toArray, schema) /* TODO: Performance could be increased if
`SuccessfulSQLResult`#resultSet were of type `Seq[Row]`*/

} else {

Unmarshal(httpResponse.entity).to[SQLReply] map {
Expand All @@ -202,8 +204,8 @@ class HttpDriver private[driver](driverConf: DriverConf,
simpleRequest(
securitizeCommand(command),
s"query/${command.requestId}", {
case reply: QueryCancelledReply => reply
}: PartialFunction[SQLReply, QueryCancelledReply]
reply: QueryCancelledReply => reply
}
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ class ProxyActor(clusterClientActor: ActorRef, driver: ClusterClientDriver) exte
case secureSQLCommand @ CommandEnvelope(_: OpenSessionCommand | _: CloseSessionCommand, _, _) =>
clusterClientActor ! ClusterClient.Send(ServerClusterClientParameters.ServerPath, secureSQLCommand, localAffinity = true)

case secureSQLCommand @ CommandEnvelope(_: ControlCommand, _, _) =>
clusterClientActor ! ClusterClient.Send(ServerClusterClientParameters.ServerPath, secureSQLCommand, localAffinity = false)

case sqlCommand: SQLCommand =>
logger.warn(s"Command message not securitized: ${sqlCommand.sql}. Message won't be sent to the Crossdata cluster")
}
Expand Down Expand Up @@ -126,10 +129,10 @@ class ProxyActor(clusterClientActor: ActorRef, driver: ClusterClientDriver) exte
case reply @ SQLReply(_, result) =>
logger.info(s"Successful SQL execution: $result")
p.success(reply)
// TODO review query cancelation
case reply @ QueryCancelledReply(id) =>
logger.info(s"Query $id cancelled")
p.success(reply)
case reply @ QueryCancelledReply(queryRqId, cancellationRqId) =>
logger.info(s"Query $queryRqId cancelled")
p.success(SQLReply(queryRqId, ErrorSQLResult("Query cancelled")))
promisesByIds.promises.get(cancellationRqId).foreach(_.success(reply))
case reply @ ClusterStateReply(_, clusterState, _) =>
logger.debug(s"Cluster snapshot received $clusterState")
p.success(reply)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,14 +145,14 @@ object BasicShell extends App {
printResult(sqlResponse.id, sqlResult)
case Failure(throwable) =>
console.println(s"Unexpected error while processing the query ${throwable.getMessage}")
console.flush
}

} else {
printResult(sqlResponse.id, sqlResponse.waitForResult(timeout))
}

}
console.flush
}
}

Expand All @@ -166,6 +166,7 @@ object BasicShell extends App {
console.println("ERROR")
console.println(message)
}
console.flush
}

sys addShutdownHook {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,23 +120,36 @@ class CrossdataHttpServer(config: Config, serverActor: ActorRef, implicit val sy
post {
entity(as[CommandEnvelope]) { rq: CommandEnvelope =>

implicit val _ = Timeout(requestExecutionTimeout)

def completeWithErrorResult(desc: String) = {
val httpErrorReply = SQLReply(rq.cmd.requestId, ErrorSQLResult(desc))
complete(StatusCodes.InternalServerError -> httpErrorReply)
}

rq.cmd match {

case _: CloseSessionCommand => // Commands with no confirmation
case _: CloseSessionCommand => // Commands with no confirmation

serverActor ! rq
complete(StatusCodes.OK)

case _ => // Commands requiring confirmation
case _: CancelQueryExecution => // Management commands

onSuccess(serverActor ? rq) {
case qcr: QueryCancelledReply => complete(qcr)
}

implicit val _ = Timeout(requestExecutionTimeout)
case _ => // SQL Commands

onComplete(serverActor ? rq) {

case Success(SQLReply(requestId, _)) if requestId != rq.cmd.requestId =>
complete(StatusCodes.ServerError, s"Request ids do not match: (${rq.cmd.requestId}, $requestId)")

case Success(reply: ServerReply) =>
reply match {
case qcr: QueryCancelledReply => complete(qcr)

case SQLReply(_, SuccessfulSQLResult(resultSet, schema)) =>

implicit val jsonStreamingSupport = EntityStreamingSupport.json()
Expand All @@ -153,12 +166,11 @@ class CrossdataHttpServer(config: Config, serverActor: ActorRef, implicit val sy

complete(responseStream)

case _ => complete(reply)
case _ => complete(StatusCodes.InternalServerError -> reply)

}
case other =>
val httpErrorReply = SQLReply(rq.cmd.requestId, ErrorSQLResult(s"Internal XD server error: $other"))
complete(StatusCodes.ServerError -> httpErrorReply)
completeWithErrorResult(s"Internal XD server error: $other")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.stratio.crossdata.server.actors

import java.util.UUID
import java.util.concurrent.{CancellationException, Executor}

import akka.actor.{Actor, ActorRef, Props}
Expand Down Expand Up @@ -46,11 +47,8 @@ object JobActor {
trait JobEvent

object Events {

case object JobCompleted extends JobEvent

case class JobFailed(err: Throwable) extends JobEvent

}

object Commands {
Expand All @@ -59,7 +57,7 @@ object JobActor {

case object GetJobStatus

case object CancelJob
case class CancelJob(cancellationRequester: ActorRef, cancellationRequestId: Option[UUID])

case object StartJob
}
Expand Down Expand Up @@ -124,7 +122,7 @@ class JobActor(
requester ! queryRes
self ! JobCompleted
case Failure(_: CancellationException) => // Job cancellation
requester ! QueryCancelledReply(command.requestId)
requester ! SQLReply(command.requestId, ErrorSQLResult("Query cancelled"))
self ! JobCompleted
case Failure(e: ExecutionException) => self ! JobFailed(e.getCause) // Spark exception
case Failure(reason) => self ! JobFailed(reason) // Job failure
Expand All @@ -139,9 +137,15 @@ class JobActor(

context.become(receive(st.copy(runningTask = Some(runningTask))))

case CancelJob =>
case CancelJob(cancellationRequester, cancelRequestId) =>
st.runningTask.foreach{ tsk =>
logger.debug(s"Cancelling ${self.path}'s task ")
import context.dispatcher
cancelRequestId foreach { cancelRqId =>
tsk.future onFailure { case _: CancellationException =>
cancellationRequester ! QueryCancelledReply(command.requestId, cancelRqId)
}
}
tsk.cancel()
}

Expand All @@ -155,9 +159,10 @@ class JobActor(
context.parent ! event
requester ! SQLReply(command.requestId, ErrorSQLResult(e.getMessage, Some(new Exception(e.getMessage))))
throw e //Let It Crash: It'll be managed by its supervisor
case JobCompleted if sender == self =>

case msg @ JobCompleted if sender == self =>
logger.debug(s"Completed or cancelled ${self.path} task")
context.parent ! JobCompleted
context.parent ! msg
}

private def launchTask: Cancellable[SQLReply] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ class ServerActor(cluster: Cluster, sessionProvider: XDSessionProvider)
logger.debug(s"Session identifier $session")
sessionProvider.session(id) match {
case Success(xdSession) =>
val jobActor = context.actorOf(JobActor.props(xdSession, sqlCommand, sender(), timeout))
val jobActor = context.actorOf(JobActor.props(xdSession, sqlCommand, requester, timeout))
jobActor ! StartJob
context.become(
ready(st.copy(jobsById = st.jobsById + (JobId(id, sqlCommand.queryId) -> jobActor)))
)

case Failure(error) =>
logger.warn(s"Received message with an unknown sessionId $id", error)
sender ! SQLReply(
requester ! SQLReply(
sqlCommand.requestId,
ErrorSQLResult(s"Unable to recover the session ${session.id}. Cause: ${error.getMessage}")
)
Expand All @@ -131,8 +131,8 @@ class ServerActor(cluster: Cluster, sessionProvider: XDSessionProvider)
else
sender ! SQLReply(addAppCommand.requestId, ErrorSQLResult("App can't be stored in the catalog"))

case CommandEnvelope(cc@CancelQueryExecution(queryId), session@Session(id, _), _) =>
st.jobsById(JobId(id, queryId)) ! CancelJob
case CommandEnvelope(cc@CancelQueryExecution(queryId), session@Session(id, Some(cancellationRequester)), _) =>
st.jobsById(JobId(id, queryId)) ! CancelJob(cancellationRequester, Some(cc.requestId))
}


Expand Down Expand Up @@ -256,7 +256,7 @@ class ServerActor(cluster: Cluster, sessionProvider: XDSessionProvider)
}

def gracefullyKill(victim: ActorRef): Unit = {
victim ! CancelJob
victim ! CancelJob(self, None)
victim ! PoisonPill
}

Expand Down
22 changes: 0 additions & 22 deletions testsIT/src/test/scala/com/stratio/crossdata/driver/DriverIT.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package com.stratio.crossdata.driver

import java.nio.file.Paths

import com.stratio.crossdata.common.QueryCancelledReply
import com.stratio.crossdata.common.result.{ErrorSQLResult, SuccessfulSQLResult}
import com.stratio.crossdata.driver.test.Utils._
import org.junit.runner.RunWith
Expand All @@ -38,7 +37,6 @@ class DriverIT extends EndToEndTest with ScalaFutures {

val factoryDesc = s" $description"


"CrossdataDriver" should "return an ErrorResult when running an unparseable query" + factoryDesc in {

assumeCrossdataUpAndRunning()
Expand Down Expand Up @@ -187,26 +185,6 @@ class DriverIT extends EndToEndTest with ScalaFutures {
}
}


it should "be able to cancel queries" + factoryDesc ignore {
assumeCrossdataUpAndRunning()

withDriverDo { driver =>

driver.sql(s"CREATE TEMPORARY TABLE jsonTable USING org.apache.spark.sql.json OPTIONS (path '${Paths.get(getClass.getResource("/tabletest.json").toURI).toString}')").waitForResult()

val queryRq = driver.sql("SELECT DEBUG_SLEEP_MS(2000) FROM jsonTable")
val cancellationResponseFuture = queryRq.cancelCommand()

whenReady(cancellationResponseFuture) { res =>
res shouldBe a[QueryCancelledReply]
} (PatienceConfig(timeout = 3 seconds))


}

}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (C) 2015 Stratio (http://stratio.com)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.stratio.crossdata.driver

import java.nio.file.Paths

import com.stratio.crossdata.common.QueryCancelledReply
import com.stratio.crossdata.common.result.ErrorSQLResult
import com.stratio.crossdata.driver.test.Utils._
import org.junit.runner.RunWith
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.junit.JUnitRunner

import scala.concurrent.duration._
import scala.util.Try

@RunWith(classOf[JUnitRunner])
class DriverQueryManagementIT extends EndToEndTest with ScalaFutures {

driverFactories foreach { case (factory, description) =>

implicit val ctx = DriverTestContext(factory)

val factoryDesc = s" $description"

"CrossdataDriver" should "be able to cancel queries" + factoryDesc in {
assumeCrossdataUpAndRunning()

withDriverDo { driver =>

driver.sql(s"CREATE TEMPORARY TABLE jsonTable2 USING org.apache.spark.sql.json OPTIONS (path '${Paths.get (getClass.getResource("/tabletest.json").toURI).toString}')").waitForResult()

val queryRq = driver.sql("SELECT DEBUG_SLEEP_MS(5000) FROM jsonTable2")
val cancellationResponseFuture = queryRq.cancelCommand()

whenReady(cancellationResponseFuture) { res =>
res shouldBe a[QueryCancelledReply]
} (PatienceConfig(timeout = 2 seconds))

whenReady(queryRq.sqlResult) { sqlResult =>
sqlResult shouldBe a[ErrorSQLResult]
} (PatienceConfig(timeout = 3 seconds))

}

}

}

override def stop(): Unit = Try(super.stop()) //TODO

}

0 comments on commit d61c01e

Please sign in to comment.