用GDB除錯 Redis網路通訊模組

[實戰] 除錯 Redis 準備工作

時下的業界,相對於傳統的關係型資料庫,以 key-value 思想實現的 NoSQL 記憶體資料庫非常流行,而提到記憶體資料庫,很多讀者第一反應就是 Redis 。確實,Redis 以其高效的效能和優雅的實現成為眾多記憶體資料庫中的翹楚。

1、Redis 原始碼下載與編譯

Redis 的最新原始碼下載地址可以在Redis 官網獲得。我使用的是 CentOS 7。0 系統,使用 wget 命令將 Redis 原始碼檔案下載下來:

[root@localhost gdbtest]# wget http://download。redis。io/releases/redis-4。0。11。tar。gz

–2018-09-08 13:08:41– http://download。redis。io/releases/redis-4。0。11。tar。gz

Resolving download。redis。io (download。redis。io)… 109。74。203。151

Connecting to download。redis。io (download。redis。io)|109。74。203。151|:80… connected。

HTTP request sent, awaiting response… 200 OK

Length: 1739656 (1。7M) [application/x-gzip]

Saving to: ‘redis-4。0。11。tar。gz’

54% [======================> ] 940,876 65。6KB/s eta 9s

解壓:

[root@localhost gdbtest]# tar zxvf redis-4。0。11。tar。gz

進入生成的 redis-4。0。11 目錄使用 makefile 進行編譯:

[root@localhost gdbtest]# cd redis-4。0。11

[root@localhost redis-4。0。11]# make -j 4

編譯成功後,會在 src 目錄下生成多個可執行程式,其中 redis-server 和 redis-cli 是我們即將除錯的程式。

進入 src 目錄,使用 GDB 啟動 redis-server 這個程式:

[root@localhost src]# gdb redis-server

Reading symbols from /root/redis-4。0。9/src/redis-server…done。

(gdb) r

Starting program: /root/redis-4。0。9/src/redis-server

[Thread debugging using libthread_db enabled]

Using host libthread_db library “/lib64/libthread_db。so。1”。

31212:C 17 Sep 11:59:50。781 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo

31212:C 17 Sep 11:59:50。781 # Redis version=4。0。9, bits=64, commit=00000000, modified=0, pid=31212, just started

31212:C 17 Sep 11:59:50。781 # Warning: no config file specified, using the default config。 In order to specify a config file use /root/redis-4。0。9/src/redis-server /path/to/redis。conf

31212:M 17 Sep 11:59:50。781 * Increased maximum number of open files to 10032 (it was originally set to 1024)。

[New Thread 0x7ffff07ff700 (LWP 31216)]

[New Thread 0x7fffefffe700 (LWP 31217)]

[New Thread 0x7fffef7fd700 (LWP 31218)]

_。-__ ‘’-。_

_。- 。。 ”-。 Redis 4。0。9 (00000000/0) 64 bit

。-。-```。 ```\/ _。,_ ‘’-。_

( ‘ , 。-` | `, ) Running in standalone mode

|`-。_`-。。。-` __。。。-。-。_|’_。-’| Port: 6379

|-。_ 。_ / _。-‘ | PID: 31212

-。_ -。_-。/ 。-’ 。-’

|-。_-。_ -。__。-’ _。-‘_。-’|

|-。_-。_ _。-‘_。-’ | http://redis。io

-。_ -。_-。_。-‘。-’ _。-’

|-。_-。_ -。__。-‘ _。-’_。-‘|

|-。_-。_ _。-’_。-‘ |

-。_ -。_-。_。-‘。-’ _。-’

-。_-。_。-’ 。-’

-。_ _。-’

-。__。-’

以上是 redis-server 啟動成功後的畫面。

我們再開一個 session,再次進入 Redis 原始碼所在的 src 目錄,然後使用 GDB 啟動 Redis 客戶端 redis-cli:

[root@localhost src]# gdb redis-cli

Reading symbols from /root/redis-4。0。9/src/redis-cli…done。

(gdb) r

Starting program: /root/redis-4。0。9/src/redis-cli

[Thread debugging using libthread_db enabled]

Using host libthread_db library “/lib64/libthread_db。so。1”。

127。0。0。1:6379>

以上是 redis-cli 啟動成功後的畫面。

2、通訊示例

本課程的學習目的是研究 Redis 的網路通訊模組,為了說明問題方便,我們使用一個簡單的通訊例項,即透過 redis-cli 產生一個 key 為“hello”、值為“world”的 key-value 資料,然後得到 redis-server 的響應。

127。0。0。1:6379> set hello world

OK

127。0。0。1:6379>

讀者需要注意的是,我這裡說是一個“簡單”的例項,其實並不簡單。有兩個原因:

我們是在 redis-cli (Redis 客戶端)輸入的命令,這個命令經 redis-cli 處理後封裝成網路通訊包,透過客戶端的網路通訊模組發給 redis-server,然後 redis-server 網路通訊模組收到後解析出命令,執行命令後得到結果再封裝成相應的網路資料包,返回給 redis-cli。這個過程中涉及到兩端的網路通訊模組是我們研究和學習的重點

redis-server 基本的資料型別都是可以透過類似的命令產生,因此這個例子是一個典型的研究 redis 的典範。

[實戰] Redis 網路通訊模組原始碼分析(1)

我們這裡先研究 redis-server 端的網路通訊模組。除去 Redis 本身的業務功能以外,Redis 的網路通訊模組實現思路和細節非常有代表性。由於網路通訊模組的設計也是 Linux C++ 後臺開發一個很重要的模組,雖然網路上有很多現成的網路庫,但是簡單易學且可以作為典範的並不多,而 redis-server 就是這方面值得借鑑學習的材料之一。

1、偵聽 socket 初始化工作

透過前面課程的介紹,我們知道網路通訊在應用層上的大致流程如下:

伺服器端建立偵聽 socket;

將偵聽 socket 繫結到需要的 IP 地址和埠上(呼叫 Socket API bind 函式);

啟動偵聽(呼叫 socket API listen 函式);

無限等待客戶端連線到來,呼叫 Socket API accept 函式接受客戶端連線,併產生一個與該客戶端對應的客戶端 socket;

處理客戶端 socket 上網路資料的收發,必要時關閉該 socket。

根據上面的流程,先來探究前三步的流程。由於 redis-server 預設對客戶端的埠號是 6379,可以使用這個資訊作為依據。

全域性搜尋一下 Redis 的程式碼,尋找呼叫了 bind() 函式的程式碼,經過過濾和篩選,我們確定了位於 anet。c 的 anetListen() 函式。

static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {

if (bind(s,sa,len) == -1) {

anetSetError(err, “bind: %s”, strerror(errno));

close(s);

return ANET_ERR;

}

if (listen(s, backlog) == -1) {

anetSetError(err, “listen: %s”, strerror(errno));

close(s);

return ANET_ERR;

}

return ANET_OK;

}

用 GDB 的 b 命令在這個函式上加個斷點,然後重新執行 redis-server:

(gdb) b anetListen

Breakpoint 1 at 0x426cd0: file anet。c, line 440。

(gdb) r

The program being debugged has been started already。

Start it from the beginning? (y or n) y

Starting program: /root/redis-4。0。9/src/redis-server

[Thread debugging using libthread_db enabled]

Using host libthread_db library “/lib64/libthread_db。so。1”。

31546:C 17 Sep 14:20:43。861 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo

31546:C 17 Sep 14:20:43。861 # Redis version=4。0。9, bits=64, commit=00000000, modified=0, pid=31546, just started

31546:C 17 Sep 14:20:43。861 # Warning: no config file specified, using the default config。 In order to specify a config file use /root/redis-4。0。9/src/redis-server /path/to/redis。conf

31546:M 17 Sep 14:20:43。862 * Increased maximum number of open files to 10032 (it was originally set to 1024)。

Breakpoint 1, anetListen (err=0x745bb0 “”, s=10, sa=0x75dfe0, len=28, backlog=511) at anet。c:440

440 static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {

當 GDB 中斷在這個函式時,使用 bt 命令檢視一下此時的呼叫堆疊:

(gdb) bt

#0 anetListen (err=0x745bb0 “”, s=10, sa=0x75dfe0, len=28, backlog=511) at anet。c:440

#1 0x0000000000426e25 in _anetTcpServer (err=err@entry=0x745bb0 “”, port=port@entry=6379, bindaddr=bindaddr@entry=0x0, af=af@entry=10, backlog=511)

at anet。c:487

#2 0x000000000042792d in anetTcp6Server (err=err@entry=0x745bb0 “”, port=port@entry=6379, bindaddr=bindaddr@entry=0x0, backlog=

at anet。c:510

#3 0x000000000042b01f in listenToPort (port=6379, fds=fds@entry=0x745ae4 , count=count@entry=0x745b24 ) at server。c:1728

#4 0x000000000042f917 in initServer () at server。c:1852

#5 0x0000000000423803 in main (argc=, argv=0x7fffffffe588) at server。c:3857

透過這個堆疊,結合堆疊 #2 的 6379 埠號可以確認這就是我們要找的邏輯,並且這個邏輯在主執行緒(因為從堆疊上看,最頂層堆疊是 main() 函式)中進行。

我們看下堆疊 #1 處的程式碼:

static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)

{

int s = -1, rv;

char _port[6]; /* strlen(“65535”) */

struct addrinfo hints, *servinfo, *p;

snprintf(_port,6,“%d”,port);

memset(&hints,0,sizeof(hints));

hints。ai_family = af;

hints。ai_socktype = SOCK_STREAM;

hints。ai_flags = AI_PASSIVE; /* No effect if bindaddr != NULL */

if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {

anetSetError(err, “%s”, gai_strerror(rv));

return ANET_ERR;

}

for (p = servinfo; p != NULL; p = p->ai_next) {

if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)

continue;

if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;

if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;

if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) goto error;

goto end;

}

if (p == NULL) {

anetSetError(err, “unable to bind socket, errno: %d”, errno);

goto error;

}

error:

if (s != -1) close(s);

s = ANET_ERR;

end:

freeaddrinfo(servinfo);

return s;

}

將堆疊切換至 #1,然後輸入 info arg 檢視傳入給這個函式的引數:

(gdb) f 1

#1 0x0000000000426e25 in _anetTcpServer (err=err@entry=0x745bb0 “”, port=port@entry=6379, bindaddr=bindaddr@entry=0x0, af=af@entry=10, backlog=511)

at anet。c:487

487 if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) s = ANET_ERR;

(gdb) info args

err = 0x745bb0 “”

port = 6379

bindaddr = 0x0

af = 10

backlog = 511

使用系統 API getaddrinfo 來解析得到當前主機的 IP 地址和埠資訊。這裡沒有選擇使用 gethostbyname 這個 API 是因為 gethostbyname 僅能用於解析 ipv4 相關的主機資訊,而 getaddrinfo 既可以用於 ipv4 也可以用於 ipv6 ,這個函式的簽名如下:

int getaddrinfo(const char *node, const char *service,

const struct addrinfo *hints,

struct addrinfo **res);

這個函式的具體用法可以在 Linux man 手冊上檢視。通常伺服器端在呼叫 getaddrinfo 之前,將 hints 引數的 ai_flags 設定為 AI_PASSIVE,用於 bind;主機名 nodename 通常會設定為 NULL,返回通配地址 [::]。當然,客戶端呼叫 getaddrinfo 時,hints 引數的 ai_flags 一般不設定 AI_PASSIVE,但是主機名 node 和服務名 service(更願意稱之為埠)則應該不為空。

解析完協議資訊後,利用得到的協議資訊建立偵聽 socket,並開啟該 socket 的 reuseAddr 選項。然後呼叫 anetListen 函式,在該函式中先 bind 後 listen。至此,redis-server 就可以在 6379 埠上接受客戶端連線了。

2、接受客戶端連線

同樣的道理,要研究 redis-server 如何接受客戶端連線,只要搜尋 socket API accept 函式即可。

經定位,我們最終在 anet。c 檔案中找到 anetGenericAccept 函式:

static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {

int fd;

while(1) {

fd = accept(s,sa,len);

if (fd == -1) {

if (errno == EINTR)

continue;

else {

anetSetError(err, “accept: %s”, strerror(errno));

return ANET_ERR;

}

}

break;

}

return fd;

}

我們用 b 命令在這個函式處加個斷點,然後重新執行 redis-server。一直到程式全部執行起來,GDB 都沒有觸發該斷點,這時新開啟一個 redis-cli,以模擬新客戶端連線到 redis-server 上的行為。斷點觸發了,此時檢視一下呼叫堆疊。

Breakpoint 2, anetGenericAccept (err=0x745bb0 “”, s=s@entry=11, sa=sa@entry=0x7fffffffe2b0, len=len@entry=0x7fffffffe2ac) at anet。c:531

531 static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {

(gdb) bt

#0 anetGenericAccept (err=0x745bb0 “”, s=s@entry=11, sa=sa@entry=0x7fffffffe2b0, len=len@entry=0x7fffffffe2ac) at anet。c:531

#1 0x0000000000427a1d in anetTcpAccept (err=, s=s@entry=11, ip=ip@entry=0x7fffffffe370 “\317P\237[”, ip_len=ip_len@entry=46,

port=port@entry=0x7fffffffe36c) at anet。c:552

#2 0x0000000000437fb1 in acceptTcpHandler (el=, fd=11, privdata=, mask=) at networking。c:689

#3 0x00000000004267f0 in aeProcessEvents (eventLoop=eventLoop@entry=0x7ffff083a0a0, flags=flags@entry=11) at ae。c:440

#4 0x0000000000426adb in aeMain (eventLoop=0x7ffff083a0a0) at ae。c:498

#5 0x00000000004238ef in main (argc=, argv=0x7fffffffe588) at server。c:3894

分析這個呼叫堆疊,梳理一下這個呼叫流程。在 main 函式的 initServer 函式中建立偵聽 socket、繫結地址然後開啟偵聽,接著呼叫 aeMain 函式啟動一個迴圈不斷地處理“事件”。

void aeMain(aeEventLoop *eventLoop) {

eventLoop->stop = 0;

while (!eventLoop->stop) {

if (eventLoop->beforesleep != NULL)

eventLoop->beforesleep(eventLoop);

aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);

}

}

迴圈的退出條件是 eventLoop→stop 為 1。事件處理的程式碼如下:

int aeProcessEvents(aeEventLoop *eventLoop, int flags)

{

int processed = 0, numevents;

/* Nothing to do? return ASAP */

if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

/* Note that we want call select() even if there are no

* file events to process as long as we want to process time

* events, in order to sleep until the next time event is ready

* to fire。 */

if (eventLoop->maxfd != -1 ||

((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {

int j;

aeTimeEvent *shortest = NULL;

struct timeval tv, *tvp;

if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))

shortest = aeSearchNearestTimer(eventLoop);

if (shortest) {

long now_sec, now_ms;

aeGetTime(&now_sec, &now_ms);

tvp = &tv;

/* How many milliseconds we need to wait for the next

* time event to fire? */

long long ms =

(shortest->when_sec - now_sec)*1000 +

shortest->when_ms - now_ms;

if (ms > 0) {

tvp->tv_sec = ms/1000;

tvp->tv_usec = (ms % 1000)*1000;

} else {

tvp->tv_sec = 0;

tvp->tv_usec = 0;

}

} else {

/* If we have to check for events but need to return

* ASAP because of AE_DONT_WAIT we need to set the timeout

* to zero */

if (flags & AE_DONT_WAIT) {

tv。tv_sec = tv。tv_usec = 0;

tvp = &tv;

} else {

/* Otherwise we can block */

tvp = NULL; /* wait forever */

}

}

/* Call the multiplexing API, will return only on timeout or when

* some event fires。 */

numevents = aeApiPoll(eventLoop, tvp);

/* After sleep callback。 */

if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)

eventLoop->aftersleep(eventLoop);

for (j = 0; j < numevents; j++) {

aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j]。fd];

int mask = eventLoop->fired[j]。mask;

int fd = eventLoop->fired[j]。fd;

int rfired = 0;

/* note the fe->mask & mask & 。。。 code: maybe an already processed

* event removed an element that fired and we still didn‘t

* processed, so we check if the event is still valid。 */

if (fe->mask & mask & AE_READABLE) {

rfired = 1;

fe->rfileProc(eventLoop,fd,fe->clientData,mask);

}

if (fe->mask & mask & AE_WRITABLE) {

if (!rfired || fe->wfileProc != fe->rfileProc)

fe->wfileProc(eventLoop,fd,fe->clientData,mask);

}

processed++;

}

}

/* Check time events */

if (flags & AE_TIME_EVENTS)

processed += processTimeEvents(eventLoop);

return processed; /* return the number of processed file/time events */

}

這段程式碼先透過 flag 引數檢查是否有事件需要處理。如果有定時器事件( AE_TIME_EVENTS 標誌 ),則尋找最近要到期的定時器。

/* Search the first timer to fire。

* This operation is useful to know how many time the select can be

* put in sleep without to delay any event。

* If there are no timers NULL is returned。

*

* Note that’s O(N) since time events are unsorted。

* Possible optimizations (not needed by Redis so far, but。。。):

* 1) Insert the event in order, so that the nearest is just the head。

* Much better but still insertion or deletion of timers is O(N)。

* 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N))。

*/

static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)

{

aeTimeEvent *te = eventLoop->timeEventHead;

aeTimeEvent *nearest = NULL;

while(te) {

if (!nearest || te->when_sec < nearest->when_sec ||

(te->when_sec == nearest->when_sec &&

te->when_ms < nearest->when_ms))

nearest = te;

te = te->next;

}

return nearest;

}

這段程式碼有詳細的註釋,也非常好理解。註釋告訴我們,由於這裡的定時器集合是無序的,所以需要遍歷一下這個連結串列,演算法複雜度是 O(n) 。同時,註釋中也“暗示”了我們將來 Redis 在這塊的最佳化方向,即把這個連結串列按到期時間從小到大排序,這樣連結串列的頭部就是我們要的最近時間點的定時器物件,演算法複雜度是 O(1) 。或者使用 Redis 中的 skiplist ,演算法複雜度是 O(log(N)) 。

接著獲取當前系統時間( aeGetTime(&now_sec, &now_ms); )將最早要到期的定時器時間減去當前系統時間獲得一個間隔。這個時間間隔作為 numevents = aeApiPoll(eventLoop, tvp); 呼叫的引數,aeApiPoll() 在 Linux 平臺上使用 epoll 技術,Redis 在這個 IO 複用技術上、在不同的作業系統平臺上使用不同的系統函式,在 Windows 系統上使用 select,在 Mac 系統上使用 kqueue。這裡重點看下 Linux 平臺下的實現:

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {

aeApiState *state = eventLoop->apidata;

int retval, numevents = 0;

retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,

tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);

if (retval > 0) {

int j;

numevents = retval;

for (j = 0; j < numevents; j++) {

int mask = 0;

struct epoll_event *e = state->events+j;

if (e->events & EPOLLIN) mask |= AE_READABLE;

if (e->events & EPOLLOUT) mask |= AE_WRITABLE;

if (e->events & EPOLLERR) mask |= AE_WRITABLE;

if (e->events & EPOLLHUP) mask |= AE_WRITABLE;

eventLoop->fired[j]。fd = e->data。fd;

eventLoop->fired[j]。mask = mask;

}

}

return numevents;

}

epoll_wait 這個函式的簽名如下:

int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);

最後一個引數 timeout 的設定非常有講究,如果傳入進來的 tvp 是 NULL ,根據上文的分析,說明沒有定時器事件,則將等待時間設定為 -1 ,這會讓 epoll_wait 無限期地掛起來,直到有事件時才會被喚醒。掛起的好處就是不浪費 CPU 時間片。反之,將 timeout 設定成最近的定時器事件間隔,將 epoll_wait 的等待時間設定為最近的定時器事件來臨的時間間隔,可以及時喚醒 epoll_wait ,這樣程式流可以儘快處理這個到期的定時器事件(下文會介紹)。

對於 epoll_wait 這種系統呼叫,所有的 fd(對於網路通訊,也叫 socket)資訊包括偵聽 fd 和普通客戶端 fd 都記錄在事件迴圈物件 aeEventLoop 的 apidata 欄位中,當某個 fd 上有事件觸發時,從 apidata 中找到該 fd,並把事件型別(mask 欄位)一起記錄到 aeEventLoop 的 fired 欄位中去。我們先把這個流程介紹完,再介紹 epoll_wait 函式中使用的 epfd 是在何時何地建立的,偵聽 fd、客戶端 fd 是如何掛載到 epfd 上去的。

在得到了有事件的 fd 以後,接下來就要處理這些事件了。在主迴圈 aeProcessEvents 中從 aeEventLoop 物件的 fired 陣列中取出上一步記錄的 fd,然後根據事件型別(讀事件和寫事件)分別進行處理。

for (j = 0; j < numevents; j++) {

aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j]。fd];

int mask = eventLoop->fired[j]。mask;

int fd = eventLoop->fired[j]。fd;

int rfired = 0;

/* note the fe->mask & mask & 。。。 code: maybe an already processed

* event removed an element that fired and we still didn‘t

* processed, so we check if the event is still valid。 */

if (fe->mask & mask & AE_READABLE) {

rfired = 1;

fe->rfileProc(eventLoop,fd,fe->clientData,mask);

}

if (fe->mask & mask & AE_WRITABLE) {

if (!rfired || fe->wfileProc != fe->rfileProc)

fe->wfileProc(eventLoop,fd,fe->clientData,mask);

}

processed++;

}

讀事件欄位 rfileProc 和寫事件欄位 wfileProc 都是函式指標,在程式早期設定好,這裡直接呼叫就可以了。

typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);

/* File event structure */

typedef struct aeFileEvent {

int mask; /* one of AE_(READABLE|WRITABLE) */

aeFileProc *rfileProc;

aeFileProc *wfileProc;

void *clientData;

} aeFileEvent;

3、EPFD 的建立

我們透過搜尋關鍵字 epoll_create 在 ae_poll。c 檔案中找到 EPFD 的建立函式 aeApiCreate 。

static int aeApiCreate(aeEventLoop *eventLoop) {

aeApiState *state = zmalloc(sizeof(aeApiState));

if (!state) return -1;

state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);

if (!state->events) {

zfree(state);

return -1;

}

state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */

if (state->epfd == -1) {

zfree(state->events);

zfree(state);

return -1;

}

eventLoop->apidata = state;

return 0;

}

使用 GDB 的 b 命令在這個函式上加個斷點,然後使用 run 命令重新執行一下 redis-server,觸發斷點,使用 bt 命令檢視此時的呼叫堆疊。發現 EPFD 也是在上文介紹的 initServer 函式中建立的。

(gdb) bt

#0 aeCreateEventLoop (setsize=10128) at ae。c:79

#1 0x000000000042f542 in initServer () at server。c:1841

#2 0x0000000000423803 in main (argc=, argv=0x7fffffffe588) at server。c:3857

在 aeCreateEventLoop 中不僅建立了 EPFD,也建立了整個事件迴圈需要的 aeEventLoop 物件,並把這個物件記錄在 Redis 的一個全域性變數的 el 欄位中。這個全域性變數叫 server,這是一個結構體型別。其定義如下:

//位於 server。c 檔案中

struct redisServer server; /* Server global state */

//位於 server。h 檔案中

struct redisServer {

/* General */

//省略部分欄位…

aeEventLoop *el;

unsigned int lruclock; /* Clock for LRU eviction */

//太長了,省略部分欄位…

}

[實戰] Redis 網路通訊模組原始碼分析(2)

接著上一課的內容繼續分析。

1、偵聽 fd 與客戶端 fd 是如何掛載到 EPFD 上去的

同樣的方式,要把一個 fd 掛載到 EPFD 上去,需要呼叫系統 API epoll_ctl ,搜尋一下這個函式名。在檔案 ae_epoll。c 中我們找到 aeApiAddEvent 函式:

static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {

aeApiState *state = eventLoop->apidata;

struct epoll_event ee = {0}; /* avoid valgrind warning */

/* If the fd was already monitored for some event, we need a MOD

* operation。 Otherwise we need an ADD operation。 */

int op = eventLoop->events[fd]。mask == AE_NONE ?

EPOLL_CTL_ADD : EPOLL_CTL_MOD;

ee。events = 0;

mask |= eventLoop->events[fd]。mask; /* Merge old events */

if (mask & AE_READABLE) ee。events |= EPOLLIN;

if (mask & AE_WRITABLE) ee。events |= EPOLLOUT;

ee。data。fd = fd;

if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;

return 0;

}

當把一個 fd 繫結到 EPFD 上去的時候,先從 eventLoop( aeEventLoop型別 )中尋找是否存在已關注的事件型別,如果已經有了,說明使用 epoll_ctl 是更改已繫結的 fd 事件型別( EPOLL_CTL_MOD ),否則就是新增 fd 到 EPFD 上。

在 aeApiAddEvent 加個斷點,再重啟下 redis-server 。觸發斷點後的呼叫堆疊如下:

#0 aeCreateFileEvent (eventLoop=0x7ffff083a0a0, fd=15, mask=mask@entry=1, proc=0x437f50 , clientData=clientData@entry=0x0) at ae。c:145

#1 0x000000000042f83b in initServer () at server。c:1927

#2 0x0000000000423803 in main (argc=, argv=0x7fffffffe588) at server。c:3857

同樣在 initServer 函式中,結合上文分析的偵聽 fd 的建立過程,去掉無關程式碼,抽出這個函式的主脈絡得到如下虛擬碼:

void initServer(void) {

//記錄程式程序 ID

server。pid = getpid();

//建立程式的 aeEventLoop 物件和 epfd 物件

server。el = aeCreateEventLoop(server。maxclients+CONFIG_FDSET_INCR);

//建立偵聽 fd

listenToPort(server。port,server。ipfd,&server。ipfd_count) == C_ERR)

//將偵聽 fd 設定為非阻塞的

anetNonBlock(NULL,server。sofd);

//建立 Redis 的定時器,用於執行定時任務 cron

/* Create the timer callback, this is our way to process many background

* operations incrementally, like clients timeout, eviction of unaccessed

* expired keys and so forth。 */

aeCreateTimeEvent(server。el, 1, serverCron, NULL, NULL) == AE_ERR

//將偵聽 fd 繫結到 epfd 上去

/* Create an event handler for accepting new connections in TCP and Unix

* domain sockets。 */

aeCreateFileEvent(server。el, server。ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR

//建立一個管道,用於在需要時去喚醒 epoll_wait 掛起的整個 EventLoop

/* Register a readable event for the pipe used to awake the event loop

* when a blocked client in a module needs attention。 */

aeCreateFileEvent(server。el, server。module_blocked_pipe[0], AE_READABLE, moduleBlockedClientPipeReadable,NULL) == AE_ERR)

}

注意:這裡所說的“主脈絡”是指我們關心的網路通訊的主脈絡,不代表這個函式中其他程式碼就不是主要的。

如何驗證這個斷點處掛載到 EPFD 上的 fd 就是偵聽 fd 呢?很簡單,建立偵聽 fd 時,用 GDB 記錄下這個 fd 的值。例如,當我的電腦某次執行時,偵聽 fd 的值是 15 。如下圖( 除錯工具用的是 CGDB ):

用GDB除錯 Redis網路通訊模組

然後在執行程式至繫結 fd 的地方,確認一下繫結到 EPFD 上的 fd 值:

用GDB除錯 Redis網路通訊模組

這裡的 fd 值也是 15 ,說明繫結的 fd 是偵聽 fd 。當然在繫結偵聽 fd 時,同時也指定了只關注可讀事件,並設定事件回撥函式為 acceptTcpHandler 。對於偵聽 fd ,一般只要關注可讀事件就可以了,當觸發可讀事件,說明有新的連線到來。

aeCreateFileEvent(server。el, server。ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR

acceptTcpHandler 函式定義如下( 位於檔案 networking。c 中 ):

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {

int cport, cfd, max = MAX_ACCEPTS_PER_CALL;

char cip[NET_IP_STR_LEN];

UNUSED(el);

UNUSED(mask);

UNUSED(privdata);

while(max——) {

cfd = anetTcpAccept(server。neterr, fd, cip, sizeof(cip), &cport);

if (cfd == ANET_ERR) {

if (errno != EWOULDBLOCK)

serverLog(LL_WARNING,

“Accepting client connection: %s”, server。neterr);

return;

}

serverLog(LL_VERBOSE,“Accepted %s:%d”, cip, cport);

acceptCommonHandler(cfd,0,cip);

}

}

anetTcpAccept 函式中呼叫的就是我們上面說的 anetGenericAccept 函數了。

int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {

int fd;

struct sockaddr_storage sa;

socklen_t salen = sizeof(sa);

if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)

return ANET_ERR;

if (sa。ss_family == AF_INET) {

struct sockaddr_in *s = (struct sockaddr_in *)&sa;

if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len);

if (port) *port = ntohs(s->sin_port);

} else {

struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa;

if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);

if (port) *port = ntohs(s->sin6_port);

}

return fd;

}

至此,這段流程總算連起來了,在 acceptTcpHandler 上加個斷點,然後重新執行一下 redis-server ,再開個 redis-cli 去連線 redis-server 。看看是否能觸發該斷點,如果能觸發該斷點,說明我們的分析是正確的。

經驗證,確實觸發了該斷點。

用GDB除錯 Redis網路通訊模組

在 acceptTcpHandler 中成功接受新連線後,產生客戶端 fd ,然後呼叫 acceptCommonHandler 函式,在該函式中呼叫 createClient 函式,在 createClient 函式中先將客戶端 fd 設定成非阻塞的,然後將該 fd 關聯到 EPFD 上去,同時記錄到整個程式的 aeEventLoop 物件上。

注意:這裡客戶端 fd 繫結到 EPFD 上時也只關注可讀事件。將無關的程式碼去掉,然後抽出我們關注的部分,整理後如下( 位於 networking。c 檔案中 ):

client *createClient(int fd) {

//將客戶端 fd 設定成非阻塞的

anetNonBlock(NULL,fd);

//啟用 tcp NoDelay 選項

anetEnableTcpNoDelay(NULL,fd);

//根據配置,決定是否啟動 tcpkeepalive 選項

if (server。tcpkeepalive)

anetKeepAlive(NULL,fd,server。tcpkeepalive);

//將客戶端 fd 繫結到 epfd,同時記錄到 aeEventLoop 上,關注的事件為 AE_READABLE,回撥函式為

//readQueryFromClient

aeCreateFileEvent(server。el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR

return c;

}

2、如何處理 fd 可讀事件

客戶端 fd 觸發可讀事件後,回撥函式是 readQueryFromClient 。該函式實現如下( 位於 networking。c 檔案中):

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {

client *c = (client*) privdata;

int nread, readlen;

size_t qblen;

UNUSED(el);

UNUSED(mask);

readlen = PROTO_IOBUF_LEN;

if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1

&& c->bulklen >= PROTO_MBULK_BIG_ARG)

{

int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);

if (remaining < readlen) readlen = remaining;

}

qblen = sdslen(c->querybuf);

if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;

c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);

nread = read(fd, c->querybuf+qblen, readlen);

if (nread == -1) {

if (errno == EAGAIN) {

return;

} else {

serverLog(LL_VERBOSE, “Reading from client: %s”,strerror(errno));

freeClient(c);

return;

}

} else if (nread == 0) {

serverLog(LL_VERBOSE, “Client closed connection”);

freeClient(c);

return;

} else if (c->flags & CLIENT_MASTER) {

c->pending_querybuf = sdscatlen(c->pending_querybuf,

c->querybuf+qblen,nread);

}

sdsIncrLen(c->querybuf,nread);

c->lastinteraction = server。unixtime;

if (c->flags & CLIENT_MASTER) c->read_reploff += nread;

server。stat_net_input_bytes += nread;

if (sdslen(c->querybuf) > server。client_max_querybuf_len) {

sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

bytes = sdscatrepr(bytes,c->querybuf,64);

serverLog(LL_WARNING,“Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)”, ci, bytes);

sdsfree(ci);

sdsfree(bytes);

freeClient(c);

return;

}

if (!(c->flags & CLIENT_MASTER)) {

processInputBuffer(c);

} else {

size_t prev_offset = c->reploff;

processInputBuffer(c);

size_t applied = c->reploff - prev_offset;

if (applied) {

replicationFeedSlavesFromMasterStream(server。slaves,

c->pending_querybuf, applied);

sdsrange(c->pending_querybuf,applied,-1);

}

}

}

給這個函式加個斷點,然後重新執行下 redis-server ,再啟動一個客戶端,然後嘗試給伺服器傳送一個命令“set hello world”。但是在我們實際除錯的時候會發現。只要 redis-cli 一連線成功,GDB 就觸發該斷點,此時並沒有傳送我們預想的命令。我們單步除錯 readQueryFromClient 函式,將收到的資料打印出來,得到如下字串:

(gdb) p c->querybuf

$8 = (sds) 0x7ffff09b8685 “*1\r\n$7\r\nCOMMAND\r\n”

c → querybuf 是什麼呢?這裡 c 的型別是 client 結構體,它是上文中連線接收成功後產生的新客戶端 fd 繫結回撥函式時產生的、並傳遞給 readQueryFromClient 函式的引數。我們可以在 server。h 中找到它的定義:

* With multiplexing we need to take per-client state。

* Clients are taken in a linked list。 */

typedef struct client {

uint64_t id; /* Client incremental unique ID。 */

int fd; /* Client socket。 */

redisDb *db; /* Pointer to currently SELECTed DB。 */

robj *name; /* As set by CLIENT SETNAME。 */

sds querybuf; /* Buffer we use to accumulate client queries。 */

//省略掉部分欄位

} client;

client 實際上是儲存每個客戶端連線資訊的物件,其 fd 欄位就是當前連線的 fd,querybuf 欄位就是當前連線的接收緩衝區,也就是說每個新客戶端連線都會產生這樣一個物件。從 fd 上收取資料後就儲存在這個 querybuf 欄位中。

我們貼一下完整的 createClient 函式的程式碼:

client *createClient(int fd) {

//略

}

[實戰] Redis 網路通訊模組原始碼分析(3)

接著上一課的內容繼續分析。

1、redis-server 接收到客戶端的第一條命令

redis-cli 給 redis-server 傳送的第一條資料是 *1\r\n$7\r\nCOMMAND\r\n 。我們來看下對於這條資料如何處理,單步除錯一下 readQueryFromClient 呼叫 read 函式收取完資料,接著繼續處理 c→querybuf 的程式碼即可。經實際跟蹤除錯,呼叫的是 processInputBuffer 函式,位於 networking。c 檔案中:

/* This function is called every time, in the client structure ’c‘, there is

* more query buffer to process, because we read more data from the socket

* or because a client was blocked and later reactivated, so there could be

* pending query buffer, already representing a full command, to process。 */

void processInputBuffer(client *c) {

server。current_client = c;

/* Keep processing while there is something in the input buffer */

while(sdslen(c->querybuf)) {

/* Return if clients are paused。 */

if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;

/* Immediately abort if the client is in the middle of something。 */

if (c->flags & CLIENT_BLOCKED) break;

/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is

* written to the client。 Make sure to not let the reply grow after

* this flag has been set (i。e。 don’t process more commands)。

*

* The same applies for clients we want to terminate ASAP。 */

if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

/* Determine request type when unknown。 */

if (!c->reqtype) {

if (c->querybuf[0] == ‘*’) {

c->reqtype = PROTO_REQ_MULTIBULK;

} else {

c->reqtype = PROTO_REQ_INLINE;

}

}

if (c->reqtype == PROTO_REQ_INLINE) {

if (processInlineBuffer(c) != C_OK) break;

} else if (c->reqtype == PROTO_REQ_MULTIBULK) {

if (processMultibulkBuffer(c) != C_OK) break;

} else {

serverPanic(“Unknown request type”);

}

/* Multibulk processing could see a <= 0 length。 */

if (c->argc == 0) {

resetClient(c);

} else {

/* Only reset the client when the command was executed。 */

if (processCommand(c) == C_OK) {

if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {

/* Update the applied replication offset of our master。 */

c->reploff = c->read_reploff - sdslen(c->querybuf);

}

/* Don‘t reset the client structure for clients blocked in a

* module blocking command, so that the reply callback will

* still be able to access the client argv and argc field。

* The client will be reset in unblockClientFromModule()。 */

if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)

resetClient(c);

}

/* freeMemoryIfNeeded may flush slave output buffers。 This may

* result into a slave, that may be the active client, to be

* freed。 */

if (server。current_client == NULL) break;

}

}

server。current_client = NULL;

}

processInputBuffer 先判斷接收到的字串是不是以星號( * )開頭,這裡是以星號開頭,然後設定 client 物件的 reqtype 欄位值為 PROTO_REQ_MULTIBULK 型別,接著呼叫 processMultibulkBuffer 函式繼續處理剩餘的字串。處理後的字串被解析成 redis 命令,記錄在 client 物件的 argc 和 argv 兩個欄位中,前者記錄當前命令的數目,後者儲存的是命令對應結構體物件的地址。這些命令的相關內容不是我們本課程的關注點,不再贅述。

命令解析完成以後,從 processMultibulkBuffer 函式返回,在 processCommand 函式中處理剛才記錄在 client 物件 argv 欄位中的命令。

//為了與原始碼保持一致,程式碼縮排未調整

if (c->argc == 0) {

resetClient(c);

} else {

/* Only reset the client when the command was executed。 */

if (processCommand(c) == C_OK) {

//省略部分程式碼

}

}

在 processCommand 函式中處理命令,流程大致如下:

(1)先判斷是不是 quit 命令,如果是,則往傳送緩衝區中新增一條應答命令( 應答 redis 客戶端 ),並給當前 client 物件設定 CLIENT_CLOSE_AFTER_REPLY 標誌,這個標誌見名知意,即應答完畢後關閉連線。

(2)如果不是 quit 命令,則使用 lookupCommand 函式從全域性命令字典表中查詢相應的命令,如果出錯,則向傳送緩衝區中添加出錯應答。出錯不是指程式邏輯出錯,有可能是客戶端傳送的非法命令。如果找到相應的命令,則執行命令後新增應答。

int processCommand(client *c) {

/* The QUIT command is handled separately。 Normal command procs will

* go through checking for replication and QUIT will cause trouble

* when FORCE_REPLICATION is enabled and would be implemented in

* a regular command proc。 */

if (!strcasecmp(c->argv[0]->ptr,“quit”)) {

addReply(c,shared。ok);

c->flags |= CLIENT_CLOSE_AFTER_REPLY;

return C_ERR;

}

/* Now lookup the command and check ASAP about trivial error conditions

* such as wrong arity, bad command name and so forth。 */

c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);

if (!c->cmd) {

flagTransaction(c);

addReplyErrorFormat(c,“unknown command ’%s‘”,

(char*)c->argv[0]->ptr);

return C_OK;

} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||

(c->argc < -c->cmd->arity)) {

flagTransaction(c);

addReplyErrorFormat(c,“wrong number of arguments for ’%s‘ command”,

c->cmd->name);

return C_OK;

}

//。。。省略部分程式碼

}

全域性字典表是前面介紹的 server 全域性變數(型別是 redisServer)的一個欄位 commands 。

struct redisServer {

/* General */

pid_t pid; /* Main process pid。 */

//無關欄位省略

dict *commands; /* Command table */

//無關欄位省略

}

至於這個全域性字典表在哪裡初始化以及相關的資料結構型別,由於與本課程主題無關,這裡就不分析了。

下面重點探究如何將應答命令(包括出錯的應答)新增到傳送緩衝區去。我們以新增一個“ok”命令為例:

void addReply(client *c, robj *obj) {

if (prepareClientToWrite(c) != C_OK) return;

/* This is an important place where we can avoid copy-on-write

* when there is a saving child running, avoiding touching the

* refcount field of the object if it’s not needed。

*

* If the encoding is RAW and there is room in the static buffer

* we‘ll be able to send the object to the client without

* messing with its page。 */

if (sdsEncodedObject(obj)) {

if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)

_addReplyObjectToList(c,obj);

} else if (obj->encoding == OBJ_ENCODING_INT) {

/* Optimization: if there is room in the static buffer for 32 bytes

* (more than the max chars a 64 bit integer can take as string) we

* avoid decoding the object and go for the lower level approach。 */

if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) {

char buf[32];

int len;

len = ll2string(buf,sizeof(buf),(long)obj->ptr);

if (_addReplyToBuffer(c,buf,len) == C_OK)

return;

/* else。。。 continue with the normal code path, but should never

* happen actually since we verified there is room。 */

}

obj = getDecodedObject(obj);

if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)

_addReplyObjectToList(c,obj);

decrRefCount(obj);

} else {

serverPanic(“Wrong obj->encoding in addReply()”);

}

}

addReply 函式中有兩個關鍵的地方,一個是 prepareClientToWrite 函式呼叫,另外一個是 _addReplyToBuffer 函式呼叫。先來看 prepareClientToWrite ,這個函式中有這樣一段程式碼:

if (!clientHasPendingReplies(c) &&

!(c->flags & CLIENT_PENDING_WRITE) &&

(c->replstate == REPL_STATE_NONE ||

(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))

{

/* Here instead of installing the write handler, we just flag the

* client and put it into a list of clients that have something

* to write to the socket。 This way before re-entering the event

* loop, we can try to directly write to the client sockets avoiding

* a system call。 We’ll only really install the write handler if

* we‘ll not be able to write the whole reply at once。 */

c->flags |= CLIENT_PENDING_WRITE;

listAddNodeHead(server。clients_pending_write,c);

}

這段程式碼先判斷髮送緩衝區中是否還有未傳送的應答命令——透過判斷 client 物件的 bufpos 欄位( int 型 )和 reply 欄位( 這是一個連結串列 )的長度是否大於 0 。

/* Return true if the specified client has pending reply buffers to write to

* the socket。 */

int clientHasPendingReplies(client *c) {

return c->bufpos || listLength(c->reply);

}

如果當前 client 物件不是處於 CLIENT_PENDING_WRITE 狀態,且在傳送緩衝區沒有剩餘資料,則給該 client 物件設定 CLIENT_PENDING_WRITE 標誌,並將當前 client 物件新增到全域性 server 物件的名叫 clients_pending_write 連結串列中去。這個連結串列中存的是所有有資料要傳送的 client 物件,注意和上面說的 reply 連結串列區分開來。

關於 CLIENT_PENDING_WRITE 標誌,redis 解釋是:

Client has output to send but a write handler is yet not installed

翻譯成中文就是:一個有資料需要傳送,但是還沒有註冊可寫事件的 client 物件。

下面討論 _addReplyToBuffer 函式,位於 networking。c 檔案中。

int _addReplyToBuffer(client *c, const char *s, size_t len) {

size_t available = sizeof(c->buf)-c->bufpos;

if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;

/* If there already are entries in the reply list, we cannot

* add anything more to the static buffer。 */

if (listLength(c->reply) > 0) return C_ERR;

/* Check that the buffer has enough space available for this string。 */

if (len > available) return C_ERR;

memcpy(c->buf+c->bufpos,s,len);

c->bufpos+=len;

return C_OK;

}

在這個函式中再次確保了 client 物件的 reply 連結串列長度不能大於 0( if 判斷,如果不滿足條件,則退出該函式 )。reply 連結串列儲存的是待發送的應答命令。應答命令被儲存在 client 物件的 buf 欄位中,其長度被記錄在 bufpos 欄位中。buf 欄位是一個固定大小的位元組陣列:

typedef struct client {

uint64_t id; /* Client incremental unique ID。 */

int fd; /* Client socket。 */

redisDb *db; /* Pointer to currently SELECTed DB。 */

robj *name; /* As set by CLIENT SETNAME。 */

sds querybuf; /* Buffer we use to accumulate client queries。 */

sds pending_querybuf; /* If this is a master, this buffer represents the

yet not applied replication stream that we

are receiving from the master。 */

//省略部分欄位。。。

/* Response buffer */

int bufpos;

char buf[PROTO_REPLY_CHUNK_BYTES];

} client;

PROTO_REPLY_CHUNK_BYTES 在 redis 中的定義是 16*1024 ,也就是說應答命令資料包最長是 16k 。

回到我們上面提的命令:*1\r\n$7\r\nCOMMAND\r\n ,透過 lookupCommand 解析之後得到 command 命令,在 GDB 中顯示如下:

2345 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);

(gdb) n

2346 if (!c->cmd) {

(gdb) p c->cmd

$23 = (struct redisCommand *) 0x742db0

(gdb) p *c->cmd

$24 = {name = 0x4fda67 “command”, proc = 0x42d920 , arity = 0, sflags = 0x50dc3e “lt”, flags = 1536, getkeys_proc = 0x0, firstkey = 0, lastkey = 0,

keystep = 0, microseconds = 1088, calls = 1}