Heim  >  Artikel  >  Datenbank  >  Redis数据类型与指令详解之集合(t_set)

Redis数据类型与指令详解之集合(t_set)

WBOY
WBOYOriginal
2016-06-07 15:23:121442Durchsuche

集合编码方式 Redis 集合(set)使用REDIS_ENCONDING_INT与REDIS_ENCONDING_HT两种编码方式 1、REDIS_ENCONDING_INT: intset.c/intset.h 2、REDIS_ENCONDING_HT: dict.c/dict.h 第一个添加到集合的元素,决定了创建集合时所使用的编码:如果第一个元素可以表示

集合编码方式

Redis 集合(set)使用REDIS_ENCONDING_INT与REDIS_ENCONDING_HT两种编码方式
1、REDIS_ENCONDING_INT: intset.c/intset.h
2、REDIS_ENCONDING_HT: dict.c/dict.h
第一个添加到集合的元素,决定了创建集合时所使用的编码:如果第一个元素可以表示为 long long 类型值(也即是,它是一个整数),那么集合的初始编码为 REDIS_ENCODING_INTSET ;否则,集合的初始编码为 REDIS_ENCODING_HT 。
编码切换:如果一个集合使用 REDIS_ENCODING_INTSET 编码,那么当以下任何一个条件被满足时,这个集合会被转换成 REDIS_ENCODING_HT 编码:
1、 intset 保存的整数值个数超过 server.set_max_intset_entries (默认值为 512 )
2、试图往集合里添加一个新元素,并且这个元素不能被表示为 long long 类型

集合指令实现

SADD

指令格式: SADD key member [member...]
将一个或多个member元素加入到集合key当中,由于集合成员不能重复,已经存在于集合key中的member元素将被忽略。
如果key不存在,则创建一个包含被添加的member元素的新集合。
如果key不是集合类型(REDIS_SET)时,则操作出错,redis返回一个错误。
时间复杂度:O(N)
void saddCommand(redisClient *c) {
    robj *set;
    int j, added = 0;

    set = lookupKeyWrite(c->db,c->argv[1]);//写查找数据库中名为c->argv[1]的集合
    if (set == NULL) {//集合不存在
        set = setTypeCreate(c->argv[2]);//创建一个新的集合
        dbAdd(c->db,c->argv[1],set);//将该集合添加到数据库中,dict中的key就是集合的名称,value就是集合元素
    } else {
        if (set->type != REDIS_SET) {//判断是否是集合类型
            addReply(c,shared.wrongtypeerr);
            return;
        }
    }

    for (j = 2; j < c->argc; j++) {//添加集合元素
        c->argv[j] = tryObjectEncoding(c->argv[j]);//尝试使用整型存储数据
        if (setTypeAdd(set,c->argv[j])) added++;
    }
    if (added) {
        signalModifiedKey(c->db,c->argv[1]);//通知数据库哪些键key变化了,把变化的key存储到watched_keys中,只在事务操作时才用的着
        notifyKeyspaceEvent(REDIS_NOTIFY_SET,"sadd",c->argv[1],c->db->id);//暂不知道啥用途
    }
    server.dirty += added;//数据库中数据变化的数目
    addReplyLongLong(c,added);
}

lookupKeyWrite函数在object.c文件中,用来在数据库中查找指定key的value值。
setTypeCreate函数在创建一个新的Redis_Set时,根据添加的元素类型为整型还是字符串会创建不同的存储数据结构 

//创建一个集合对象,如果value是整型,那么使用intset,否则使用dict
robj *setTypeCreate(robj *value) {
    if (isObjectRepresentableAsLongLong(value,NULL) == REDIS_OK)
        return createIntsetObject();//intset
    return createSetObject();//dict
}
robj *createSetObject(void) {
    dict *d = dictCreate(&setDictType,NULL);
    robj *o = createObject(REDIS_SET,d);
    o->encoding = REDIS_ENCODING_HT;
    return o;
}

robj *createIntsetObject(void) {
    intset *is = intsetNew();
    robj *o = createObject(REDIS_SET,is);
    o->encoding = REDIS_ENCODING_INTSET;
    return o;
}
robj *createObject(int type, void *ptr) {
    robj *o = zmalloc(sizeof(*o));
    o->type = type;
    o->encoding = REDIS_ENCODING_RAW;
    o->ptr = ptr;
    o->refcount = 1;

    /* Set the LRU to the current lruclock (minutes resolution). */
    o->lru = server.lruclock;
    return o;
}

SCARD

SCARD key
返回集合key中元素的个数
时间复杂度:O(1)
简单根据集合的编码类型:如果是HT编码,那么直接通过dictSize函数得到字典中元素的个数;如果是intset编码,那么直接通过intsetLen函数得到结果.
 

SISMEMBER

SISMEMBER key member
判断member元素是否集合key的中的元素
时间复杂度: O(1)
简单根据集合的编码类型:如果是HT编码,那么直接通过dictFind函数查找字典中是否存在该member;如果是intset编码,那么直接通过intsetFind函数查找是否存在该member.

SMEMBERS

SMEMBERS key
返回集合key中的所有元素,不存在的key被视为空集合。
时间复杂度: O(N)

SMOVE

SMOVE source destination member
将member元素从集合source转移到集合destination,如果集合source中不存在member元素,那么SMOVE指令不执行任何操作,返回0。若存在member元素,将member元素从集合source中删除,并添加到集合destination,如果集合destination中已存在member元素,那么仅仅从集合source中删除元素member。
void smoveCommand(redisClient *c) {//将member元素从source集合移动到destination集合
    robj *srcset, *dstset, *ele;
    srcset = lookupKeyWrite(c->db,c->argv[1]);//从数据库中查找集合source
    dstset = lookupKeyWrite(c->db,c->argv[2]);
    ele = c->argv[3] = tryObjectEncoding(c->argv[3]);//member元素

    /* If the source key does not exist return 0 */
    if (srcset == NULL) {
        addReply(c,shared.czero);
        return;
    }

    /* If the source key has the wrong type, or the destination key
     * is set and has the wrong type, return with an error. */
    if (checkType(c,srcset,REDIS_SET) ||
        (dstset && checkType(c,dstset,REDIS_SET))) return;//类型检查

    /* If srcset and dstset are equal, SMOVE is a no-op */
    if (srcset == dstset) {//source与dest相同
        addReply(c,shared.cone);
        return;
    }

    /* If the element cannot be removed from the src set, return 0. */
    if (!setTypeRemove(srcset,ele)) {//从源集合中删除member元素
        addReply(c,shared.czero);
        return;
    }
    notifyKeyspaceEvent(REDIS_NOTIFY_SET,"srem",c->argv[1],c->db->id);

    /* Remove the src set from the database when empty */
    if (setTypeSize(srcset) == 0) {//移除member元素后,源集合为空,删除
        dbDelete(c->db,c->argv[1]);
        notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",c->argv[1],c->db->id);
    }
    signalModifiedKey(c->db,c->argv[1]);
    signalModifiedKey(c->db,c->argv[2]);
    server.dirty++;

    /* Create the destination set when it doesn&#39;t exist */
    if (!dstset) {//目标集合不存在,则新建
        dstset = setTypeCreate(ele);
        dbAdd(c->db,c->argv[2],dstset);
    }

    /* An extra key has changed when ele was successfully added to dstset */
    if (setTypeAdd(dstset,ele)) {//添加member元素到目标集合
        server.dirty++;
        notifyKeyspaceEvent(REDIS_NOTIFY_SET,"sadd",c->argv[2],c->db->id);
    }
    addReply(c,shared.cone);
}

SPOP

SPOP key
移除并返回集合key中的一个随机元素。
时间复杂度:O(1)
由于是随机删除一个元素,那么对复制与AOF肯定有影响,因此该操作后,需要将该指令改变为SREM即将该元素删除,参考rewriteClientCommandVector函数。
 

SRANDMEMBER

SRANDMEMBER key [count]
[count]参数可选,如果没有count,那么返回集合中的一个随机元素。
如果count为正数,且小于集合元素个数,那么命令返回一个包含count个元素的数组,数组中的元素各不相同,如果count大于等于集合基数,那么返回整个集合;如果 count 为负数,那么命令返回一个数组,数组中的元素可能会重复出现多次,而数组的长度为count的绝对值。

SREM

SREM key member [member ...]
移除集合key中的一个或多个member元素,不存在的member元素会被忽略。

集合求交算法

集合求交的指令有两个:SINTER与SINTERSTORE

SINTER key [key ...]
返回所有给定集合的交集,不存在的 key 被视为空集,当给定集合当中有一个空集时,结果也为空集.
时间复杂度: O(N * M),N 为给定集合中元素数目最小的集合,M 为给定集合的个数
SINTERSTORE destination key [key ...]
与SINTER指令类似,不同的仅仅将结果集存储到目标集合destination中,如果集合destination已存在,那么将其覆盖

void sinterGenericCommand(redisClient *c, robj **setkeys, unsigned long setnum, robj *dstkey) {
    robj **sets = zmalloc(sizeof(robj*)*setnum);
    setTypeIterator *si;//迭代器
    robj *eleobj, *dstset = NULL;
    int64_t intobj;
    void *replylen = NULL;
    unsigned long j, cardinality = 0;
    int encoding;

    for (j = 0; j < setnum; j++) {//得到所有的集合
        robj *setobj = dstkey ?
            lookupKeyWrite(c->db,setkeys[j]) :
            lookupKeyRead(c->db,setkeys[j]);
        if (!setobj) {//任何一个集合不存在,那么总的交集就为空
            zfree(sets);
            if (dstkey) {
                if (dbDelete(c->db,dstkey)) {
                    signalModifiedKey(c->db,dstkey);
                    server.dirty++;
                }
                addReply(c,shared.czero);
            } else {
                addReply(c,shared.emptymultibulk);
            }
            return;
        }
        if (checkType(c,setobj,REDIS_SET)) {//类型检查
            zfree(sets);
            return;
        }
        sets[j] = setobj;
    }
    /* Sort sets from the smallest to largest, this will improve our
     * algorithm&#39;s performance */
    //按照集合元素个数从小到大排序
    qsort(sets,setnum,sizeof(robj*),qsortCompareSetsByCardinality);

    /* The first thing we should output is the total number of elements...
     * since this is a multi-bulk write, but at this stage we don&#39;t know
     * the intersection set size, so we use a trick, append an empty object
     * to the output list and save the pointer to later modify it with the
     * right length */
    if (!dstkey) {
        replylen = addDeferredMultiBulkLength(c);
    } else {
        /* If we have a target key where to store the resulting set
         * create this key with an empty set inside */
        dstset = createIntsetObject();
    }

    /* Iterate all the elements of the first (smallest) set, and test
     * the element against all the other sets, if at least one set does
     * not include the element it is discarded */
    /**
        求多个集合交集的算法思想:
        首先按照集合元素个数对集合进行qsort,然后遍历排序后的第一个集合中的元素,查看该元素在
        其他集合中是否存在,如果在其他集合中都存在,那么该元素为一个结果
    */
    si = setTypeInitIterator(sets[0]);
    while((encoding = setTypeNext(si,&eleobj,&intobj)) != -1) {
        for (j = 1; j < setnum; j++) {
            if (sets[j] == sets[0]) continue;//这段代码没意义啊
            if (encoding == REDIS_ENCODING_INTSET) {//intset
                /* intset with intset is simple... and fast */
                //集合sets[j]编码为intset
                if (sets[j]->encoding == REDIS_ENCODING_INTSET &&
                    !intsetFind((intset*)sets[j]->ptr,intobj))//在集合sets[j]中没有找到集合sets[0]的intobj
                {
                    break;
                /* in order to compare an integer with an object we
                 * have to use the generic function, creating an object
                 * for this */
                } else if (sets[j]->encoding == REDIS_ENCODING_HT) {//集合sets[j]编码为HT,sets[0]为INTSET
                    eleobj = createStringObjectFromLongLong(intobj);//将sets[0]中的intobj转换为sds
                    if (!setTypeIsMember(sets[j],eleobj)) {//如果eleobj不在集合sets[j]中
                        decrRefCount(eleobj);
                        break;
                    }
                    decrRefCount(eleobj);
                }
            } else if (encoding == REDIS_ENCODING_HT) {//HT
                /* Optimization... if the source object is integer
                 * encoded AND the target set is an intset, we can get
                 * a much faster path. */
                if (eleobj->encoding == REDIS_ENCODING_INT &&
                    sets[j]->encoding == REDIS_ENCODING_INTSET &&
                    !intsetFind((intset*)sets[j]->ptr,(long)eleobj->ptr))
                {
                    break;
                /* else... object to object check is easy as we use the
                 * type agnostic API here. */
                } else if (!setTypeIsMember(sets[j],eleobj)) {
                    break;
                }
            }
        }

        /* Only take action when all sets contain the member */
        if (j == setnum) {
            if (!dstkey) {
                if (encoding == REDIS_ENCODING_HT)
                    addReplyBulk(c,eleobj);
                else
                    addReplyBulkLongLong(c,intobj);
                cardinality++;
            } else {//添加到临时目标集合
                if (encoding == REDIS_ENCODING_INTSET) {
                    eleobj = createStringObjectFromLongLong(intobj);
                    setTypeAdd(dstset,eleobj);
                    decrRefCount(eleobj);
                } else {
                    setTypeAdd(dstset,eleobj);
                }
            }
        }
    }
    setTypeReleaseIterator(si);

    if (dstkey) {
        /* Store the resulting set into the target, if the intersection
         * is not an empty set. */
        int deleted = dbDelete(c->db,dstkey);//覆盖原来的目标集合
        if (setTypeSize(dstset) > 0) {
            dbAdd(c->db,dstkey,dstset);
            addReplyLongLong(c,setTypeSize(dstset));
            notifyKeyspaceEvent(REDIS_NOTIFY_SET,"sinterstore",
                dstkey,c->db->id);
        } else {//空集
            decrRefCount(dstset);
            addReply(c,shared.czero);
            if (deleted)
                notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
                    dstkey,c->db->id);
        }
        signalModifiedKey(c->db,dstkey);
        server.dirty++;
    } else {
        setDeferredMultiBulkLength(c,replylen,cardinality);
    }
    zfree(sets);
}

/*SINTER key [key...]*/
void sinterCommand(redisClient *c) {//计算所有给定集合的交集
    sinterGenericCommand(c,c->argv+1,c->argc-1,NULL);
}

/*SINTER destination key [key...]*/
void sinterstoreCommand(redisClient *c) {//计算所有给定集合的交集,但存储在集合destination中
    sinterGenericCommand(c,c->argv+2,c->argc-2,c->argv[1]);
}

集合求差算法

集合求差有两个指令:SDIFF与SDIFFSTORE

SDIFF key [key ...]
返回所有给定集合之间的差集,不存在的集合key视为空集。
时间复杂度: O(N),N 是所有给定集合的成员数量之和。
SDIFFSTORE destination key [key ...]
与SDIFF 类似,但它将差集的结果保存到集合destination,如果集合destination已经存在,则将其覆盖。

集合求并算法

集合求并有两个指令:SUNION与SUNIONSTORE

SUNION key [key ...]
返回所有给定集合的并集,不存在的集合key被视为空集。
时间复杂度: O(N),N 是所有给定集合的成员数量之和。
SUNIONSTORE destination key [key ...]
与SUNION类似,但它将并集结果保存到集合destination,如果集合destination已经存在,则将其覆盖。

#define REDIS_OP_UNION 0
#define REDIS_OP_DIFF 1
#define REDIS_OP_INTER 2

void sunionDiffGenericCommand(redisClient *c, robj **setkeys, int setnum, robj *dstkey, int op) {
    robj **sets = zmalloc(sizeof(robj*)*setnum);
    setTypeIterator *si;
    robj *ele, *dstset = NULL;
    int j, cardinality = 0;
    int diff_algo = 1;

    for (j = 0; j < setnum; j++) {//取出所有集合
        robj *setobj = dstkey ?
            lookupKeyWrite(c->db,setkeys[j]) :
            lookupKeyRead(c->db,setkeys[j]);
        if (!setobj) {
            sets[j] = NULL;
            continue;
        }
        if (checkType(c,setobj,REDIS_SET)) {
            zfree(sets);
            return;
        }
        sets[j] = setobj;
    }

    /* Select what DIFF algorithm to use.
     *
     * Algorithm 1 is O(N*M) where N is the size of the element first set
     * and M the total number of sets.
     *
     * Algorithm 2 is O(N) where N is the total number of elements in all
     * the sets.
     *
     * We compute what is the best bet with the current input here. */
    //对于SDIFF指令选择最优算法
    if (op == REDIS_OP_DIFF && sets[0]) {
        long long algo_one_work = 0, algo_two_work = 0;

        for (j = 0; j < setnum; j++) {
            if (sets[j] == NULL) continue;

            algo_one_work += setTypeSize(sets[0]);
            algo_two_work += setTypeSize(sets[j]);
        }

        /* Algorithm 1 has better constant times and performs less operations
         * if there are elements in common. Give it some advantage. */
        algo_one_work /= 2;//算法1可能不需要全部比较,因此除2来降低常数时间
        diff_algo = (algo_one_work <= algo_two_work) ? 1 : 2;

        if (diff_algo == 1 && setnum > 1) {
            /* With algorithm 1 it is better to order the sets to subtract
             * by decreasing size, so that we are more likely to find
             * duplicated elements ASAP. */
            //对sets[1]至sets[setnum-1]按照集合元素的个数从大到小排序
            qsort(sets+1,setnum-1,sizeof(robj*),
                qsortCompareSetsByRevCardinality);
        }
    }

    /* We need a temp set object to store our union. If the dstkey
     * is not NULL (that is, we are inside an SUNIONSTORE operation) then
     * this set object will be the resulting object to set into the target key*/
    dstset = createIntsetObject();

    if (op == REDIS_OP_UNION) {//并集操作,把所有元素不重复的操作即可
        /* Union is trivial, just add every element of every set to the
         * temporary set. */
        for (j = 0; j < setnum; j++) {
            if (!sets[j]) continue; /* non existing keys are like empty sets */

            si = setTypeInitIterator(sets[j]);
            while((ele = setTypeNextObject(si)) != NULL) {
                // 已有的元素不会被计数
                if (setTypeAdd(dstset,ele)) cardinality++;
                decrRefCount(ele);
            }
            setTypeReleaseIterator(si);
        }
    } else if (op == REDIS_OP_DIFF && sets[0] && diff_algo == 1) {//选择算法1
        /* DIFF Algorithm 1:
         *
         * We perform the diff by iterating all the elements of the first set,
         * and only adding it to the target set if the element does not exist
         * into all the other sets.
         *
         * This way we perform at max N*M operations, where N is the size of
         * the first set, and M the number of sets. */
        /** 遍历 sets[0] ,对于其中的每个元素ele,
         只有ele在set[1]至set[setnum-1]的每个集合中均不存在,该元素ele才是一个结果
         算法复杂度: O(MlogM) + O(sum(size(sets[0]) * size(sets[j]))) j = [1,setnum-1]
                     M = setnum - 1
         */
        si = setTypeInitIterator(sets[0]);
        while((ele = setTypeNextObject(si)) != NULL) {
            for (j = 1; j < setnum; j++) {
                if (!sets[j]) continue; /* no key is an empty set. *///空集合
                if (setTypeIsMember(sets[j],ele)) break;
            }
            if (j == setnum) {
                /* There is no other set with this element. Add it. */
                setTypeAdd(dstset,ele);
                cardinality++;
            }
            decrRefCount(ele);
        }
        setTypeReleaseIterator(si);
    } else if (op == REDIS_OP_DIFF && sets[0] && diff_algo == 2) {//选择算法2
        /* DIFF Algorithm 2:
         *
         * Add all the elements of the first set to the auxiliary set.
         * Then remove all the elements of all the next sets from it.
         *
         * This is O(N) where N is the sum of all the elements in every
         * set. */
        /**将 sets[0] 的所有元素保存到临时目标集合dstset中
           遍历set[1]至set[setnum-1]的每个集合,如果被遍历集合和 dstset 有相同的元素,
           那么从dstset中删除那个元素
           算法复杂度:O(sum(size(sets[j]))) j = [0,setnum-1]
         */
        for (j = 0; j < setnum; j++) {
            if (!sets[j]) continue; /* non existing keys are like empty sets */

            si = setTypeInitIterator(sets[j]);
            while((ele = setTypeNextObject(si)) != NULL) {
                if (j == 0) {
                    if (setTypeAdd(dstset,ele)) cardinality++;
                } else {
                    if (setTypeRemove(dstset,ele)) cardinality--;
                }
                decrRefCount(ele);
            }
            setTypeReleaseIterator(si);

            /* Exit if result set is empty as any additional removal
             * of elements will have no effect. */
            if (cardinality == 0) break;
        }
    }

    /* Output the content of the resulting set, if not in STORE mode */
    if (!dstkey) {
        addReplyMultiBulkLen(c,cardinality);
        si = setTypeInitIterator(dstset);
        while((ele = setTypeNextObject(si)) != NULL) {
            addReplyBulk(c,ele);
            decrRefCount(ele);
        }
        setTypeReleaseIterator(si);
        decrRefCount(dstset);
    } else {
        /* If we have a target key where to store the resulting set
         * create this key with the result set inside */
        int deleted = dbDelete(c->db,dstkey);//dstkey已存在直接删除
        if (setTypeSize(dstset) > 0) {
            dbAdd(c->db,dstkey,dstset);
            addReplyLongLong(c,setTypeSize(dstset));
            notifyKeyspaceEvent(REDIS_NOTIFY_SET,
                op == REDIS_OP_UNION ? "sunionstore" : "sdiffstore",
                dstkey,c->db->id);
        } else {
            decrRefCount(dstset);
            addReply(c,shared.czero);
            if (deleted)
                notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
                    dstkey,c->db->id);
        }
        signalModifiedKey(c->db,dstkey);
        server.dirty++;
    }
    zfree(sets);
}

/*SUNION key [key..]*/
void sunionCommand(redisClient *c) {//计算所有给定集合的并集
    sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_UNION);
}

/*SUNIONSTORE destination key [key..]*/
void sunionstoreCommand(redisClient *c) {//计算所有给定集合的并集,当将结果存储在destination中
    sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_UNION);
}

/*SDIFF key [key...]*/
void sdiffCommand(redisClient *c) {//计算第一个集合与另外所有集合的差集
    sunionDiffGenericCommand(c,c->argv+1,c->argc-1,NULL,REDIS_OP_DIFF);
}

/*SDIFFSTORE destination key [key...]*/
void sdiffstoreCommand(redisClient *c) {//与SDIFF类似,但将结果存储在destination中
    sunionDiffGenericCommand(c,c->argv+2,c->argc-2,c->argv[1],REDIS_OP_DIFF);
}

小结

集合是Redis中重要的数据类型,其存储使用intset与hash table(dict)两种数据结构,集合的所有指令都比较简单易懂,集合求差算法的两种优化方式可以学习。

集合所有指令的注解http://redis.io/commands#set

感谢黄健宏(huangz1990)的Redis设计与实现及其他对Redis2.6源码的相关注释对我在研究Redis2.8源码方面的帮助。

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn