[實戰] 除錯 Redis 準備工作
時下的業界,相對於傳統的關係型資料庫,以 key-value 思想實現的 NoSQL 記憶體資料庫非常流行,而提到記憶體資料庫,很多讀者第一反應就是 Redis 。確實,Redis 以其高效的效能和優雅的實現成為眾多記憶體資料庫中的翹楚。
1、Redis 原始碼下載與編譯
Redis 的最新原始碼下載地址可以在Redis 官網獲得。我使用的是 CentOS 7。0 系統,使用 wget 命令將 Redis 原始碼檔案下載下來:
[root@localhost gdbtest]# wget http://download。redis。io/releases/redis-4。0。11。tar。gz
–2018-09-08 13:08:41– http://download。redis。io/releases/redis-4。0。11。tar。gz
Resolving download。redis。io (download。redis。io)… 109。74。203。151
Connecting to download。redis。io (download。redis。io)|109。74。203。151|:80… connected。
HTTP request sent, awaiting response… 200 OK
Length: 1739656 (1。7M) [application/x-gzip]
Saving to: ‘redis-4。0。11。tar。gz’
54% [======================> ] 940,876 65。6KB/s eta 9s
解壓:
[root@localhost gdbtest]# tar zxvf redis-4。0。11。tar。gz
進入生成的 redis-4。0。11 目錄使用 makefile 進行編譯:
[root@localhost gdbtest]# cd redis-4。0。11
[root@localhost redis-4。0。11]# make -j 4
編譯成功後,會在 src 目錄下生成多個可執行程式,其中 redis-server 和 redis-cli 是我們即將除錯的程式。
進入 src 目錄,使用 GDB 啟動 redis-server 這個程式:
[root@localhost src]# gdb redis-server
Reading symbols from /root/redis-4。0。9/src/redis-server…done。
(gdb) r
Starting program: /root/redis-4。0。9/src/redis-server
[Thread debugging using libthread_db enabled]
Using host libthread_db library “/lib64/libthread_db。so。1”。
31212:C 17 Sep 11:59:50。781 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
31212:C 17 Sep 11:59:50。781 # Redis version=4。0。9, bits=64, commit=00000000, modified=0, pid=31212, just started
31212:C 17 Sep 11:59:50。781 # Warning: no config file specified, using the default config。 In order to specify a config file use /root/redis-4。0。9/src/redis-server /path/to/redis。conf
31212:M 17 Sep 11:59:50。781 * Increased maximum number of open files to 10032 (it was originally set to 1024)。
[New Thread 0x7ffff07ff700 (LWP 31216)]
[New Thread 0x7fffefffe700 (LWP 31217)]
[New Thread 0x7fffef7fd700 (LWP 31218)]
。
_。-__ ‘’-。_
_。- 。。 ”-。 Redis 4。0。9 (00000000/0) 64 bit
。-。-```。 ```\/ _。,_ ‘’-。_
( ‘ , 。-` | `, ) Running in standalone mode
|`-。_`-。。。-` __。。。-。-。_|’_。-’| Port: 6379
|-。_ 。_ / _。-‘ | PID: 31212
-。_ -。_-。/ 。-’ 。-’
|-。_-。_ -。__。-’ _。-‘_。-’|
|-。_-。_ _。-‘_。-’ | http://redis。io
-。_ -。_-。_。-‘。-’ _。-’
|-。_-。_ -。__。-‘ _。-’_。-‘|
|-。_-。_ _。-’_。-‘ |
-。_ -。_-。_。-‘。-’ _。-’
-。_-。_。-’ 。-’
-。_ _。-’
-。__。-’
以上是 redis-server 啟動成功後的畫面。
我們再開一個 session,再次進入 Redis 原始碼所在的 src 目錄,然後使用 GDB 啟動 Redis 客戶端 redis-cli:
[root@localhost src]# gdb redis-cli
Reading symbols from /root/redis-4。0。9/src/redis-cli…done。
(gdb) r
Starting program: /root/redis-4。0。9/src/redis-cli
[Thread debugging using libthread_db enabled]
Using host libthread_db library “/lib64/libthread_db。so。1”。
127。0。0。1:6379>
以上是 redis-cli 啟動成功後的畫面。
2、通訊示例
本課程的學習目的是研究 Redis 的網路通訊模組,為了說明問題方便,我們使用一個簡單的通訊例項,即透過 redis-cli 產生一個 key 為“hello”、值為“world”的 key-value 資料,然後得到 redis-server 的響應。
127。0。0。1:6379> set hello world
OK
127。0。0。1:6379>
讀者需要注意的是,我這裡說是一個“簡單”的例項,其實並不簡單。有兩個原因:
我們是在 redis-cli (Redis 客戶端)輸入的命令,這個命令經 redis-cli 處理後封裝成網路通訊包,透過客戶端的網路通訊模組發給 redis-server,然後 redis-server 網路通訊模組收到後解析出命令,執行命令後得到結果再封裝成相應的網路資料包,返回給 redis-cli。這個過程中涉及到兩端的網路通訊模組是我們研究和學習的重點
redis-server 基本的資料型別都是可以透過類似的命令產生,因此這個例子是一個典型的研究 redis 的典範。
[實戰] Redis 網路通訊模組原始碼分析(1)
我們這裡先研究 redis-server 端的網路通訊模組。除去 Redis 本身的業務功能以外,Redis 的網路通訊模組實現思路和細節非常有代表性。由於網路通訊模組的設計也是 Linux C++ 後臺開發一個很重要的模組,雖然網路上有很多現成的網路庫,但是簡單易學且可以作為典範的並不多,而 redis-server 就是這方面值得借鑑學習的材料之一。
1、偵聽 socket 初始化工作
透過前面課程的介紹,我們知道網路通訊在應用層上的大致流程如下:
伺服器端建立偵聽 socket;
將偵聽 socket 繫結到需要的 IP 地址和埠上(呼叫 Socket API bind 函式);
啟動偵聽(呼叫 socket API listen 函式);
無限等待客戶端連線到來,呼叫 Socket API accept 函式接受客戶端連線,併產生一個與該客戶端對應的客戶端 socket;
處理客戶端 socket 上網路資料的收發,必要時關閉該 socket。
根據上面的流程,先來探究前三步的流程。由於 redis-server 預設對客戶端的埠號是 6379,可以使用這個資訊作為依據。
全域性搜尋一下 Redis 的程式碼,尋找呼叫了 bind() 函式的程式碼,經過過濾和篩選,我們確定了位於 anet。c 的 anetListen() 函式。
static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {
if (bind(s,sa,len) == -1) {
anetSetError(err, “bind: %s”, strerror(errno));
close(s);
return ANET_ERR;
}
if (listen(s, backlog) == -1) {
anetSetError(err, “listen: %s”, strerror(errno));
close(s);
return ANET_ERR;
}
return ANET_OK;
}
用 GDB 的 b 命令在這個函式上加個斷點,然後重新執行 redis-server:
(gdb) b anetListen
Breakpoint 1 at 0x426cd0: file anet。c, line 440。
(gdb) r
The program being debugged has been started already。
Start it from the beginning? (y or n) y
Starting program: /root/redis-4。0。9/src/redis-server
[Thread debugging using libthread_db enabled]
Using host libthread_db library “/lib64/libthread_db。so。1”。
31546:C 17 Sep 14:20:43。861 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo
31546:C 17 Sep 14:20:43。861 # Redis version=4。0。9, bits=64, commit=00000000, modified=0, pid=31546, just started
31546:C 17 Sep 14:20:43。861 # Warning: no config file specified, using the default config。 In order to specify a config file use /root/redis-4。0。9/src/redis-server /path/to/redis。conf
31546:M 17 Sep 14:20:43。862 * Increased maximum number of open files to 10032 (it was originally set to 1024)。
Breakpoint 1, anetListen (err=0x745bb0
440 static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {
當 GDB 中斷在這個函式時,使用 bt 命令檢視一下此時的呼叫堆疊:
(gdb) bt
#0 anetListen (err=0x745bb0
#1 0x0000000000426e25 in _anetTcpServer (err=err@entry=0x745bb0
at anet。c:487
#2 0x000000000042792d in anetTcp6Server (err=err@entry=0x745bb0
at anet。c:510
#3 0x000000000042b01f in listenToPort (port=6379, fds=fds@entry=0x745ae4
#4 0x000000000042f917 in initServer () at server。c:1852
#5 0x0000000000423803 in main (argc=
透過這個堆疊,結合堆疊 #2 的 6379 埠號可以確認這就是我們要找的邏輯,並且這個邏輯在主執行緒(因為從堆疊上看,最頂層堆疊是 main() 函式)中進行。
我們看下堆疊 #1 處的程式碼:
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{
int s = -1, rv;
char _port[6]; /* strlen(“65535”) */
struct addrinfo hints, *servinfo, *p;
snprintf(_port,6,“%d”,port);
memset(&hints,0,sizeof(hints));
hints。ai_family = af;
hints。ai_socktype = SOCK_STREAM;
hints。ai_flags = AI_PASSIVE; /* No effect if bindaddr != NULL */
if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {
anetSetError(err, “%s”, gai_strerror(rv));
return ANET_ERR;
}
for (p = servinfo; p != NULL; p = p->ai_next) {
if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
continue;
if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) goto error;
goto end;
}
if (p == NULL) {
anetSetError(err, “unable to bind socket, errno: %d”, errno);
goto error;
}
error:
if (s != -1) close(s);
s = ANET_ERR;
end:
freeaddrinfo(servinfo);
return s;
}
將堆疊切換至 #1,然後輸入 info arg 檢視傳入給這個函式的引數:
(gdb) f 1
#1 0x0000000000426e25 in _anetTcpServer (err=err@entry=0x745bb0
at anet。c:487
487 if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) s = ANET_ERR;
(gdb) info args
err = 0x745bb0
port = 6379
bindaddr = 0x0
af = 10
backlog = 511
使用系統 API getaddrinfo 來解析得到當前主機的 IP 地址和埠資訊。這裡沒有選擇使用 gethostbyname 這個 API 是因為 gethostbyname 僅能用於解析 ipv4 相關的主機資訊,而 getaddrinfo 既可以用於 ipv4 也可以用於 ipv6 ,這個函式的簽名如下:
int getaddrinfo(const char *node, const char *service,
const struct addrinfo *hints,
struct addrinfo **res);
這個函式的具體用法可以在 Linux man 手冊上檢視。通常伺服器端在呼叫 getaddrinfo 之前,將 hints 引數的 ai_flags 設定為 AI_PASSIVE,用於 bind;主機名 nodename 通常會設定為 NULL,返回通配地址 [::]。當然,客戶端呼叫 getaddrinfo 時,hints 引數的 ai_flags 一般不設定 AI_PASSIVE,但是主機名 node 和服務名 service(更願意稱之為埠)則應該不為空。
解析完協議資訊後,利用得到的協議資訊建立偵聽 socket,並開啟該 socket 的 reuseAddr 選項。然後呼叫 anetListen 函式,在該函式中先 bind 後 listen。至此,redis-server 就可以在 6379 埠上接受客戶端連線了。
2、接受客戶端連線
同樣的道理,要研究 redis-server 如何接受客戶端連線,只要搜尋 socket API accept 函式即可。
經定位,我們最終在 anet。c 檔案中找到 anetGenericAccept 函式:
static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
int fd;
while(1) {
fd = accept(s,sa,len);
if (fd == -1) {
if (errno == EINTR)
continue;
else {
anetSetError(err, “accept: %s”, strerror(errno));
return ANET_ERR;
}
}
break;
}
return fd;
}
我們用 b 命令在這個函式處加個斷點,然後重新執行 redis-server。一直到程式全部執行起來,GDB 都沒有觸發該斷點,這時新開啟一個 redis-cli,以模擬新客戶端連線到 redis-server 上的行為。斷點觸發了,此時檢視一下呼叫堆疊。
Breakpoint 2, anetGenericAccept (err=0x745bb0
531 static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
(gdb) bt
#0 anetGenericAccept (err=0x745bb0
#1 0x0000000000427a1d in anetTcpAccept (err=
port=port@entry=0x7fffffffe36c) at anet。c:552
#2 0x0000000000437fb1 in acceptTcpHandler (el=
#3 0x00000000004267f0 in aeProcessEvents (eventLoop=eventLoop@entry=0x7ffff083a0a0, flags=flags@entry=11) at ae。c:440
#4 0x0000000000426adb in aeMain (eventLoop=0x7ffff083a0a0) at ae。c:498
#5 0x00000000004238ef in main (argc=
分析這個呼叫堆疊,梳理一下這個呼叫流程。在 main 函式的 initServer 函式中建立偵聽 socket、繫結地址然後開啟偵聽,接著呼叫 aeMain 函式啟動一個迴圈不斷地處理“事件”。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
迴圈的退出條件是 eventLoop→stop 為 1。事件處理的程式碼如下:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire。 */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
/* How many milliseconds we need to wait for the next
* time event to fire? */
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv。tv_sec = tv。tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
/* Call the multiplexing API, will return only on timeout or when
* some event fires。 */
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback。 */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j]。fd];
int mask = eventLoop->fired[j]。mask;
int fd = eventLoop->fired[j]。fd;
int rfired = 0;
/* note the fe->mask & mask & 。。。 code: maybe an already processed
* event removed an element that fired and we still didn‘t
* processed, so we check if the event is still valid。 */
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
}
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
這段程式碼先透過 flag 引數檢查是否有事件需要處理。如果有定時器事件( AE_TIME_EVENTS 標誌 ),則尋找最近要到期的定時器。
/* Search the first timer to fire。
* This operation is useful to know how many time the select can be
* put in sleep without to delay any event。
* If there are no timers NULL is returned。
*
* Note that’s O(N) since time events are unsorted。
* Possible optimizations (not needed by Redis so far, but。。。):
* 1) Insert the event in order, so that the nearest is just the head。
* Much better but still insertion or deletion of timers is O(N)。
* 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N))。
*/
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
aeTimeEvent *te = eventLoop->timeEventHead;
aeTimeEvent *nearest = NULL;
while(te) {
if (!nearest || te->when_sec < nearest->when_sec ||
(te->when_sec == nearest->when_sec &&
te->when_ms < nearest->when_ms))
nearest = te;
te = te->next;
}
return nearest;
}
這段程式碼有詳細的註釋,也非常好理解。註釋告訴我們,由於這裡的定時器集合是無序的,所以需要遍歷一下這個連結串列,演算法複雜度是 O(n) 。同時,註釋中也“暗示”了我們將來 Redis 在這塊的最佳化方向,即把這個連結串列按到期時間從小到大排序,這樣連結串列的頭部就是我們要的最近時間點的定時器物件,演算法複雜度是 O(1) 。或者使用 Redis 中的 skiplist ,演算法複雜度是 O(log(N)) 。
接著獲取當前系統時間( aeGetTime(&now_sec, &now_ms); )將最早要到期的定時器時間減去當前系統時間獲得一個間隔。這個時間間隔作為 numevents = aeApiPoll(eventLoop, tvp); 呼叫的引數,aeApiPoll() 在 Linux 平臺上使用 epoll 技術,Redis 在這個 IO 複用技術上、在不同的作業系統平臺上使用不同的系統函式,在 Windows 系統上使用 select,在 Mac 系統上使用 kqueue。這裡重點看下 Linux 平臺下的實現:
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j]。fd = e->data。fd;
eventLoop->fired[j]。mask = mask;
}
}
return numevents;
}
epoll_wait 這個函式的簽名如下:
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
最後一個引數 timeout 的設定非常有講究,如果傳入進來的 tvp 是 NULL ,根據上文的分析,說明沒有定時器事件,則將等待時間設定為 -1 ,這會讓 epoll_wait 無限期地掛起來,直到有事件時才會被喚醒。掛起的好處就是不浪費 CPU 時間片。反之,將 timeout 設定成最近的定時器事件間隔,將 epoll_wait 的等待時間設定為最近的定時器事件來臨的時間間隔,可以及時喚醒 epoll_wait ,這樣程式流可以儘快處理這個到期的定時器事件(下文會介紹)。
對於 epoll_wait 這種系統呼叫,所有的 fd(對於網路通訊,也叫 socket)資訊包括偵聽 fd 和普通客戶端 fd 都記錄在事件迴圈物件 aeEventLoop 的 apidata 欄位中,當某個 fd 上有事件觸發時,從 apidata 中找到該 fd,並把事件型別(mask 欄位)一起記錄到 aeEventLoop 的 fired 欄位中去。我們先把這個流程介紹完,再介紹 epoll_wait 函式中使用的 epfd 是在何時何地建立的,偵聽 fd、客戶端 fd 是如何掛載到 epfd 上去的。
在得到了有事件的 fd 以後,接下來就要處理這些事件了。在主迴圈 aeProcessEvents 中從 aeEventLoop 物件的 fired 陣列中取出上一步記錄的 fd,然後根據事件型別(讀事件和寫事件)分別進行處理。
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j]。fd];
int mask = eventLoop->fired[j]。mask;
int fd = eventLoop->fired[j]。fd;
int rfired = 0;
/* note the fe->mask & mask & 。。。 code: maybe an already processed
* event removed an element that fired and we still didn‘t
* processed, so we check if the event is still valid。 */
if (fe->mask & mask & AE_READABLE) {
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
processed++;
}
讀事件欄位 rfileProc 和寫事件欄位 wfileProc 都是函式指標,在程式早期設定好,這裡直接呼叫就可以了。
typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
3、EPFD 的建立
我們透過搜尋關鍵字 epoll_create 在 ae_poll。c 檔案中找到 EPFD 的建立函式 aeApiCreate 。
static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}
使用 GDB 的 b 命令在這個函式上加個斷點,然後使用 run 命令重新執行一下 redis-server,觸發斷點,使用 bt 命令檢視此時的呼叫堆疊。發現 EPFD 也是在上文介紹的 initServer 函式中建立的。
(gdb) bt
#0 aeCreateEventLoop (setsize=10128) at ae。c:79
#1 0x000000000042f542 in initServer () at server。c:1841
#2 0x0000000000423803 in main (argc=
在 aeCreateEventLoop 中不僅建立了 EPFD,也建立了整個事件迴圈需要的 aeEventLoop 物件,並把這個物件記錄在 Redis 的一個全域性變數的 el 欄位中。這個全域性變數叫 server,這是一個結構體型別。其定義如下:
//位於 server。c 檔案中
struct redisServer server; /* Server global state */
//位於 server。h 檔案中
struct redisServer {
/* General */
//省略部分欄位…
aeEventLoop *el;
unsigned int lruclock; /* Clock for LRU eviction */
//太長了,省略部分欄位…
}
[實戰] Redis 網路通訊模組原始碼分析(2)
接著上一課的內容繼續分析。
1、偵聽 fd 與客戶端 fd 是如何掛載到 EPFD 上去的
同樣的方式,要把一個 fd 掛載到 EPFD 上去,需要呼叫系統 API epoll_ctl ,搜尋一下這個函式名。在檔案 ae_epoll。c 中我們找到 aeApiAddEvent 函式:
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation。 Otherwise we need an ADD operation。 */
int op = eventLoop->events[fd]。mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee。events = 0;
mask |= eventLoop->events[fd]。mask; /* Merge old events */
if (mask & AE_READABLE) ee。events |= EPOLLIN;
if (mask & AE_WRITABLE) ee。events |= EPOLLOUT;
ee。data。fd = fd;
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
當把一個 fd 繫結到 EPFD 上去的時候,先從 eventLoop( aeEventLoop型別 )中尋找是否存在已關注的事件型別,如果已經有了,說明使用 epoll_ctl 是更改已繫結的 fd 事件型別( EPOLL_CTL_MOD ),否則就是新增 fd 到 EPFD 上。
在 aeApiAddEvent 加個斷點,再重啟下 redis-server 。觸發斷點後的呼叫堆疊如下:
#0 aeCreateFileEvent (eventLoop=0x7ffff083a0a0, fd=15, mask=mask@entry=1, proc=0x437f50
#1 0x000000000042f83b in initServer () at server。c:1927
#2 0x0000000000423803 in main (argc=
同樣在 initServer 函式中,結合上文分析的偵聽 fd 的建立過程,去掉無關程式碼,抽出這個函式的主脈絡得到如下虛擬碼:
void initServer(void) {
//記錄程式程序 ID
server。pid = getpid();
//建立程式的 aeEventLoop 物件和 epfd 物件
server。el = aeCreateEventLoop(server。maxclients+CONFIG_FDSET_INCR);
//建立偵聽 fd
listenToPort(server。port,server。ipfd,&server。ipfd_count) == C_ERR)
//將偵聽 fd 設定為非阻塞的
anetNonBlock(NULL,server。sofd);
//建立 Redis 的定時器,用於執行定時任務 cron
/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
* expired keys and so forth。 */
aeCreateTimeEvent(server。el, 1, serverCron, NULL, NULL) == AE_ERR
//將偵聽 fd 繫結到 epfd 上去
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets。 */
aeCreateFileEvent(server。el, server。ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR
//建立一個管道,用於在需要時去喚醒 epoll_wait 掛起的整個 EventLoop
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention。 */
aeCreateFileEvent(server。el, server。module_blocked_pipe[0], AE_READABLE, moduleBlockedClientPipeReadable,NULL) == AE_ERR)
}
注意:這裡所說的“主脈絡”是指我們關心的網路通訊的主脈絡,不代表這個函式中其他程式碼就不是主要的。
如何驗證這個斷點處掛載到 EPFD 上的 fd 就是偵聽 fd 呢?很簡單,建立偵聽 fd 時,用 GDB 記錄下這個 fd 的值。例如,當我的電腦某次執行時,偵聽 fd 的值是 15 。如下圖( 除錯工具用的是 CGDB ):
然後在執行程式至繫結 fd 的地方,確認一下繫結到 EPFD 上的 fd 值:
這裡的 fd 值也是 15 ,說明繫結的 fd 是偵聽 fd 。當然在繫結偵聽 fd 時,同時也指定了只關注可讀事件,並設定事件回撥函式為 acceptTcpHandler 。對於偵聽 fd ,一般只要關注可讀事件就可以了,當觸發可讀事件,說明有新的連線到來。
aeCreateFileEvent(server。el, server。ipfd[j], AE_READABLE, acceptTcpHandler,NULL) == AE_ERR
acceptTcpHandler 函式定義如下( 位於檔案 networking。c 中 ):
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
while(max——) {
cfd = anetTcpAccept(server。neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
“Accepting client connection: %s”, server。neterr);
return;
}
serverLog(LL_VERBOSE,“Accepted %s:%d”, cip, cport);
acceptCommonHandler(cfd,0,cip);
}
}
anetTcpAccept 函式中呼叫的就是我們上面說的 anetGenericAccept 函數了。
int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {
int fd;
struct sockaddr_storage sa;
socklen_t salen = sizeof(sa);
if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)
return ANET_ERR;
if (sa。ss_family == AF_INET) {
struct sockaddr_in *s = (struct sockaddr_in *)&sa;
if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len);
if (port) *port = ntohs(s->sin_port);
} else {
struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa;
if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);
if (port) *port = ntohs(s->sin6_port);
}
return fd;
}
至此,這段流程總算連起來了,在 acceptTcpHandler 上加個斷點,然後重新執行一下 redis-server ,再開個 redis-cli 去連線 redis-server 。看看是否能觸發該斷點,如果能觸發該斷點,說明我們的分析是正確的。
經驗證,確實觸發了該斷點。
在 acceptTcpHandler 中成功接受新連線後,產生客戶端 fd ,然後呼叫 acceptCommonHandler 函式,在該函式中呼叫 createClient 函式,在 createClient 函式中先將客戶端 fd 設定成非阻塞的,然後將該 fd 關聯到 EPFD 上去,同時記錄到整個程式的 aeEventLoop 物件上。
注意:這裡客戶端 fd 繫結到 EPFD 上時也只關注可讀事件。將無關的程式碼去掉,然後抽出我們關注的部分,整理後如下( 位於 networking。c 檔案中 ):
client *createClient(int fd) {
//將客戶端 fd 設定成非阻塞的
anetNonBlock(NULL,fd);
//啟用 tcp NoDelay 選項
anetEnableTcpNoDelay(NULL,fd);
//根據配置,決定是否啟動 tcpkeepalive 選項
if (server。tcpkeepalive)
anetKeepAlive(NULL,fd,server。tcpkeepalive);
//將客戶端 fd 繫結到 epfd,同時記錄到 aeEventLoop 上,關注的事件為 AE_READABLE,回撥函式為
//readQueryFromClient
aeCreateFileEvent(server。el,fd,AE_READABLE, readQueryFromClient, c) == AE_ERR
return c;
}
2、如何處理 fd 可讀事件
客戶端 fd 觸發可讀事件後,回撥函式是 readQueryFromClient 。該函式實現如下( 位於 networking。c 檔案中):
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
client *c = (client*) privdata;
int nread, readlen;
size_t qblen;
UNUSED(el);
UNUSED(mask);
readlen = PROTO_IOBUF_LEN;
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
int remaining = (unsigned)(c->bulklen+2)-sdslen(c->querybuf);
if (remaining < readlen) readlen = remaining;
}
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) {
if (errno == EAGAIN) {
return;
} else {
serverLog(LL_VERBOSE, “Reading from client: %s”,strerror(errno));
freeClient(c);
return;
}
} else if (nread == 0) {
serverLog(LL_VERBOSE, “Client closed connection”);
freeClient(c);
return;
} else if (c->flags & CLIENT_MASTER) {
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server。unixtime;
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
server。stat_net_input_bytes += nread;
if (sdslen(c->querybuf) > server。client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64);
serverLog(LL_WARNING,“Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)”, ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClient(c);
return;
}
if (!(c->flags & CLIENT_MASTER)) {
processInputBuffer(c);
} else {
size_t prev_offset = c->reploff;
processInputBuffer(c);
size_t applied = c->reploff - prev_offset;
if (applied) {
replicationFeedSlavesFromMasterStream(server。slaves,
c->pending_querybuf, applied);
sdsrange(c->pending_querybuf,applied,-1);
}
}
}
給這個函式加個斷點,然後重新執行下 redis-server ,再啟動一個客戶端,然後嘗試給伺服器傳送一個命令“set hello world”。但是在我們實際除錯的時候會發現。只要 redis-cli 一連線成功,GDB 就觸發該斷點,此時並沒有傳送我們預想的命令。我們單步除錯 readQueryFromClient 函式,將收到的資料打印出來,得到如下字串:
(gdb) p c->querybuf
$8 = (sds) 0x7ffff09b8685 “*1\r\n$7\r\nCOMMAND\r\n”
c → querybuf 是什麼呢?這裡 c 的型別是 client 結構體,它是上文中連線接收成功後產生的新客戶端 fd 繫結回撥函式時產生的、並傳遞給 readQueryFromClient 函式的引數。我們可以在 server。h 中找到它的定義:
* With multiplexing we need to take per-client state。
* Clients are taken in a linked list。 */
typedef struct client {
uint64_t id; /* Client incremental unique ID。 */
int fd; /* Client socket。 */
redisDb *db; /* Pointer to currently SELECTed DB。 */
robj *name; /* As set by CLIENT SETNAME。 */
sds querybuf; /* Buffer we use to accumulate client queries。 */
//省略掉部分欄位
} client;
client 實際上是儲存每個客戶端連線資訊的物件,其 fd 欄位就是當前連線的 fd,querybuf 欄位就是當前連線的接收緩衝區,也就是說每個新客戶端連線都會產生這樣一個物件。從 fd 上收取資料後就儲存在這個 querybuf 欄位中。
我們貼一下完整的 createClient 函式的程式碼:
client *createClient(int fd) {
//略
}
[實戰] Redis 網路通訊模組原始碼分析(3)
接著上一課的內容繼續分析。
1、redis-server 接收到客戶端的第一條命令
redis-cli 給 redis-server 傳送的第一條資料是 *1\r\n$7\r\nCOMMAND\r\n 。我們來看下對於這條資料如何處理,單步除錯一下 readQueryFromClient 呼叫 read 函式收取完資料,接著繼續處理 c→querybuf 的程式碼即可。經實際跟蹤除錯,呼叫的是 processInputBuffer 函式,位於 networking。c 檔案中:
/* This function is called every time, in the client structure ’c‘, there is
* more query buffer to process, because we read more data from the socket
* or because a client was blocked and later reactivated, so there could be
* pending query buffer, already representing a full command, to process。 */
void processInputBuffer(client *c) {
server。current_client = c;
/* Keep processing while there is something in the input buffer */
while(sdslen(c->querybuf)) {
/* Return if clients are paused。 */
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
/* Immediately abort if the client is in the middle of something。 */
if (c->flags & CLIENT_BLOCKED) break;
/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client。 Make sure to not let the reply grow after
* this flag has been set (i。e。 don’t process more commands)。
*
* The same applies for clients we want to terminate ASAP。 */
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
/* Determine request type when unknown。 */
if (!c->reqtype) {
if (c->querybuf[0] == ‘*’) {
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
c->reqtype = PROTO_REQ_INLINE;
}
}
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic(“Unknown request type”);
}
/* Multibulk processing could see a <= 0 length。 */
if (c->argc == 0) {
resetClient(c);
} else {
/* Only reset the client when the command was executed。 */
if (processCommand(c) == C_OK) {
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master。 */
c->reploff = c->read_reploff - sdslen(c->querybuf);
}
/* Don‘t reset the client structure for clients blocked in a
* module blocking command, so that the reply callback will
* still be able to access the client argv and argc field。
* The client will be reset in unblockClientFromModule()。 */
if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE)
resetClient(c);
}
/* freeMemoryIfNeeded may flush slave output buffers。 This may
* result into a slave, that may be the active client, to be
* freed。 */
if (server。current_client == NULL) break;
}
}
server。current_client = NULL;
}
processInputBuffer 先判斷接收到的字串是不是以星號( * )開頭,這裡是以星號開頭,然後設定 client 物件的 reqtype 欄位值為 PROTO_REQ_MULTIBULK 型別,接著呼叫 processMultibulkBuffer 函式繼續處理剩餘的字串。處理後的字串被解析成 redis 命令,記錄在 client 物件的 argc 和 argv 兩個欄位中,前者記錄當前命令的數目,後者儲存的是命令對應結構體物件的地址。這些命令的相關內容不是我們本課程的關注點,不再贅述。
命令解析完成以後,從 processMultibulkBuffer 函式返回,在 processCommand 函式中處理剛才記錄在 client 物件 argv 欄位中的命令。
//為了與原始碼保持一致,程式碼縮排未調整
if (c->argc == 0) {
resetClient(c);
} else {
/* Only reset the client when the command was executed。 */
if (processCommand(c) == C_OK) {
//省略部分程式碼
}
}
在 processCommand 函式中處理命令,流程大致如下:
(1)先判斷是不是 quit 命令,如果是,則往傳送緩衝區中新增一條應答命令( 應答 redis 客戶端 ),並給當前 client 物件設定 CLIENT_CLOSE_AFTER_REPLY 標誌,這個標誌見名知意,即應答完畢後關閉連線。
(2)如果不是 quit 命令,則使用 lookupCommand 函式從全域性命令字典表中查詢相應的命令,如果出錯,則向傳送緩衝區中添加出錯應答。出錯不是指程式邏輯出錯,有可能是客戶端傳送的非法命令。如果找到相應的命令,則執行命令後新增應答。
int processCommand(client *c) {
/* The QUIT command is handled separately。 Normal command procs will
* go through checking for replication and QUIT will cause trouble
* when FORCE_REPLICATION is enabled and would be implemented in
* a regular command proc。 */
if (!strcasecmp(c->argv[0]->ptr,“quit”)) {
addReply(c,shared。ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth。 */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
flagTransaction(c);
addReplyErrorFormat(c,“unknown command ’%s‘”,
(char*)c->argv[0]->ptr);
return C_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
flagTransaction(c);
addReplyErrorFormat(c,“wrong number of arguments for ’%s‘ command”,
c->cmd->name);
return C_OK;
}
//。。。省略部分程式碼
}
全域性字典表是前面介紹的 server 全域性變數(型別是 redisServer)的一個欄位 commands 。
struct redisServer {
/* General */
pid_t pid; /* Main process pid。 */
//無關欄位省略
dict *commands; /* Command table */
//無關欄位省略
}
至於這個全域性字典表在哪裡初始化以及相關的資料結構型別,由於與本課程主題無關,這裡就不分析了。
下面重點探究如何將應答命令(包括出錯的應答)新增到傳送緩衝區去。我們以新增一個“ok”命令為例:
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;
/* This is an important place where we can avoid copy-on-write
* when there is a saving child running, avoiding touching the
* refcount field of the object if it’s not needed。
*
* If the encoding is RAW and there is room in the static buffer
* we‘ll be able to send the object to the client without
* messing with its page。 */
if (sdsEncodedObject(obj)) {
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyObjectToList(c,obj);
} else if (obj->encoding == OBJ_ENCODING_INT) {
/* Optimization: if there is room in the static buffer for 32 bytes
* (more than the max chars a 64 bit integer can take as string) we
* avoid decoding the object and go for the lower level approach。 */
if (listLength(c->reply) == 0 && (sizeof(c->buf) - c->bufpos) >= 32) {
char buf[32];
int len;
len = ll2string(buf,sizeof(buf),(long)obj->ptr);
if (_addReplyToBuffer(c,buf,len) == C_OK)
return;
/* else。。。 continue with the normal code path, but should never
* happen actually since we verified there is room。 */
}
obj = getDecodedObject(obj);
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyObjectToList(c,obj);
decrRefCount(obj);
} else {
serverPanic(“Wrong obj->encoding in addReply()”);
}
}
addReply 函式中有兩個關鍵的地方,一個是 prepareClientToWrite 函式呼叫,另外一個是 _addReplyToBuffer 函式呼叫。先來看 prepareClientToWrite ,這個函式中有這樣一段程式碼:
if (!clientHasPendingReplies(c) &&
!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{
/* Here instead of installing the write handler, we just flag the
* client and put it into a list of clients that have something
* to write to the socket。 This way before re-entering the event
* loop, we can try to directly write to the client sockets avoiding
* a system call。 We’ll only really install the write handler if
* we‘ll not be able to write the whole reply at once。 */
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server。clients_pending_write,c);
}
這段程式碼先判斷髮送緩衝區中是否還有未傳送的應答命令——透過判斷 client 物件的 bufpos 欄位( int 型 )和 reply 欄位( 這是一個連結串列 )的長度是否大於 0 。
/* Return true if the specified client has pending reply buffers to write to
* the socket。 */
int clientHasPendingReplies(client *c) {
return c->bufpos || listLength(c->reply);
}
如果當前 client 物件不是處於 CLIENT_PENDING_WRITE 狀態,且在傳送緩衝區沒有剩餘資料,則給該 client 物件設定 CLIENT_PENDING_WRITE 標誌,並將當前 client 物件新增到全域性 server 物件的名叫 clients_pending_write 連結串列中去。這個連結串列中存的是所有有資料要傳送的 client 物件,注意和上面說的 reply 連結串列區分開來。
關於 CLIENT_PENDING_WRITE 標誌,redis 解釋是:
Client has output to send but a write handler is yet not installed
翻譯成中文就是:一個有資料需要傳送,但是還沒有註冊可寫事件的 client 物件。
下面討論 _addReplyToBuffer 函式,位於 networking。c 檔案中。
int _addReplyToBuffer(client *c, const char *s, size_t len) {
size_t available = sizeof(c->buf)-c->bufpos;
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
/* If there already are entries in the reply list, we cannot
* add anything more to the static buffer。 */
if (listLength(c->reply) > 0) return C_ERR;
/* Check that the buffer has enough space available for this string。 */
if (len > available) return C_ERR;
memcpy(c->buf+c->bufpos,s,len);
c->bufpos+=len;
return C_OK;
}
在這個函式中再次確保了 client 物件的 reply 連結串列長度不能大於 0( if 判斷,如果不滿足條件,則退出該函式 )。reply 連結串列儲存的是待發送的應答命令。應答命令被儲存在 client 物件的 buf 欄位中,其長度被記錄在 bufpos 欄位中。buf 欄位是一個固定大小的位元組陣列:
typedef struct client {
uint64_t id; /* Client incremental unique ID。 */
int fd; /* Client socket。 */
redisDb *db; /* Pointer to currently SELECTed DB。 */
robj *name; /* As set by CLIENT SETNAME。 */
sds querybuf; /* Buffer we use to accumulate client queries。 */
sds pending_querybuf; /* If this is a master, this buffer represents the
yet not applied replication stream that we
are receiving from the master。 */
//省略部分欄位。。。
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
} client;
PROTO_REPLY_CHUNK_BYTES 在 redis 中的定義是 16*1024 ,也就是說應答命令資料包最長是 16k 。
回到我們上面提的命令:*1\r\n$7\r\nCOMMAND\r\n ,透過 lookupCommand 解析之後得到 command 命令,在 GDB 中顯示如下:
2345 c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
(gdb) n
2346 if (!c->cmd) {
(gdb) p c->cmd
$23 = (struct redisCommand *) 0x742db0
(gdb) p *c->cmd
$24 = {name = 0x4fda67 “command”, proc = 0x42d920
keystep = 0, microseconds = 1088, calls = 1}