Skip to content

Commit

Permalink
KAFKA-13080: Direct fetch snapshot request to kraft controller (apach…
Browse files Browse the repository at this point in the history
…e#11041)

Reviewers: Colin P. McCabe <[email protected]>
  • Loading branch information
jsancio authored Jul 13, 2021
1 parent f301768 commit d33a874
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 1 deletion.
6 changes: 6 additions & 0 deletions core/src/main/scala/kafka/server/ControllerApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class ControllerApis(val requestChannel: RequestChannel,
try {
request.header.apiKey match {
case ApiKeys.FETCH => handleFetch(request)
case ApiKeys.FETCH_SNAPSHOT => handleFetchSnapshot(request)
case ApiKeys.METADATA => handleMetadataRequest(request)
case ApiKeys.CREATE_TOPICS => handleCreateTopics(request)
case ApiKeys.DELETE_TOPICS => handleDeleteTopics(request)
Expand Down Expand Up @@ -141,6 +142,11 @@ class ControllerApis(val requestChannel: RequestChannel,
handleRaftRequest(request, response => new FetchResponse(response.asInstanceOf[FetchResponseData]))
}

def handleFetchSnapshot(request: RequestChannel.Request): Unit = {
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
handleRaftRequest(request, response => new FetchSnapshotResponse(response.asInstanceOf[FetchSnapshotResponseData]))
}

def handleMetadataRequest(request: RequestChannel.Request): Unit = {
val metadataRequest = request.body[MetadataRequest]
def createResponseCallback(requestThrottleMs: Int): MetadataResponse = {
Expand Down
55 changes: 54 additions & 1 deletion core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package kafka.server
import java.net.InetAddress
import java.util
import java.util.Properties
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ExecutionException
import kafka.network.RequestChannel
import kafka.raft.RaftManager
Expand Down Expand Up @@ -50,8 +51,9 @@ import org.apache.kafka.common.message.AlterConfigsRequestData.{AlterableConfig
import org.apache.kafka.common.message.AlterConfigsResponseData.{AlterConfigsResourceResponse => OldAlterConfigsResourceResponse}
import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.protocol.ApiMessage
import org.apache.kafka.common.protocol.Errors._
import org.apache.kafka.common.requests._
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.controller.Controller
Expand Down Expand Up @@ -149,6 +151,57 @@ class ControllerApisTest {
handleFetch(buildRequest(new FetchRequest(new FetchRequestData(), 12))))
}

@Test
def testFetchSentToKRaft(): Unit = {
when(
raftManager.handleRequest(
any(classOf[RequestHeader]),
any(classOf[ApiMessage]),
any(classOf[Long])
)
).thenReturn(
new CompletableFuture()
)

createControllerApis(None, new MockController.Builder().build())
.handleFetch(buildRequest(new FetchRequest(new FetchRequestData(), 12)))

verify(raftManager).handleRequest(
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
)
}

@Test
def testUnauthorizedFetchSnapshot(): Unit = {
assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis(
Some(createDenyAllAuthorizer()), new MockController.Builder().build()).
handleFetchSnapshot(buildRequest(new FetchSnapshotRequest(new FetchSnapshotRequestData(), 0))))
}

@Test
def testFetchSnapshotSentToKRaft(): Unit = {
when(
raftManager.handleRequest(
any(classOf[RequestHeader]),
any(classOf[ApiMessage]),
any(classOf[Long])
)
).thenReturn(
new CompletableFuture()
)

createControllerApis(None, new MockController.Builder().build())
.handleFetchSnapshot(buildRequest(new FetchSnapshotRequest(new FetchSnapshotRequestData(), 0)))

verify(raftManager).handleRequest(
ArgumentMatchers.any(),
ArgumentMatchers.any(),
ArgumentMatchers.any()
)
}

@Test
def testUnauthorizedVote(): Unit = {
assertThrows(classOf[ClusterAuthorizationException], () => createControllerApis(
Expand Down

0 comments on commit d33a874

Please sign in to comment.