Skip to content

Commit

Permalink
Merge pull request #286 from CleverCloud/chore/rewrite-errors
Browse files Browse the repository at this point in the history
chore(http): better WarpScript error extraction
  • Loading branch information
KannarFr authored Jan 16, 2025
2 parents 5e3cd52 + fb9081f commit 1078088
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 15 deletions.
39 changes: 26 additions & 13 deletions src/main/scala/com/clevercloud/warp10client/Runner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ import scala.util.{ Failure, Success, Try }
import org.apache.pekko
import pekko.NotUsed
import pekko.http.scaladsl.model.*
import pekko.http.scaladsl.model.StatusCodes.{OK, InternalServerError}
import pekko.stream.scaladsl.{ Flow, Source }
import io.circe.*
import io.circe.parser.*
import com.clevercloud.warp10client.models.gts_module.*

import scala.concurrent.Future

object Runner {
type WarpScript = String

Expand Down Expand Up @@ -48,20 +51,30 @@ object Runner {
import warpClientContext._

Flow[Try[HttpResponse]].flatMapConcat {
case Success(httpResponse) => {
if (httpResponse.status == StatusCodes.OK) {
Source.future(
WarpClientUtils.readAllDataBytes(httpResponse.entity.dataBytes)
)
} else {
Source.future(
WarpClientUtils
.readAllDataBytes(httpResponse.entity.dataBytes)
.map(content => WarpException(s"HTTP status: ${httpResponse.status.intValue.toString}: $content"))
.map(throw _)
case Success(httpResponse) =>
Source.future(
httpResponse.status match
case OK => WarpClientUtils.readAllDataBytes(httpResponse.entity.dataBytes)
case InternalServerError =>
// In case of 500, we try to read headers which are human-readable and fallback on response body which is HTML
val error: Option[String] = httpResponse.headers.findLast(_.is("x-warp10-error-message")).map(_.value)
val line: Option[Int] = httpResponse
.headers
.findLast(_.is("x-warp10-error-line"))
.map(_.value)
.flatMap(_.toIntOption)

error match
case Some(err) => Future.successful(throw WarpException(err, line))
case None => WarpClientUtils
.readAllDataBytes(httpResponse.entity.dataBytes)
.map(WarpException(_, line))
.map(throw _)
case status => WarpClientUtils
.readAllDataBytes(httpResponse.entity.dataBytes)
.map(content => WarpException(s"HTTP status: ${status.intValue.toString}: $content"))
.map(throw _)
)
}
}
case Failure(e) => throw e
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ case class WarpClientContext(
implicit def implicitWarpConfiguration: WarpConfiguration = configuration
}

case class WarpException(error: String) extends Exception(error)
case class WarpException(error: String, line: Option[Int] = None) extends Exception(error)

object `X-Warp10-Token` {

Expand Down
45 changes: 45 additions & 0 deletions src/test/scala/MockServer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import org.apache.pekko
import org.apache.pekko.actor.ActorSystem
import org.apache.pekko.http.scaladsl.Http
import org.apache.pekko.http.scaladsl.model.{ HttpMethod, HttpRequest, HttpResponse, Uri }
import org.apache.pekko.stream.scaladsl.{ Sink, Source }

import scala.concurrent.Future

/**
* Mock Akka HTTP Server, will handle requests which is provided with handle request
*/
object MockServer {

val interface = "localhost"
val port = 8888

def handleRequest(
method: HttpMethod,
uri: Uri,
response: HttpResponse
// httpRequest: HttpRequest
)(implicit
system: ActorSystem
): Future[Http.ServerBinding] = {

val serverSource: Source[Http.IncomingConnection, Future[Http.ServerBinding]] =
Http().newServerAt(interface, port).connectionSource()

// val requestPath = httpRequest.uri.path.toString()

val requestHandler: HttpRequest => HttpResponse = {
case HttpRequest(method, uri, _, _, _) => // Uri.Path(`requestPath`)
response
case _: HttpRequest =>
HttpResponse(404, entity = "Unknown resource!")
}

serverSource
.to(Sink.foreach { connection =>
println("Mock Server accepted new connection from " + connection.remoteAddress)
connection handleWithSyncHandler requestHandler
})
.run()
}
}
31 changes: 30 additions & 1 deletion src/test/scala/Warp10ClientSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ import java.util.UUID
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.concurrent.duration.Duration as Period
import scala.concurrent.duration.MILLISECONDS
import scala.concurrent.duration.SECONDS
import scala.util.{Failure, Success}
import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.http.scaladsl.model.{HttpRequest, HttpResponse, StatusCodes}
import pekko.http.scaladsl.model.{HttpMethod, HttpMethods, HttpRequest, HttpResponse, StatusCodes}
import pekko.stream.Materializer
import pekko.stream.scaladsl.Flow
import com.clevercloud.warp10client.*
Expand Down Expand Up @@ -35,6 +36,9 @@ class Warp10ClientSpec extends Specification with Warp10TestContainer {
WarpException -> on invalid data $p6

Seq[GTS] contains data -> on fetch success $e2

invalid WS throw comprehensive error $r3
invalid response throw error $r4
"""

val zonedNow: ZonedDateTime = ZonedDateTime.now
Expand Down Expand Up @@ -203,6 +207,31 @@ class Warp10ClientSpec extends Specification with Warp10TestContainer {
Period(10000, MILLISECONDS)
) must beAnInstanceOf[Right[?, ?]]

def e3: Warp10Stack = Await.result(realWarpClient.execStack("fdsfds"), Period(10, SECONDS))
val r3: MatchResult[Warp10Stack] = e3 must throwA[WarpException].like {
case e: WarpException =>
e.error must beEqualTo("Exception at '=>fdsfds<=' in section [TOP] (Unknown function 'fdsfds')")
e.line.getOrElse(0) must beEqualTo(1)

}

// If no 'error' header is set, fallback to HTML body
Await.result(
MockServer.handleRequest(
HttpMethods.POST,
"/api/v0/exec",
HttpResponse(status = 500, entity = "<h1>some error</h1>")
),
Period(3, SECONDS)
)
val mockWarpClient: Warp10Client = WarpClient(MockServer.interface, MockServer.port)
def e4: Warp10Stack = Await.result(mockWarpClient.execStack("token"), Period(10, SECONDS))
val r4: MatchResult[Warp10Stack] = e4 must throwAn[WarpException].like {
case e: WarpException =>
e.error must beEqualTo("<h1>some error</h1>")
e.line must be(None)
}

// private def getNbGTSPoints(gtsSeq: Seq[GTS]): Int = gtsSeq.map(_.points.size).sum

// close http pool
Expand Down

0 comments on commit 1078088

Please sign in to comment.