Skip to content

Commit

Permalink
migrated hornetq Twitter and Rest to use JBoss Logging
Browse files Browse the repository at this point in the history
  • Loading branch information
andytaylor committed Apr 23, 2012
1 parent 454c2e8 commit 88ca31c
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@
*/
package org.hornetq.rest;

import org.hornetq.api.core.HornetQException;
import org.hornetq.rest.queue.push.xml.XmlLink;
import org.jboss.logging.BasicLogger;
import org.jboss.logging.Cause;
import org.jboss.logging.LogMessage;
import org.jboss.logging.Logger;
import org.jboss.logging.Message;
import org.jboss.logging.MessageLogger;

/**
Expand All @@ -41,6 +48,46 @@
* so an INFO message would be 191000 to 191999
*/
@MessageLogger(projectCode = "HQ")
public class HornetQRestLogger
public interface HornetQRestLogger extends BasicLogger
{
/**
* The twitter logger.
*/
HornetQRestLogger LOGGER = Logger.getMessageLogger(HornetQRestLogger.class, HornetQRestLogger.class.getPackage().getName());

@LogMessage(level = Logger.Level.INFO)
@Message(id = 181001, value = "Loading REST push store from: {0}", format = Message.Format.MESSAGE_FORMAT)
void loadingRestStore(String path);

@LogMessage(level = Logger.Level.INFO)
@Message(id = 181002, value = "adding REST push registration: {0}", format = Message.Format.MESSAGE_FORMAT)
void addingPushRegistration(String id);

@LogMessage(level = Logger.Level.INFO)
@Message(id = 181003, value = "Push consumer started for: {0}", format = Message.Format.MESSAGE_FORMAT)
void startingPushConsumer(XmlLink link);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 182001, value = "shutdown REST consumer because of timeout for: {0}", format = Message.Format.MESSAGE_FORMAT)
void shutdownRestConsumer(String id);

@LogMessage(level = Logger.Level.WARN)
@Message(id = 182002, value = "shutdown REST subscription because of timeout for: {0}", format = Message.Format.MESSAGE_FORMAT)
void shutdownRestSubscription(String id);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 184001, value = "Failed to load push store {0}, it is probably corrupted", format = Message.Format.MESSAGE_FORMAT)
void errorLoadingStore(@Cause Exception e, String name);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 184002, value = "Error updating store", format = Message.Format.MESSAGE_FORMAT)
void errorUpdatingStore(@Cause Exception e);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 184003, value = "Failed to push message to {0} disabling push registration...", format = Message.Format.MESSAGE_FORMAT)
void errorPushingMessage(XmlLink link);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 184004, value = "Error deleting Subscriber queue", format = Message.Format.MESSAGE_FORMAT)
void errorDeletingSubscriberQueue(@Cause HornetQException e);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.core.logging.Logger;
import org.hornetq.rest.HornetQRestLogger;
import org.hornetq.rest.util.TimeoutTask;

import javax.ws.rs.DELETE;
Expand All @@ -27,7 +27,6 @@
*/
public class ConsumersResource implements TimeoutTask.Callback
{
private static final Logger log = Logger.getLogger(ConsumersResource.class);
protected ConcurrentHashMap<String, QueueConsumer> queueConsumers = new ConcurrentHashMap<String, QueueConsumer>();
protected ClientSessionFactory sessionFactory;
protected String destination;
Expand Down Expand Up @@ -87,7 +86,7 @@ public void testTimeout(String target)
{
if (System.currentTimeMillis() - consumer.getLastPingTime() > consumerTimeoutSeconds * 1000)
{
log.warn("shutdown REST consumer because of timeout for: " + consumer.getId());
HornetQRestLogger.LOGGER.shutdownRestConsumer(consumer.getId());
consumer.shutdown();
queueConsumers.remove(consumer.getId());
serviceManager.getTimeoutTask().remove(consumer.getId());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.hornetq.rest.queue.push;

import org.hornetq.core.logging.Logger;
import org.hornetq.rest.HornetQRestLogger;
import org.hornetq.rest.queue.push.xml.PushRegistration;
import org.hornetq.rest.topic.PushTopicRegistration;

Expand All @@ -19,7 +19,6 @@
*/
public class FilePushStore implements PushStore
{
private static final Logger log = Logger.getLogger(FilePushStore.class);
protected Map<String, PushRegistration> map = new HashMap<String, PushRegistration>();
protected File dir;
protected JAXBContext ctx;
Expand All @@ -30,7 +29,7 @@ public FilePushStore(String dirname) throws Exception
this.ctx = JAXBContext.newInstance(PushRegistration.class, PushTopicRegistration.class);
if (this.dir.exists())
{
log.info("Loading REST push store from: " + this.dir.getAbsolutePath());
HornetQRestLogger.LOGGER.loadingRestStore(dir.getAbsolutePath());
for (File file : this.dir.listFiles())
{
if (!file.isFile()) continue;
Expand All @@ -39,12 +38,12 @@ public FilePushStore(String dirname) throws Exception
{
reg = (PushRegistration) ctx.createUnmarshaller().unmarshal(file);
reg.setLoadedFrom(file);
log.info("adding REST push registration: " + reg.getId());
HornetQRestLogger.LOGGER.addingPushRegistration(reg.getId());
map.put(reg.getId(), reg);
}
catch (Exception e)
{
log.error("Failed to load push store" + file.getName() + " , it is probably corrupted", e);
HornetQRestLogger.LOGGER.errorLoadingStore(e, file.getName());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.logging.Logger;
import org.hornetq.jms.client.SelectorTranslator;
import org.hornetq.rest.HornetQRestLogger;
import org.hornetq.rest.queue.push.xml.PushRegistration;

/**
Expand All @@ -16,7 +16,6 @@
*/
public class PushConsumer implements MessageHandler
{
private static final Logger log = Logger.getLogger(PushConsumer.class);
protected PushRegistration registration;
protected ClientSessionFactory factory;
protected ClientSession session;
Expand Down Expand Up @@ -81,7 +80,7 @@ else if (registration.getTarget().getRelationship().equals("template"))
}
consumer.setMessageHandler(this);
session.start();
log.info("Push consumer started for: " + registration.getTarget());
HornetQRestLogger.LOGGER.startingPushConsumer(registration.getTarget());
}

public void stop()
Expand Down Expand Up @@ -119,7 +118,7 @@ public void disableFromFailure()
}
catch (Exception e)
{
log.error(e);
HornetQRestLogger.LOGGER.errorUpdatingStore(e);
}
stop();
}
Expand All @@ -143,7 +142,7 @@ public void onMessage(ClientMessage clientMessage)
{
try
{
log.debug("Acknowledging: " + clientMessage.getMessageID());
HornetQRestLogger.LOGGER.debug("Acknowledging: " + clientMessage.getMessageID());
session.commit();
return;
}
Expand All @@ -164,7 +163,7 @@ public void onMessage(ClientMessage clientMessage)
}
if (registration.isDisableOnFailure())
{
log.error("Failed to push message to " + registration.getTarget() + " disabling push registration...");
HornetQRestLogger.LOGGER.errorPushingMessage(registration.getTarget());
disableFromFailure();
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import org.apache.commons.httpclient.UsernamePasswordCredentials;
import org.apache.commons.httpclient.auth.AuthScope;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.core.logging.Logger;
import org.hornetq.rest.HornetQRestLogger;
import org.hornetq.rest.queue.push.xml.BasicAuth;
import org.hornetq.rest.queue.push.xml.PushRegistration;
import org.hornetq.rest.queue.push.xml.XmlHttpHeader;
Expand All @@ -22,7 +22,6 @@
*/
public class UriStrategy implements PushStrategy
{
private static final Logger log = Logger.getLogger(UriStrategy.class);
protected HttpClient client = new HttpClient();
protected ApacheHttpClientExecutor executor = new ApacheHttpClientExecutor(client);
protected PushRegistration registration;
Expand Down Expand Up @@ -83,7 +82,7 @@ public boolean push(ClientMessage message)
ClientResponse res = null;
try
{
log.debug(method + " " + uri);
HornetQRestLogger.LOGGER.debug(method + " " + uri);
res = request.httpMethod(method);
int status = res.getStatus();
if (status == 503)
Expand All @@ -101,7 +100,7 @@ else if (status == 307)
}
else if ((status >= 200 && status < 299) || status == 303 || status == 304)
{
log.debug("Success");
HornetQRestLogger.LOGGER.debug("Success");
return true;
}
else if (status >= 400)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.core.logging.Logger;
import org.hornetq.rest.HornetQRestLogger;
import org.hornetq.rest.queue.push.PushConsumer;
import org.hornetq.rest.queue.push.PushStore;
import org.hornetq.rest.queue.push.xml.PushRegistration;
Expand All @@ -14,8 +14,6 @@
*/
public class PushSubscription extends PushConsumer
{
private static final Logger log = Logger.getLogger(PushSubscription.class);

public PushSubscription(ClientSessionFactory factory, String destination, String id, PushRegistration registration, PushStore store)
{
super(factory, destination, id, registration, store);
Expand All @@ -40,7 +38,7 @@ protected void deleteSubscriberQueue()
}
catch (HornetQException e)
{
log.error(e);
HornetQRestLogger.LOGGER.errorDeletingSubscriberQueue(e);
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.core.logging.Logger;
import org.hornetq.rest.HornetQRestLogger;
import org.hornetq.rest.queue.AcknowledgedQueueConsumer;
import org.hornetq.rest.queue.Acknowledgement;
import org.hornetq.rest.queue.DestinationServiceManager;
Expand All @@ -34,7 +34,6 @@
*/
public class SubscriptionsResource implements TimeoutTask.Callback
{
private static final Logger log = Logger.getLogger(SubscriptionsResource.class);
protected ConcurrentHashMap<String, QueueConsumer> queueConsumers = new ConcurrentHashMap<String, QueueConsumer>();
protected ClientSessionFactory sessionFactory;
protected String destination;
Expand Down Expand Up @@ -93,7 +92,7 @@ public void testTimeout(String target)
{
if (System.currentTimeMillis() - consumer.getLastPingTime() > subscription.getTimeout())
{
log.warn("shutdown REST subscription because of session timeout for: " + consumer.getId());
HornetQRestLogger.LOGGER.shutdownRestSubscription(consumer.getId());
consumer.shutdown();
queueConsumers.remove(consumer.getId());
serviceManager.getTimeoutTask().remove(consumer.getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@
package org.hornetq.integration.twitter.impl;

import org.hornetq.api.core.SimpleString;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.ConnectorService;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.integration.twitter.TwitterConstants;
import org.hornetq.twitter.HornetQTwitterLogger;
import org.hornetq.utils.ConfigurationHelper;
import twitter4j.*;
import twitter4j.http.AccessToken;
Expand All @@ -35,8 +35,6 @@
*/
public class IncomingTweetsHandler implements ConnectorService
{
private static final Logger log = Logger.getLogger(IncomingTweetsHandler.class);

private final String connectorName;

private final String consumerKey;
Expand Down Expand Up @@ -111,7 +109,7 @@ public void start() throws Exception
this.paging.setCount(TwitterConstants.FIRST_ATTEMPT_PAGE_SIZE);
ResponseList<Status> res = this.twitter.getHomeTimeline(paging);
this.paging.setSinceId(res.get(0).getId());
log.debug(connectorName + " initialise(): got latest ID: " + this.paging.getSinceId());
HornetQTwitterLogger.LOGGER.debug(connectorName + " initialise(): got latest ID: " + this.paging.getSinceId());

// TODO make page size configurable
this.paging.setCount(TwitterConstants.DEFAULT_PAGE_SIZE);
Expand Down Expand Up @@ -162,11 +160,11 @@ private void poll() throws Exception
putTweetIntoMessage(status, msg);

this.postOffice.route(msg, false);
log.debug(connectorName + ": routed: " + status.toString());
HornetQTwitterLogger.LOGGER.debug(connectorName + ": routed: " + status.toString());
}

this.paging.setSinceId(res.get(0).getId());
log.debug(connectorName + ": update latest ID: " + this.paging.getSinceId());
HornetQTwitterLogger.LOGGER.debug(connectorName + ": update latest ID: " + this.paging.getSinceId());
}

private void putTweetIntoMessage(final Status status, final ServerMessage msg)
Expand Down Expand Up @@ -215,7 +213,7 @@ public void run()
}
catch (Throwable t)
{
log.warn(connectorName, t);
HornetQTwitterLogger.LOGGER.errorPollingTwitter(t);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@

import org.hornetq.api.core.SimpleString;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.*;
import org.hornetq.integration.twitter.TwitterConstants;
import org.hornetq.twitter.HornetQTwitterLogger;
import org.hornetq.utils.ConfigurationHelper;
import twitter4j.*;
import twitter4j.http.AccessToken;
Expand All @@ -31,8 +31,6 @@
*/
public class OutgoingTweetsHandler implements Consumer, ConnectorService
{
private static final Logger log = Logger.getLogger(OutgoingTweetsHandler.class);

private final String connectorName;

private final String consumerKey;
Expand Down Expand Up @@ -118,7 +116,7 @@ public synchronized void start() throws Exception

this.queue.deliverAsync();
this.isStarted = true;
log.debug(connectorName + ": started");
HornetQTwitterLogger.LOGGER.debug(connectorName + ": started");
}

public boolean isStarted()
Expand All @@ -133,12 +131,12 @@ public synchronized void stop() throws Exception
return;
}

log.debug(connectorName + ": receive shutdown request");
HornetQTwitterLogger.LOGGER.debug(connectorName + ": receive shutdown request");

this.queue.removeConsumer(this);

this.isStarted = false;
log.debug(connectorName + ": shutdown");
HornetQTwitterLogger.LOGGER.debug(connectorName + ": shutdown");
}

public String getName()
Expand Down Expand Up @@ -200,7 +198,7 @@ public HandleStatus handle(final MessageReference ref) throws Exception
if(e.getStatusCode() == 403 )
{
// duplicated message
log.warn(connectorName + ": HTTP status code = 403: Ignore duplicated message");
HornetQTwitterLogger.LOGGER.error403(connectorName);
queue.acknowledge(ref);

return HandleStatus.HANDLED;
Expand All @@ -212,7 +210,7 @@ public HandleStatus handle(final MessageReference ref) throws Exception
}

queue.acknowledge(ref);
log.debug(connectorName + ": forwarded to twitter: " + message.getMessageID());
HornetQTwitterLogger.LOGGER.debug(connectorName + ": forwarded to twitter: " + message.getMessageID());
return HandleStatus.HANDLED;
}
}
Expand Down
Loading

0 comments on commit 88ca31c

Please sign in to comment.