>데이터 베이스 >MySQL 튜토리얼 >redis源代码分析24–VM(中)

redis源代码分析24–VM(中)

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB원래의
2016-06-07 16:26:181246검색

VM根据value换进换出的策略又有两种使用方式:阻塞方式和多线程方式(server.vm_max_threads == 0为阻塞方式)。 这一节主要介绍阻塞方式。 redis 启动重建db(aof方式或者快照方式)时,可能会因为内存限制将某些value换出到磁盘,此时只使用阻塞方式换出 v

VM根据value换进换出的策略又有两种使用方式:阻塞方式和多线程方式(server.vm_max_threads == 0为阻塞方式)。

这一节主要介绍阻塞方式。

redis 启动重建db(aof方式或者快照方式)时,可能会因为内存限制将某些value换出到磁盘,此时只使用阻塞方式换出 value(vmSwapOneObjectBlocking函数)。除此之外,redis只在serverCron函数(该函数事件处理章节分析过)中换出value。我们来看看serverCron中的处理代码,阻塞方式使用函数vmSwapOneObjectBlocking换出value,多线程方式使用函数vmSwapOneObjectThreaded换出value。

static int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
    ---
    /* Swap a few keys on disk if we are over the memory limit and VM
     * is enbled. Try to free objects from the free list first. */
    if (vmCanSwapOut()) {
        while (server.vm_enabled && zmalloc_used_memory() >
                server.vm_max_memory)
        {
            ---
            if (tryFreeOneObjectFromFreelist() == REDIS_OK) continue;
            retval = (server.vm_max_threads == 0) ?
                        vmSwapOneObjectBlocking() :
                        vmSwapOneObjectThreaded();
            ---
        }
    }
    ---
    return 100;
}

无论是阻塞方式vmSwapOneObjectBlocking换出value,还是多线程方式vmSwapOneObjectThreaded换出value,最终都调用vmSwapOneObject(调用参数不一样)来换出value。

vmSwapOneObject会对每个db,随机选择5项,计算它的swappability,然后如果是多线程方式,则调用vmSwapObjectThreaded来换出value,否则使用vmSwapObjectBlocking换出value。

static int vmSwapOneObject(int usethreads) {
    int j, i;
    struct dictEntry *best = NULL;
    double best_swappability = 0;
    redisDb *best_db = NULL;
    robj *key, *val;
    for (j = 0; j dict) == 0) continue;
        for (i = 0; i dict);
            key = dictGetEntryKey(de);
            val = dictGetEntryVal(de);
            /* Only swap objects that are currently in memory.
             *
             * Also don't swap shared objects if threaded VM is on, as we
             * try to ensure that the main thread does not touch the
             * object while the I/O thread is using it, but we can't
             * control other keys without adding additional mutex. */
            if (key->storage != REDIS_VM_MEMORY ||
                (server.vm_max_threads != 0 && val->refcount != 1)) {
                if (maxtries) i--; /* don't count this try */
                continue;
            }
            val->vm.atime = key->vm.atime; /* atime is updated on key object */
            swappability = computeObjectSwappability(val);
            if (!best || swappability > best_swappability) {
                best = de;
                best_swappability = swappability;
                best_db = db;
            }
        }
    }
    if (best == NULL) return REDIS_ERR;
    key = dictGetEntryKey(best);
    val = dictGetEntryVal(best);
    redisLog(REDIS_DEBUG,"Key with best swappability: %s, %f",
        key->ptr, best_swappability);
    /* Unshare the key if needed */
    if (key->refcount > 1) {
        robj *newkey = dupStringObject(key);
        decrRefCount(key);
        key = dictGetEntryKey(best) = newkey;
    }
    /* Swap it */
    if (usethreads) {
        vmSwapObjectThreaded(key,val,best_db);
        return REDIS_OK;
    } else {
        if (vmSwapObjectBlocking(key,val) == REDIS_OK) {
            dictGetEntryVal(best) = NULL;
            return REDIS_OK;
        } else {
            return REDIS_ERR;
        }
    }
}

vmSwapObjectBlocking会在计算所需的交换页后,阻塞性的将value写到vm文件中(函数vmWriteObjectOnSwap),最后标记相应vm页为已使用。

static int vmSwapObjectBlocking(robj *key, robj *val) {
    off_t pages = rdbSavedObjectPages(val,NULL);
    off_t page;
    assert(key->storage == REDIS_VM_MEMORY);
    assert(key->refcount == 1);
    if (vmFindContiguousPages(&page,pages) == REDIS_ERR) return REDIS_ERR;
    if (vmWriteObjectOnSwap(val,page) == REDIS_ERR) return REDIS_ERR;
    key->vm.page = page;
    key->vm.usedpages = pages;
    key->storage = REDIS_VM_SWAPPED;
    key->vtype = val->type;
    decrRefCount(val); /* Deallocate the object from memory. */
    vmMarkPagesUsed(page,pages);
    redisLog(REDIS_DEBUG,"VM: object %s swapped out at %lld (%lld pages)",
        (unsigned char*) key->ptr,
        (unsigned long long) page, (unsigned long long) pages);
    server.vm_stats_swapped_objects++;
    server.vm_stats_swapouts++;
    return REDIS_OK;
}

对于value的加载,如果是多线程方式,会使用blockClientOnSwappedKeys提前加载,但阻塞方式则只有到相应命令执行时才会加载。最终无论是阻塞方式还是多线程方式,都会调用lookupKey来查找key是否在内存中,若不在,则使用vmLoadObject加载value,该函数是阻塞式的读入value。

static robj *lookupKey(redisDb *db, robj *key) {
    dictEntry *de = dictFind(db->dict,key);
    if (de) {
        robj *key = dictGetEntryKey(de);
        robj *val = dictGetEntryVal(de);
        if (server.vm_enabled) {
            if (key->storage == REDIS_VM_MEMORY ||
                key->storage == REDIS_VM_SWAPPING)
            {
                /* If we were swapping the object out, stop it, this key
                 * was requested. */
                if (key->storage == REDIS_VM_SWAPPING)
                    vmCancelThreadedIOJob(key);
                /* Update the access time of the key for the aging algorithm. */
                key->vm.atime = server.unixtime;
            } else {
                int notify = (key->storage == REDIS_VM_LOADING);
                /* Our value was swapped on disk. Bring it at home. */
                redisAssert(val == NULL);
                val = vmLoadObject(key);
                dictGetEntryVal(de) = val;
                /* Clients blocked by the VM subsystem may be waiting for
                 * this key... */
                if (notify) handleClientsBlockedOnSwappedKey(db,key);
            }
        }
        return val;
    } else {
        return NULL;
    }
}
성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.