検索

redis源代码分析25–VM(下)

Jun 07, 2016 pm 04:26 PM
redis導入分析するソースコード

这一节介绍下redis中的多线程机制。 先看看多线程换出的机制。 serverCron函数中调用 vmSwapOneObjectThreaded开始多线程方式换出value,vmSwapOneObjectThreaded会调用 vmSwapOneObject(参看上一节的解释),而vmSwapOneObject最终会调用vmSwapObjectThrea

这一节介绍下redis中的多线程机制。

先看看多线程换出的机制。

serverCron函数中调用 vmSwapOneObjectThreaded开始多线程方式换出value,vmSwapOneObjectThreaded会调用 vmSwapOneObject(参看上一节的解释),而vmSwapOneObject最终会调用vmSwapObjectThreaded。

static int vmSwapObjectThreaded(robj *key, robj *val, redisDb *db) {
    iojob *j;
    assert(key->storage == REDIS_VM_MEMORY);
    assert(key->refcount == 1);
    j = zmalloc(sizeof(*j));
    j->type = REDIS_IOJOB_PREPARE_SWAP;
    j->db = db;
    j->key = key;
    j->val = val;
    incrRefCount(val);
    j->canceled = 0;
    j->thread = (pthread_t) -1;
    key->storage = REDIS_VM_SWAPPING;
    lockThreadedIO();
    queueIOJob(j);
    unlockThreadedIO();
    return REDIS_OK;
}

vmSwapObjectThreaded 会创建一个类型为REDIS_IOJOB_PREPARE_SWAP的job,然后使用queueIOJob来排队。而queueIOJob所做的主要工作就是就是将新job加入到server.io_newjobs,并在创建的线程数还没超过配置值时,创建新的线程。

/* This function must be called while with threaded IO locked */
static void queueIOJob(iojob *j) {
    redisLog(REDIS_DEBUG,"Queued IO Job %p type %d about key '%s'\n",
        (void*)j, j->type, (char*)j->key->ptr);
    listAddNodeTail(server.io_newjobs,j);
    if (server.io_active_threads 
<p>从spawnIOThread中可以知道,新线程的入口点是IOThreadEntryPoint。</p>
<pre class="brush:php;toolbar:false">
static void spawnIOThread(void) {
    pthread_t thread;
    sigset_t mask, omask;
    int err;
    sigemptyset(&mask);
    sigaddset(&mask,SIGCHLD);
    sigaddset(&mask,SIGHUP);
    sigaddset(&mask,SIGPIPE);
    pthread_sigmask(SIG_SETMASK, &mask, &omask);
    while ((err = pthread_create(&thread,&server.io_threads_attr,IOThreadEntryPoint,NULL)) != 0) {
        redisLog(REDIS_WARNING,"Unable to spawn an I/O thread: %s",
            strerror(err));
        usleep(1000000);
    }
    pthread_sigmask(SIG_SETMASK, &omask, NULL);
    server.io_active_threads++;
}

IOThreadEntryPoint会将io_newjobs中的job移入server.io_processing,然后在做完job类型的工作后(加载value/计算value所需交换页数/换出value),将job从server.io_processing移入io_processed中。然后往 server.io_ready_pipe_write所在的管道(io_ready_pipe_read、io_ready_pipe_write组成管道的两端)写入一个字节,让睡眠中的vmThreadedIOCompletedJob继续运行,该函数会做些后续工作。

static void *IOThreadEntryPoint(void *arg) {
    iojob *j;
    listNode *ln;
    REDIS_NOTUSED(arg);
    pthread_detach(pthread_self());
    while(1) {
        /* Get a new job to process */
        lockThreadedIO();
        if (listLength(server.io_newjobs) == 0) {
            /* No new jobs in queue, exit. */
            redisLog(REDIS_DEBUG,"Thread %ld exiting, nothing to do",
                (long) pthread_self());
            server.io_active_threads--;
            unlockThreadedIO();
            return NULL;
        }
        ln = listFirst(server.io_newjobs);
        j = ln->value;
        listDelNode(server.io_newjobs,ln);
        /* Add the job in the processing queue */
        j->thread = pthread_self();
        listAddNodeTail(server.io_processing,j);
        ln = listLast(server.io_processing); /* We use ln later to remove it */
        unlockThreadedIO();
        redisLog(REDIS_DEBUG,"Thread %ld got a new job (type %d): %p about key '%s'",
            (long) pthread_self(), j->type, (void*)j, (char*)j->key->ptr);
        /* Process the Job */
        if (j->type == REDIS_IOJOB_LOAD) {
            j->val = vmReadObjectFromSwap(j->page,j->key->vtype);
        } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
            FILE *fp = fopen("/dev/null","w+");
            j->pages = rdbSavedObjectPages(j->val,fp);
            fclose(fp);
        } else if (j->type == REDIS_IOJOB_DO_SWAP) {
            if (vmWriteObjectOnSwap(j->val,j->page) == REDIS_ERR)
                j->canceled = 1;
        }
        /* Done: insert the job into the processed queue */
        redisLog(REDIS_DEBUG,"Thread %ld completed the job: %p (key %s)",
            (long) pthread_self(), (void*)j, (char*)j->key->ptr);
        lockThreadedIO();
        listDelNode(server.io_processing,ln);
        listAddNodeTail(server.io_processed,j);
        unlockThreadedIO();
        /* Signal the main thread there is new stuff to process */
        assert(write(server.io_ready_pipe_write,"x",1) == 1);
    }
    return NULL; /* never reached */
}
static void vmThreadedIOCompletedJob(aeEventLoop *el, int fd, void *privdata,
            int mask)
{
    char buf[1];
    int retval, processed = 0, toprocess = -1, trytoswap = 1;
    REDIS_NOTUSED(el);
    REDIS_NOTUSED(mask);
    REDIS_NOTUSED(privdata);
    if (privdata != NULL) trytoswap = 0; /* check the comments above... */
    /* For every byte we read in the read side of the pipe, there is one
     * I/O job completed to process. */
    while((retval = read(fd,buf,1)) == 1) {
        iojob *j;
        listNode *ln;
        robj *key;
        struct dictEntry *de;
        redisLog(REDIS_DEBUG,"Processing I/O completed job");
        /* Get the processed element (the oldest one) */
        lockThreadedIO();
        assert(listLength(server.io_processed) != 0);
        if (toprocess == -1) {
            toprocess = (listLength(server.io_processed)*REDIS_MAX_COMPLETED_JOBS_PROCESSED)/100;
            if (toprocess value;
        listDelNode(server.io_processed,ln);
        unlockThreadedIO();
        /* If this job is marked as canceled, just ignore it */
        if (j->canceled) {
            freeIOJob(j);
            continue;
        }
        /* Post process it in the main thread, as there are things we
         * can do just here to avoid race conditions and/or invasive locks */
        redisLog(REDIS_DEBUG,"Job %p type: %d, key at %p (%s) refcount: %d\n", (void*) j, j->type, (void*)j->key, (char*)j->key->ptr, j->key->refcount);
        de = dictFind(j->db->dict,j->key);
        assert(de != NULL);
        key = dictGetEntryKey(de);
        if (j->type == REDIS_IOJOB_LOAD) {
            redisDb *db;
            /* Key loaded, bring it at home */
            key->storage = REDIS_VM_MEMORY;
            key->vm.atime = server.unixtime;
            vmMarkPagesFree(key->vm.page,key->vm.usedpages);
            redisLog(REDIS_DEBUG, "VM: object %s loaded from disk (threaded)",
                (unsigned char*) key->ptr);
            server.vm_stats_swapped_objects--;
            server.vm_stats_swapins++;
            dictGetEntryVal(de) = j->val;
            incrRefCount(j->val);
            db = j->db;
            freeIOJob(j);
            /* Handle clients waiting for this key to be loaded. */
            handleClientsBlockedOnSwappedKey(db,key);
        } else if (j->type == REDIS_IOJOB_PREPARE_SWAP) {
            /* Now we know the amount of pages required to swap this object.
             * Let's find some space for it, and queue this task again
             * rebranded as REDIS_IOJOB_DO_SWAP. */
            if (!vmCanSwapOut() ||
                vmFindContiguousPages(&j->page,j->pages) == REDIS_ERR)
            {
                /* Ooops... no space or we can't swap as there is
                 * a fork()ed Redis trying to save stuff on disk. */
                freeIOJob(j);
                key->storage = REDIS_VM_MEMORY; /* undo operation */
            } else {
                /* Note that we need to mark this pages as used now,
                 * if the job will be canceled, we'll mark them as freed
                 * again. */
                vmMarkPagesUsed(j->page,j->pages);
                j->type = REDIS_IOJOB_DO_SWAP;
                lockThreadedIO();
                queueIOJob(j);
                unlockThreadedIO();
            }
        } else if (j->type == REDIS_IOJOB_DO_SWAP) {
            robj *val;
            /* Key swapped. We can finally free some memory. */
            if (key->storage != REDIS_VM_SWAPPING) {
                printf("key->storage: %d\n",key->storage);
                printf("key->name: %s\n",(char*)key->ptr);
                printf("key->refcount: %d\n",key->refcount);
                printf("val: %p\n",(void*)j->val);
                printf("val->type: %d\n",j->val->type);
                printf("val->ptr: %s\n",(char*)j->val->ptr);
            }
            redisAssert(key->storage == REDIS_VM_SWAPPING);
            val = dictGetEntryVal(de);
            key->vm.page = j->page;
            key->vm.usedpages = j->pages;
            key->storage = REDIS_VM_SWAPPED;
            key->vtype = j->val->type;
            decrRefCount(val); /* Deallocate the object from memory. */
            dictGetEntryVal(de) = NULL;
            redisLog(REDIS_DEBUG,
                "VM: object %s swapped out at %lld (%lld pages) (threaded)",
                (unsigned char*) key->ptr,
                (unsigned long long) j->page, (unsigned long long) j->pages);
            server.vm_stats_swapped_objects++;
            server.vm_stats_swapouts++;
            freeIOJob(j);
            /* Put a few more swap requests in queue if we are still
             * out of memory */
            if (trytoswap && vmCanSwapOut() &&
                zmalloc_used_memory() > server.vm_max_memory)
            {
                int more = 1;
                while(more) {
                    lockThreadedIO();
                    more = listLength(server.io_newjobs) 
    <p class="copyright">
        原文地址:redis源代码分析25–VM(下), 感谢原作者分享。
    </p>
    
    


声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
MySQLはSQLiteとどのように違いますか?MySQLはSQLiteとどのように違いますか?Apr 24, 2025 am 12:12 AM

MySQLとSQLiteの主な違いは、設計コンセプトと使用法のシナリオです。1。MySQLは、大規模なアプリケーションとエンタープライズレベルのソリューションに適しており、高性能と高い並行性をサポートしています。 2。SQLiteは、モバイルアプリケーションとデスクトップソフトウェアに適しており、軽量で埋め込みやすいです。

MySQLのインデックスとは何ですか?また、パフォーマンスをどのように改善しますか?MySQLのインデックスとは何ですか?また、パフォーマンスをどのように改善しますか?Apr 24, 2025 am 12:09 AM

MySQLのインデックスは、データの取得をスピードアップするために使用されるデータベーステーブル内の1つ以上の列の順序付けられた構造です。 1)インデックスは、スキャンされたデータの量を減らすことにより、クエリ速度を改善します。 2)B-Tree Indexは、バランスの取れたツリー構造を使用します。これは、範囲クエリとソートに適しています。 3)CreateIndexステートメントを使用して、createIndexidx_customer_idonorders(customer_id)などのインデックスを作成します。 4)Composite Indexesは、createIndexIDX_CUSTOMER_ORDERONORDERS(Customer_Id、Order_date)などのマルチコラムクエリを最適化できます。 5)説明を使用してクエリ計画を分析し、回避します

データの一貫性を確保するために、MySQLでトランザクションを使用する方法を説明します。データの一貫性を確保するために、MySQLでトランザクションを使用する方法を説明します。Apr 24, 2025 am 12:09 AM

MySQLでトランザクションを使用すると、データの一貫性が保証されます。 1)StartTransactionを介してトランザクションを開始し、SQL操作を実行して、コミットまたはロールバックで送信します。 2)SavePointを使用してSave Pointを設定して、部分的なロールバックを許可します。 3)パフォーマンスの最適化の提案には、トランザクション時間の短縮、大規模なクエリの回避、分離レベルの使用が合理的に含まれます。

どのシナリオでMySQLよりもPostgreSQLを選択できますか?どのシナリオでMySQLよりもPostgreSQLを選択できますか?Apr 24, 2025 am 12:07 AM

MySQLの代わりにPostgreSQLが選択されるシナリオには、1)複雑なクエリと高度なSQL関数、2)厳格なデータの整合性と酸コンプライアンス、3)高度な空間関数が必要、4)大規模なデータセットを処理するときに高いパフォーマンスが必要です。 PostgreSQLは、これらの側面でうまく機能し、複雑なデータ処理と高いデータの整合性を必要とするプロジェクトに適しています。

MySQLデータベースをどのように保護できますか?MySQLデータベースをどのように保護できますか?Apr 24, 2025 am 12:04 AM

MySQLデータベースのセキュリティは、以下の測定を通じて達成できます。1。ユーザー許可管理:CreateUSERおよびGrantコマンドを通じてアクセス権を厳密に制御します。 2。暗号化された送信:SSL/TLSを構成して、データ送信セキュリティを確保します。 3.データベースのバックアップとリカバリ:MySQLDUMPまたはMySQLPumpを使用して、定期的にデータをバックアップします。 4.高度なセキュリティポリシー:ファイアウォールを使用してアクセスを制限し、監査ロギング操作を有効にします。 5。パフォーマンスの最適化とベストプラクティス:インデックス作成とクエリの最適化と定期的なメンテナンスを通じて、安全性とパフォーマンスの両方を考慮に入れます。

MySQLのパフォーマンスを監視するために使用できるツールは何ですか?MySQLのパフォーマンスを監視するために使用できるツールは何ですか?Apr 23, 2025 am 12:21 AM

MySQLのパフォーマンスを効果的に監視する方法は? MySqladmin、ShowGlobalStatus、PerconAmonitoring and Management(PMM)、MySQL EnterpriseMonitorなどのツールを使用します。 1. mysqladminを使用して、接続の数を表示します。 2。showglobalstatusを使用して、クエリ番号を表示します。 3.PMMは、詳細なパフォーマンスデータとグラフィカルインターフェイスを提供します。 4.mysqlenterprisemonitorは、豊富な監視機能とアラームメカニズムを提供します。

MySQLはSQL Serverとどのように違いますか?MySQLはSQL Serverとどのように違いますか?Apr 23, 2025 am 12:20 AM

MySQLとSQLServerの違いは次のとおりです。1)MySQLはオープンソースであり、Webおよび埋め込みシステムに適しています。2)SQLServerはMicrosoftの商用製品であり、エンタープライズレベルのアプリケーションに適しています。ストレージエンジン、パフォーマンスの最適化、アプリケーションシナリオの2つには大きな違いがあります。選択するときは、プロジェクトのサイズと将来のスケーラビリティを考慮する必要があります。

どのシナリオでMySQLよりもSQL Serverを選択できますか?どのシナリオでMySQLよりもSQL Serverを選択できますか?Apr 23, 2025 am 12:20 AM

高可用性、高度なセキュリティ、優れた統合を必要とするエンタープライズレベルのアプリケーションシナリオでは、MySQLの代わりにSQLServerを選択する必要があります。 1)SQLServerは、高可用性や高度なセキュリティなどのエンタープライズレベルの機能を提供します。 2)VisualStudioやPowerbiなどのMicrosoftエコシステムと密接に統合されています。 3)SQLSERVERは、パフォーマンスの最適化に優れた機能を果たし、メモリが最適化されたテーブルと列ストレージインデックスをサポートします。

See all articles

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

Video Face Swap

Video Face Swap

完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡単に交換できます。

ホットツール

MantisBT

MantisBT

Mantis は、製品の欠陥追跡を支援するために設計された、導入が簡単な Web ベースの欠陥追跡ツールです。 PHP、MySQL、Web サーバーが必要です。デモおよびホスティング サービスをチェックしてください。

EditPlus 中国語クラック版

EditPlus 中国語クラック版

サイズが小さく、構文の強調表示、コード プロンプト機能はサポートされていません

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

強力な PHP 統合開発環境

Safe Exam Browser

Safe Exam Browser

Safe Exam Browser は、オンライン試験を安全に受験するための安全なブラウザ環境です。このソフトウェアは、あらゆるコンピュータを安全なワークステーションに変えます。あらゆるユーティリティへのアクセスを制御し、学生が無許可のリソースを使用するのを防ぎます。

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)