Rumah >pangkalan data >tutorial mysql >redis源代码分析25–VM(下)
这一节介绍下redis中的多线程机制。 先看看多线程换出的机制。 serverCron函数中调用 vmSwapOneObjectThreaded开始多线程方式换出value,vmSwapOneObjectThreaded会调用 vmSwapOneObject(参看上一节的解释),而vmSwapOneObject最终会调用vmSwapObjectThrea
这一节介绍下redis中的多线程机制。
先看看多线程换出的机制。
serverCron函数中调用 vmSwapOneObjectThreaded开始多线程方式换出value,vmSwapOneObjectThreaded会调用 vmSwapOneObject(参看上一节的解释),而vmSwapOneObject最终会调用vmSwapObjectThreaded。
static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) { iojob *j; assert(key->storage == REDIS_VM_MEMORY); assert(key->refcount == 1); j = zmalloc(sizeof(*j)); j->type = REDIS_IOJOB_PREPARE_SWAP; j->db = db; j->key = key; j->val = val; incrRefCount(val); j->canceled = 0; j->thread = (pthread_t) -1; key->storage = REDIS_VM_SWAPPING; lockThreadedIO(); queueIOJob(j); unlockThreadedIO(); return REDIS_OK; }
vmSwapObjectThreaded 会创建一个类型为REDIS_IOJOB_PREPARE_SWAP的job,然后使用queueIOJob来排队。而queueIOJob所做的主要工作就是就是将新job加入到server.io_newjobs,并在创建的线程数还没超过配置值时,创建新的线程。
/* This function must be called while with threaded IO locked */ static void queueIOJob(iojob *j) { redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n", (void*)j, j->type, (char*)j->key->ptr); listAddNodeTail(server.io_newjobs,j); if (server.io_active_threads <p>从spawnIOThread中可以知道,新线程的入口点是IOThreadEntryPoint。</p> <pre class="brush:php;toolbar:false"> static void spawnIOThread(void) { pthread_t thread; sigset_t mask, omask; int err; sigemptyset(&mask); sigaddset(&mask,SIGCHLD); sigaddset(&mask,SIGHUP); sigaddset(&mask,SIGPIPE); pthread_sigmask(SIG_SETMASK, &mask, &omask); while ((err = pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL)) != 0) { redisLog(REDIS_WARNING,"Unable to spawn an I/O thread: %s", strerror(err)); usleep(1000000); } pthread_sigmask(SIG_SETMASK, &omask, NULL); server.io_active_threads++; }
IOThreadEntryPoint会将io_newjobs中的job移入server.io_processing,然后在做完job类型的工作后(加载value/计算value所需交换页数/换出value),将job从server.io_processing移入io_processed中。然后往 server.io_ready_pipe_write所在的管道(io_ready_pipe_read、io_ready_pipe_write组成管道的两端)写入一个字节,让睡眠中的vmThreadedIOCompletedJob继续运行,该函数会做些后续工作。
static void *IOThreadEntryPoint(void *arg) { iojob *j; listNode *ln; REDIS_NOTUSED(arg); pthread_detach(pthread_self()); while(1) { /* Get a new job to process */ lockThreadedIO(); if (listLength(server.io_newjobs) == 0) { /* No new jobs in queue, exit. */ redisLog(REDIS_DEBUG,"Thread %ld exiting, nothing to do", (long) pthread_self()); server.io_active_threads--; unlockThreadedIO(); return NULL; } ln = listFirst(server.io_newjobs); j = ln->value; listDelNode(server.io_newjobs,ln); /* Add the job in the processing queue */ j->thread = pthread_self(); listAddNodeTail(server.io_processing,j); ln = listLast(server.io_processing); /* We use ln later to remove it */ unlockThreadedIO(); redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'", (long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr); /* Process the Job */ if (j->type == REDIS_IOJOB_LOAD) { j->val = vmReadObjectFromSwap(j->page,j->key->vtype); } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { FILE *fp = fopen("/dev/null","w+"); j->pages = rdbSavedObjectPages(j->val,fp); fclose(fp); } else if (j->type == REDIS_IOJOB_DO_SWAP) { if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR) j->canceled = 1; } /* Done: insert the job into the processed queue */ redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)", (long) pthread_self(), (void*)j, (char*)j->key->ptr); lockThreadedIO(); listDelNode(server.io_processing,ln); listAddNodeTail(server.io_processed,j); unlockThreadedIO(); /* Signal the main thread there is new stuff to process */ assert(write(server.io_ready_pipe_write,"x",1) == 1); } return NULL; /* never reached */ } static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata, int mask) { char buf[1]; int retval, processed = 0, toprocess = -1, trytoswap = 1; REDIS_NOTUSED(el); REDIS_NOTUSED(mask); REDIS_NOTUSED(privdata); if (privdata != NULL) trytoswap = 0; /* check the comments above... */ /* For every byte we read in the read side of the pipe, there is one * I/O job completed to process. */ while((retval = read(fd,buf,1)) == 1) { iojob *j; listNode *ln; robj *key; struct dictEntry *de; redisLog(REDIS_DEBUG,"Processing I/O completed job"); /* Get the processed element (the oldest one) */ lockThreadedIO(); assert(listLength(server.io_processed) != 0); if (toprocess == -1) { toprocess = (listLength(server.io_processed)*REDIS_MAX_COMPLETED_JOBS_PROCESSED)/100; if (toprocess value; listDelNode(server.io_processed,ln); unlockThreadedIO(); /* If this job is marked as canceled, just ignore it */ if (j->canceled) { freeIOJob(j); continue; } /* Post process it in the main thread, as there are things we * can do just here to avoid race conditions and/or invasive locks */ redisLog(REDIS_DEBUG,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount); de = dictFind(j->db->dict,j->key); assert(de != NULL); key = dictGetEntryKey(de); if (j->type == REDIS_IOJOB_LOAD) { redisDb *db; /* Key loaded, bring it at home */ key->storage = REDIS_VM_MEMORY; key->vm.atime = server.unixtime; vmMarkPagesFree(key->vm.page,key->vm.usedpages); redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)", (unsigned char*) key->ptr); server.vm_stats_swapped_objects--; server.vm_stats_swapins++; dictGetEntryVal(de) = j->val; incrRefCount(j->val); db = j->db; freeIOJob(j); /* Handle clients waiting for this key to be loaded. */ handleClientsBlockedOnSwappedKey(db,key); } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) { /* Now we know the amount of pages required to swap this object. * Let's find some space for it, and queue this task again * rebranded as REDIS_IOJOB_DO_SWAP. */ if (!vmCanSwapOut() || vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR) { /* Ooops... no space or we can't swap as there is * a fork()ed Redis trying to save stuff on disk. */ freeIOJob(j); key->storage = REDIS_VM_MEMORY; /* undo operation */ } else { /* Note that we need to mark this pages as used now, * if the job will be canceled, we'll mark them as freed * again. */ vmMarkPagesUsed(j->page,j->pages); j->type = REDIS_IOJOB_DO_SWAP; lockThreadedIO(); queueIOJob(j); unlockThreadedIO(); } } else if (j->type == REDIS_IOJOB_DO_SWAP) { robj *val; /* Key swapped. We can finally free some memory. */ if (key->storage != REDIS_VM_SWAPPING) { printf("key->storage: %d\n",key->storage); printf("key->name: %s\n",(char*)key->ptr); printf("key->refcount: %d\n",key->refcount); printf("val: %p\n",(void*)j->val); printf("val->type: %d\n",j->val->type); printf("val->ptr: %s\n",(char*)j->val->ptr); } redisAssert(key->storage == REDIS_VM_SWAPPING); val = dictGetEntryVal(de); key->vm.page = j->page; key->vm.usedpages = j->pages; key->storage = REDIS_VM_SWAPPED; key->vtype = j->val->type; decrRefCount(val); /* Deallocate the object from memory. */ dictGetEntryVal(de) = NULL; redisLog(REDIS_DEBUG, "VM: object %s swapped out at %lld (%lld pages) (threaded)", (unsigned char*) key->ptr, (unsigned long long) j->page, (unsigned long long) j->pages); server.vm_stats_swapped_objects++; server.vm_stats_swapouts++; freeIOJob(j); /* Put a few more swap requests in queue if we are still * out of memory */ if (trytoswap && vmCanSwapOut() && zmalloc_used_memory() > server.vm_max_memory) { int more = 1; while(more) { lockThreadedIO(); more = listLength(server.io_newjobs) <p class="copyright"> 原文地址:redis源代码分析25–VM(下), 感谢原作者分享。 </p>