Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add cassandra trace #53

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions containers/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,10 @@ type ActiveConnection struct {
Timestamp uint64
Closed time.Time

http2Parser *l7.Http2Parser
postgresParser *l7.PostgresParser
mysqlParser *l7.MysqlParser
http2Parser *l7.Http2Parser
postgresParser *l7.PostgresParser
mysqlParser *l7.MysqlParser
cassandraParser *l7.CassandraParser
}

type ListenDetails struct {
Expand Down Expand Up @@ -633,6 +634,13 @@ func (c *Container) onL7Request(pid uint32, fd uint64, timestamp uint64, r *l7.R
query := l7.ParseMongo(r.Payload)
trace.MongoQuery(query, r.Status.Error(), r.Duration)
case l7.ProtocolKafka, l7.ProtocolCassandra:
if conn.cassandraParser == nil {
conn.cassandraParser = l7.NewCassandraParser()
}
query := conn.cassandraParser.Parse(r.Payload, r.StatementId, uint32(r.Method))
if query != "" {
trace.CassandraQuery(query, r.Status.Error(), r.Duration)
}
stats.observe(r.Status.String(), "", r.Duration)
case l7.ProtocolRabbitmq, l7.ProtocolNats:
stats.observe(r.Status.String(), r.Method.String(), 0)
Expand Down
9 changes: 5 additions & 4 deletions containers/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ package containers
import (
"bytes"
"fmt"
"os"
"regexp"
"strings"
"time"

"github.com/coroot/coroot-node-agent/cgroup"
"github.com/coroot/coroot-node-agent/common"
"github.com/coroot/coroot-node-agent/ebpftracer"
Expand All @@ -11,10 +16,6 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/vishvananda/netns"
"k8s.io/klog/v2"
"os"
"regexp"
"strings"
"time"
)

var (
Expand Down
14 changes: 4 additions & 10 deletions ebpftracer/ebpf.go

Large diffs are not rendered by default.

21 changes: 19 additions & 2 deletions ebpftracer/ebpf/l7/cassandra.c
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
#define CASSANDRA_OPCODE_ERROR 0x00
#define CASSANDRA_OPCODE_QUERY 0x07
#define CASSANDRA_OPCODE_RESULT 0x08
#define CASSANDRA_OPCODE_PREPARE 0x09
#define CASSANDRA_OPCODE_EXECUTE 0x0A
#define CASSANDRA_OPCODE_BATCH 0x0D

#define CASSANDRA_OPCODE_RESULT_PREPARE 0x04

struct cassandra_header {
__u8 version;
__u8 flags;
Expand All @@ -17,7 +20,7 @@ struct cassandra_header {
};

static __always_inline
int is_cassandra_request(char *buf, __u64 buf_size, __s16 *stream_id) {
int is_cassandra_request(char *buf, __u64 buf_size, __s16 *stream_id, __u8 *request_type) {
struct cassandra_header h = {};
if (buf_size < sizeof(h)) {
return 0;
Expand All @@ -30,11 +33,16 @@ int is_cassandra_request(char *buf, __u64 buf_size, __s16 *stream_id) {
*stream_id = h.stream_id;
return 1;
}
if (h.opcode == CASSANDRA_OPCODE_PREPARE) {
*stream_id = h.stream_id;
*request_type = CASSANDRA_OPCODE_PREPARE;
return 1;
}
return 0;
}

static __always_inline
int is_cassandra_response(char *buf, __u64 buf_size, __s16 *stream_id, __u32 *status) {
int is_cassandra_response(char *buf, __u64 buf_size, __s16 *stream_id, __u32 *statement_id, __u32 *status) {
struct cassandra_header h = {};
if (buf_size < sizeof(h)) {
return 0;
Expand All @@ -46,6 +54,15 @@ int is_cassandra_response(char *buf, __u64 buf_size, __s16 *stream_id, __u32 *st
if (h.opcode == CASSANDRA_OPCODE_RESULT) {
*stream_id = h.stream_id;
*status = STATUS_OK;
__u8 b[4];
bpf_read(buf+9, b);
if (b[0] == 0 && b[1] == 0 && b[2] == 0 && b[3] == CASSANDRA_OPCODE_RESULT_PREPARE) {
__u8 l[2];
bpf_read(buf+13, l);
int length = (int)l[0] | (int)l[1] << 8;
// only read 4 first bytes
bpf_read(buf+15, *statement_id);
}
return 1;
}
if (h.opcode == CASSANDRA_OPCODE_ERROR) {
Expand Down
7 changes: 5 additions & 2 deletions ebpftracer/ebpf/l7/l7.c
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ int trace_enter_write(void *ctx, __u64 fd, __u16 is_tls, char *buf, __u64 size,
e->connection_timestamp = get_connection_timestamp(k.pid, k.fd);
bpf_perf_event_output(ctx, &l7_events, BPF_F_CURRENT_CPU, e, sizeof(*e));
return 0;
} else if (is_cassandra_request(payload, size, &k.stream_id)) {
} else if (is_cassandra_request(payload, size, &k.stream_id, &req->request_type)) {
req->protocol = PROTOCOL_CASSANDRA;
} else if (is_kafka_request(payload, size, &req->request_id)) {
req->protocol = PROTOCOL_KAFKA;
Expand Down Expand Up @@ -414,11 +414,14 @@ int trace_exit_read(void *ctx, __u64 id, __u32 pid, __u16 is_tls, long int ret)
struct l7_request *req = bpf_map_lookup_elem(&active_l7_requests, &k);
int response = 0;
if (!req) {
if (is_cassandra_response(payload, ret, &k.stream_id, &e->status)) {
if (is_cassandra_response(payload, ret, &k.stream_id, &e->statement_id, &e->status)) {
req = bpf_map_lookup_elem(&active_l7_requests, &k);
if (!req) {
return 0;
}
if (req->request_type == CASSANDRA_OPCODE_PREPARE) {
e->method = METHOD_STATEMENT_PREPARE;
}
response = 1;
} else if (looks_like_http2_frame(payload, ret, METHOD_HTTP2_SERVER_FRAMES)) {
e->protocol = PROTOCOL_HTTP2;
Expand Down
63 changes: 63 additions & 0 deletions ebpftracer/l7/cassandra.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package l7

import (
"encoding/binary"
"fmt"
"strconv"
)

type CassandraParser struct {
preparedStatements map[string]string
}

func NewCassandraParser() *CassandraParser {
return &CassandraParser{preparedStatements: map[string]string{}}
}

func (p *CassandraParser) Parse(payload []byte, statementId, method uint32) string {
opcode := payload[4]
query := p.ParseMessages(int(opcode), payload[9:])
if method == uint32(MethodStatementPrepare) {
id := strconv.FormatUint(uint64(statementId), 10)
p.preparedStatements[id] = query
return fmt.Sprintf("PREPARE %s FROM %s", id, query)
}
return query
}

func (p *CassandraParser) ParseMessages(opcode int, messages []byte) string {
// klog.Infof("ParseMessages: opcode=%v, messages=%v", opcode, messages)
switch opcode {
case 7: // query
return p.ParseQuery(messages)
case 9: // prepare
return p.ParsePrepare(messages)
case 10: // execute
return p.ParseExecute(messages)
}
return ""
}

func (p *CassandraParser) ParseQuery(messages []byte) string {
query := messages[0:4]
length := binary.BigEndian.Uint32(query)
query = messages[4 : 4+length]
return string(query)
}

func (p *CassandraParser) ParsePrepare(messages []byte) string {
query := messages[0:4]
length := binary.BigEndian.Uint32(query)
query = messages[4 : 4+length]
return string(query)
}

func (p *CassandraParser) ParseExecute(messages []byte) string {
// next length bytes are the id but we just get the first 4 bytes
id := messages[2 : 2+4]
idstr := strconv.Itoa(int(binary.LittleEndian.Uint32(id)))
if query, ok := p.preparedStatements[idstr]; ok {
return query
}
return fmt.Sprintf(`EXECUTE %s /* unknown */`, idstr)
}
10 changes: 10 additions & 0 deletions tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,16 @@ func (t *Trace) MysqlQuery(query string, error bool, duration time.Duration) {
)
}

func (t *Trace) CassandraQuery(query string, error bool, duration time.Duration) {
if t == nil || query == "" {
return
}
t.createSpan("query", duration, error,
semconv.DBSystemCassandra,
semconv.DBStatement(query),
)
}

func (t *Trace) MongoQuery(query string, error bool, duration time.Duration) {
if t == nil || query == "" {
return
Expand Down