search
HomeDatabaseMysql Tutorialredis源代码分析25–VM(下)

redis源代码分析25–VM(下)

Jun 07, 2016 pm 04:26 PM
redisintroduceanalyzesource codethread

这一节介绍下redis中的多线程机制。 先看看多线程换出的机制。 serverCron函数中调用 vmSwapOneObjectThreaded开始多线程方式换出value,vmSwapOneObjectThreaded会调用 vmSwapOneObject(参看上一节的解释),而vmSwapOneObject最终会调用vmSwapObjectThrea

这一节介绍下redis中的多线程机制。

先看看多线程换出的机制。

serverCron函数中调用 vmSwapOneObjectThreaded开始多线程方式换出value,vmSwapOneObjectThreaded会调用 vmSwapOneObject(参看上一节的解释),而vmSwapOneObject最终会调用vmSwapObjectThreaded。

static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) {
    iojob *j;
    assert(key->storage == REDIS_VM_MEMORY);
    assert(key->refcount == 1);
    j = zmalloc(sizeof(*j));
    j->type = REDIS_IOJOB_PREPARE_SWAP;
    j->db = db;
    j->key = key;
    j->val = val;
    incrRefCount(val);
    j->canceled = 0;
    j->thread = (pthread_t) -1;
    key->storage = REDIS_VM_SWAPPING;
    lockThreadedIO();
    queueIOJob(j);
    unlockThreadedIO();
    return REDIS_OK;
}

vmSwapObjectThreaded 会创建一个类型为REDIS_IOJOB_PREPARE_SWAP的job,然后使用queueIOJob来排队。而queueIOJob所做的主要工作就是就是将新job加入到server.io_newjobs,并在创建的线程数还没超过配置值时,创建新的线程。

/* This function must be called while with threaded IO locked */
static void queueIOJob(iojob *j) {
    redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
        (void*)j, j->type, (char*)j->key->ptr);
    listAddNodeTail(server.io_newjobs,j);
    if (server.io_active_threads 
<p>从spawnIOThread中可以知道,新线程的入口点是IOThreadEntryPoint。</p>
<pre class="brush:php;toolbar:false">
static void spawnIOThread(void) {
    pthread_t thread;
    sigset_t mask, omask;
    int err;
    sigemptyset(&mask);
    sigaddset(&mask,SIGCHLD);
    sigaddset(&mask,SIGHUP);
    sigaddset(&mask,SIGPIPE);
    pthread_sigmask(SIG_SETMASK, &mask, &omask);
    while ((err = pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL)) != 0) {
        redisLog(REDIS_WARNING,"Unable to spawn an I/O thread: %s",
            strerror(err));
        usleep(1000000);
    }
    pthread_sigmask(SIG_SETMASK, &omask, NULL);
    server.io_active_threads++;
}

IOThreadEntryPoint会将io_newjobs中的job移入server.io_processing,然后在做完job类型的工作后(加载value/计算value所需交换页数/换出value),将job从server.io_processing移入io_processed中。然后往 server.io_ready_pipe_write所在的管道(io_ready_pipe_read、io_ready_pipe_write组成管道的两端)写入一个字节,让睡眠中的vmThreadedIOCompletedJob继续运行,该函数会做些后续工作。

static void *IOThreadEntryPoint(void *arg) {
    iojob *j;
    listNode *ln;
    REDIS_NOTUSED(arg);
    pthread_detach(pthread_self());
    while(1) {
        /* Get a new job to process */
        lockThreadedIO();
        if (listLength(server.io_newjobs) == 0) {
            /* No new jobs in queue, exit. */
            redisLog(REDIS_DEBUG,"Thread %ld exiting, nothing to do",
                (long) pthread_self());
            server.io_active_threads--;
            unlockThreadedIO();
            return NULL;
        }
        ln = listFirst(server.io_newjobs);
        j = ln->value;
        listDelNode(server.io_newjobs,ln);
        /* Add the job in the processing queue */
        j->thread = pthread_self();
        listAddNodeTail(server.io_processing,j);
        ln = listLast(server.io_processing); /* We use ln later to remove it */
        unlockThreadedIO();
        redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'",
            (long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);
        /* Process the Job */
        if (j->type == REDIS_IOJOB_LOAD) {
            j->val = vmReadObjectFromSwap(j->page,j->key->vtype);
        } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
            FILE *fp = fopen("/dev/null","w+");
            j->pages = rdbSavedObjectPages(j->val,fp);
            fclose(fp);
        } else if (j->type == REDIS_IOJOB_DO_SWAP) {
            if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
                j->canceled = 1;
        }
        /* Done: insert the job into the processed queue */
        redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)",
            (long) pthread_self(), (void*)j, (char*)j->key->ptr);
        lockThreadedIO();
        listDelNode(server.io_processing,ln);
        listAddNodeTail(server.io_processed,j);
        unlockThreadedIO();
        /* Signal the main thread there is new stuff to process */
        assert(write(server.io_ready_pipe_write,"x",1) == 1);
    }
    return NULL; /* never reached */
}
static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
            int mask)
{
    char buf[1];
    int retval, processed = 0, toprocess = -1, trytoswap = 1;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);
    if (privdata != NULL) trytoswap = 0; /* check the comments above... */
    /* For every byte we read in the read side of the pipe, there is one
     * I/O job completed to process. */
    while((retval = read(fd,buf,1)) == 1) {
        iojob *j;
        listNode *ln;
        robj *key;
        struct dictEntry *de;
        redisLog(REDIS_DEBUG,"Processing I/O completed job");
        /* Get the processed element (the oldest one) */
        lockThreadedIO();
        assert(listLength(server.io_processed) != 0);
        if (toprocess == -1) {
            toprocess = (listLength(server.io_processed)*REDIS_MAX_COMPLETED_JOBS_PROCESSED)/100;
            if (toprocess value;
        listDelNode(server.io_processed,ln);
        unlockThreadedIO();
        /* If this job is marked as canceled, just ignore it */
        if (j->canceled) {
            freeIOJob(j);
            continue;
        }
        /* Post process it in the main thread, as there are things we
         * can do just here to avoid race conditions and/or invasive locks */
        redisLog(REDIS_DEBUG,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount);
        de = dictFind(j->db->dict,j->key);
        assert(de != NULL);
        key = dictGetEntryKey(de);
        if (j->type == REDIS_IOJOB_LOAD) {
            redisDb *db;
            /* Key loaded, bring it at home */
            key->storage = REDIS_VM_MEMORY;
            key->vm.atime = server.unixtime;
            vmMarkPagesFree(key->vm.page,key->vm.usedpages);
            redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)",
                (unsigned char*) key->ptr);
            server.vm_stats_swapped_objects--;
            server.vm_stats_swapins++;
            dictGetEntryVal(de) = j->val;
            incrRefCount(j->val);
            db = j->db;
            freeIOJob(j);
            /* Handle clients waiting for this key to be loaded. */
            handleClientsBlockedOnSwappedKey(db,key);
        } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
            /* Now we know the amount of pages required to swap this object.
             * Let's find some space for it, and queue this task again
             * rebranded as REDIS_IOJOB_DO_SWAP. */
            if (!vmCanSwapOut() ||
                vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR)
            {
                /* Ooops... no space or we can't swap as there is
                 * a fork()ed Redis trying to save stuff on disk. */
                freeIOJob(j);
                key->storage = REDIS_VM_MEMORY; /* undo operation */
            } else {
                /* Note that we need to mark this pages as used now,
                 * if the job will be canceled, we'll mark them as freed
                 * again. */
                vmMarkPagesUsed(j->page,j->pages);
                j->type = REDIS_IOJOB_DO_SWAP;
                lockThreadedIO();
                queueIOJob(j);
                unlockThreadedIO();
            }
        } else if (j->type == REDIS_IOJOB_DO_SWAP) {
            robj *val;
            /* Key swapped. We can finally free some memory. */
            if (key->storage != REDIS_VM_SWAPPING) {
                printf("key->storage: %d\n",key->storage);
                printf("key->name: %s\n",(char*)key->ptr);
                printf("key->refcount: %d\n",key->refcount);
                printf("val: %p\n",(void*)j->val);
                printf("val->type: %d\n",j->val->type);
                printf("val->ptr: %s\n",(char*)j->val->ptr);
            }
            redisAssert(key->storage == REDIS_VM_SWAPPING);
            val = dictGetEntryVal(de);
            key->vm.page = j->page;
            key->vm.usedpages = j->pages;
            key->storage = REDIS_VM_SWAPPED;
            key->vtype = j->val->type;
            decrRefCount(val); /* Deallocate the object from memory. */
            dictGetEntryVal(de) = NULL;
            redisLog(REDIS_DEBUG,
                "VM: object %s swapped out at %lld (%lld pages) (threaded)",
                (unsigned char*) key->ptr,
                (unsigned long long) j->page, (unsigned long long) j->pages);
            server.vm_stats_swapped_objects++;
            server.vm_stats_swapouts++;
            freeIOJob(j);
            /* Put a few more swap requests in queue if we are still
             * out of memory */
            if (trytoswap && vmCanSwapOut() &&
                zmalloc_used_memory() > server.vm_max_memory)
            {
                int more = 1;
                while(more) {
                    lockThreadedIO();
                    more = listLength(server.io_newjobs) 
    <p class="copyright">
        原文地址:redis源代码分析25–VM(下), 感谢原作者分享。
    </p>
    
    


Statement
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
What Are the Limitations of Using Views in MySQL?What Are the Limitations of Using Views in MySQL?May 14, 2025 am 12:10 AM

MySQLviewshavelimitations:1)Theydon'tsupportallSQLoperations,restrictingdatamanipulationthroughviewswithjoinsorsubqueries.2)Theycanimpactperformance,especiallywithcomplexqueriesorlargedatasets.3)Viewsdon'tstoredata,potentiallyleadingtooutdatedinforma

Securing Your MySQL Database: Adding Users and Granting PrivilegesSecuring Your MySQL Database: Adding Users and Granting PrivilegesMay 14, 2025 am 12:09 AM

ProperusermanagementinMySQLiscrucialforenhancingsecurityandensuringefficientdatabaseoperation.1)UseCREATEUSERtoaddusers,specifyingconnectionsourcewith@'localhost'or@'%'.2)GrantspecificprivilegeswithGRANT,usingleastprivilegeprincipletominimizerisks.3)

What Factors Influence the Number of Triggers I Can Use in MySQL?What Factors Influence the Number of Triggers I Can Use in MySQL?May 14, 2025 am 12:08 AM

MySQLdoesn'timposeahardlimitontriggers,butpracticalfactorsdeterminetheireffectiveuse:1)Serverconfigurationimpactstriggermanagement;2)Complextriggersincreasesystemload;3)Largertablesslowtriggerperformance;4)Highconcurrencycancausetriggercontention;5)M

MySQL: Is it safe to store BLOB?MySQL: Is it safe to store BLOB?May 14, 2025 am 12:07 AM

Yes,it'ssafetostoreBLOBdatainMySQL,butconsiderthesefactors:1)StorageSpace:BLOBscanconsumesignificantspace,potentiallyincreasingcostsandslowingperformance.2)Performance:LargerrowsizesduetoBLOBsmayslowdownqueries.3)BackupandRecovery:Theseprocessescanbe

MySQL: Adding a user through a PHP web interfaceMySQL: Adding a user through a PHP web interfaceMay 14, 2025 am 12:04 AM

Adding MySQL users through the PHP web interface can use MySQLi extensions. The steps are as follows: 1. Connect to the MySQL database and use the MySQLi extension. 2. Create a user, use the CREATEUSER statement, and use the PASSWORD() function to encrypt the password. 3. Prevent SQL injection and use the mysqli_real_escape_string() function to process user input. 4. Assign permissions to new users and use the GRANT statement.

MySQL: BLOB and other no-sql storage, what are the differences?MySQL: BLOB and other no-sql storage, what are the differences?May 13, 2025 am 12:14 AM

MySQL'sBLOBissuitableforstoringbinarydatawithinarelationaldatabase,whileNoSQLoptionslikeMongoDB,Redis,andCassandraofferflexible,scalablesolutionsforunstructureddata.BLOBissimplerbutcanslowdownperformancewithlargedata;NoSQLprovidesbetterscalabilityand

MySQL Add User: Syntax, Options, and Security Best PracticesMySQL Add User: Syntax, Options, and Security Best PracticesMay 13, 2025 am 12:12 AM

ToaddauserinMySQL,use:CREATEUSER'username'@'host'IDENTIFIEDBY'password';Here'showtodoitsecurely:1)Choosethehostcarefullytocontrolaccess.2)SetresourcelimitswithoptionslikeMAX_QUERIES_PER_HOUR.3)Usestrong,uniquepasswords.4)EnforceSSL/TLSconnectionswith

MySQL: How to avoid String Data Types common mistakes?MySQL: How to avoid String Data Types common mistakes?May 13, 2025 am 12:09 AM

ToavoidcommonmistakeswithstringdatatypesinMySQL,understandstringtypenuances,choosetherighttype,andmanageencodingandcollationsettingseffectively.1)UseCHARforfixed-lengthstrings,VARCHARforvariable-length,andTEXT/BLOBforlargerdata.2)Setcorrectcharacters

See all articles

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

Video Face Swap

Video Face Swap

Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Linux new version

SublimeText3 Linux new version

SublimeText3 Linux latest version

MantisBT

MantisBT

Mantis is an easy-to-deploy web-based defect tracking tool designed to aid in product defect tracking. It requires PHP, MySQL and a web server. Check out our demo and hosting services.

SAP NetWeaver Server Adapter for Eclipse

SAP NetWeaver Server Adapter for Eclipse

Integrate Eclipse with SAP NetWeaver application server.