Skip to content

Commit

Permalink
Added a new API endpoint under '/topic/$TOPIC/consumers' that shows t…
Browse files Browse the repository at this point in the history
…he active

consumers information for the current topic.

Changelog:
  - Changed endpoint resource from '/topiclist/$TOPIC' to '/topic/$TOPIC'
    so changed app.js and templates to match this change
  - Created a new case class 'TopicAndConsumersDetails' which is the
    return for the new method created
  - Added new method: 'getTopicAndConsumersDetail()' which returns the
    topic details plus the consumer details for each consumer of the topic
  - Added the name into the 'KafkaInfo' case class to provide a little bit
    of context
  - Added the new endpoint to the 'OffsetGetterWeb' intent() method with
    path: '/topic/$TOPIC/consumers'
  • Loading branch information
paugay committed Apr 2, 2015
1 parent bb45fa2 commit 773d4c5
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 6 deletions.
4 changes: 2 additions & 2 deletions src/main/resources/offsetapp/scripts/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ var app = angular.module('offsetapp',
templateUrl: "views/topiclist.html",
controller: "TopicListCtrl"
})
.when("/topicdetail/:group", {
.when("/topic/:group", {
templateUrl: "views/topic-detail.html",
controller: "TopicDetailCtrl"
});;
Expand Down Expand Up @@ -65,7 +65,7 @@ angular.module("offsetapp.services", ["ngResource"])
return $resource("./group/:group").get({group:group}, groupPartitions(cb));
},
topicDetail: function(group, cb) {
return $resource("./topicdetails/:group").get({group:group}, groupPartitions(cb));
return $resource("./topic/:group").get({group:group}, groupPartitions(cb));
},
loadClusterViz: function(group, cb) {
cb(loadViz("#dataviz-container", "./clusterlist"))
Expand Down
2 changes: 1 addition & 1 deletion src/main/resources/offsetapp/views/topiclist.html
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ <h1>Please select the topic you would like to view consumers on</h1>
</div>

<ul class="list-group">
<li ng-repeat="g in topics" class="list-group-item"><a href="./#/topicdetail/{{g}}">{{g}}</a></li>
<li ng-repeat="g in topics" class="list-group-item"><a href="./#/topic/{{g}}">{{g}}</a></li>
</ul>
22 changes: 21 additions & 1 deletion src/main/scala/com/quantifind/kafka/OffsetGetter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ case class Node(name: String, children: Seq[Node] = Seq())

case class TopicDetails(consumers: Seq[ConsumerDetail])

case class TopicAndConsumersDetails(consumers: Seq[KafkaInfo])

case class ConsumerDetail(name: String)

class OffsetGetter(zkClient: ZkClient) extends Logging {
Expand Down Expand Up @@ -122,6 +124,7 @@ class OffsetGetter(zkClient: ZkClient) extends Logging {
val off = offsetInfo(group, topics)
val brok = brokerInfo()
KafkaInfo(
name = group,
brokers = brok.toSeq,
offsets = off
)
Expand Down Expand Up @@ -155,6 +158,23 @@ class OffsetGetter(zkClient: ZkClient) extends Logging {
}
}

/**
* returns details for a given topic such as the active consumers pulling off of it
* and for each of the active consumers it will return the consumer data
*
* @param topic
* @return
*/
def getTopicAndConsumersDetail(topic: String): TopicAndConsumersDetails = {
val topicMap = getActiveTopicMap

if (topicMap.contains(topic)) {
TopicAndConsumersDetails(topicMap(topic).map(getInfo(_, Seq(topic))))
} else {
TopicAndConsumersDetails(Seq())
}
}

def getTopics: Seq[String] = {
try {
ZkUtils.getChildren(zkClient, ZkUtils.BrokerTopicsPath).sortWith(_ < _)
Expand Down Expand Up @@ -227,7 +247,7 @@ class OffsetGetter(zkClient: ZkClient) extends Logging {

object OffsetGetter {

case class KafkaInfo(brokers: Seq[BrokerInfo], offsets: Seq[OffsetInfo])
case class KafkaInfo(name: String, brokers: Seq[BrokerInfo], offsets: Seq[OffsetInfo])

case class BrokerInfo(id: Int, host: String, port: Int)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ object OffsetGetterWeb extends UnfilteredWebApp[OWArgs] with Logging {
_.getTopicDetail(topic)
}

def getTopicAndConsumersDetail(topic: String, args: OWArgs) = withOG(args) {
_.getTopicAndConsumersDetail(topic)
}

def getClusterViz(args: OWArgs) = withOG(args) {
_.getClusterViz
}
Expand Down Expand Up @@ -152,8 +156,10 @@ object OffsetGetterWeb extends UnfilteredWebApp[OWArgs] with Logging {
JsonContent ~> ResponseString(write(getTopics(args)))
case GET(Path(Seg("clusterlist" :: Nil))) =>
JsonContent ~> ResponseString(write(getClusterViz(args)))
case GET(Path(Seg("topicdetails" :: group :: Nil))) =>
JsonContent ~> ResponseString(write(getTopicDetail(group, args)))
case GET(Path(Seg("topic" :: topic :: Nil))) =>
JsonContent ~> ResponseString(write(getTopicDetail(topic, args)))
case GET(Path(Seg("topic" :: topic :: "consumers" :: Nil))) =>
JsonContent ~> ResponseString(write(getTopicAndConsumersDetail(topic, args)))
case GET(Path(Seg("activetopics" :: Nil))) =>
JsonContent ~> ResponseString(write(getActiveTopics(args)))
}
Expand Down

0 comments on commit 773d4c5

Please sign in to comment.