-
Notifications
You must be signed in to change notification settings - Fork 18
/
metadata_request.go
104 lines (86 loc) · 2.31 KB
/
metadata_request.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package healer
import (
"encoding/binary"
)
type MetadataRequest struct {
*RequestHeader
Topics []string
AllowAutoTopicCreation bool
}
func (r *MetadataRequest) length(version uint16) int {
requestHeaderLength := r.RequestHeader.length()
requestLength := requestHeaderLength + 4
for _, topic := range r.Topics {
requestLength += 2 + len(topic)
}
if version == 7 {
requestLength += 1
}
return requestLength
}
func (metadataRequest *MetadataRequest) Encode(version uint16) []byte {
requestLength := metadataRequest.length(version)
payload := make([]byte, requestLength+4)
offset := 0
binary.BigEndian.PutUint32(payload[offset:], uint32(requestLength))
offset += 4
offset += metadataRequest.RequestHeader.EncodeTo(payload[offset:])
if metadataRequest.Topics == nil {
var i int32 = -1
binary.BigEndian.PutUint32(payload[offset:], uint32(i))
} else {
binary.BigEndian.PutUint32(payload[offset:], uint32(len(metadataRequest.Topics)))
}
offset += 4
for _, topicname := range metadataRequest.Topics {
binary.BigEndian.PutUint16(payload[offset:], uint16(len(topicname)))
offset += 2
offset += copy(payload[offset:], topicname)
}
if version == 7 {
if metadataRequest.AllowAutoTopicCreation {
payload[offset] = 1
} else {
payload[offset] = 0
}
offset++
}
return payload
}
func NewMetadataRequest(clientID string, topics []string) *MetadataRequest {
r := &MetadataRequest{
RequestHeader: &RequestHeader{
APIKey: API_MetadataRequest,
ClientID: &clientID,
},
Topics: topics,
AllowAutoTopicCreation: true,
}
return r
}
// just for test
func DecodeMetadataRequest(payload []byte) (r MetadataRequest, err error) {
offset := 0
// request payload length
binary.BigEndian.Uint32(payload)
offset += 4
// request header
header, n := DecodeRequestHeader(payload[offset:])
offset += n
r.RequestHeader = &header
// topics
topicCount := binary.BigEndian.Uint32(payload[offset:])
offset += 4
r.Topics = make([]string, topicCount)
for i := uint32(0); i < topicCount; i++ {
topicLength := binary.BigEndian.Uint16(payload[offset:])
offset += 2
r.Topics[i] = string(payload[offset : offset+int(topicLength)])
offset += int(topicLength)
}
if header.APIVersion >= 4 {
r.AllowAutoTopicCreation = payload[offset] == 1
offset++
}
return r, nil
}