Allgemein

Vincent Bernat: Using eBPF to load-balance traffic across UDP sockets with Go

Vincent Bernat: Using eBPF to load-balance traffic across UDP sockets with Go

Akvorado collects sFlow and IPFIX flows over UDP. Because UDP does
not retransmit lost packets, it needs to process them quickly. Akvorado runs
several workers listening to the same port. The kernel should load-balance
received packets fairly between these workers. However, this does not work as
expected. A couple of workers exhibit high packet loss:

$ curl -s 127.0.0.1:8080/api/v0/inlet/metrics 
> | sed -n s/akvorado_inlet_flow_input_udp_in_dropped//p
packets_total{listener="0.0.0.0:2055",worker="0"} 0
packets_total{listener="0.0.0.0:2055",worker="1"} 0
packets_total{listener="0.0.0.0:2055",worker="2"} 0
packets_total{listener="0.0.0.0:2055",worker="3"} 1.614933572278264e+15
packets_total{listener="0.0.0.0:2055",worker="4"} 0
packets_total{listener="0.0.0.0:2055",worker="5"} 0
packets_total{listener="0.0.0.0:2055",worker="6"} 9.59964121598348e+14
packets_total{listener="0.0.0.0:2055",worker="7"} 0

eBPF can help by implementing an alternate balancing algorithm. 🐝

Options for load-balancing

There are three methods to load-balance UDP packets across workers:

  1. One worker receives the packets and dispatches them to the other workers.
  2. All workers share the same socket.
  3. Each worker has its own socket, listening to the same port, with the
    SO_REUSEPORT socket option.

SO_REUSEPORT option

Tom Hebert added the SO_REUSEPORT socket option in Linux 3.9. The
cover letter for his patch series explains why this new option is better than
the two existing ones from a performance point of view:

SO_REUSEPORT allows multiple listener sockets to be bound to the same port.
[…] Received packets are distributed to multiple sockets bound to the same
port using a 4-tuple hash.

The motivating case for SO_RESUSEPORT in TCP would be something like a web
server binding to port 80 running with multiple threads, where each thread
might have it’s own listener socket. This could be done as an alternative to
other models:

  1. have one listener thread which dispatches completed connections to workers, or
  2. accept on a single listener socket from multiple threads.

In case #1, the listener thread can easily become the bottleneck with high
connection turn-over rate. In case #2, the proportion of connections accepted
per thread tends to be uneven under high connection load. […] We have seen the
disproportion to be as high as 3:1 ratio between thread accepting most
connections and the one accepting the fewest. With SO_REUSEPORT the
distribution is uniform.

The motivating case for SO_REUSEPORT in UDP would be something like a DNS
server. An alternative would be to receive on the same socket from multiple
threads. As in the case of TCP, the load across these threads tends to be
disproportionate and we also see a lot of contection on the socket lock.

Akvorado uses the SO_REUSEPORT option to dispatch the packets across the
workers. However, because the distribution uses a 4-tuple hash, a single socket
handles all the flows from one exporter.

SO_ATTACH_REUSEPORT_EBPF option

In Linux 4.5, Craig Gallek added the SO_ATTACH_REUSEPORT_EBPF
option to attach an eBPF program to select the target UDP socket. In Linux 4.6,
he extended it to support TCP. The socket(7) manual page
documents this mechanism:1

The BPF program must return an index between 0 and N-1 representing the socket
which should receive the packet (where N is the number of sockets in the
group). If the BPF program returns an invalid index, socket selection will
fall back to the plain SO_REUSEPORT mechanism.

In Linux 4.19, Martin KaFai Lau added the
BPF_PROG_TYPE_SK_REUSEPORT program type. Such an eBPF program
selects the socket from a BPF_MAP_TYPE_REUSEPORT_ARRAY map instead. This new
approach is more reliable when switching target sockets from one instance to
another—for example, when upgrading, a new instance can add its sockets and
remove the old ones.

Load-balancing with eBPF and Go

Altering the load-balancing algorithm for a group of sockets requires two steps:

  1. write and compile an eBPF program in C,2 and
  2. load it and attach it in Go.

eBPF program in C

A simple load-balancing algorithm is to randomly choose the destination socket.
The kernel provides the bpf_get_prandom_u32() helper function to get a
pseudo-random number.

volatile const __u32 num_sockets; // ❶

struct {
    __uint(type, BPF_MAP_TYPE_REUSEPORT_SOCKARRAY);
    __type(key, __u32);
    __type(value, __u64);
    __uint(max_entries, 256);
} socket_map SEC(".maps"); // ❷

SEC("sk_reuseport")
int reuseport_balance_prog(struct sk_reuseport_md *reuse_md)
{
    __u32 index = bpf_get_prandom_u32() % num_sockets; // ❸
    bpf_sk_select_reuseport(reuse_md, &socket_map, &index, 0); // ❹
    return SK_PASS; // ❺
}

char _license[] SEC("license") = "GPL";

In ❶, we declare a volatile constant for the number of sockets in the group. We
will initialize this constant before loading the eBPF program into the kernel.
In ❷, we define the socket map. We will populate it with the socket file
descriptors. In ❸, we randomly select the index of the target socket.3
In ❹, we invoke the bpf_sk_select_reuseport() helper to record our decision.
Finally, in ❺, we accept the packet.

Header files

If you compile the C source with clang, you get errors due to missing headers.
The recommended way to solve this is to generate a vmlinux.h file with
bpftool:

$ bpftool btf dump file /sys/kernel/btf/vmlinux format c > vmlinux.h

Then, include the following headers:4

#include "vmlinux.h"
#include <bpf/bpf_helpers.h>

For my 6.17 kernel, the generated vmlinux.h is quite big: 2.7 MiB. Moreover,
bpf/bpf_helpers.h is shipped with libbpf. This adds another dependency for
users. As the eBPF program is quite small, I prefer to put the strict minimum in
vmlinux.h by cherry-picking the definitions I need.

Compilation

The eBPF Library for Go ships bpf2go, a tool to compile eBPF programs and
to generate some scaffolding code. We create a gen.go file with the following
content:

package main

//go:generate go tool bpf2go -tags linux reuseport reuseport_kern.c

After running go generate ./..., we can inspect the resulting objects with
readelf and llvm-objdump:

$ readelf -S reuseport_bpfeb.o
There are 14 section headers, starting at offset 0x840:
  [Nr] Name              Type             Address           Offset
[…]
  [ 3] sk_reuseport      PROGBITS         0000000000000000  00000040
  [ 6] .maps             PROGBITS         0000000000000000  000000c8
  [ 7] license           PROGBITS         0000000000000000  000000e8
[…]
$ llvm-objdump -S reuseport_bpfeb.o
reuseport_bpfeb.o:  file format elf64-bpf
Disassembly of section sk_reuseport:
0000000000000000 <reuseport_balance_prog>:
; {
       0:   bf 61 00 00 00 00 00 00     r6 = r1
;     __u32 index = bpf_get_prandom_u32() % num_sockets;
       1:   85 00 00 00 00 00 00 07     call 0x7
[…]

Usage from Go

Let’s set up 10 workers listening to the same port.5 Each socket
enables the SO_REUSEPORT option before binding:6

var (
    err error
    fds []uintptr
    conns []*net.UDPConn
)
workers := 10
listenAddr := "127.0.0.1:0"
listenConfig := net.ListenConfig{
    Control: func(_, _ string, c syscall.RawConn) error {
        c.Control(func(fd uintptr) {
            err = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unix.SO_REUSEPORT, 1)
            fds = append(fds, fd)
        })
        return err
    },
}
for range workers {
    pconn, err := listenConfig.ListenPacket(t.Context(), "udp", listenAddr)
    if err != nil {
        t.Fatalf("ListenPacket() error:n%+v", err)
    }
    udpConn := pconn.(*net.UDPConn)
    listenAddr = udpConn.LocalAddr().String()
    conns = append(conns, udpConn)
}

The second step is to load the eBPF program, initialize the num_sockets
variable, populate the socket map, and attach the program to the first
socket.7

// Load the eBPF collection.
spec, err := loadReuseport()
if err != nil {
    t.Fatalf("loadVariables() error:n%+v", err)
}

// Set "num_sockets" global variable to the number of file descriptors we will register
if err := spec.Variables["num_sockets"].Set(uint32(len(fds))); err != nil {
    t.Fatalf("NumSockets.Set() error:n%+v", err)
}

// Load the map and the program into the kernel.
var objs reuseportObjects
if err := spec.LoadAndAssign(&objs, nil); err != nil {
    t.Fatalf("loadReuseportObjects() error:n%+v", err)
}
t.Cleanup(func() { objs.Close() })

// Assign the file descriptors to the socket map.
for worker, fd := range fds {
    if err := objs.reuseportMaps.SocketMap.Put(uint32(worker), uint64(fd)); err != nil {
        t.Fatalf("SocketMap.Put() error:n%+v", err)
    }
}

// Attach the eBPF program to the first socket.
socketFD := int(fds[0])
progFD := objs.reuseportPrograms.ReuseportBalanceProg.FD()
if err := unix.SetsockoptInt(socketFD, unix.SOL_SOCKET, unix.SO_ATTACH_REUSEPORT_EBPF, progFD); err != nil {
    t.Fatalf("SetsockoptInt() error:n%+v", err)
}

We are now ready to process incoming packets. Each worker is a Go routine
incrementing a counter for each received packet:8

var wg sync.WaitGroup
receivedPackets := make([]int, workers)
for worker := range workers {
    conn := conns[worker]
    packets := &receivedPackets[worker]
    wg.Go(func() {
        payload := make([]byte, 9000)
        for {
            if _, err := conn.Read(payload); err != nil {
                if errors.Is(err, net.ErrClosed) {
                    return
                }
                t.Logf("Read() error:n%+v", err)
            }
            *packets++
        }
    })
}

Let’s send 1000 packets:

sentPackets := 1000
conn, err := net.Dial("udp", conns[0].LocalAddr().String())
if err != nil {
    t.Fatalf("Dial() error:n%+v", err)
}
defer conn.Close()
for range sentPackets {
    if _, err := conn.Write([]byte("hello world!")); err != nil {
        t.Fatalf("Write() error:n%+v", err)
    }
}

If we print the content of the receivedPackets array, we can check the
balancing works as expected, with each worker getting about 100 packets:

=== RUN   TestUDPWorkerBalancing
    balancing_test.go:84: receivedPackets[0] = 107
    balancing_test.go:84: receivedPackets[1] = 92
    balancing_test.go:84: receivedPackets[2] = 99
    balancing_test.go:84: receivedPackets[3] = 105
    balancing_test.go:84: receivedPackets[4] = 107
    balancing_test.go:84: receivedPackets[5] = 96
    balancing_test.go:84: receivedPackets[6] = 102
    balancing_test.go:84: receivedPackets[7] = 105
    balancing_test.go:84: receivedPackets[8] = 99
    balancing_test.go:84: receivedPackets[9] = 88

    balancing_test.go:91: receivedPackets = 1000
    balancing_test.go:92: sentPackets     = 1000

Graceful restart

You can also use SO_ATTACH_REUSEPORT_EBPF to gracefully restart an
application. A new instance of the application binds to the same address and
prepare its own version of the socket map. Once it attaches the eBPF program to
the first socket, the kernel steers incoming packets to this new instance. The
old instance needs to drain the already received packets before shutting down.

To check we are not losing any packet, we spawn a Go routine to send as many
packets as possible:

sentPackets := 0
notSentPackets := 0
done := make(chan bool)
conn, err := net.Dial("udp", conns1[0].LocalAddr().String())
if err != nil {
    t.Fatalf("Dial() error:n%+v", err)
}
defer conn.Close()
go func() {
    for {
        if _, err := conn.Write([]byte("hello world!")); err != nil {
            notSentPackets++
        } else {
            sentPackets++
        }
        select {
        case <-done:
            return
        default:
        }
    }
}()

Then, while the Go routine runs, we start the second set of workers. Once they
are running, they start receiving packets. If we gracefully stop the initial set
of workers, not a single packet is lost!9

=== RUN   TestGracefulRestart
    graceful_test.go:135: receivedPackets1[0] = 165
    graceful_test.go:135: receivedPackets1[1] = 195
    graceful_test.go:135: receivedPackets1[2] = 194
    graceful_test.go:135: receivedPackets1[3] = 190
    graceful_test.go:135: receivedPackets1[4] = 213
    graceful_test.go:135: receivedPackets1[5] = 187
    graceful_test.go:135: receivedPackets1[6] = 170
    graceful_test.go:135: receivedPackets1[7] = 190
    graceful_test.go:135: receivedPackets1[8] = 194
    graceful_test.go:135: receivedPackets1[9] = 155

    graceful_test.go:139: receivedPackets2[0] = 1631
    graceful_test.go:139: receivedPackets2[1] = 1582
    graceful_test.go:139: receivedPackets2[2] = 1594
    graceful_test.go:139: receivedPackets2[3] = 1611
    graceful_test.go:139: receivedPackets2[4] = 1571
    graceful_test.go:139: receivedPackets2[5] = 1660
    graceful_test.go:139: receivedPackets2[6] = 1587
    graceful_test.go:139: receivedPackets2[7] = 1605
    graceful_test.go:139: receivedPackets2[8] = 1631
    graceful_test.go:139: receivedPackets2[9] = 1689

    graceful_test.go:147: receivedPackets = 18014
    graceful_test.go:148: sentPackets     = 18014

Unfortunately, gracefully shutting down a UDP socket is not trivial in Go.
Previously, we were terminating workers by closing their sockets. However, if we
close them too soon, the application loses packets that were assigned to them
but not yet processed. Before stopping, a worker needs to call conn.Read()
until there are no more packets. A solution is to set a deadline for
conn.Read() and check if we should stop the Go routine when the deadline is
exceeded:

payload := make([]byte, 9000)
for {
    conn.SetReadDeadline(time.Now().Add(50 * time.Millisecond))
    if _, err := conn.Read(payload); err != nil {
        if errors.Is(err, os.ErrDeadlineExceeded) {
            select {
            case <-done:
                return
            default:
                continue
            }
        }
        t.Logf("Read() error:n%+v", err)
    }
    *packets++
}

With TCP, this aspect is simpler: after enabling the net.ipv4.tcp_migrate_req
sysctl, the kernel automatically migrates waiting connections to a random socket
in the same group. Alternatively, eBPF can also control this
migration
. Both features are available since Linux 5.14.

Addendum

After implementing this strategy in Akvorado, all workers now drop packets! 😱

$ curl -s 127.0.0.1:8080/api/v0/inlet/metrics 
> | sed -n s/akvorado_inlet_flow_input_udp_in_dropped//p
packets_total{listener="0.0.0.0:2055",worker="0"} 838673
packets_total{listener="0.0.0.0:2055",worker="1"} 843675
packets_total{listener="0.0.0.0:2055",worker="2"} 837922
packets_total{listener="0.0.0.0:2055",worker="3"} 841443
packets_total{listener="0.0.0.0:2055",worker="4"} 840668
packets_total{listener="0.0.0.0:2055",worker="5"} 850274
packets_total{listener="0.0.0.0:2055",worker="6"} 835488
packets_total{listener="0.0.0.0:2055",worker="7"} 834479

The root cause is the default limit of 32 records for Kafka batch sizes. This
limit is too low because the brokers have a large overhead when handling each
batch: they need to ensure they persist correctly before acknowledging
them
. Increasing the limit to 4096 records fixes this issue.

While load-balancing incoming flows with eBPF remains useful, it did not solve
the main issue. At least the even distribution of dropped packets helped
identify the real bottleneck. 😅


  1. The current version of the manual page is incomplete and does not
    cover the evolution introduced in Linux 4.19. There is a pending patch
    about this. ↩︎

  2. Rust is another option. However, the program we use is so trivial that
    it does not make sense to use Rust. ↩︎

  3. As bpf_get_prandom_u32() returns a pseudo-random 32-bit unsigned
    value, this method exhibits a very slight bias towards the first indexes.
    This is unlikely to be worth fixing. ↩︎

  4. Some examples include <linux/bpf.h> instead of "vmlinux.h". This
    makes your eBPF program dependent on the installed kernel headers. ↩︎

  5. listenAddr is initially set to 127.0.0.1:0 to allocate a
    random port. After the first iteration, it is updated with the allocated
    port. ↩︎

  6. This is the setupSockets() function in fixtures_test.go↩︎

  7. This is the setupEBPF() function in fixtures_test.go↩︎

  8. The complete code is in balancing_test.go ↩︎

  9. The complete code is in graceful_test.go ↩︎