站長資訊網
        最全最豐富的資訊網站

        一文了解Redis源碼設計剖析之事件處理

        本篇文章給大家帶來了關于redis的相關問題,其中主要介紹了關于事件處理示例的相關內容,包括了redis事件介紹、事件的抽象以及事件的實現等內容,下面一起來看一下,希望對大家有幫助。

        一文了解Redis源碼設計剖析之事件處理

        千萬級數據并發如何處理?進入學習

        推薦學習:Redis視頻教程

        1. Redis事件介紹

        Redis服務器是一個事件驅動程序,所謂事件驅動就是輸入一條命令并且按下回車,然后消息被組裝成Redis協議的格式發送給Redis服務器,這個時候就會產生一個事件,Redis服務器會接收改命令,處理該命令和發送回復,而當我們沒有與服務器進行交互時,服務器就會處于阻塞等待狀態,它會讓出CPU然后進入睡眠狀態,當事件觸發時,就會被操作系統喚醒.

        而Redis服務器需要處理以下兩類事件:

        文件事件:Redis 服務器通過套接字與客戶端(或者其他Redis服務器)進行連接,而文件事件就是服務器對套接字操作的抽象. 服務器與客戶端(或者其他服務器)的通信會產生相應的文件事件,而服務器則通過監聽并處理這些事件來完成一系列網絡通信操作.

        時間事件:Redis 服務器中的一些操作(比如serverCron函數)需要在給定的時間點執行,而時間事件就是服務器對這類定時操作的抽象.

        2. 事件的抽象

        Redis把文件事件時間事件分別抽象成一個數據結構來管理.

        2.1 文件事件結構

        typedef struct aeFileEvent {     // 文件時間類型:AE_NONE,AE_READABLE,AE_WRITABLE     int mask;     // 可讀處理函數     aeFileProc *rfileProc;     // 可寫處理函數     aeFileProc *wfileProc;     // 客戶端傳入的數據     void *clientData; } aeFileEvent;  //文件事件
        登錄后復制

        其中rfileProcwfileProc成員分別為兩個函數指針,他們的原型為:

        typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
        登錄后復制

        該函數是回調函數,如果當前文件事件所指定的事件類型發生時,則會調用對應的回調函數來處理該事件.

        當事件就緒的時候,我們需要知道文件事件的文件描述符還有事件類型才能對于鎖定該事件,因此定義了aeFiredEvent結構統一管理:

        typedef struct aeFiredEvent {     // 就緒事件的文件描述符     int fd;     // 就緒事件類型:AE_NONE,AE_READABLE,AE_WRITABLE     int mask; } aeFiredEvent; //就緒事件
        登錄后復制

        文件事件的類型:

        #define AE_NONE 0           //未設置 #define AE_READABLE 1       //事件可讀 #define AE_WRITABLE 2       //事件可寫
        登錄后復制

        2.2 時間事件結構

        typedef struct aeTimeEvent {     // 時間事件的id     long long id;     // 時間事件到達的時間的秒數     long when_sec; /* seconds */     // 時間事件到達的時間的毫秒數     long when_ms; /* milliseconds */     // 時間事件處理函數     aeTimeProc *timeProc;     // 時間事件終結函數     aeEventFinalizerProc *finalizerProc;     // 客戶端傳入的數據     void *clientData;     // 指向下一個時間事件     struct aeTimeEvent *next; } aeTimeEvent;  //時間事件
        登錄后復制

        可以看出,時間事件的結構就是一個鏈表的節點,因為struct aeTimeEvent *next是指向下一個時間事件的指針.

        和文件事件一樣,當時間事件所指定的事件發生時,也會調用對應的回調函數,結構成員timeProcfinalizerProc都是回調函數,函數原型如下:

        typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
        登錄后復制

        雖然對文件事件和時間事件都做了抽象,Redis仍然需要對事件做一個整體的抽象,用來描述一個事件的狀態. 也就是下面要介紹的事件狀態結構:aeEventLoop.

        2.3 事件狀態結構

        typedef struct aeEventLoop {     // 當前已注冊的最大的文件描述符     int maxfd;   /* highest file descriptor currently registered */     // 文件描述符監聽集合的大小     int setsize; /* max number of file descriptors tracked */     // 下一個時間事件的ID     long long timeEventNextId;     // 最后一次執行事件的時間     time_t lastTime;     /* Used to detect system clock skew */     // 注冊的文件事件表     aeFileEvent *events; /* Registered events */     // 已就緒的文件事件表     aeFiredEvent *fired; /* Fired events */     // 時間事件的頭節點指針     aeTimeEvent *timeEventHead;     // 事件處理開關     int stop;     // 多路復用庫的事件狀態數據     void *apidata; /* This is used for polling API specific data */     // 執行處理事件之前的函數     aeBeforeSleepProc *beforesleep; } aeEventLoop;  //事件輪詢的狀態結構
        登錄后復制

        aeEventLoop結構保存了一個void *類型的萬能指針apidata,用來保存輪詢事件的狀態,也就是保存底層調用的多路復用庫的事件狀態.

        RedisI/O多路復用程序的所有功能都是通過包裝常見的selectepollevportkqueue這些I/O多路復用函數庫來實現的,每個I/O多路復用函數庫在Redis源碼中都對應著一個單獨的文件,比如ae_select.c、ae_epoll.c等等.

        他們在編譯階段,會根據不同的系統選擇性能最高的一個多路復用庫作為Redis的多路復用程序實現,而且所有庫的API都是相同的,這就可以讓Redis多路復用程序的底層可以互換.

        下面是具體選擇庫的源碼:

        // IO復用的選擇,性能依次下降,Linux支持 "ae_epoll.c" 和 "ae_select.c" #ifdef HAVE_EVPORT #include "ae_evport.c" #else     #ifdef HAVE_EPOLL     #include "ae_epoll.c"     #else         #ifdef HAVE_KQUEUE         #include "ae_kqueue.c"         #else         #include "ae_select.c"         #endif     #endif #endif
        登錄后復制

        也可以通過命令INFO server來查看當前使用的是哪個多路復用庫:

        一文了解Redis源碼設計剖析之事件處理

        可以看到Linux下默認使用的是epoll多路復用庫,那么apidata保存的就是epoll模型的事件狀態結構,它在ae_epoll.c源文件中:

        typedef struct aeApiState {     // epoll事件的文件描述符     int epfd;     // 事件表     struct epoll_event *events; } aeApiState;   // 事件的狀態
        登錄后復制

        epoll模型的struct epoll_event結構中定義著epoll事件的類型,比如EPOLLIN、EPOLLOUT等等,但是Redis的文件結構aeFileEvent中也在mask中定義了自己的事件類型,例如:AE_READABLE、AE_WRITABLE等等,于是就需要實現一個中間層將兩者的事件類型相聯系起來,這就是之前提到的ae_epoll.c文件中實現的相同的API:

        // 創建一個epoll實例,保存到eventLoop中 static int aeApiCreate(aeEventLoop *eventLoop) // 調整事件表的大小 static int aeApiResize(aeEventLoop *eventLoop, int setsize)   // 釋放epoll實例和事件表空間 static void aeApiFree(aeEventLoop *eventLoop) // 在epfd標識的事件表上注冊fd的事件 static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) // 在epfd標識的事件表上注刪除fd的事件 static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) // 等待所監聽文件描述符上有事件發生 static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) // 返回正在使用的IO多路復用庫的名字 static char *aeApiName(void)
        登錄后復制

        這些API會講epoll的底層函數封裝起來,Redis實現事件時,只需要調用這些接口即可.

        我們以下面兩個API的源碼舉例:

        aeApiAddEvent

        該函數會向Redis事件狀態結構aeEventLoop的事件表event注冊一個事件,對應的是epoll_ctl函數.

        // 在epfd標識的事件表上注冊fd的事件 static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {     aeApiState *state = eventLoop->apidata;     struct epoll_event ee = {0};     // EPOLL_CTL_ADD,向epfd注冊fd的上的event     // EPOLL_CTL_MOD,修改fd已注冊的event     // #define AE_NONE 0           //未設置     // #define AE_READABLE 1       //事件可讀     // #define AE_WRITABLE 2       //事件可寫     // 判斷fd事件的操作,如果沒有設置事件,則進行關聯mask類型事件,否則進行修改     int op = eventLoop->events[fd].mask == AE_NONE ?             EPOLL_CTL_ADD : EPOLL_CTL_MOD;     // struct epoll_event {     //      uint32_t     events;      /* Epoll events */     //      epoll_data_t data;        /* User data variable */     // };     ee.events = 0;     // 如果是修改事件,合并之前的事件類型     mask |= eventLoop->events[fd].mask; /* Merge old events */     // 根據mask映射epoll的事件類型     if (mask & AE_READABLE) ee.events |= EPOLLIN;   //讀事件     if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;  //寫事件     ee.data.fd = fd;    //設置事件所從屬的目標文件描述符     // 將ee事件注冊到epoll中     if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;     return 0; }
        登錄后復制

        aeApiPoll

        等待所監聽文件描述符上有事件發生,對應著底層的epoll_wait函數.

        // 等待所監聽文件描述符上有事件發生 static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {     aeApiState *state = eventLoop->apidata;     int retval, numevents = 0;     // 監聽事件表上是否有事件發生     retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,             tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);     // 至少有一個就緒的事件     if (retval > 0) {         int j;         numevents = retval;         // 遍歷就緒的事件表,將其加入到eventLoop的就緒事件表中         for (j = 0; j < numevents; j++) {             int mask = 0;             struct epoll_event *e = state->events+j;             // 根據就緒的事件類型,設置mask             if (e->events & EPOLLIN) mask |= AE_READABLE;             if (e->events & EPOLLOUT) mask |= AE_WRITABLE;             if (e->events & EPOLLERR) mask |= AE_WRITABLE;             if (e->events & EPOLLHUP) mask |= AE_WRITABLE;             // 添加到就緒事件表中             eventLoop->fired[j].fd = e->data.fd;             eventLoop->fired[j].mask = mask;         }     }     // 返回就緒的事件個數     return numevents; }
        登錄后復制

        3. 事件的實現

        事件的所有源碼都定義在ae.c源文件中,先從aeMain函數說起.

        // 事件輪詢的主函數 void aeMain(aeEventLoop *eventLoop) {     eventLoop->stop = 0;     // 一直處理事件     while (!eventLoop->stop) {         // 執行處理事件之前的函數         if (eventLoop->beforesleep != NULL)             eventLoop->beforesleep(eventLoop);         //處理到時的時間事件和就緒的文件事件         aeProcessEvents(eventLoop, AE_ALL_EVENTS);     } }
        登錄后復制

        可以看到,如果服務器一直處理事件,那么就是一個死循環,而一個最典型的事件驅動,就是一個死循環. 在循環中,程序會調用處理事件的函數aeProcessEvents(),它的參數是一個事件狀態結構aeEventLoopAE_ALL_EVENTS.

        事件類型的宏定義,在ae.h頭文件中:

        #define AE_FILE_EVENTS 1                                //文件事件 #define AE_TIME_EVENTS 2                                //時間事件 #define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)   //文件和時間事件 #define AE_DONT_WAIT 4
        登錄后復制

        // 處理到時的時間事件和就緒的文件事件 // 如果flags = 0,函數什么都不做,直接返回 // 如果flags設置了 AE_ALL_EVENTS ,則執行所有類型的事件 // 如果flags設置了 AE_FILE_EVENTS ,則執行文件事件 // 如果flags設置了 AE_TIME_EVENTS ,則執行時間事件 // 如果flags設置了 AE_DONT_WAIT ,那么函數處理完事件后直接返回,不阻塞等待 // 函數返回執行的事件個數 int aeProcessEvents(aeEventLoop *eventLoop, int flags) {     int processed = 0, numevents;     // 如果什么事件都沒有設置則直接返回     if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;     // 請注意,既然我們要處理時間事件,即使沒有要處理的文件事件,我們仍要調用select(),以便在下一次事件準備啟動之前進行休眠     // 當前還沒有要處理的文件事件,或者設置了時間事件但是沒有設置不阻塞標識     if (eventLoop->maxfd != -1 ||         ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {         int j;         aeTimeEvent *shortest = NULL;         struct timeval tv, *tvp;         // 如果設置了時間事件而沒有設置不阻塞標識         if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))             // 獲取最近到時的時間事件             shortest = aeSearchNearestTimer(eventLoop);         // 獲取到了最早到時的時間事件         if (shortest) {             long now_sec, now_ms;             // 獲取當前時間             aeGetTime(&now_sec, &now_ms);             tvp = &tv;             // 等待該時間事件到時所需要的時長             long long ms =                 (shortest->when_sec - now_sec)*1000 +                 shortest->when_ms - now_ms;             // 如果沒到時             if (ms > 0) {                 // 保存時長到tvp中                 tvp->tv_sec = ms/1000;                 tvp->tv_usec = (ms % 1000)*1000;             // 如果已經到時,則將tvp的時間設置為0             } else {                 tvp->tv_sec = 0;                 tvp->tv_usec = 0;             }         // 沒有獲取到了最早到時的時間事件,時間事件鏈表為空         } else {             // 如果設置了不阻塞標識             if (flags & AE_DONT_WAIT) {                 // 將tvp的時間設置為0,就不會阻塞                 tv.tv_sec = tv.tv_usec = 0;                 tvp = &tv;             } else {                 // 阻塞到第一個時間事件的到來                 /* Otherwise we can block */                 tvp = NULL; /* wait forever */             }         }         // 等待所監聽文件描述符上有事件發生         // 如果tvp為NULL,則阻塞在此,否則等待tvp設置阻塞的時間,就會有時間事件到時         // 返回了就緒文件事件的個數         numevents = aeApiPoll(eventLoop, tvp);         // 遍歷就緒文件事件表         for (j = 0; j < numevents; j++) {             // 獲取就緒文件事件的地址             aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];             // 獲取就緒文件事件的類型,文件描述符             int mask = eventLoop->fired[j].mask;             int fd = eventLoop->fired[j].fd;             int rfired = 0;             // 如果是文件可讀事件發生             if (fe->mask & mask & AE_READABLE) {                 // 設置讀事件標識 且 調用讀事件方法處理讀事件                 rfired = 1;                 fe->rfileProc(eventLoop,fd,fe->clientData,mask);             }             // 如果是文件可寫事件發生             if (fe->mask & mask & AE_WRITABLE) {                 // 讀寫事件的執行發法不同,則執行寫事件,避免重復執行相同的方法                 if (!rfired || fe->wfileProc != fe->rfileProc)                     fe->wfileProc(eventLoop,fd,fe->clientData,mask);             }             processed++;    //執行的事件次數加1         }     }     /* Check time events */     // 執行時間事件     if (flags & AE_TIME_EVENTS)         processed += processTimeEvents(eventLoop);     return processed; /* return the number of processed file/time events */ }
        登錄后復制

        Redis服務器在沒有被事件觸發時,如果沒有設置AE_DONT_WAIT標識,就會開始阻塞等待. 但是它不會死等待,因為還需要處理時間事件,所以在調用aeApiPoll進行監聽之前,會先從時間事件表中獲取一個最近到達的時間,根據需要等待的時間構建一個struct timeval tv, *tvp結構的變量,這個變量保存著服務器阻塞等待文件事件的最長時間,一旦時間到達而沒有觸發文件事件aeApiPoll函數就會停止阻塞,進而調用processTimeEvents函數處理時間事件.

        如果在阻塞等待的最長時間之間,觸發了文件事件,就會先執行文件事件,后執行時間事件,因此處理時間事件通常比預設的會晚一點.

        而執行文件事件rfileProcwfileProc也是調用了回調函數,Redis將文件事件的處理分為了好幾種,用于處理不同的網絡通信需求:

        • acceptTcpHandler:用于accept client的connect.
        • acceptUnixHandler:用于accept client的本地connect.
        • sendReplyToClient:用于向client發送命令回復.
        • readQueryFromClient:用于讀入client發送的請求.

        然后我們來看一下獲取最快達到時間事件的函數aeSearchNearestTimer實現:

        // 尋找第一個快到時的時間事件 // 這個操作是有用的知道有多少時間可以選擇該事件設置為不用推遲任何事件的睡眠中。 // 如果事件鏈表沒有時間將返回NULL。 static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) {     // 時間事件頭節點地址     aeTimeEvent *te = eventLoop->timeEventHead;     aeTimeEvent *nearest = NULL;     // 遍歷所有的時間事件     while(te) {         // 尋找第一個快到時的時間事件,保存到nearest中         if (!nearest || te->when_sec < nearest->when_sec ||                 (te->when_sec == nearest->when_sec &&                  te->when_ms < nearest->when_ms))             nearest = te;         te = te->next;     }     return nearest; }
        登錄后復制

        該函數就是遍歷時間事件鏈表,然后找到最小值.

        我們重點看執行時間事件的函數processTimeEvents函數的實現:

        // 執行時間事件 static int processTimeEvents(aeEventLoop *eventLoop) {     int processed = 0;     aeTimeEvent *te, *prev;     long long maxId;     time_t now = time(NULL);     // 這里嘗試發現時間混亂的情況,上一次處理事件的時間比當前時間還要大     // 重置最近一次處理事件的時間     if (now < eventLoop->lastTime) {         te = eventLoop->timeEventHead;         while(te) {             te->when_sec = 0;             te = te->next;         }     }     // 設置上一次時間事件處理的時間為當前時間     eventLoop->lastTime = now;     prev = NULL;     te = eventLoop->timeEventHead;     maxId = eventLoop->timeEventNextId-1;   //當前時間事件表中的最大ID     // 遍歷時間事件鏈表     while(te) {         long now_sec, now_ms;         long long id;         /* Remove events scheduled for deletion. */         // 如果時間事件已被刪除了         if (te->id == AE_DELETED_EVENT_ID) {             aeTimeEvent *next = te->next;             // 從事件鏈表中刪除事件的節點             if (prev == NULL)                 eventLoop->timeEventHead = te->next;             else                 prev->next = te->next;             // 調用時間事件終結方法清除該事件             if (te->finalizerProc)                 te->finalizerProc(eventLoop, te->clientData);             zfree(te);             te = next;             continue;         }         // 確保我們不處理在此迭代中由時間事件創建的時間事件. 請注意,此檢查目前無效:我們總是在頭節點添加新的計時器,但是如果我們更改實施細節,則該檢查可能會再次有用:我們將其保留在未來的防御         if (te->id > maxId) {             te = te->next;             continue;         }         // 獲取當前時間         aeGetTime(&now_sec, &now_ms);         // 找到已經到時的時間事件         if (now_sec > te->when_sec ||             (now_sec == te->when_sec && now_ms >= te->when_ms))         {             int retval;             id = te->id;             // 調用時間事件處理方法             retval = te->timeProc(eventLoop, id, te->clientData);             // 時間事件次數加1             processed++;             // 如果不是定時事件,則繼續設置它的到時時間             if (retval != AE_NOMORE) {                 aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);             // 如果是定時時間,則retval為-1,則將其時間事件刪除,惰性刪除             } else {                 te->id = AE_DELETED_EVENT_ID;             }         }         // 更新前驅節點指針和后繼節點指針         prev = te;         te = te->next;     }     return processed;   //返回執行事件的次數 }
        登錄后復制

        如果時間事件不存在,則就調用finalizerProc指向的回調函數,刪除當前的時間事件. 如果存在,就調用timeProc指向的回調函數處理時間事件. Redis的時間事件分為兩類:

        • 定時事件:讓一段程序在指定的時間后執行一次.
        • 周期性事件:讓一段程序每隔指定的時間后執行一次.

        如果當前的時間事件是周期性,那么就會在將時間周期添加到周期事件的到時時間中. 如果是定時事件,則將該時間事件刪除.

        推薦學習:Redis視頻教程

        贊(0)
        分享到: 更多 (0)
        網站地圖   滬ICP備18035694號-2    滬公網安備31011702889846號
        主站蜘蛛池模板: 亚洲国产精品ⅴa在线观看| 日韩精品无码一区二区三区| 无码AⅤ精品一区二区三区| 国产精品久久久久久搜索| 香港aa三级久久三级老师2021国产三级精品三级在 | 亚洲国产精品一区二区九九| 日本精品在线视频| 无码精品国产VA在线观看 | 久久婷婷国产综合精品| 九九精品在线视频| 亚洲精品免费视频| 成人国产精品免费视频| 亚洲精品国产电影| jizz国产精品| 亚洲欧美日韩精品永久在线| 国产精品高清一区二区人妖| 久久精品国产第一区二区三区| 永久无码精品三区在线4| 亚洲国产精品第一区二区三区| 国产在线观看一区精品| 国产精品免费久久久久久久久 | 欧美日韩精品久久久免费观看| 国产亚洲精品不卡在线| 国产精品国产三级国产AⅤ| 91精品国产综合久久四虎久久无码一级| 精品国产_亚洲人成在线高清| 国产成人精品一区二区秒拍| 久久夜色精品国产噜噜麻豆| 亚洲日韩精品射精日| 亚洲精品成人a在线观看| 久久久久无码精品国产app| 国内精品伊人久久久久网站| 国产精品亚洲欧美大片在线看| 国产精品va久久久久久久| 97精品国产高清自在线看超| 中文字幕一区二区精品区| 3级黄性日本午夜精品| 99久久婷婷国产综合精品草原| 国产精品1024视频| 国产三级精品三级在专区| 精品乱子伦一区二区三区|