Skip to content

Commit

Permalink
Merge pull request #9 from rudijs/revert-8-feat/connection-option
Browse files Browse the repository at this point in the history
Revert "feat(connection): make possible to use external RabbitMQ connection"
  • Loading branch information
rudijs committed Apr 2, 2016
2 parents 87ebdc4 + c8f4370 commit d5eee4d
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 48 deletions.
2 changes: 0 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ Check out the [examples](examples) for more details of the basic, advanced, adva
- `prefetch` : default : 1 - Set the prefetch count for this channel. The count given is the maximum number of messages sent over the channel that can be awaiting acknowledgement. To consume multiple requests in the same process, in parallel, this must be set to greater than 1.
- `logInfo`: Log non-error messages, default console.info - Can pass in custom logger (example Bunyan)
- `logError`: Log error messages, default console.warn - Can pass in custom logger (example Bunyan)
- `connection`: default: auto connect by `url` and `socketOptions` - [amqplib](https://www.npmjs.com/package/amqplib) connection, reconnect should be handled manually in this case via `initConsumer(connection)`

## Publisher Options

Expand All @@ -118,7 +117,6 @@ Check out the [examples](examples) for more details of the basic, advanced, adva
- `queueOptions` : default: ````{exclusive: true, autoDelete: true}```` - AMQP options passed to the queue.
- `logInfo`: Log non-error messages, default console.log - Can pass in custom logger (example Bunyan)
- `logError`: Log error messages, default console.log - Can pass in custom logger (example Bunyan)
- `connection`: default: auto connect by `url` and `socketOptions` - [amqplib](https://www.npmjs.com/package/amqplib) connection

## Tests

Expand Down
82 changes: 36 additions & 46 deletions lib/rpc-consumer-factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,53 @@ var rpcConsumerProto = {

this.processMessage = options.processMessage || processMessageDefault;

this.connection = null;

return this;
},

connect: function () {
if (this.connection) {
return this.initConsumer(conn);
}

return amqp.connect(this.uri(), this.socketOptions).then(function getConnectionSuccess(conn) {
amqp.connect(this.uri(), this.socketOptions).then(function getConnectionSuccess(conn) {

conn.on('error', function (err) {
this.logError(err.stack);
this.reconnect();
}.bind(this));

return this.initConsumer(conn);
return conn.createChannel().then(function createChannelSuccess(ch) {

// Capitalize this function as its used with a this binding
function Reply(msg) {

// process request
var response;

// If this.processMessage is a Q promise, returns the promise.
// If this.processMessage is not a promise, returns a promise that is fulfilled with value.
q(this.processMessage(msg))
.then(function processMessageSuccess(res) {
response = res;
})
.catch(function processMessageError(err) {
response = err;
})
.finally(function () {
// send response
ch.sendToQueue(msg.properties.replyTo, new Buffer(response.toString()), {correlationId: msg.properties.correlationId});
ch.ack(msg);
});

}

return ch.assertQueue(this.queue, this.queueOptions)
.then(function assertQueueSuccess() {
ch.prefetch(this.prefetch);
return ch.consume(this.queue, Reply.bind(this));
}.bind(this))
.then(function consumeSuccess() {
this.logInfo('Consumer: Waiting for RPC requests on: ' + this.queue);
}.bind(this));

}.bind(this));
}.bind(this))
.catch(function (error) {
this.logError('Consumer: AMQP Connect Error: ' + error.stack);
Expand All @@ -71,46 +99,8 @@ var rpcConsumerProto = {
setTimeout(this.connect.bind(this), this.connectionRetryInterval);
},

initConsumer: function (conn) {
return conn.createChannel().then(function createChannelSuccess(ch) {

// Capitalize this function as its used with a this binding
function Reply(msg) {

// process request
var response;

// If this.processMessage is a Q promise, returns the promise.
// If this.processMessage is not a promise, returns a promise that is fulfilled with value.
q(this.processMessage(msg))
.then(function processMessageSuccess(res) {
response = res;
})
.catch(function processMessageError(err) {
response = err;
})
.finally(function () {
// send response
ch.sendToQueue(msg.properties.replyTo, new Buffer(response.toString()), {correlationId: msg.properties.correlationId});
ch.ack(msg);
});

}

return ch.assertQueue(this.queue, this.queueOptions)
.then(function assertQueueSuccess() {
ch.prefetch(this.prefetch);
return ch.consume(this.queue, Reply.bind(this));
}.bind(this))
.then(function consumeSuccess() {
this.logInfo('Consumer: Waiting for RPC requests on: ' + this.queue);
}.bind(this));

}.bind(this));
},

run: function () {
return this.connect();
this.connect();
}
};

Expand Down

0 comments on commit d5eee4d

Please sign in to comment.