Manjusaka

Manjusaka

利用動態 tracing 技術來 trace 內核中的網絡請求

這周幫朋友用 eBPF/SystemTap 這樣的動態 tracing 工具做了一些很有趣的功能。這篇文章算是一個總結

開篇#

實際上這周的一些想法,最開始是實際上來源於某天一個朋友問我的一個問題

我們能不能監控機器上哪些進程在發出 ICMP 請求?需要拿到 PID,ICMP 包出口地址,目標地址,進程啟動命令

很有趣的問題。實際上首先拿到這個問題時,我們第一反應肯定是 “讓機器上的進程在發 ICMP 包的時候” 直接往一個地方寫日誌不就好了,emmmm,用一個 meme 鎮樓吧

雞生蛋蛋生雞

嗯,可能大家都知道我想說什麼了,我們這種場景實際上只能選擇旁路,無侵入的方式去做。

那麼涉及到包的旁路的 trace,大家第一反應肯定是 tcpdump 去抓包。但是在我們今天的問題下,tcpdump 只能拿到包信息,但拿不到具體的 PID,啟動命令等信息。

所以我們可能需要用另外一些方式去實現我們的需求

在需求最開始之初,我們還可能的選擇的方式有這樣一些

  1. /proc/net/tcp 去拿具體的 socket 的 inode 信息,然後反查 pid 關聯

  2. eBPF + kprobe 內核打點做監控

  3. SystemTap + kprobe 內核打點做監控

第一種方式,實際上只能拿到 TCP 一層的信息,但是 ICMP 並不是 TCP 協議啊(衰(雖然同屬 L4

那麼看到最後,我們貌似就只有用 eBPF/SystemTap 配合 kprobe 的一條路可以走了

基礎的 trace#

Kprobe#

在繼續下面的代碼實際操作之前,我們首先要來認識一下 Kprobe

先引用一段官方文檔的介紹

Kprobes enables you to dynamically break into any kernel routine and collect debugging and performance information non-disruptively. You can trap at almost any kernel code address 1, specifying a handler routine to be invoked when the breakpoint is hit.
There are currently two types of probes: kprobes, and kretprobes (also called return probes). A kprobe can be inserted on virtually any instruction in the kernel. A return probe fires when a specified function returns.
In the typical case, Kprobes-based instrumentation is packaged as a kernel module. The module’s init function installs (“registers”) one or more probes, and the exit function unregisters them. A registration function such as register_kprobe() specifies where the probe is to be inserted and what handler is to be called when the probe is hit.

簡單來說,kprobe 是內核的一個提供的 trace 機制,在執行我們所設定特定的內核函數時 / 後,會按照我們所設定的規則觸發我們的回調函數。用官方的話來說,“You can trap at almost any kernel code address”

在我們今天的場景下,不管利用 eBPF 還是 SystemTap 都需要依賴 Kprobe 並選擇合適的 hook 點來完成我們內核調用的 trace

那麼,在我們今天的場景下,我們應該選擇在什麼函數上加上對應的 hook 呢?

首先我們來想一下,ICMP 是一個四層的包,最終封裝在一個 IP 報文中分發出去,那麼我們來看一下,內核中 IP 報文發送中的關鍵調用,參見下圖

IP Layer 關鍵系統調用

在這裡我選擇將 ip_finish_output 作為我們的 hook 點。

OK,Hook 點確認後,在開始正式編碼前,我們來大概介紹下 ip_finish_output

ip_finish_output#

首先來看下這個函數

static int ip_finish_output(struct net *net, struct sock *sk, struct sk_buff *skb)
{
	int ret;

	ret = BPF_CGROUP_RUN_PROG_INET_EGRESS(sk, skb);
	switch (ret) {
	case NET_XMIT_SUCCESS:
		return __ip_finish_output(net, sk, skb);
	case NET_XMIT_CN:
		return __ip_finish_output(net, sk, skb) ? : ret;
	default:
		kfree_skb(skb);
		return ret;
	}
}

具體細節先不在這裡展開(因為實在是太多了 Orz),在系統調用 ip_finish_output 時,會觸發我們設定的 kprobe 的鉤子,在我們所設定的 hook 函數中會收到 net, sk, skb 三個參數(這三個參數也是調用 ip_finish_output 時的值。

在這三個參數中,我們主要來將視線放在 struct sk_buff *skb 上。

熟悉 Linux Kernel 協議棧實現的同學肯定對 sk_buff 這個數據結構非常非常熟悉了。這個數據結構是 Linux Kernel 中網絡相關的核心數據結構。通過不斷的偏移指針,這個數據結構能夠很方便幫助我們確認我們待發送 / 已接收的數據在內存中所存放的位置。

空口直說好像有點抽象,我們來看個圖

sk_buff

以發送一個 TCP 包為例,我們能看到這個圖中,sk_buff 經歷了六個階段

a. 根據 TCP 中的一些選項如 MSS 等,分配一個 buffer
b. 根據 MAX_TCP_HEADER 在我們申請好的內存 buffer 中預留一段足夠容納所有網絡層的 header 的空間(TCP/IP/Link 等)
c. 填入 TCP 的 payload
d. 填入 TCP header
e. 填入 IP header
d. 填入 link header

可以參照一下 TCP 報文結構,這樣大家會有一個更直觀的理解

TCP Segement Format

大家能看到,通過 sk_buff 的一些指針的操作,我們就能很方便的獲取到其中不同 layer 的 header 和具體的 payload

OK,現在讓我們正式的來開始實現我們所需要的功能

eBPF + KProbe#

首先簡單介紹下 eBPF。BPF 指 Berkeley Packet Filter ,最早期是用來設計在內核中實現一些網絡包過濾的功能。但是後續社區對其做了非常多的強化增強,使其不僅能應用於網絡目的。這也是名字中 e 的來歷(extend)

本質上而言,eBPF 在內核維護了一層 VM,可以加載特定規則生成的代碼,讓內核變得更具有可編程性(後面我爭取寫一篇 eBPF 從入門到入土的介紹文章)

Tips: Tcpdump 的背後就是 BPF

然後在這次實現中,我們使用了 BCC 來簡化我們 eBPF 相關的編寫難度

OK,先上代碼

from bcc import BPF
import ctypes

bpf_text = """
#include <linux/ptrace.h>
#include <linux/sched.h>        /* For TASK_COMM_LEN */
#include <linux/icmp.h>
#include <linux/ip.h>
#include <linux/netdevice.h>

struct probe_icmp_sample {
    u32 pid;
    u32 daddress;
    u32 saddress;
};

BPF_PERF_OUTPUT(probe_events);

static inline unsigned char *custom_skb_network_header(const struct sk_buff *skb)
{
	return skb->head + skb->network_header;
}

static inline struct iphdr *get_iphdr_in_icmp(const struct sk_buff *skb)
{
    return (struct iphdr *)custom_skb_network_header(skb);
}

int probe_icmp(struct pt_regs *ctx, struct net *net, struct sock *sk, struct sk_buff *skb){
    struct iphdr * ipdata=get_iphdr_in_icmp(skb);
    if (ipdata->protocol!=1){
        return 1;
    }
    u64 __pid_tgid = bpf_get_current_pid_tgid();
    u32 __pid = __pid_tgid;
    struct probe_icmp_sample __data = {0};
    __data.pid = __pid;
    u32 daddress;
    u32 saddress;
    bpf_probe_read(&daddress, sizeof(ipdata->daddr), &ipdata->daddr);
    bpf_probe_read(&saddress, sizeof(ipdata->daddr), &ipdata->saddr);
    __data.daddress=daddress;
    __data.saddress=saddress;
    probe_events.perf_submit(ctx, &__data, sizeof(__data));
    return 0;
}

"""


class IcmpSamples(ctypes.Structure):
    _fields_ = [
        ("pid", ctypes.c_uint32),
        ("daddress", ctypes.c_uint32),
        ("saddress", ctypes.c_uint32),
    ]


bpf = BPF(text=bpf_text)

filters = {}


def parse_ip_address(data):
    results = [0, 0, 0, 0]
    results[3] = data & 0xFF
    results[2] = (data >> 8) & 0xFF
    results[1] = (data >> 16) & 0xFF
    results[0] = (data >> 24) & 0xFF
    return ".".join([str(i) for i in results[::-1]])


def print_icmp_event(cpu, data, size):
    # event = b["probe_icmp_events"].event(data)
    event = ctypes.cast(data, ctypes.POINTER(IcmpSamples)).contents
    daddress = parse_ip_address(event.daddress)
    print(
        f"pid:{event.pid}, daddress:{daddress}, saddress:{parse_ip_address(event.saddress)}"
    )


bpf.attach_kprobe(event="ip_finish_output", fn_name="probe_icmp")

bpf["probe_events"].open_perf_buffer(print_icmp_event)
while 1:
    try:
        bpf.kprobe_poll()
    except KeyboardInterrupt:
        exit()

OK,這段代碼嚴格意義上來說是混編的,一部分是 C,一部分是 Python,。Python 部分大家肯定都很熟悉,BCC 幫我們加載我們的 C 代碼,並 attch 到 kprobe 上。然後不斷輸出我們從內核中往外傳輸的數據

那我們重點來看看 C 部分的代碼(實際上這嚴格來說不算標準 C,算是 BCC 封裝的一層 DSL)

首先看一下我們輔助的兩個函數

static inline unsigned char *custom_skb_network_header(const struct sk_buff *skb)
{
	return skb->head + skb->network_header;
}

static inline struct iphdr *get_iphdr_in_icmp(const struct sk_buff *skb)
{
    return (struct iphdr *)custom_skb_network_header(skb);
}

如前面所說,我們可以根據 sk_buff 中的 head 和 network_header 就能計算出我們 IP 頭部在內存中的地址,然後我們將其 cast 成一個 iphdr 結構體指針

我們還得再來看一下 iphdr

struct iphdr {
#if defined(__LITTLE_ENDIAN_BITFIELD)
	__u8	ihl:4,
		version:4;
#elif defined (__BIG_ENDIAN_BITFIELD)
	__u8	version:4,
  		ihl:4;
#else
#error	"Please fix <asm/byteorder.h>"
#endif
	__u8	tos;
	__be16	tot_len;
	__be16	id;
	__be16	frag_off;
	__u8	ttl;
	__u8	protocol;
	__sum16	check;
	__be32	saddr;
	__be32	daddr;
	/*The options start here. */
};

熟悉 IP 報文結構的同學肯定就很眼熟了對吧,其中 saddrdaddr 就是我們的源地址和目標地址,protocol 代表著我們 L4 協議的類型,其中為 1 的時候代表著 ICMP 協議

OK 然後來看一下我們的 trace 函數

int probe_icmp(struct pt_regs *ctx, struct net *net, struct sock *sk, struct sk_buff *skb){
    struct iphdr * ipdata=get_iphdr_in_icmp(skb);
    if (ipdata->protocol!=1){
        return 1;
    }
    u64 __pid_tgid = bpf_get_current_pid_tgid();
    u32 __pid = __pid_tgid;
    struct probe_icmp_sample __data = {0};
    __data.pid = __pid;
    u32 daddress;
    u32 saddress;
    bpf_probe_read(&daddress, sizeof(ipdata->daddr), &ipdata->daddr);
    bpf_probe_read(&saddress, sizeof(ipdata->daddr), &ipdata->saddr);
    __data.daddress=daddress;
    __data.saddress=saddress;
    probe_events.perf_submit(ctx, &__data, sizeof(__data));
    return 0;
}

如前面所說,kprobe 觸發調用時,會將 ip_finish_output 的三個參數傳入到我們的 trace 函數中來,那我們就可以根據傳入的數據做很多的事了,現在來介紹下上面的代碼中所做的事

  1. 將 sk_buff 轉換成對應的 iphdr
  2. 判斷當前報文是否為 ICMP 協議
  3. 利用內核 BPF 提供的 helper bpf_get_current_pid_tgid 獲取當前調用 ip_finish_output 進程的 pid
  4. 獲取 saddr 和 daddr。注意我們這裡用的 bpf_probe_read 也是 BPF 提供的 helper function,原則上來講,在 eBPF 中為了保證安全,我們所有從內核中讀取數據的行為都應該利用 bpf_probe_readbpf_probe_read_kernel 來實現
  5. 通過 perf 將數據提交出去

這樣一來,我們就能排查到機器上具體什麼進程在發送 ICMP 請求了

來看下效果

image

OK,我們的需求基本上達到了,不過這裡算是留了一個小問題,大家可以思考下,我們怎麼樣根據 pid 獲取啟動進程時的 cmdline ?

SystemTap + kprobe#

eBPF 的版本實現了,但是有個問題啊,eBPF 只能在高版本的內核中使用。一般而言,在 xb86_64 上,Linux 3.16 中支持了 eBPF。而我們依賴的 kprobe 對於 eBPF 的支持則是在 Linux 4.1 中實現的。通常而言,我們一般推薦使用 4.9 及以上內核來配合 eBPF 使用

那麼問題來了。實際上我們現在有很多 Centos 7 + Linux 3.10 這樣的傳統的搭配,那麼他們怎麼辦呢?

Linux 3.10 live's matter! Centos 7 live's matter!

那沒辦法,只能換一個技術棧來做了。這個時候,我們就首先考慮由 RedHat 開發,貢獻進入社區,低版本可用的 SystemTap

%{
#include<linux/byteorder/generic.h>
#include<linux/if_ether.h>
#include<linux/skbuff.h>
#include<linux/ip.h>
#include<linux/in.h>
#include<linux/tcp.h>
#include <linux/sched.h>
#include <linux/list.h>
#include <linux/pid.h>
#include <linux/mm.h>
%}

function isicmp:long (data:long)
%{
    struct iphdr *ip;
    struct sk_buff *skb;
    int tmp = 0;

    skb = (struct sk_buff *) STAP_ARG_data;

    if (skb->protocol == htons(ETH_P_IP)){
            ip = (struct iphdr *) skb->data;
            tmp = (ip->protocol == 1);
    }
    STAP_RETVALUE = tmp;
%}

function task_execname_by_pid:string (pid:long) %{
    struct task_struct *task;

    task = pid_task(find_vpid(STAP_ARG_pid), PIDTYPE_PID);

//     proc_pid_cmdline(p, STAP_RETVALUE);
    snprintf(STAP_RETVALUE, MAXSTRINGLEN, "%s", task->comm);
    
%}

function ipsource:long (data:long)
%{
    struct sk_buff *skb;
    struct iphdr *ip;
    __be32 src;

    skb = (struct sk_buff *) STAP_ARG_data;

    ip = (struct iphdr *) skb->data;
    src = (__be32) ip->saddr;

    STAP_RETVALUE = src;
%}

/* Return ip destination address */
function ipdst:long (data:long)
%{
    struct sk_buff *skb;
    struct iphdr *ip;
    __be32 dst;

    skb = (struct sk_buff *) STAP_ARG_data;

    ip = (struct iphdr *) skb->data;
    dst = (__be32) ip->daddr;

    STAP_RETVALUE = dst;
%}

function parseIp:string (data:long) %{ 
    sprintf(STAP_RETVALUE,"%d.%d,%d.%d",(int)STAP_ARG_data &0xFF,(int)(STAP_ARG_data>>8)&0xFF,(int)(STAP_ARG_data>>16)&0xFF,(int)(STAP_ARG_data>>24)&0xFF);
%}


probe kernel.function("ip_finish_output").call {
    if (isicmp($skb)) {
        pid_data = pid()
        /* IP */
        ipdst = ipdst($skb)
        ipsrc = ipsource($skb)
        printf("pid is:%d,source address is:%s, destination address is %s, command is: '%s'\n",pid_data,parseIp(ipsrc),parseIp(ipdst),task_execname_by_pid(pid_data))
    
    } else {
        next
    }
}

實際上大家可以看到,我們思路還是這樣,利用 ip_finish_output 來作為 kprobe 的 hook 點,然後我們獲取對應的 iphdr 然後進行操作。

嗯,我們的需求的基礎功能差不多就是這樣了,大家可以在額外進行一些功能增強,比如獲取完整的進程 cmdline 等等

更近一步的想法和實驗#

大家可能對於 ICMP 這樣的冷門協議沒有太明顯的感覺,那麼我們換個需求大家可能就更為有感覺了

監控機器上哪些進程在發出 HTTP 1.1 請求

嗯,一如往的,我們先來看一下系統中的關鍵調用

TCP

嗯,這裡我們選擇 tcp_sendmsg 來作為我們的切入點

int tcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t size)
{
	int ret;

	lock_sock(sk);
	ret = tcp_sendmsg_locked(sk, msg, size);
	release_sock(sk);

	return ret;
}

嗯,其中 sock 是包含我們一些關鍵元數據的結構體

struct sock {
	/*
	 * Now struct inet_timewait_sock also uses sock_common, so please just
	 * don't add nothing before this first member (__sk_common) --acme
	 */
	struct sock_common	__sk_common;
    ...
}

struct sock_common {
	/* skc_daddr and skc_rcv_saddr must be grouped on a 8 bytes aligned
	 * address on 64bit arches : cf INET_MATCH()
	 */
	union {
		__addrpair	skc_addrpair;
		struct {
			__be32	skc_daddr;
			__be32	skc_rcv_saddr;
		};
	};
	union  {
		unsigned int	skc_hash;
		__u16		skc_u16hashes[2];
	};
	/* skc_dport && skc_num must be grouped as well */
	union {
		__portpair	skc_portpair;
		struct {
			__be16	skc_dport;
			__u16	skc_num;
		};
	};
    ...
}

大家可以看到,我們能在 sock 中獲取到我們端口的五元組數據,然後我們從 msghdr 中能獲取到具體的數據

那麼,以我們需求中的 HTTP 為例,我們實際上只需要判斷,我們獲取到的 TCP 包中是否包含 HTTP/1.1 ,便可粗略判斷,這個請求是否是 HTTP 1.1 請求(很暴力的做法 Hhhhh

OK,我們來看下代碼

from bcc import BPF
import ctypes
import binascii

bpf_text = """
#include <linux/ptrace.h>
#include <linux/ip.h>
#include <linux/tcp.h>
#include <uapi/linux/ptrace.h>
#include <net/sock.h>
#include <bcc/proto.h>
#include <linux/socket.h>

struct ipv4_data_t {
    u32 pid;
    u64 ip;
    u32 saddr;
    u32 daddr;
    u16 lport;
    u16 dport;
    u64 state;
    u64 type;
    u8 data[300];
    u16 data_size;
};


BPF_PERF_OUTPUT(ipv4_events);

int trace_event(struct pt_regs *ctx,struct sock *sk, struct msghdr *msg, size_t size){
    if (sk == NULL)
        return 0;
    u32 pid = bpf_get_current_pid_tgid() >> 32;
    

    // pull in details
    u16 family = sk->__sk_common.skc_family;
    u16 lport = sk->__sk_common.skc_num;
    u16 dport = sk->__sk_common.skc_dport;
    char state = sk->__sk_common.skc_state;

    if (family == AF_INET) {
        struct ipv4_data_t data4 = {};
        data4.pid = pid;
        data4.ip = 4;
        //data4.type = type;
        data4.saddr = sk->__sk_common.skc_rcv_saddr;
        data4.daddr = sk->__sk_common.skc_daddr;
        // lport is host order
        data4.lport = lport;
        data4.dport = ntohs(dport);
        data4.state = state;
        struct iov_iter temp_iov_iter=msg->msg_iter;
        struct iovec *temp_iov=temp_iov_iter.iov;
        bpf_probe_read_kernel(&data4.data_size, 4, &temp_iov->iov_len);
        u8 * temp_ptr;
        bpf_probe_read_kernel(&temp_ptr, sizeof(temp_ptr), &temp_iov->iov_base);
        bpf_probe_read_kernel(&data4.data, sizeof(data4.data), temp_ptr);
        ipv4_events.perf_submit(ctx, &data4, sizeof(data4));
    }
    return 0;
}

"""

bpf = BPF(text=bpf_text)

filters = {}


def parse_ip_address(data):
    results = [0, 0, 0, 0]
    results[3] = data & 0xFF
    results[2] = (data >> 8) & 0xFF
    results[1] = (data >> 16) & 0xFF
    results[0] = (data >> 24) & 0xFF
    return ".".join([str(i) for i in results[::-1]])


def print_http_payload(cpu, data, size):
    # event = b["probe_icmp_events"].event(data)
    # event = ctypes.cast(data, ctypes.POINTER(IcmpSamples)).contents
    event= bpf["ipv4_events"].event(data)
    daddress = parse_ip_address(event.daddr)
    # data=list(event.data)
    # temp=binascii.hexlify(data) 
    body = bytearray(event.data).hex()
    if "48 54 54 50 2f 31 2e 31".replace(" ", "") in body:
        # if "68747470" in temp.decode():
        print(
            f"pid:{event.pid}, daddress:{daddress}, saddress:{parse_ip_address(event.saddr)}, {event.lport}, {event.dport}, {event.data_size}"
        )


bpf.attach_kprobe(event="tcp_sendmsg", fn_name="trace_event")

bpf["ipv4_events"].open_perf_buffer(print_http_payload)
while 1:
    try:
        bpf.perf_buffer_poll()
    except KeyboardInterrupt:
        exit()

OK,我們來看下效果

效果

實際上這個我們還可以再擴展一下。比如針對 Go 這樣,所發出的 HTTPS 連接有著固定特徵的語言,我們也可以用相對簡單的做法去完成機器上的包來源的溯源(大家可以參考下無辄的這篇文章,為什麼用 Go 訪問某網站始終會 503 Service Unavailable ?)

我自己也做了個測試,大家可以參考下代碼:https://github.com/Zheaoli/linux-traceing-script/blob/main/ebpf/go-https-tracing.py

總結#

實際上無論是 eBPF 還是 SystemTap ,這類動態 tracing 技術可以 Linux Kernel 變得更具被可編程性。相較於傳統的 recompile kernel 這些手段來說,更為方便快捷。而 BCC/BPFTrace 這類的更進一步的封裝框架的出現,更進一步的降低了我們去觀測內核的難度

很多時候我們很多需求都可以選擇旁路的方式去更快捷的實現。但是要注意的一點是,動態 tracing 技術的引入勢必增加了內核的不穩定性,而且一定程度上會影響性能。所以我們需要根據具體的場景去做 trade-off

好了,這篇文章差不多就水到這裡,後面有時間爭取出一個 eBPF 從入門到入土的系列文章(flag++

載入中......
此文章數據所有權由區塊鏈加密技術和智能合約保障僅歸創作者所有。