Home  >  Article  >  Database  >  redis源代码分析16–阻塞式命令

redis源代码分析16–阻塞式命令

WBOY
WBOYOriginal
2016-06-07 16:26:131447browse

redis现在只支持对list的阻塞式操作,相关的两个命令是brpop和blpop。 这两个命令在list中有元素时,跟普通的pop没有区别,弹出list的一个元素,然后返回。但在list没有元素时,会为redisClient设置REDIS_BLOCKED标志,然后client阻塞(设置REDIS_BLOCKED标

redis现在只支持对list的阻塞式操作,相关的两个命令是brpop和blpop。

这两个命令在list中有元素时,跟普通的pop没有区别,弹出list的一个元素,然后返回。但在list没有元素时,会为redisClient设置REDIS_BLOCKED标志,然后client阻塞(设置REDIS_BLOCKED标志的redisClient会一直阻塞,参考命令处理章节),一直到新元素加入时(push操作的处理函数pushGenericCommand),才会返回。

这两个命令设置的处理函数brpopCommand和blpopCommand都会调用blockingPopGenericCommand。该函数在检查list中有元素后,会调用非阻塞的popGenericCommand来弹出一个元素,否则调用blockForKeys来处理阻塞的情况。

/* Blocking RPOP/LPOP */
static void blockingPopGenericCommand(redisClient *c, int where) {
    robj *o;
    long long lltimeout;
    time_t timeout;
    int j;
    /* Make sure timeout is an integer value */
    if (getLongLongFromObjectOrReply(c,c->argv[c->argc-1],&lltimeout,
            "timeout is not an integer") != REDIS_OK) return;
    /* Make sure the timeout is not negative */
    if (lltimeout argc-1; j++) {
        o = lookupKeyWrite(c->db,c->argv[j]);
        if (o != NULL) {
            if (o->type != REDIS_LIST) {
                addReply(c,shared.wrongtypeerr);
                return;
            } else {
                list *list = o->ptr;
                if (listLength(list) != 0) {
                    /* If the list contains elements fall back to the usual
                     * non-blocking POP operation */
                    robj *argv[2], **orig_argv;
                    int orig_argc;
                    /* We need to alter the command arguments before to call
                     * popGenericCommand() as the command takes a single key. */
                    orig_argv = c->argv;
                    orig_argc = c->argc;
                    argv[1] = c->argv[j];
                    c->argv = argv;
                    c->argc = 2;
                    /* Also the return value is different, we need to output
                     * the multi bulk reply header and the key name. The
                     * "real" command will add the last element (the value)
                     * for us. If this souds like an hack to you it's just
                     * because it is... */
                    addReplySds(c,sdsnew("*2\r\n"));
                    addReplyBulk(c,argv[1]);
                    popGenericCommand(c,where);
                    /* Fix the client structure with the original stuff */
                    c->argv = orig_argv;
                    c->argc = orig_argc;
                    return;
                }
            }
        }
    }
    /* If we are inside a MULTI/EXEC and the list is empty the only thing
     * we can do is treating it as a timeout (even with timeout 0). */
    if (c->flags & REDIS_MULTI) {
        addReply(c,shared.nullmultibulk);
        return;
    }
    /* If the list is empty or the key does not exists we must block */
    timeout = lltimeout;
    if (timeout > 0) timeout += time(NULL);
    blockForKeys(c,c->argv+1,c->argc-2,timeout);
}

blockForKeys会在db->blockingkeys记下client和等待的key的对应关系,然后给client设置REDIS_BLOCKED标志,这样client就一直阻塞了。

static void blockForKeys(redisClient *c, robj **keys, int numkeys, time_t timeout) {
    dictEntry *de;
    list *l;
    int j;
    ---
    if (c->fd blockingkeys = zmalloc(sizeof(robj*)*numkeys);
    c->blockingkeysnum = numkeys;
    c->blockingto = timeout;
    for (j = 0; j  keys */
        c->blockingkeys[j] = keys[j];
        incrRefCount(keys[j]);
        /* And in the other "side", to map keys -> clients */
        de = dictFind(c->db->blockingkeys,keys[j]);
        if (de == NULL) {
            int retval;
            /* For every key we take a list of clients blocked for it */
            l = listCreate();
            retval = dictAdd(c->db->blockingkeys,keys[j],l);
            incrRefCount(keys[j]);
            assert(retval == DICT_OK);
        } else {
            l = dictGetEntryVal(de);
        }
        listAddNodeTail(l,c);
    }
    /* Mark the client as a blocked client */
    c->flags |= REDIS_BLOCKED;
    server.blpop_blocked_clients++;
}

等待的client会一直阻塞,直到有push操作,此时会调用unblockClientWaitingData来解除client的阻塞。

/* Unblock a client that's waiting in a blocking operation such as BLPOP */
// 减少对所阻塞对象的引用
static void unblockClientWaitingData(redisClient *c) {
    dictEntry *de;
    list *l;
    int j;
    assert(c->blockingkeys != NULL);
    /* The client may wait for multiple keys, so unblock it for every key. */
    for (j = 0; j blockingkeysnum; j++) {
        /* Remove this client from the list of clients waiting for this key. */
        de = dictFind(c->db->blockingkeys,c->blockingkeys[j]);
        assert(de != NULL);
        l = dictGetEntryVal(de);
        listDelNode(l,listSearchKey(l,c));
        /* If the list is empty we need to remove it to avoid wasting memory */
        if (listLength(l) == 0)
            dictDelete(c->db->blockingkeys,c->blockingkeys[j]);
        decrRefCount(c->blockingkeys[j]);
    }
    /* Cleanup the client structure */
    zfree(c->blockingkeys);
    c->blockingkeys = NULL;
    c->flags &= (~REDIS_BLOCKED);
    server.blpop_blocked_clients--;
    /* We want to process data if there is some command waiting
     * in the input buffer. Note that this is safe even if
     * unblockClientWaitingData() gets called from freeClient() because
     * freeClient() will be smart enough to call this function
     * *after* c->querybuf was set to NULL. */
    if (c->querybuf && sdslen(c->querybuf) > 0) processInputBuffer(c);
}
Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Previous article:ChukWa入门1Next article:Hadoop Hama项目–BSP模型的实现