Skip to content

Commit

Permalink
make flushing services easier.
Browse files Browse the repository at this point in the history
  • Loading branch information
MammatusPlatypus committed Jun 28, 2016
1 parent b6dfe0b commit 27606e4
Showing 1 changed file with 19 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.advantageous.qbit.client.ClientProxy;
import io.advantageous.qbit.reactive.Callback;
import io.advantageous.qbit.reakt.Reakt;
import io.advantageous.qbit.service.ServiceProxyUtils;
import io.advantageous.qbit.service.health.HealthFailReason;
import io.advantageous.qbit.service.health.HealthServiceClient;
import io.advantageous.qbit.service.health.ServiceHealthManager;
Expand All @@ -12,8 +13,12 @@
import io.advantageous.reakt.Expected;
import io.advantageous.reakt.promise.Promise;
import io.advantageous.reakt.reactor.Reactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.function.Consumer;

/**
Expand All @@ -31,7 +36,9 @@ public class ServiceManagementBundle implements ServiceHealthManager, StatsColle
private final Timer timer;
private final String statKeyPrefix;
private final HashMap<String, String> statNameMap;
private final List<Object> servicesToFlush;
private final Expected<Runnable> processHandler;
private final Logger logger = LoggerFactory.getLogger(ServiceManagementBundle.class);
protected long time;

public ServiceManagementBundle(final Reactor reactor,
Expand All @@ -51,15 +58,27 @@ public ServiceManagementBundle(final Reactor reactor,
this.healthServiceClient = Expected.ofNullable(healthServiceClient);
this.statNameMap = new HashMap<>();
this.processHandler = Expected.ofNullable(processHandler);
this.servicesToFlush = new ArrayList<>();
}

public void addServiceToFlush(Object service) {
servicesToFlush.add(service);
}

public void process() {
time = timer.time();
reactor.process();
processHandler.ifPresent(Runnable::run);
stats.clientProxyFlush();
healthServiceClient.ifPresent(ClientProxy::clientProxyFlush);

servicesToFlush.forEach((service) -> {
try {
ServiceProxyUtils.flushServiceProxy(service);
} catch (Exception ex) {
logger.error("Unable to flush service on behalf of service " + serviceName, ex);
}
});
}

/**
Expand Down

0 comments on commit 27606e4

Please sign in to comment.