Skip to content

Commit

Permalink
remove dependency on ledger/errors from ledger-api (digital-asset#16263)
Browse files Browse the repository at this point in the history
  • Loading branch information
mziolekda authored Feb 9, 2023
1 parent 7413bf8 commit 9b81120
Show file tree
Hide file tree
Showing 12 changed files with 17 additions and 22 deletions.
2 changes: 0 additions & 2 deletions ledger-api/rs-grpc-akka/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ da_scala_library(
],
deps = [
"//ledger-api/rs-grpc-bridge",
"//ledger/error",
"//libs-scala/contextualized-logging",
"@maven//:io_grpc_grpc_api",
"@maven//:io_grpc_grpc_stub",
"@maven//:org_reactivestreams_reactive_streams",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
package com.daml.grpc.adapter.server.akka

import akka.stream.scaladsl.Sink
import com.daml.error.DamlContextualizedErrorLogger
import com.daml.error.definitions.CommonErrors
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.daml.grpc.adapter.server.rs.ServerSubscriber
import io.grpc.stub.{ServerCallStreamObserver, StreamObserver}
Expand All @@ -15,10 +13,9 @@ import scala.concurrent.{Future, Promise}

object ServerAdapter {

private val errorLogger = DamlContextualizedErrorLogger.forClass(getClass)

def toSink[Resp](
streamObserver: StreamObserver[Resp]
streamObserver: StreamObserver[Resp],
errorHandler: Throwable => Throwable,
)(implicit executionSequencerFactory: ExecutionSequencerFactory): Sink[Resp, Future[Unit]] = {
val subscriber =
new ServerSubscriber[Resp](
Expand All @@ -32,10 +29,7 @@ object ServerAdapter {
throwable match {
case t: StatusException => t
case t: StatusRuntimeException => t
case _ =>
CommonErrors.ServiceInternalError
.UnexpectedOrUnknownException(throwable)(errorLogger)
.asGrpcError
case _ => errorHandler(throwable)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class HelloServiceAkkaImplementation(implicit
Source
.single(request)
.via(Flow[HelloRequest].mapConcat(responses))
.runWith(ServerAdapter.toSink(responseObserver))
.runWith(ServerAdapter.toSink(responseObserver, identity))
.onComplete(_ => serverStreamingCalls.incrementAndGet())

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.daml.error.DamlContextualizedErrorLogger
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.daml.grpc.adapter.server.akka.StreamingServiceLifecycleManagement
import com.daml.ledger.api.v1.command_completion_service._
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.api.validation.CompletionServiceRequestValidator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger}
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.daml.grpc.adapter.server.akka.StreamingServiceLifecycleManagement
import com.daml.ledger.api.health.HealthChecks
import com.daml.ledger.api.validation.ValidationErrors.invalidArgument
import com.daml.logging.{ContextualizedLogger, LoggingContext}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import akka.stream.Materializer
import akka.stream.scaladsl.Source
import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger}
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.daml.grpc.adapter.server.akka.StreamingServiceLifecycleManagement
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.ledger_offset.LedgerOffset
import com.daml.ledger.api.v1.transaction_service._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
// Copyright (c) 2023 Digital Asset (Switzerland) GmbH and/or its affiliates. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

package com.daml.grpc.adapter.server.akka
package com.daml.platform.server.api.services.grpc

import akka.NotUsed
import akka.stream.scaladsl.{Keep, Source}
import akka.stream.{KillSwitch, KillSwitches, Materializer}
import com.daml.error.ContextualizedErrorLogger
import com.daml.error.definitions.CommonErrors
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.daml.grpc.adapter.server.akka.ServerAdapter
import io.grpc.StatusRuntimeException
import io.grpc.stub.StreamObserver

Expand All @@ -29,6 +30,11 @@ trait StreamingServiceLifecycleManagement extends AutoCloseable {
}
}

private def errorHandler(throwable: Throwable) =
CommonErrors.ServiceInternalError
.UnexpectedOrUnknownException(throwable)(contextualizedErrorLogger)
.asGrpcError

protected def registerStream[RespT](
responseObserver: StreamObserver[RespT]
)(createSource: => Source[RespT, NotUsed])(implicit
Expand All @@ -42,7 +48,7 @@ trait StreamingServiceLifecycleManagement extends AutoCloseable {
// Double-checked locking to keep the (potentially expensive)
// by-name `source` evaluation out of the synchronized block
ifNotClosed { () =>
val sink = ServerAdapter.toSink(responseObserver)
val sink = ServerAdapter.toSink(responseObserver, errorHandler)
// Force evaluation before synchronized block
val source = createSource

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import akka.stream.scaladsl.Source
import com.daml.error.definitions.LedgerApiErrors
import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger}
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.daml.grpc.adapter.server.akka.StreamingServiceLifecycleManagement
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.active_contracts_service.ActiveContractsServiceGrpc.ActiveContractsService
import com.daml.ledger.api.v1.active_contracts_service._
Expand All @@ -21,6 +20,7 @@ import com.daml.platform.ApiOffset
import com.daml.platform.api.grpc.GrpcApiService
import com.daml.platform.server.api.ValidationLogger
import com.daml.platform.server.api.services.grpc.Logging.traceId
import com.daml.platform.server.api.services.grpc.StreamingServiceLifecycleManagement
import com.daml.platform.server.api.validation.ActiveContractsServiceValidation
import com.daml.platform.server.api.validation.FieldValidations
import com.daml.tracing.Telemetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import akka.stream.Materializer
import com.daml.api.util.DurationConversion._
import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger}
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.daml.grpc.adapter.server.akka.StreamingServiceLifecycleManagement
import com.daml.ledger.api.domain.LedgerId
import com.daml.ledger.api.v1.ledger_configuration_service._
import com.daml.ledger.participant.state.index.v2.IndexConfigurationService
import com.daml.logging.{ContextualizedLogger, LoggingContext}
import com.daml.platform.api.grpc.GrpcApiService
import com.daml.platform.server.api.services.grpc.StreamingServiceLifecycleManagement
import com.daml.platform.server.api.validation.LedgerConfigurationServiceValidation
import io.grpc.stub.StreamObserver
import io.grpc.{BindableService, ServerServiceDefinition}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import akka.stream.scaladsl.Source
import com.daml.api.util.TimestampConversion._
import com.daml.error.{ContextualizedErrorLogger, DamlContextualizedErrorLogger}
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.daml.grpc.adapter.server.akka.StreamingServiceLifecycleManagement
import com.daml.ledger.api.domain.{LedgerId, optionalLedgerId}
import com.daml.ledger.api.v1.testing.time_service.TimeServiceGrpc.TimeService
import com.daml.ledger.api.v1.testing.time_service._
Expand All @@ -19,6 +18,7 @@ import com.daml.platform.akkastreams.dispatcher.SignalDispatcher
import com.daml.platform.api.grpc.GrpcApiService
import com.daml.platform.apiserver.TimeServiceBackend
import com.daml.platform.server.api.ValidationLogger
import com.daml.platform.server.api.services.grpc.StreamingServiceLifecycleManagement
import com.daml.platform.server.api.validation.FieldValidations
import com.google.protobuf.empty.Empty
import io.grpc.{ServerServiceDefinition, StatusRuntimeException}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ object MetricsInterceptorSpec {
.mapAsync(1)(response =>
after(delay, materializer.system.scheduler)(Future.successful(response))
)
.runWith(ServerAdapter.toSink(responseObserver))
.runWith(ServerAdapter.toSink(responseObserver, identity))
()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import com.daml.error._
import com.daml.error.definitions.{CommonErrors, DamlError}
import com.daml.error.utils.ErrorDetails
import com.daml.grpc.adapter.ExecutionSequencerFactory
import com.daml.grpc.adapter.server.akka.StreamingServiceLifecycleManagement
import com.daml.grpc.sampleservice.HelloServiceResponding
import com.daml.ledger.api.testing.utils.{AkkaBeforeAndAfterAll, TestingServerInterceptors}
import com.daml.ledger.resources.{ResourceOwner, TestResourceContext}
import com.daml.platform.hello.HelloServiceGrpc.HelloService
import com.daml.platform.hello.{HelloRequest, HelloResponse, HelloServiceGrpc}
import com.daml.platform.server.api.services.grpc.StreamingServiceLifecycleManagement
import com.daml.platform.testing.LogCollector.{ThrowableCause, ThrowableEntry}
import com.daml.platform.testing.{LogCollector, LogCollectorAssertions, StreamConsumer}
import io.grpc._
Expand Down

0 comments on commit 9b81120

Please sign in to comment.