ホームページ  >  記事  >  データベース  >  Python を使用して Redis クラスターをすばやく構築する方法

Python を使用して Redis クラスターをすばやく構築する方法

WBOY
WBOY転載
2023-05-26 15:56:36944ブラウズ

Redis 通信プロトコル

次のプログラムを理解しやすくするために、主要なポイントを列挙します。

Redis は、TCP ポート 6379 (デフォルトのポート、構成で変更可能) で受信接続を監視します。クライアントとサーバー間で送信されるすべての Redis コマンドまたはデータは rn で終わります。

Reply (サーバーがクライアントから回復できるプロトコル)

Redis は、さまざまな応答タイプでコマンドに応答します。サーバーによって送信された最初のバイトから始まる応答タイプをチェックする場合があります:

* 単一行の応答 (ステータス応答) の場合、応答の最初のバイトは “ ”

## になります。 # * エラー メッセージ、応答の最初のバイトは “-”

* 整数、応答の最初のバイトは “:”

* バッチ応答、返信の最初のバイトは “$”

になります * 複数のバッチ返信、返信の最初のバイトは “*”

になります 一括文字列

一括応答は、単一のバイナリの安全な文字列を返すためにサーバーによって使用されます。

C: GET mykey

S: $6rnfoobarrn

サーバーは、「$」で始まり、実際に送信されるバイトが続く応答の最初の行を送信します。番号の後に CRLF が続き、実際のデータが送信され、その後に最後の CRLF 用の 2 バイトの追加データが送信されます。サーバーによって送信される正確なシーケンスは次のとおりです: "$6rnfoobarrn"

要求された値が存在しない場合、バッチ応答ではデータ長として特別な値 -1 が使用されます。例:

C: GET nonexistingkey

S: $-1

要求されたオブジェクトが存在しない場合、クライアント ライブラリ API は空の文字列ではなく、空のオブジェクトを返します。たとえば、Ruby ライブラリは「nil」を返しますが、C ライブラリは NULL を返します (または応答オブジェクトに指定されたフラグを設定します)。

バイナリ

バイナリとは簡単に言うと「含まれる」という意味なので、C言語で処理する場合はstrlenやstrcpyなどのstr関数が使用されるため使用できません。文字列を判定する。

redis クラスター

超簡単な redis クラスター構築方法

公式サイトでも redis クラスターの構築方法が紹介されていますが、centos6.5 を使っているので試すのは面倒です. 新しい centos を使用する場合は、より良いかもしれません。

Redis クラスターのデータシャーディング

Redis クラスターは一貫したハッシュを使用しませんが、ハッシュ スロットの概念を導入します。

Redis クラスターには 16384 個のハッシュ スロットがあり、各キーがチェックされますCRC16 チェックを通過した後、どのスロットに配置するかを決定するモジュロ 16384。クラスター内の各ノードは、ハッシュ スロットの一部を担当します。たとえば、現在のクラスターに 3 つのノードがある場合、:

*ノード A にはハッシュ スロット 0 ~ 5500 が含まれます。

* ノード B にはハッシュ スロット 5501 ~ 11000 が含まれます。

* ノード C にはハッシュ スロット 11001 ~ 16384 が含まれます。

この構造は次のようになります。ノードの追加や削除は簡単です。たとえば、新しいノード D を追加したい場合は、ノード A、B、C から D にスロットを取得する必要があります。ノード A を削除したい場合は、 A のスロットがノード B と C に移動され、その後、スロットのない A ノードがクラスターから削除されます。ハッシュ スロットをあるノードから別のノードに移動しても、何を追加してもサービスは停止しません。ノードのハッシュ スロットの数によってクラスターが使用できなくなることはありません。

Redis クラスター プロトコルのクライアント側とサーバー側

Redis クラスターでは、ノードがデータの保存と記録を担当します。クラスターのステータス (正しいノードへのキー値のマッピングを含む)。クラスター ノードは、他のノードを自動的に検出したり、正しく動作していないノードを検出したり、必要に応じてスレーブ ノードからマスター ノードを選択したりすることもできます。

これらのタスクを実行するために、すべてのクラスター ノードは TCP 接続 (TCP バス?) およびバイナリ プロトコル (クラスター接続、クラスター バス) を介して通信します。各ノードは、クラスター バスを介してクラスター内の他のすべてのノードに接続されます。ノードはゴシップ プロトコルを使用してクラスター情報を広めます。これにより、新しいノードの検出、ping パケットの送信 (すべてのノードが適切に動作していることを確認するために使用)、および特定の状況が発生した場合のクラスター メッセージの送信が可能になります。クラスター接続は、クラスター内のメッセージをパブリッシュまたはサブスクライブするためにも使用されます。

クラスター ノードはリクエストをプロキシできないため、クライアントはリダイレクト エラー -MOVED および -ASK を受信するとコマンドを他のノードにリダイレクトします。理論的には、クライアントはクラスター内のすべてのノードにリクエストを自由に送信し、必要に応じてリクエストを他のノードにリダイレクトできるため、クライアントはクラスターの状態を保存する必要がありません。ただし、クライアントはキー値とノード間のマッピング関係をキャッシュできるため、コマンド実行の効率が大幅に向上します。

-MOVED

簡単に言うと、-MOVED が返されると、クライアントはノード A に接続してキーの処理を要求しますが、実際にはキーは実際にはノード B にあるため、-MOVEDプロトコルは次のとおりです:- MOVED 3999 127.0.0.1:6381

-ASK の状況を考慮する必要はありません。

C 言語による redis クライアントの実装

コードは次のとおりです:

#include <string.h>#include <sys/socket.h>#include <arpa/inet.h>#include <errno.h>#include <fcntl.h>#include <netdb.h>#include <sys/poll.h>#include <unistd.h>#include <sys/types.h>#include <stdlib.h>#include <stdio.h>ssize_t sock_write_loop( int fd, const void *vptr, size_t n )
{
    size_t nleft = 0;
    ssize_t nwritten = 0;const char *ptr;

    ptr = (char *) vptr;
    nleft = n;while( nleft > 0 )
    {if( (nwritten = write(fd, ptr, nleft) ) <= 0 )
        {if( errno == EINTR )
            {
                nwritten = 0;  //再次调用write            }else{return -5;
            }
        }
        nleft = nleft - nwritten;
        ptr = ptr + nwritten;
    }return(n);
}int sock_read_wait( int fd, int timeout )
{struct pollfd pfd;

    pfd.fd = fd;
    pfd.events = POLLIN;
    pfd.revents = 0;

    timeout *= 1000;for (;;) 
    {switch( poll(&pfd, 1, timeout) ) 
        {case -1:if( errno != EINTR ) 
                {return (-2);
                }continue;case 0:
                errno = ETIMEDOUT;return (-1);default:if( pfd.revents & POLLIN )return (0);elsereturn (-3);
        }
    }

}

ssize_t sock_read_tmo( int fd, void *vptr, size_t len, int timeout )
{   if( timeout > 0 && sock_read_wait(fd, timeout) < 0 )return (-1);elsereturn (read(fd, vptr, len));

}int sock_connect_nore(const char *IPaddr , int port , int timeout)
{   // char temp[4096];int sock_fd = 0, n = 0, errcode = 0;struct sockaddr_in servaddr;if( IPaddr == NULL )
    {return -1;
    }if( (sock_fd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 )
    {return -1;
    }

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port   = htons(port);//changed by navy 2003.3.3 for support domain addr//if( (servaddr.sin_addr.s_addr = inet_addr(IPaddr) ) == -1 )if( (errcode = inet_pton(AF_INET, IPaddr, &servaddr.sin_addr) ) <= 0 )
    {//added by navy 2003.3.31 for support domain addrstruct hostent* pHost = NULL, host;char sBuf[2048], sHostIp[17];int h_errnop = 0;

        memset(&host, 0, sizeof(host));
        memset(sBuf, 0, sizeof(sBuf));
        memset(sHostIp, 0 , sizeof(sHostIp));
        pHost = &host;

#ifdef _SOLARIS_PLAT//solarisif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &h_errnop) == NULL) || 
#else//linuxif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &pHost, &h_errnop) != 0) || 
#endif(pHost == NULL) ) 
                {
                close(sock_fd);return -1;
                }if( pHost->h_addrtype != AF_INET && pHost->h_addrtype != AF_INET6 )
                {
                close(sock_fd);return -1;
                }//目前仅取第一个IP地址if( (inet_ntop(pHost->h_addrtype, *(pHost->h_addr_list), sHostIp, sizeof(sHostIp)) ) == NULL )
                {
                    close(sock_fd);return -1;
                }                if( (errcode = inet_pton(AF_INET, sHostIp, &servaddr.sin_addr) ) <= 0 )
                {
                    close(sock_fd);                    return -1;
                }//end added by navy 2003.3.31    }if( (errcode = sock_timed_connect(sock_fd, (struct sockaddr *)&servaddr, sizeof(servaddr), timeout) ) < 0 )
    {
        close(sock_fd);        return -1;
    }return sock_fd;
}int sock_connect(const char *IPaddr , int port , int timeout)
{char temp[4096];int sock_fd = 0, n = 0, errcode = 0;struct sockaddr_in servaddr;if( IPaddr == NULL )
    {return -1;
    }if( (sock_fd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 )
    {return -1;
    }

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port   = htons(port);//changed by navy 2003.3.3 for support domain addr//if( (servaddr.sin_addr.s_addr = inet_addr(IPaddr) ) == -1 )if( (errcode = inet_pton(AF_INET, IPaddr, &servaddr.sin_addr) ) <= 0 )
    {//added by navy 2003.3.31 for support domain addrstruct hostent* pHost = NULL, host;char sBuf[2048], sHostIp[17];int h_errnop = 0;

        memset(&host, 0, sizeof(host));
        memset(sBuf, 0, sizeof(sBuf));
        memset(sHostIp, 0 , sizeof(sHostIp));
        pHost = &host;

#ifdef _SOLARIS_PLAT//solarisif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &h_errnop) == NULL) || 
#else//linuxif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &pHost, &h_errnop) != 0) || 
#endif(pHost == NULL) ) 
                {
                close(sock_fd);return -1;
                }if( pHost->h_addrtype != AF_INET && pHost->h_addrtype != AF_INET6 )
                {
                close(sock_fd);return -1;
                }//目前仅取第一个IP地址if( (inet_ntop(pHost->h_addrtype, *(pHost->h_addr_list), sHostIp, sizeof(sHostIp)) ) == NULL )
                {
                    close(sock_fd);return -1;
                }                if( (errcode = inet_pton(AF_INET, sHostIp, &servaddr.sin_addr) ) <= 0 )
                {
                    close(sock_fd);                    return -1;
                }//end added by navy 2003.3.31    }if( (errcode = sock_timed_connect(sock_fd, (struct sockaddr *)&servaddr, sizeof(servaddr), timeout) ) < 0 )
    {
        close(sock_fd);        return -1;
    }

    n = sock_read_tmo(sock_fd, temp, 4096, timeout);//一般错误if( n <= 0 ) 
    {
        close(sock_fd);
        
        sock_fd = -1;
    }return sock_fd;
}int sock_non_blocking(int fd, int on)
{int     flags;if ((flags = fcntl(fd, F_GETFL, 0)) < 0){return -10;
    }if (fcntl(fd, F_SETFL, on ? flags | O_NONBLOCK : flags & ~O_NONBLOCK) < 0){return -10;
    }return 0;
}int sock_write_wait(int fd, int timeout)
{struct pollfd pfd;

    pfd.fd = fd;
    pfd.events = POLLOUT;
    pfd.revents = 0;

    timeout *= 1000;for (;;) 
    {switch( poll(&pfd, 1, timeout) ) 
        {case -1:if( errno != EINTR ) 
            {return (-2);
            }continue;case 0:
            errno = ETIMEDOUT;return (-1);default:if( pfd.revents & POLLOUT )return (0);elsereturn (-3);
        }
    }

}int sock_timed_connect(int sock, struct sockaddr * sa, int len, int timeout)
{int error = 0;
    socklen_t error_len;

    sock_non_blocking(sock, 1);if( connect(sock, sa, len) == 0 )
    {
        sock_non_blocking(sock, 0);return (0);
    }if( errno != EINPROGRESS )
    {
        sock_non_blocking(sock, 0);return (-1);
    }/* * A connection is in progress. Wait for a limited amount of time for
     * something to happen. If nothing happens, report an error.     */if( sock_write_wait(sock, timeout) != 0)
    {
        sock_non_blocking(sock, 0);return (-2);
    }/* * Something happened. Some Solaris 2 versions have getsockopt() itself
     * return the error, instead of returning it via the parameter list.     */error = 0;
    error_len = sizeof(error);if( getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *) &error, &error_len) != 0 )
    {
        sock_non_blocking(sock, 0);return (-3);
    }if( error ) 
    {
        errno = error;
        sock_non_blocking(sock, 0);return (-4);
    }

    sock_non_blocking(sock, 0);/* * No problems.     */return (0);

}static int check_ip_in_list(const char *ip, char *iplist)
{        char *token = NULL;char *saveptr = NULL;
    token = strtok_r(iplist, ",", &saveptr);while(token != NULL)
    {        char *ptmp = NULL;                        char *ip_mask = strtok_r(token, "/", &ptmp);if(!ip_mask)                    return -1;                     char *ip_bit = strtok_r(NULL, "/", &ptmp);        if(ip_bit)
        {int mask_bit = atoi(ip_bit);if(mask_bit < 0 || mask_bit >32)continue;

            unsigned long addr[4] = { 0 };
            sscanf( ip_mask, "%lu.%lu.%lu.%lu", addr, addr + 1, addr + 2, addr + 3 );
            unsigned long vl1 = addr[0] << 24 | addr[1] << 16 | addr[2] << 8 | addr[3];

            sscanf( ip, "%lu.%lu.%lu.%lu", addr, addr + 1, addr + 2, addr + 3 );
            unsigned long vl2 = addr[0] << 24 | addr[1] << 16 | addr[2] << 8 | addr[3];

            vl1 = ( vl1 >> ( 32 - mask_bit ) );
            vl2 = ( vl2 >> ( 32 - mask_bit ) );if( vl1 == vl2 )                        return 1;                          
        }else{if(strcmp(ip,ip_mask) == 0)            return 1;                            
        }                    

        token = strtok_r(NULL, ",", &saveptr);                
    }        return 0;
}static int check_ip_in_redis(const char *redis_host, const char *ip,const char *rq_pro)
{char buf[128];int loops = 0;    

    strcpy(buf, redis_host);    do{
        loops ++;char *ptmp = NULL;char *host = strtok_r(buf, ":", &ptmp);if(!host) return -1;char *s_port = strtok_r(NULL, ":", &ptmp);if(!s_port) return -1;int port = atoi(s_port);char respone[40] = {0};int sock_fd = -1;if((sock_fd = sock_connect_nore(host, port, 5))<0)return -1;if(sock_write_loop(sock_fd, rq_pro, strlen(rq_pro)) != strlen(rq_pro))
        {
            close(sock_fd);return -1;
        }if(sock_read_tmo(sock_fd, respone, sizeof(respone)-1, 5)<=0)
        {
            close(sock_fd);return -1;
        }        if(strncmp(":0", respone, 2) == 0)
        {
            close(sock_fd);return 0;
        }            else if(strncmp(":1", respone, 2) == 0)
        {
            close(sock_fd);return 1;
        }            else if(strncmp("$", respone, 1) == 0)
        {                                    int data_size = 0;   int ret = 0;char *data_line = strstr(respone,"rn");if(!data_line)
            {
                close(sock_fd);return -1;
            }
            data_line = data_line+2;

            data_size = atoi(respone+1);if(data_size == -1)
            {
                close(sock_fd);return 0;
            }if(strlen(data_line) == data_size+2)
            {
                printf("line = %d, data_line = %sn",__LINE__,data_line);
                ret=check_ip_in_list(ip, data_line);
                close(sock_fd);return ret;
            }char *data = calloc(data_size+3,1);if(!data)
            {
                close(sock_fd);return -1;
            }
            strcpy(data,data_line);int read_size = strlen(data);int left_size = data_size + 2 - read_size;while(left_size > 0)
            {int nread = sock_read_tmo(sock_fd, data+read_size, left_size, 5);if(nread<=0)
                {free(data);
                    close(sock_fd);            return -1;
                }
                read_size += nread;
                left_size -= nread;
            }
            close(sock_fd);
            printf("line = %d, data = %sn",__LINE__,data);
            ret=check_ip_in_list(ip, data);free(data);return ret;
        }            else if(strncmp("-MOVED", respone, 6) == 0)
        {
            close(sock_fd);char *p = strchr(respone, &#39; &#39;);if(p == NULL)return -1;

            p = strchr(p+1, &#39; &#39;);if(p == NULL)return -1;

            strcpy(buf, p+1);
        }else{
            close(sock_fd);return -1;
        }            
        
    }while(loops < 2);return -1;
}int main(int argc,char *argv[])
{if(argc != 2)
    {
        printf("please input ipn");return -1;
    }     const char *redis_ip = "127.0.0.1:7002";const char *domain = "test.com";char exist_pro[128] = {0};char get_pro[128] = {0};    
    snprintf(exist_pro,sizeof(exist_pro),"EXISTS test|%s|%srn",domain,"127.0.0.1");        
    snprintf(get_pro,sizeof(get_pro),"GET test_%srn",domain);int loops = 0;int ret = 0;do{
        loops ++;
        ret = check_ip_in_redis(redis_ip, argv[1],exist_pro);if(ret == 0)
            ret = check_ip_in_redis(redis_ip, argv[1],get_pro);
    }while(loops < 3 && ret < 0);

    printf("line = %d, ret = %dn",__LINE__,ret);return ret;
}

c_redis_cli.c

主に check_ip_in_redis 関数に注目し、その他は次のとおりです。すべてのソケットのカプセル化。

Python は Redis クライアントを実装します

#!/usr/bin/pythonimport sys  
import socketdef main(argv):if(len(argv) != 3):print "please input domain ip!"returnhost = "192.168.188.47"   
    port = 7002while 1:
        s = socket.socket()                
        s.connect((host, port))
        
        cmd = &#39;set %s_white_ip %srn&#39; % (argv[1],argv[2])        
        s.send(cmd)
        res = s.recv(32)
        s.close()        
    if res[0] == "+":print "set domain white  ip suc!"return    elif res[0:6] == "-MOVED":
            list = res.split(" ")
            ip_list = list[2].split(":")            
            host = ip_list[0]    
            port = int(ip_list[1])                            else:print "set domain white  ip error!"return                               if __name__ == "__main__":
    main(sys.argv)

以上がPython を使用して Redis クラスターをすばやく構築する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事はyisu.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。