Skip to content

Commit

Permalink
some cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
jrobles committed Dec 12, 2016
1 parent f33be8e commit 90df753
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 38 deletions.
21 changes: 11 additions & 10 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,25 +7,25 @@ import (

func Consume(conn *amqp.Connection, queue Queue, exchange Exchange, consumerTag string, cb chan string) error {

ch, err := conn.Channel()
channel, err := conn.Channel()
if err != nil {
log.Printf("[CONEJO] Could not declare channel %q", err)
}
defer ch.Close()
defer channel.Close()

err = declareExchange(exchange, ch)
err = declareExchange(exchange, channel)
if err != nil {
log.Printf("ERROR: Could not declare Exchange [%s] %q", exchange.Name, err)
return err
} else {

_, err = declareQueue(queue, ch)
err = declareQueue(queue, channel)
if err != nil {
log.Printf("ERROR: Could not declare queue [%s] %q", queue.Name, err)
return err
} else {

err = ch.QueueBind(
err = channel.QueueBind(
queue.Name, // queue name
queue.Name, // routing key @TODO
exchange.Name, // exchange
Expand All @@ -38,7 +38,7 @@ func Consume(conn *amqp.Connection, queue Queue, exchange Exchange, consumerTag
} else {

log.Printf("Queue %s declared", queue.Name)
err = ch.Qos(
err = channel.Qos(
1, // prefetch count
0, // prefetch size
false, // global
Expand All @@ -47,7 +47,8 @@ func Consume(conn *amqp.Connection, queue Queue, exchange Exchange, consumerTag
log.Printf("ERROR: %q", err)
return err
}
msgs, err := ch.Consume(

msgs, err := channel.Consume(
queue.Name, // queue
consumerTag, // consumer
false, // auto-ack
Expand All @@ -62,7 +63,6 @@ func Consume(conn *amqp.Connection, queue Queue, exchange Exchange, consumerTag
}

forever := make(chan bool)

go func() {
for d := range msgs {
d.Ack(false)
Expand All @@ -72,9 +72,10 @@ func Consume(conn *amqp.Connection, queue Queue, exchange Exchange, consumerTag
log.Printf("Consumer tag %s", consumerTag)
<-forever
return nil
}

}
} // Bound to queue

} // Queue declared

}
return nil
Expand Down
18 changes: 9 additions & 9 deletions exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ type Exchange struct {
Arguments amqp.Table
}

func declareExchange(e Exchange, ch *amqp.Channel) error {
err := ch.ExchangeDeclare(
e.Name, // name
e.Type, // type
e.Durable, // durable
e.AutoDeleted, // auto-deleted
e.Internal, // internal
e.NoWait, // noWait
e.Arguments, // arguments
func declareExchange(exchange Exchange, channel *amqp.Channel) error {
err := channel.ExchangeDeclare(
exchange.Name, // name
exchange.Type, // type
exchange.Durable, // durable
exchange.AutoDeleted, // auto-deleted
exchange.Internal, // internal
exchange.NoWait, // noWait
exchange.Arguments, // arguments
)
return err
}
21 changes: 11 additions & 10 deletions publisher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,24 +7,24 @@ import (

func Publish(conn *amqp.Connection, queue Queue, exchange Exchange, body string) {

ch, err := conn.Channel()
channel, err := conn.Channel()
if err != nil {
log.Printf("[CONEJO] Could not declare channel %q", err)
}
ch.Confirm(false)
ack, nack := ch.NotifyConfirm(make(chan uint64, 1), make(chan uint64, 1))
channel.Confirm(false)
ack, nack := channel.NotifyConfirm(make(chan uint64, 1), make(chan uint64, 1))

err = declareExchange(exchange, ch)
err = declareExchange(exchange, channel)
if err != nil {
log.Printf("ERROR: Could not declare exchange %q", err)
} else {
log.Printf("[CONEJO] Declared exchange")
_, err := declareQueue(queue, ch)

err := declareQueue(queue, channel)
if err != nil {
log.Printf("ERROR: Could not declare queue %q", err)
} else {
log.Printf("[CONEJO] Declared queue")
err = ch.QueueBind(

err = channel.QueueBind(
queue.Name, // queue name
queue.Name, // @TODO - FIX ME!!!
exchange.Name, // exchange
Expand All @@ -35,7 +35,7 @@ func Publish(conn *amqp.Connection, queue Queue, exchange Exchange, body string)
log.Printf("ERROR: Could not bind queue '%s' to exchange '%s' using '%s' - ", queue.Name, exchange.Name, queue.Name, err)
} else {

if err = ch.Publish(
if err = channel.Publish(
exchange.Name, // publish to an exchange
queue.Name, // routing to 0 or more queues
false, // mandatory
Expand All @@ -51,13 +51,14 @@ func Publish(conn *amqp.Connection, queue Queue, exchange Exchange, body string)
); err != nil {
log.Printf("ERROR: Could not publish message %s", err)
} else {

select {
case tag := <-ack:
log.Println("Acked ", tag)
case tag := <-nack:
log.Println("Nack alert! ", tag)
}
defer ch.Close()
defer channel.Close()
}

}
Expand Down
18 changes: 9 additions & 9 deletions queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ type Queue struct {
Arguments amqp.Table
}

func declareQueue(q Queue, ch *amqp.Channel) (aq amqp.Queue, err error) {
aq, err = ch.QueueDeclare(
q.Name, // name
q.Durable, // durable
q.Delete, // delete when unused
q.Exclusive, // exclusive
q.NoWait, // no-wait
q.Arguments, // arguments
func declareQueue(queue Queue, channel *amqp.Channel) error {
_, err := channel.QueueDeclare(
queue.Name, // name
queue.Durable, // durable
queue.Delete, // delete when unused
queue.Exclusive, // exclusive
queue.NoWait, // no-wait
queue.Arguments, // arguments
)
return aq, err
return err
}

0 comments on commit 90df753

Please sign in to comment.