Skip to content

Commit

Permalink
Merge pull request Netflix#237 from Netflix/ISSUE-236
Browse files Browse the repository at this point in the history
counters are added
  • Loading branch information
metacret committed Mar 26, 2015
2 parents 1c31295 + ac45241 commit 5d735b2
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 2 deletions.
1 change: 1 addition & 0 deletions suro-core/src/main/java/com/netflix/suro/TagKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ public class TagKey {
public static final String RETRIED_COUNT = "retriedCount";
public static final String ROUTING_KEY = "routingKey";
public static final String REJECTED_REASON = "rejectedReason";
public static final String ATTEMPTED_COUNT = "attemptedMessageCount";
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ public void process(SuroInput input, MessageContainer msg) throws Exception {
return; // discard message
}

DynamicCounter.increment(
MonitorConfig
.builder(TagKey.RECV_COUNT)
.withTag("routingKey", msg.getRoutingKey())
.build());

RoutingMap.RoutingInfo info = routingMap.getRoutingInfo(msg.getRoutingKey());

if (info == null) {
Expand All @@ -83,6 +89,13 @@ public void process(SuroInput input, MessageContainer msg) throws Exception {
} else {
sink.writeTo(msg);
}

DynamicCounter.increment(
MonitorConfig
.builder(TagKey.ATTEMPTED_COUNT)
.withTag("routingKey", msg.getRoutingKey())
.withTag("sinkId", route.getSink())
.build());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,19 @@ public void shouldDateTypeReturnsCorrectOne() {
public void shouldThrowExceptionOnUnsupportedType() {
IndexSuffixFormatter formatter = new IndexSuffixFormatter("invalid", null);
}

@Test
public void testWeeklyRepresentation() {
System.setProperty("user.timezone", "GMT");

Properties props = new Properties();
props.put("dateFormat", "YYYYMM_ww");

DateTime dt = new DateTime("2014-10-12T00:00:00.000Z");
IndexSuffixFormatter formatter = new IndexSuffixFormatter("date", props);
IndexInfo info = mock(IndexInfo.class);
doReturn(dt.getMillis()).when(info).getTimestamp();

assertEquals(formatter.format(info), "201410_41");
}
}
2 changes: 1 addition & 1 deletion suro-kafka-producer/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apply plugin: 'nebula-test-jar'
dependencies {
compile project(':suro-core')
compile 'com.netflix.rxjava:rxjava-core:0.19.1'
compile 'org.apache.kafka:kafka-clients:0.8.2.0'
compile 'org.apache.kafka:kafka-clients:0.8.2.1'

testCompile 'junit:junit:4.11'
testCompile 'org.apache.curator:curator-test:2.4.2'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void run() {

this.prng = new Random();
// seed with a random integer
this.indexCache = new ConcurrentHashMap<String, Integer>();
this.indexCache = new ConcurrentHashMap<>();
// increment index every interval
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ public void setRecordCounterListener(Action3 action) {
@Override
public void writeTo(final MessageContainer message) {
queuedRecords.incrementAndGet();
DynamicCounter.increment(
MonitorConfig
.builder("queuedRecord")
.withTag(TagKey.ROUTING_KEY, message.getRoutingKey())
.build());
runRecordCounterListener();

if (metadataFetchedTopicSet.contains(message.getRoutingKey())) {
Expand Down

0 comments on commit 5d735b2

Please sign in to comment.