Skip to content

Commit

Permalink
[ISSUE apache#2152] Add isRunning method in DefaultLitePullConsumerIm…
Browse files Browse the repository at this point in the history
…pl class and test suit (apache#2302)
  • Loading branch information
jjz921024 authored Sep 17, 2020
1 parent 2a8ba5a commit 9ddcab4
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,11 @@ public void shutdown() {
this.defaultLitePullConsumerImpl.shutdown();
}

@Override
public boolean isRunning() {
return this.defaultLitePullConsumerImpl.isRunning();
}

@Override
public void subscribe(String topic, String subExpression) throws MQClientException {
this.defaultLitePullConsumerImpl.subscribe(withNamespace(topic), subExpression);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,13 @@ public interface LitePullConsumer {
*/
void shutdown();

/**
* This consumer is still running
*
* @return true if consumer is still running
*/
boolean isRunning();

/**
* Subscribe some topic with subExpression
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ public synchronized void shutdown() {
}
}

public synchronized boolean isRunning() {
return this.serviceState == ServiceState.RUNNING;
}

public synchronized void start() throws MQClientException {
switch (this.serviceState) {
case CREATE_JUST:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,32 @@ public void testComputePullByTimeStamp() throws Exception{
assertThat(offset).isEqualTo(100);
}

@Test
public void testConsumerAfterShutdown() throws Exception {
DefaultLitePullConsumer defaultLitePullConsumer = createStartLitePullConsumer();
defaultLitePullConsumer.setNamesrvAddr("127.0.0.1:9876");
defaultLitePullConsumer.subscribe(topic, "*");
new AsyncConsumer().executeAsync(defaultLitePullConsumer);
Thread.sleep(10 * 1000);
defaultLitePullConsumer.shutdown();
assertThat(defaultLitePullConsumer.isRunning()).isFalse();
}

static class AsyncConsumer {
public void executeAsync(final DefaultLitePullConsumer consumer) {
new Thread(new Runnable() {
@Override
public void run() {
while (consumer.isRunning()) {
List<MessageExt> poll = consumer.poll(2 * 1000);
System.out.println("consumer is still running");
}
System.out.println("consumer shutdown");
}
}).start();
}
}

private void initDefaultLitePullConsumer(DefaultLitePullConsumer litePullConsumer) throws Exception {

Field field = DefaultLitePullConsumer.class.getDeclaredField("defaultLitePullConsumerImpl");
Expand Down

0 comments on commit 9ddcab4

Please sign in to comment.