[toc]

redis基本知识

redis对象robj

robj 定义

redis是一个关系型数据库,key只能是string,value可以是字符串,列表,集合,有序集合,哈希,stream,这些数据类型用结构体robj表示. redis的对象定义如下

typedef struct redisObject {
    unsigned type:4;
    unsigned encoding:4;
    unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
                            * LFU data (least significant 8 bits frequency
                            * and most significant 16 bits access time). */
    int refcount;
    void *ptr;
} robj;

robj type定义

robj中type表示类型,定义如下:

/* A redis object, that is a type able to hold a string / list / set */

/* The actual Redis Object */
#define OBJ_STRING 0    /* String object. */
#define OBJ_LIST 1      /* List object. */
#define OBJ_SET 2       /* Set object. */
#define OBJ_ZSET 3      /* Sorted set object. */
#define OBJ_HASH 4      /* Hash object. */

/* The "module" object type is a special one that signals that the object
 * is one directly managed by a Redis module. In this case the value points
 * to a moduleValue struct, which contains the object value (which is only
 * handled by the module itself) and the RedisModuleType struct which lists
 * function pointers in order to serialize, deserialize, AOF-rewrite and
 * free the object.
 *
 * Inside the RDB file, module types are encoded as OBJ_MODULE followed
 * by a 64 bit module type ID, which has a 54 bits module-specific signature
 * in order to dispatch the loading to the right module, plus a 10 bits
 * encoding version. */
#define OBJ_MODULE 5    /* Module object. */

/*REDIS5新增类型*/
#define OBJ_STREAM 6    /* Stream object.  */

robj encoding定义

robj某一种类型的的对象,其采用的数据结构可能不同,结构体的encoding表示当前的对象正在使用哪种数据结构,定义如下:

/* Objects encoding. Some kind of objects like Strings and Hashes can be
 * internally represented in multiple ways. The 'encoding' field of the object
 * is set to one of this fields for this object. */
#define OBJ_ENCODING_RAW 0     /* Raw representation */
#define OBJ_ENCODING_INT 1     /* Encoded as integer */
#define OBJ_ENCODING_HT 2      /* Encoded as hash table */
/*OBJ_ENCODING_ZIPMAP  和 OBJ_ENCODING_LINKEDLIST  已不再使用 */
#define OBJ_ENCODING_ZIPMAP 3  /* Encoded as zipmap */
#define OBJ_ENCODING_LINKEDLIST 4 /* No longer used: old list encoding. */

#define OBJ_ENCODING_ZIPLIST 5 /* Encoded as ziplist */
#define OBJ_ENCODING_INTSET 6  /* Encoded as intset */
#define OBJ_ENCODING_SKIPLIST 7  /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8  /* Embedded sds string encoding */
#define OBJ_ENCODING_QUICKLIST 9 /* Encoded as linked list of ziplists */

/*redis5新增类型*/
#define OBJ_ENCODING_STREAM 10 /* Encoded as a radix tree of listpacks  */

redis客户端

定义如下:

typedef struct client {
    uint64_t id;            /* Client incremental unique ID. */
    int fd;                 /* Client socket. */
    redisDb *db;            /* Pointer to currently SELECTed DB. */
    robj *name;             /* As set by CLIENT SETNAME. */
    sds querybuf;           /* Buffer we use to accumulate client queries. */
    size_t qb_pos;          /* The position we have read in querybuf. */
    sds pending_querybuf;   /* If this client is flagged as master, this buffer
                               represents the yet not applied portion of the
                               replication stream that we are receiving from
                               the master. */
    size_t querybuf_peak;   /* Recent (100ms or more) peak of querybuf size. */
    int argc;               /* Num of arguments of current command. */
    robj **argv;            /* Arguments of current command. */
    struct redisCommand *cmd, *lastcmd;  /* Last command executed. */
    int reqtype;            /* Request protocol type: PROTO_REQ_* */
    int multibulklen;       /* Number of multi bulk arguments left to read. */
    long bulklen;           /* Length of bulk argument in multi bulk request. */
    list *reply;            /* List of reply objects to send to the client. */
    unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
    size_t sentlen;         /* Amount of bytes already sent in the current
                               buffer or object being sent. */
    time_t ctime;           /* Client creation time. */
    time_t lastinteraction; /* Time of the last interaction, used for timeout */
    time_t obuf_soft_limit_reached_time;
    int flags;              /* Client flags: CLIENT_* macros. */
    int authenticated;      /* When requirepass is non-NULL. */
    int replstate;          /* Replication state if this is a slave. */
    int repl_put_online_on_ack; /* Install slave write handler on ACK. */
    int repldbfd;           /* Replication DB file descriptor. */
    off_t repldboff;        /* Replication DB file offset. */
    off_t repldbsize;       /* Replication DB file size. */
    sds replpreamble;       /* Replication DB preamble. */
    long long read_reploff; /* Read replication offset if this is a master. */
    long long reploff;      /* Applied replication offset if this is a master. */
    long long repl_ack_off; /* Replication ack offset, if this is a slave. */
    long long repl_ack_time;/* Replication ack time, if this is a slave. */
    long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
                                       copying this slave output buffer
                                       should use. */
    char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
    int slave_listening_port; /* As configured with: SLAVECONF listening-port */
    char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */
    int slave_capa;         /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
    multiState mstate;      /* MULTI/EXEC state */
    int btype;              /* Type of blocking op if CLIENT_BLOCKED. */
    blockingState bpop;     /* blocking state */
    long long woff;         /* Last write global replication offset. */
    list *watched_keys;     /* Keys WATCHED for MULTI/EXEC CAS */
    dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
    list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */
    sds peerid;             /* Cached peer ID. */
    listNode *client_list_node; /* list node in client list */

    /* Response buffer */
    int bufpos;
    char buf[PROTO_REPLY_CHUNK_BYTES];
} client;

redis server端

定义如下:

struct redisServer {
    /* General */
    pid_t pid;                  /* Main process pid. */
    char *configfile;           /* Absolute config file path, or NULL */
   ...
    redisDb *db;
    dict *commands;             /* Command table */
    dict *orig_commands;        /* Command table before command renaming. */
    aeEventLoop *el;
    ...
    /* Networking */
    int port;                   /* TCP listening port */
    ...
    /* Fast pointers to often looked up command */
    struct redisCommand *delCommand, *multiCommand, *lpushCommand,
                        *lpopCommand, *rpopCommand, *zpopminCommand,
                        *zpopmaxCommand, *sremCommand, *execCommand,
                        *expireCommand, *pexpireCommand, *xclaimCommand,
                        *xgroupCommand;
    /* Fields used only for stats */
   ...
};

事件

redis采用Reactor事件模型

  • Handles:表示操作系统管理的资源,我们能够理解为fd。

  • Synchronous Event Demultiplexer :同步事件分离器。堵塞等待Handles中的事件发生。

  • Initiation Dispatcher :初始分派器,作用为加入Event handler(事件处理器)、删除Event handler以及分派事件给Event handler。

    也就是说,Synchronous Event Demultiplexer负责等待新事件发生,事件发生时通知Initiation Dispatcher,然后Initiation Dispatcher调用event handler处理事件。

  • Event Handler :事件处理器的接口

  • Concrete Event Handler :事件处理器的实际实现,并且绑定了一个Handle。由于在实际情况中,我们往往不止一种事件处理器,因此这里将事件处理器接口和实现分开,与C++、Java这些高级语言中的多态类似。

以上各子模块间协作的步骤描写叙述例如以下:

  1. 我们注册Concrete Event Handler到Initiation Dispatcher中。

  2. Initiation Dispatcher调用每一个Event Handler的get_handle接口获取其绑定的Handle。

  3. Initiation Dispatcher调用handle_events開始事件处理循环。在这里,Initiation Dispatcher会将步骤2获取的全部Handle都收集起来,使用Synchronous Event Demultiplexer来等待这些Handle的事件发生。

  4. 当某个(或某几个)Handle的事件发生时,Synchronous Event Demultiplexer通知Initiation Dispatcher。

  5. Initiation Dispatcher依据发生事件的Handle找出所相应的Handler。

  6. Initiation Dispatcher调用Handler的handle_event方法处理事件。

时序图例如以下:

reactor mode 4

以上内容 转自:

https://www.cnblogs.com/gavanwanggw/p/7259473.html

事件循环

redis分为文件事件和时间事件,两种事件都在结构体aeEventLoop里有封装。

aeEventLoop结构如下:

/* State of an event based program */
typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */

    /*文件事件数组,存储已注册的文件事件*/
    aeFileEvent *events; /* Registered events */

    /*存储已经被触发的文件事件*/
    aeFiredEvent *fired; /* Fired events */

    /*指向时间事件链表的头节点的指针*/
    aeTimeEvent *timeEventHead;

    /*标记事件循环是否已结束*/
    int stop;

    /*指向对应的IO多路复用的API的结构体对象,这里只研究了epoll*/
    void *apidata; /* This is used for polling API specific data */

    /*进程阻塞之前会调用*/
    aeBeforeSleepProc *beforesleep;
    /*进程被唤醒后调用*/
    aeBeforeSleepProc *aftersleep;
} aeEventLoop;

事件循环代码如下:

/*文件标志位*/
#define AE_FILE_EVENTS 1
#define AE_TIME_EVENTS 2
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
#define AE_DONT_WAIT 4
#define AE_CALL_AFTER_SLEEP 8
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);

        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}
src/ae.c:412 
aeProcessEvents

/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
    eventLoop->aftersleep(eventLoop);

在循环时传入AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP标志位,可以看出在事件循环时会处理时间事件,文件事件和触发afterSleep函数。

文件事件

redis客户端通过socket与服务端交互,所以这里的文件事件就是socket的读写事件,上面已经说了redis采用了IO多路复用,这里简单的说下epoll.

epoll

epoll的相关API只有三个:

  • int epoll_create(int size) size表示需要注册的连接数,但是高版本的linux采用了动态分配,会忽略此参数,只区分了是否大于0,但是为了兼容可能存在的低版本服务器,一般还是会填上默认值,redis里的默认值为1024。

  • int epoll_ctl (int epfd, int op, int fd, struct epoll_event *event) epoll_ctl参数

  • int epoll_wait(int epfd, struct epoll_event events, int maxevents, int timeout); 等待事件的产生,类似于select()调用。参数events用来从内核得到事件的集合,maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。该函数返回*需要处理的事件数目,如返回0表示已超时。

epoll_ctl参数:

  1. epfd : 函数 epoll_create 执行成功后返回的fd。
  2. op: 需要进行的操作

    /* Valid opcodes ( "op" parameter ) to issue to epoll_ctl().  */
    /*注册事件*/
    #define EPOLL_CTL_ADD 1    /* Add a file descriptor to the interface.  */
    /*删除事件*/
    #define EPOLL_CTL_DEL 2    /* Remove a file descriptor from the interface.  */
    /*修改事件*/
    #define EPOLL_CTL_MOD 3    /* Change file descriptor epoll_event structure.  */
    
    1. fd : 文件描述符

    2. event:需要监控的事件,比较常用的为EPOLLIN ,EPOLLOUT events可以是以下几个宏的集合:
      EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);
      EPOLLOUT:表示对应的文件描述符可以写; EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来); EPOLLERR:表示对应的文件描述符发生错误; EPOLLHUP:表示对应的文件描述符被挂断;
      EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
      EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里

redis ae模块的封装

static int aeApiCreate(aeEventLoop *eventLoop) 
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) 
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) 
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask) 
static int aeApiResize(aeEventLoop *eventLoop, int setsize)

aeApiCreate就是对epoll_create的封装,aeApiAddEvent,aeApiDelEvent是对epoll_ctl的封装,aeApiPoll是对epoll_wait的封装

  • eventLoop: 事件循环,结构上面已有描述
  • mask or delmask : 添加或者删除的文件类型,详见mask定义
  • fd:文件描述符
  • tvp:文件事件的阻塞超时时间

mask定义:

//没有注册事件
#define AE_NONE 0       /* No events registered. */
//可读事件
#define AE_READABLE 1   /* Fire when descriptor is readable. */
//可写事件
#define AE_WRITABLE 2   /* Fire when descriptor is writable. */
/*暂时还不知道做什么用,全局搜索了一下只有在cluster.c里面有调用,
  应该是在redis集群的时候会用到,待研究*/
#define AE_BARRIER 4    /* With WRITABLE, never fire the event if the
                           READABLE event already fired in the same event
                           loop iteration. Useful when you want to persist
                           things to disk before sending replies, and want
                           to do that in a group fashion. */

分析一下aeApiPool函数:

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    //阻塞等待
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        //遍历所有的事件
        for (j = 0; j < numevents; j++) {
            int mask = 0;

            //每个eopll事件的地址
            struct epoll_event *e = state->events+j;

            //将epoll事件转换为redis自己定义的事件
            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;

            //将已经发生的事件
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

aeApiPoll会将获取到的事件保存在eventLoop里的fired数组里,然后再去遍历处理这些事件.

int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
        int processed = 0, numevents;

        ...

        //获取所有事件
        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j++) {
            //遍历fired数组里的事件并处理
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            ...

            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

时间事件

server启动

命令处理

调试

使用了jetbrains家的clion做调试工具,不得不说,真香. 基于cmake,详见 调试工具

附录

results matching ""

    No results matching ""