Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,24 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
val ec = ExecutionContext.fromExecutor(fjp)

// Map of request id to the runnable responsible for executing that request id
val activeRequests = new ConcurrentHashMap[Int, CancellableTask[Int]](poolSize)
val activeRequests = new ConcurrentHashMap[Int, (WorkerProtocol.WorkRequest, CancellableTask[Int])](poolSize)

def writeResponse(
requestId: Int,
maybeOutStream: Option[OutputStream],
maybeExitCode: Option[Int],
wasCancelled: Boolean = false,
): Unit = {
// Remove the request from our book keeping right before we respond to Bazel. If
// we respond to Bazel about the request before removing it,then there is a race:
// Bazel could make a request with the same requestId to this worker before the
// requestId is removed from the worker's book keeping.
//
// Ideally Bazel will not send a request to this worker with the same requestId
// as another request before we've responded to the original request. If that
// happens, then there's a race regardless of what we do.
activeRequests.remove(requestId)

// Defined here so all writes to stdout are synchronized
stdout.synchronized {
val builder = WorkerProtocol.WorkResponse.newBuilder
Expand All @@ -88,8 +98,6 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
.build()
.writeDelimitedTo(stdout)
}

activeRequests.remove(requestId)
}

/**
Expand Down Expand Up @@ -128,10 +136,10 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream

// From the Bazel doc: "The server may send cancel requests for requests that the worker
// has already responded to, in which case the cancel request must be ignored."
Option(activeRequests.get(requestId)).foreach { activeRequest =>
Option(activeRequests.get(requestId)).foreach { case (_, workTask) =>
// Cancel will wait for the thread to complete or be interrupted, so we do it in a future
// to prevent blocking the worker from processing more requests
Future(activeRequest.cancel(mayInterruptIfRunning = mayInterruptWorkerTasks))(
Future(workTask.cancel(mayInterruptIfRunning = mayInterruptWorkerTasks))(
scala.concurrent.ExecutionContext.global,
)
}
Expand Down Expand Up @@ -227,9 +235,14 @@ abstract class WorkerMain[S](stdin: InputStream = System.in, stdout: PrintStream
// for this requestId. If that's the case, we have a book keeping error or there are
// two active requests with the same ID. Either of which is not good and something we
// should just crash on.
if (activeRequests.putIfAbsent(requestId, workTask) != null) {
val alreadyActiveRequest = activeRequests.putIfAbsent(requestId, (request, workTask))
if (alreadyActiveRequest != null) {
val (activeRequest, _) = alreadyActiveRequest
throw new AnnexDuplicateActiveRequestException(
s"Received a WorkRequest with an already active request id: ${requestId}",
s"""Received a WorkRequest with an already active request id: ${requestId}.
Currently active request: ${activeRequest.toString}
New request with the same id: ${request.toString}
""",
)
} else {
workTask.execute(ec)
Expand Down