forked from gocardless/amqpc
-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.go
127 lines (108 loc) · 2.82 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package main
import (
"fmt"
"log"
"github.com/streadway/amqp"
)
type Consumer struct {
connection *amqp.Connection
channel *amqp.Channel
tag string
done chan error
}
func NewConsumer(amqpURI, exchange, exchangeType, queue, key, ctag string, durable bool) (*Consumer, error) {
c := &Consumer{
connection: nil,
channel: nil,
tag: ctag,
done: make(chan error),
}
var err error
log.Printf("Connecting to %s", amqpURI)
c.connection, err = amqp.Dial(amqpURI)
if err != nil {
return nil, fmt.Errorf("Dial: %s", err)
}
log.Printf("Getting Channel")
c.channel, err = c.connection.Channel()
if err != nil {
return nil, fmt.Errorf("hannel: %s", err)
}
log.Printf("Declaring Exchange (%s)", exchange)
if err = c.channel.ExchangeDeclare(
exchange, // name of the exchange
exchangeType, // type
durable, // durable
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
); err != nil {
return nil, fmt.Errorf("Exchange Declare: %s", err)
}
if exchangeType != "x-modulus-hash" {
log.Printf("Declaring Queue (%s)", queue)
state, err := c.channel.QueueDeclare(
queue, // name of the queue
true, // durable
false, // delete when usused
false, // exclusive
false, // noWait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Queue Declare: %s", err)
}
log.Printf("Declared Queue (%d messages, %d consumers)", state.Messages, state.Consumers)
log.Printf("Binding to Exchange (key '%s')", key)
if err = c.channel.QueueBind(
queue, // name of the queue
key, // routingKey
exchange, // sourceExchange
false, // noWait
nil, // arguments
); err != nil {
return nil, fmt.Errorf("Queue Bind: %s", err)
}
log.Printf("Queue bound to Exchange", c.tag)
}
log.Printf("Starting Consume (consumer tag '%s')", c.tag)
deliveries, err := c.channel.Consume(
queue, // name
c.tag, // consumerTag,
true, // autoAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
)
if err != nil {
return nil, fmt.Errorf("Queue Consume: %s", err)
}
go handle(deliveries, c.done)
return c, nil
}
func (c *Consumer) Shutdown() error {
// will close() the deliveries channel
if err := c.channel.Cancel(c.tag, true); err != nil {
return fmt.Errorf("Consumer cancel failed: %s", err)
}
if err := c.connection.Close(); err != nil {
return fmt.Errorf("AMQP connection close error: %s", err)
}
defer log.Printf("AMQP shutdown OK")
// wait for handle() to exit
return <-c.done
}
func handle(deliveries <-chan amqp.Delivery, done chan error) {
for d := range deliveries {
log.Printf(
"Got %dB delivery: [%v] %s",
len(d.Body),
d.DeliveryTag,
d.Body,
)
}
log.Printf("Handle: deliveries channel closed")
done <- nil
}