Skip to content

Commit

Permalink
remove unneeded bpf map update calls
Browse files Browse the repository at this point in the history
Signed-off-by: Mohamed Mahmoud <[email protected]>
  • Loading branch information
msherif1234 committed Nov 26, 2024
1 parent 02d1817 commit 27be1cb
Show file tree
Hide file tree
Showing 16 changed files with 135 additions and 139 deletions.
7 changes: 5 additions & 2 deletions bpf/dns_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@ static __always_inline int track_dns_packet(struct __sk_buff *skb, pkt_info *pkt

if ((flags & DNS_QR_FLAG) == 0) { /* dns query */
fill_dns_id(pkt->id, &dns_req, dns_id, false);
if (bpf_map_lookup_elem(&dns_flows, &dns_req) == NULL) {
bpf_map_update_elem(&dns_flows, &dns_req, &ts, BPF_ANY);
ret = bpf_map_update_elem(&dns_flows, &dns_req, &ts, BPF_NOEXIST);
if (ret != 0) {
if (trace_messages) {
bpf_printk("error creating new dns entry %d\n", ret);
}
}
} else { /* dns response */
fill_dns_id(pkt->id, &dns_req, dns_id, true);
Expand Down
92 changes: 45 additions & 47 deletions bpf/flows.c
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,24 @@
*/
#include "network_events_monitoring.h"

static inline void update_existing_flow(flow_metrics *aggregate_flow, pkt_info *pkt, u64 len,
int dns_errno) {
aggregate_flow->packets += 1;
aggregate_flow->bytes += len;
aggregate_flow->end_mono_time_ts = pkt->current_ts;
// it might happen that start_mono_time hasn't been set due to
// the way percpu hashmap deal with concurrent map entries
if (aggregate_flow->start_mono_time_ts == 0) {
aggregate_flow->start_mono_time_ts = pkt->current_ts;
}
aggregate_flow->flags |= pkt->flags;
aggregate_flow->dscp = pkt->dscp;
aggregate_flow->dns_record.id = pkt->dns_id;
aggregate_flow->dns_record.flags = pkt->dns_flags;
aggregate_flow->dns_record.latency = pkt->dns_latency;
aggregate_flow->dns_record.errno = dns_errno;
}

static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
// If sampling is defined, will only parse 1 out of "sampling" flows
if (sampling > 1 && (bpf_get_prandom_u32() % sampling) != 0) {
Expand All @@ -70,6 +88,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
void *data_end = (void *)(long)skb->data_end;
void *data = (void *)(long)skb->data;
struct ethhdr *eth = (struct ethhdr *)data;
u64 len = skb->len;

if (fill_ethhdr(eth, data_end, &pkt) == DISCARD) {
return TC_ACT_OK;
Expand All @@ -93,33 +112,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
// a spinlocked alternative version and use it selectively https://lwn.net/Articles/779120/
flow_metrics *aggregate_flow = (flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
aggregate_flow->packets += 1;
aggregate_flow->bytes += skb->len;
aggregate_flow->end_mono_time_ts = pkt.current_ts;
// it might happen that start_mono_time hasn't been set due to
// the way percpu hashmap deal with concurrent map entries
if (aggregate_flow->start_mono_time_ts == 0) {
aggregate_flow->start_mono_time_ts = pkt.current_ts;
}
aggregate_flow->flags |= pkt.flags;
aggregate_flow->dscp = pkt.dscp;
aggregate_flow->dns_record.id = pkt.dns_id;
aggregate_flow->dns_record.flags = pkt.dns_flags;
aggregate_flow->dns_record.latency = pkt.dns_latency;
aggregate_flow->dns_record.errno = dns_errno;
long ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY);
if (ret != 0) {
// usually error -16 (-EBUSY) is printed here.
// In this case, the flow is dropped, as submitting it to the ringbuffer would cause
// a duplicated UNION of flows (two different flows with partial aggregation of the same packets),
// which can't be deduplicated.
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
if (trace_messages) {
bpf_printk("error updating flow %d\n", ret);
}
// Update global counter for hashmap update errors
increase_counter(HASHMAP_FLOWS_DROPPED);
}
update_existing_flow(aggregate_flow, &pkt, dns_errno, len);
} else {
// Key does not exist in the map, and will need to create a new entry.
u64 rtt = 0;
Expand All @@ -128,7 +121,7 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
}
flow_metrics new_flow = {
.packets = 1,
.bytes = skb->len,
.bytes = len,
.start_mono_time_ts = pkt.current_ts,
.end_mono_time_ts = pkt.current_ts,
.flags = pkt.flags,
Expand All @@ -140,31 +133,36 @@ static inline int flow_monitor(struct __sk_buff *skb, u8 direction) {
.flow_rtt = rtt,
};

// even if we know that the entry is new, another CPU might be concurrently inserting a flow
// so we need to specify BPF_ANY
long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY);
long ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_NOEXIST);
if (ret != 0) {
// usually error -16 (-EBUSY) or -7 (E2BIG) is printed here.
// In this case, we send the single-packet flow via ringbuffer as in the worst case we can have
// a repeated INTERSECTION of flows (different flows aggregating different packets),
// which can be re-aggregated at userpace.
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
if (trace_messages) {
bpf_printk("error adding flow %d\n", ret);
}

new_flow.errno = -ret;
flow_record *record =
(flow_record *)bpf_ringbuf_reserve(&direct_flows, sizeof(flow_record), 0);
if (!record) {
if (trace_messages) {
bpf_printk("couldn't reserve space in the ringbuf. Dropping flow");
if (ret == -EEXIST) {
flow_metrics *aggregate_flow =
(flow_metrics *)bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
update_existing_flow(aggregate_flow, &pkt, dns_errno, len);
}
} else {
// usually error -16 (-EBUSY) or -7 (E2BIG) is printed here.
// In this case, we send the single-packet flow via ringbuffer as in the worst case we can have
// a repeated INTERSECTION of flows (different flows aggregating different packets),
// which can be re-aggregated at userpace.
// other possible values https://chromium.googlesource.com/chromiumos/docs/+/master/constants/errnos.md
new_flow.errno = -ret;
flow_record *record =
(flow_record *)bpf_ringbuf_reserve(&direct_flows, sizeof(flow_record), 0);
if (!record) {
if (trace_messages) {
bpf_printk("couldn't reserve space in the ringbuf. Dropping flow");
}
return TC_ACT_OK;
}
return TC_ACT_OK;
record->id = id;
record->metrics = new_flow;
bpf_ringbuf_submit(record, 0);
}
record->id = id;
record->metrics = new_flow;
bpf_ringbuf_submit(record, 0);
}
}
return TC_ACT_OK;
Expand Down
10 changes: 3 additions & 7 deletions bpf/flows_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,9 @@

#include "utils.h"

// remove the comment below to enable debug prints
//#define ENABLE_BPF_PRINTK
#ifdef ENABLE_BPF_PRINTK
#define BPF_PRINTK(fmt, args...) bpf_printk(fmt, ##args)
#else
#define BPF_PRINTK(fmt, args...)
#endif
#define BPF_PRINTK(fmt, args...) \
if (trace_messages) \
bpf_printk(fmt, ##args)

static __always_inline int is_zero_ip(u8 *ip, u8 len) {
for (int i = 0; i < len; i++) {
Expand Down
64 changes: 33 additions & 31 deletions bpf/network_events_monitoring.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,44 @@ static inline bool md_already_exists(u8 network_events[MAX_NETWORK_EVENTS][MAX_E
return false;
}

static inline int lookup_and_update_existing_flow_network_events(flow_id *id, u8 md_len,
u8 *user_cookie) {
u8 cookie[MAX_EVENT_MD];

bpf_probe_read(cookie, md_len, user_cookie);

flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, id);
if (aggregate_flow != NULL) {
u8 idx = aggregate_flow->network_events_idx;
aggregate_flow->end_mono_time_ts = bpf_ktime_get_ns();
// Needed to check length here again to keep JIT verifier happy
if (idx < MAX_NETWORK_EVENTS && md_len <= MAX_EVENT_MD) {
if (!md_already_exists(aggregate_flow->network_events, (u8 *)cookie)) {
__builtin_memcpy(aggregate_flow->network_events[idx], cookie, MAX_EVENT_MD);
aggregate_flow->network_events_idx = (idx + 1) % MAX_NETWORK_EVENTS;
}
return 0;
}
}
return -1;
}

static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_metadata *md) {
u8 dscp = 0, protocol = 0, md_len = 0;
u16 family = 0, flags = 0;
u8 *user_cookie = NULL;
u8 cookie[MAX_EVENT_MD];
long ret = 0;
u64 len = 0;
flow_id id;

__builtin_memset(&id, 0, sizeof(id));
__builtin_memset(cookie, 0, sizeof(cookie));

md_len = BPF_CORE_READ(md, user_cookie_len);
user_cookie = (u8 *)BPF_CORE_READ(md, user_cookie);
if (md_len == 0 || md_len > MAX_EVENT_MD || user_cookie == NULL) {
return -1;
}

bpf_probe_read(cookie, md_len, user_cookie);

id.if_index = BPF_CORE_READ(md, in_ifindex);

len = BPF_CORE_READ(skb, len);
Expand Down Expand Up @@ -88,33 +106,12 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me

for (direction dir = INGRESS; dir < MAX_DIRECTION; dir++) {
id.direction = dir;
flow_metrics *aggregate_flow = bpf_map_lookup_elem(&aggregated_flows, &id);
if (aggregate_flow != NULL) {
u8 idx = aggregate_flow->network_events_idx;
aggregate_flow->end_mono_time_ts = bpf_ktime_get_ns();
// Needed to check length here again to keep JIT verifier happy
if (idx < MAX_NETWORK_EVENTS && md_len <= MAX_EVENT_MD) {
if (!md_already_exists(aggregate_flow->network_events, (u8 *)cookie)) {
__builtin_memcpy(aggregate_flow->network_events[idx], cookie, MAX_EVENT_MD);
aggregate_flow->network_events_idx = (idx + 1) % MAX_NETWORK_EVENTS;
}
ret = bpf_map_update_elem(&aggregated_flows, &id, aggregate_flow, BPF_ANY);
if (ret == 0) {
return 0;
}
} else {
return -1;
}
ret = lookup_and_update_existing_flow_network_events(&id, md_len, user_cookie);
if (ret == 0) {
return ret;
}
}

if (ret != 0) {
if (trace_messages) {
bpf_printk("error network events updating existing flow %d\n", ret);
}
return ret;
}

// there is no matching flows so lets create new one and add the network event metadata
u64 current_time = bpf_ktime_get_ns();
id.direction = INGRESS;
Expand All @@ -128,9 +125,14 @@ static inline int trace_network_events(struct sk_buff *skb, struct rh_psample_me
};
bpf_probe_read(new_flow.network_events[0], md_len, user_cookie);
new_flow.network_events_idx++;
ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY);
if (trace_messages && ret != 0) {
bpf_printk("error network events creating new flow %d\n", ret);
ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_NOEXIST);
if (ret != 0) {
if (trace_messages) {
bpf_printk("error network events creating new flow %d\n", ret);
}
if (ret == -EEXIST) {
ret = lookup_and_update_existing_flow_network_events(&id, md_len, user_cookie);
}
}
return ret;
}
Expand Down
15 changes: 8 additions & 7 deletions bpf/pkt_drops.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ static inline long pkt_drop_lookup_and_update_flow(flow_id *id, u8 state, u16 fl
aggregate_flow->pkt_drops.latest_state = state;
aggregate_flow->pkt_drops.latest_flags = flags;
aggregate_flow->pkt_drops.latest_drop_cause = reason;
long ret = bpf_map_update_elem(&aggregated_flows, id, aggregate_flow, BPF_EXIST);
if (trace_messages && ret != 0) {
bpf_printk("error packet drop updating flow %d\n", ret);
}
return 0;
}
return -1;
Expand Down Expand Up @@ -93,9 +89,14 @@ static inline int trace_pkt_drop(void *ctx, u8 state, struct sk_buff *skb,
.pkt_drops.latest_flags = flags,
.pkt_drops.latest_drop_cause = reason,
};
ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY);
if (trace_messages && ret != 0) {
bpf_printk("error packet drop creating new flow %d\n", ret);
ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_NOEXIST);
if (ret != 0) {
if (trace_messages) {
bpf_printk("error packet drop creating new flow %d\n", ret);
}
if (ret == -EEXIST) {
ret = pkt_drop_lookup_and_update_flow(&id, state, flags, reason, len);
}
}

return ret;
Expand Down
15 changes: 8 additions & 7 deletions bpf/rtt_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ static inline int rtt_lookup_and_update_flow(flow_id *id, u16 flags, u64 rtt) {
if (aggregate_flow->flow_rtt < rtt) {
aggregate_flow->flow_rtt = rtt;
}
long ret = bpf_map_update_elem(&aggregated_flows, id, aggregate_flow, BPF_ANY);
if (trace_messages && ret != 0) {
bpf_printk("error rtt updating flow %d\n", ret);
}
return 0;
}
return -1;
Expand Down Expand Up @@ -87,9 +83,14 @@ static inline int calculate_flow_rtt_tcp(struct sock *sk, struct sk_buff *skb) {
.flow_rtt = rtt,
.dscp = dscp,
};
ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_ANY);
if (trace_messages && ret != 0) {
bpf_printk("error rtt track creating flow %d\n", ret);
ret = bpf_map_update_elem(&aggregated_flows, &id, &new_flow, BPF_NOEXIST);
if (ret != 0) {
if (trace_messages) {
bpf_printk("error rtt track creating flow %d\n", ret);
}
if (ret == -EEXIST) {
ret = rtt_lookup_and_update_flow(&id, flags, rtt);
}
}

return 0;
Expand Down
2 changes: 1 addition & 1 deletion bpf/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#define DISCARD 1
#define SUBMIT 0

#define EEXIST 17
// Flags according to RFC 9293 & https://www.iana.org/assignments/ipfix/ipfix.xhtml
typedef enum tcp_flags_t {
FIN_FLAG = 0x01,
Expand Down Expand Up @@ -184,7 +185,6 @@ typedef struct dns_flow_id_t {

// Enum to define global counters keys and share it with userspace
typedef enum global_counters_key_t {
HASHMAP_FLOWS_DROPPED,
FILTER_REJECT,
FILTER_ACCEPT,
FILTER_NOMATCH,
Expand Down
17 changes: 8 additions & 9 deletions pkg/ebpf/bpf_arm64_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_arm64_bpfel.o
Binary file not shown.
17 changes: 8 additions & 9 deletions pkg/ebpf/bpf_powerpc_bpfel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified pkg/ebpf/bpf_powerpc_bpfel.o
Binary file not shown.
Loading

0 comments on commit 27be1cb

Please sign in to comment.