Skip to content

Commit

Permalink
Merge pull request Netflix#178 from Netflix/ISSUE-176
Browse files Browse the repository at this point in the history
control SuroInput pause with just one time, not accumulative one
  • Loading branch information
metacret committed Jan 15, 2015
2 parents 8a3e7c3 + 2614897 commit c6db346
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public String getId() {

private AtomicLong pausedTime = new AtomicLong(0);

public static long MAX_PAUSE = 1000; // not final for the test

@Override
public void start() throws Exception {
executor = Executors.newCachedThreadPool(
Expand All @@ -93,10 +95,10 @@ public void start() throws Exception {
public void run() {
while (running) {
try {
long pause = pausedTime.get();
long pause = Math.min(pausedTime.get(), MAX_PAUSE);
if (pause > 0) {
Thread.sleep(pause);
pausedTime.addAndGet(-pause);
pausedTime.set(0);
}
byte[] message = iterator.next().message();
router.process(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {

properties.setProperty("consumer.timeout.ms", "1000");
KafkaConsumer consumer = new KafkaConsumer(properties, TOPIC_NAME, numPartitions, router, jsonMapper);
KafkaConsumer.MAX_PAUSE = 10000; // for testing

consumer.start();
latch.await(1000 * 5, TimeUnit.MILLISECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ public String getId() {
return id;
}

private static final long MAX_PAUSE = 10000;

@Override
public void start() throws Exception {
if (s3Service == null) {
Expand Down Expand Up @@ -126,7 +128,7 @@ public boolean offer(Runnable runnable) {
return true;
}
},
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("S3Consumer-%d").build());
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("S3Consumer-" + id + "-%d").build());

notice.init();

Expand All @@ -136,10 +138,10 @@ public boolean offer(Runnable runnable) {
public void run() {
while (running) {
try {
long pause = pausedTime.get();
long pause = Math.min(pausedTime.get(), MAX_PAUSE);
if (pause > 0) {
Thread.sleep(pause);
pausedTime.addAndGet(-pause);
pausedTime.set(0);
}
Pair<String, String> msg = notice.peek();
if (msg != null) {
Expand Down

0 comments on commit c6db346

Please sign in to comment.