Maison  >  Article  >  base de données  >  Comment créer rapidement un cluster Redis en utilisant Python

Comment créer rapidement un cluster Redis en utilisant Python

WBOY
WBOYavant
2023-05-26 15:56:36944parcourir

 Redis Communication Protocol

 Les principaux points sont répertoriés pour faciliter la compréhension du programme suivant.

Redis surveille les connexions entrantes sur le port TCP 6379 (port par défaut, qui peut être modifié dans la configuration). Chaque commande Redis ou donnée transmise entre le client et le serveur se termine par rn.

Répondre (un protocole que le serveur peut récupérer auprès du client)

Redis répond aux commandes avec différents types de réponse. Il peut vérifier le type de réponse à partir du premier octet envoyé par le serveur :

* Avec une réponse sur une seule ligne (réponse d'état), le premier octet de la réponse sera “+»#🎜 🎜#

 * Message d'erreur, le premier octet de la réponse sera "-"

 * Nombre entier, le premier octet de la réponse sera ":" ;

# 🎜🎜# * Pour les réponses par lots, le premier octet de la réponse sera "$”

" * Pour les réponses par lots multiples, le premier octet de la réponse sera “*»#🎜 🎜#

 Chaînes en masse

Les réponses en masse sont utilisées par le serveur pour renvoyer une seule chaîne binaire sécurisée.

C: GET mykey

S: $6rnfoobarrn

Le serveur envoie la première ligne de réponse, qui commence par "$" et est suivie par Le nombre réel d'octets à envoyer, suivi du CRLF, puis des données réelles, suivi de 2 octets de données supplémentaires pour le CRLF final. La séquence exacte envoyée par le serveur est la suivante :

 "$6rnfoobarrn"

Si la valeur demandée n'existe pas, la réponse batch utilisera la valeur spéciale -1 comme la longueur des données, par exemple :

C: GET nonexistingkey

S: $-1

Lorsque l'objet demandé n'existe pas, le client L'API de la bibliothèque ne renverra pas la chaîne de caractères nulle, un objet vide sera renvoyé. Par exemple : la bibliothèque Ruby renvoie "nil", tandis que la bibliothèque C renvoie NULL (ou définit l'indicateur spécifié dans l'objet de réponse), etc.

 Binary

Pour faire simple, binaire signifie qu'il contiendra, donc lorsque le langage C est en cours de traitement, vous ne pouvez pas utiliser de fonctions str, telles que strlen, strcpy, etc., car ils servent à déterminer la fin de la chaîne.

cluster redis

Super simple à construire un cluster redis

Le site officiel présente également comment créer un cluster redis C'est plus difficile à réaliser. essayez car centos6 est utilisé 5. Cela peut être mieux si vous utilisez le centos le plus récent.

Le partage des données du cluster Redis

Le cluster Redis n'utilise pas de hachage cohérent, mais introduit le concept d'emplacement de hachage

Cluster Redis Il y en a 16384. Emplacements de hachage. Chaque clé est vérifiée par CRC16 et le modulo 16384 est utilisé pour déterminer quel emplacement placer. Chaque nœud du cluster est responsable d'une partie de l'emplacement de hachage. Par exemple, si le cluster actuel a 3 nœuds, alors : 🎜🎜#

 * Le nœud A contient les emplacements de hachage 0 à 5500.

 * Le nœud B contient les emplacements de hachage 5501 à 11000.

 * Le nœud C contient les emplacements de hachage 11 001 à 16384.

Cette structure facilite l'ajout ou la suppression de nœuds. Par exemple, si je souhaite ajouter un nouveau nœud D, je dois obtenir des emplacements des nœuds A, B et C. vers D. Si je souhaite supprimer le nœud A, je dois déplacer les emplacements de A vers les nœuds B et C, puis supprimer le nœud A sans aucun emplacement du cluster puisque l'emplacement de hachage est supprimé d'un nœud en passant à un autre. Le nœud n'arrêtera pas le service, donc l'ajout, la suppression ou la modification du nombre d'emplacements de hachage d'un nœud n'entraînera pas l'indisponibilité du cluster

 Clients dans le protocole du cluster Redis Fin et côté serveur #🎜. 🎜#

Dans le cluster Redis, les nœuds sont responsables du stockage des données et de l'enregistrement de l'état du cluster (y compris le mappage des valeurs clés vers les nœuds corrects). Les nœuds de cluster peuvent également découvrir automatiquement d'autres nœuds, détecter les nœuds qui ne fonctionnent pas correctement et élire des nœuds maîtres parmi les nœuds esclaves si nécessaire.

Pour réaliser ces tâches, tous les nœuds du cluster communiquent via une connexion TCP (bus TCP ?) et un protocole binaire (connexion cluster, bus cluster). Chaque nœud est connecté à tous les autres nœuds du cluster via un bus de cluster. Les nœuds utilisent un protocole Gossip pour diffuser les informations du cluster, qui peuvent : découvrir de nouveaux nœuds, envoyer des paquets ping (utilisés pour garantir que tous les nœuds fonctionnent correctement) et envoyer des messages de cluster lorsque certaines situations se produisent. Les connexions de cluster sont également utilisées pour publier ou s'abonner à des messages dans le cluster.

Étant donné que les nœuds du cluster ne peuvent pas proxy les requêtes, le client redirigera la commande vers d'autres nœuds lors de la réception des erreurs de redirection -MOVED et -ASK. Théoriquement, le client est libre d'envoyer des requêtes à tous les nœuds du cluster et de rediriger les requêtes vers d'autres nœuds si nécessaire, de sorte que le client n'a pas besoin de sauvegarder l'état du cluster. Cependant, le client peut mettre en cache la relation de mappage entre les valeurs clés et les nœuds, ce qui peut améliorer considérablement l'efficacité de l'exécution des commandes.

 -MOVED

En termes simples, lorsque -MOVED est renvoyé, c'est que le client se connecte au nœud A et demande de traiter la clé, mais en fait la clé est en fait sur le nœud B, il renvoie donc -MOVED L'accord est le suivant : -MOVED 3999 127.0.0.1:6381

.

Pas besoin de considérer la situation de -ASK.

  Client Redis implémenté en langage C

 Le code est le suivant :

#include <string.h>#include <sys/socket.h>#include <arpa/inet.h>#include <errno.h>#include <fcntl.h>#include <netdb.h>#include <sys/poll.h>#include <unistd.h>#include <sys/types.h>#include <stdlib.h>#include <stdio.h>ssize_t sock_write_loop( int fd, const void *vptr, size_t n )
{
    size_t nleft = 0;
    ssize_t nwritten = 0;const char *ptr;

    ptr = (char *) vptr;
    nleft = n;while( nleft > 0 )
    {if( (nwritten = write(fd, ptr, nleft) ) <= 0 )
        {if( errno == EINTR )
            {
                nwritten = 0;  //再次调用write            }else{return -5;
            }
        }
        nleft = nleft - nwritten;
        ptr = ptr + nwritten;
    }return(n);
}int sock_read_wait( int fd, int timeout )
{struct pollfd pfd;

    pfd.fd = fd;
    pfd.events = POLLIN;
    pfd.revents = 0;

    timeout *= 1000;for (;;) 
    {switch( poll(&pfd, 1, timeout) ) 
        {case -1:if( errno != EINTR ) 
                {return (-2);
                }continue;case 0:
                errno = ETIMEDOUT;return (-1);default:if( pfd.revents & POLLIN )return (0);elsereturn (-3);
        }
    }

}

ssize_t sock_read_tmo( int fd, void *vptr, size_t len, int timeout )
{   if( timeout > 0 && sock_read_wait(fd, timeout) < 0 )return (-1);elsereturn (read(fd, vptr, len));

}int sock_connect_nore(const char *IPaddr , int port , int timeout)
{   // char temp[4096];int sock_fd = 0, n = 0, errcode = 0;struct sockaddr_in servaddr;if( IPaddr == NULL )
    {return -1;
    }if( (sock_fd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 )
    {return -1;
    }

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port   = htons(port);//changed by navy 2003.3.3 for support domain addr//if( (servaddr.sin_addr.s_addr = inet_addr(IPaddr) ) == -1 )if( (errcode = inet_pton(AF_INET, IPaddr, &servaddr.sin_addr) ) <= 0 )
    {//added by navy 2003.3.31 for support domain addrstruct hostent* pHost = NULL, host;char sBuf[2048], sHostIp[17];int h_errnop = 0;

        memset(&host, 0, sizeof(host));
        memset(sBuf, 0, sizeof(sBuf));
        memset(sHostIp, 0 , sizeof(sHostIp));
        pHost = &host;

#ifdef _SOLARIS_PLAT//solarisif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &h_errnop) == NULL) || 
#else//linuxif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &pHost, &h_errnop) != 0) || 
#endif(pHost == NULL) ) 
                {
                close(sock_fd);return -1;
                }if( pHost->h_addrtype != AF_INET && pHost->h_addrtype != AF_INET6 )
                {
                close(sock_fd);return -1;
                }//目前仅取第一个IP地址if( (inet_ntop(pHost->h_addrtype, *(pHost->h_addr_list), sHostIp, sizeof(sHostIp)) ) == NULL )
                {
                    close(sock_fd);return -1;
                }                if( (errcode = inet_pton(AF_INET, sHostIp, &servaddr.sin_addr) ) <= 0 )
                {
                    close(sock_fd);                    return -1;
                }//end added by navy 2003.3.31    }if( (errcode = sock_timed_connect(sock_fd, (struct sockaddr *)&servaddr, sizeof(servaddr), timeout) ) < 0 )
    {
        close(sock_fd);        return -1;
    }return sock_fd;
}int sock_connect(const char *IPaddr , int port , int timeout)
{char temp[4096];int sock_fd = 0, n = 0, errcode = 0;struct sockaddr_in servaddr;if( IPaddr == NULL )
    {return -1;
    }if( (sock_fd = socket(AF_INET, SOCK_STREAM, 0) ) < 0 )
    {return -1;
    }

    memset(&servaddr, 0, sizeof(servaddr));
    servaddr.sin_family = AF_INET;
    servaddr.sin_port   = htons(port);//changed by navy 2003.3.3 for support domain addr//if( (servaddr.sin_addr.s_addr = inet_addr(IPaddr) ) == -1 )if( (errcode = inet_pton(AF_INET, IPaddr, &servaddr.sin_addr) ) <= 0 )
    {//added by navy 2003.3.31 for support domain addrstruct hostent* pHost = NULL, host;char sBuf[2048], sHostIp[17];int h_errnop = 0;

        memset(&host, 0, sizeof(host));
        memset(sBuf, 0, sizeof(sBuf));
        memset(sHostIp, 0 , sizeof(sHostIp));
        pHost = &host;

#ifdef _SOLARIS_PLAT//solarisif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &h_errnop) == NULL) || 
#else//linuxif( (gethostbyname_r(IPaddr, pHost, sBuf, sizeof(sBuf), &pHost, &h_errnop) != 0) || 
#endif(pHost == NULL) ) 
                {
                close(sock_fd);return -1;
                }if( pHost->h_addrtype != AF_INET && pHost->h_addrtype != AF_INET6 )
                {
                close(sock_fd);return -1;
                }//目前仅取第一个IP地址if( (inet_ntop(pHost->h_addrtype, *(pHost->h_addr_list), sHostIp, sizeof(sHostIp)) ) == NULL )
                {
                    close(sock_fd);return -1;
                }                if( (errcode = inet_pton(AF_INET, sHostIp, &servaddr.sin_addr) ) <= 0 )
                {
                    close(sock_fd);                    return -1;
                }//end added by navy 2003.3.31    }if( (errcode = sock_timed_connect(sock_fd, (struct sockaddr *)&servaddr, sizeof(servaddr), timeout) ) < 0 )
    {
        close(sock_fd);        return -1;
    }

    n = sock_read_tmo(sock_fd, temp, 4096, timeout);//一般错误if( n <= 0 ) 
    {
        close(sock_fd);
        
        sock_fd = -1;
    }return sock_fd;
}int sock_non_blocking(int fd, int on)
{int     flags;if ((flags = fcntl(fd, F_GETFL, 0)) < 0){return -10;
    }if (fcntl(fd, F_SETFL, on ? flags | O_NONBLOCK : flags & ~O_NONBLOCK) < 0){return -10;
    }return 0;
}int sock_write_wait(int fd, int timeout)
{struct pollfd pfd;

    pfd.fd = fd;
    pfd.events = POLLOUT;
    pfd.revents = 0;

    timeout *= 1000;for (;;) 
    {switch( poll(&pfd, 1, timeout) ) 
        {case -1:if( errno != EINTR ) 
            {return (-2);
            }continue;case 0:
            errno = ETIMEDOUT;return (-1);default:if( pfd.revents & POLLOUT )return (0);elsereturn (-3);
        }
    }

}int sock_timed_connect(int sock, struct sockaddr * sa, int len, int timeout)
{int error = 0;
    socklen_t error_len;

    sock_non_blocking(sock, 1);if( connect(sock, sa, len) == 0 )
    {
        sock_non_blocking(sock, 0);return (0);
    }if( errno != EINPROGRESS )
    {
        sock_non_blocking(sock, 0);return (-1);
    }/* * A connection is in progress. Wait for a limited amount of time for
     * something to happen. If nothing happens, report an error.     */if( sock_write_wait(sock, timeout) != 0)
    {
        sock_non_blocking(sock, 0);return (-2);
    }/* * Something happened. Some Solaris 2 versions have getsockopt() itself
     * return the error, instead of returning it via the parameter list.     */error = 0;
    error_len = sizeof(error);if( getsockopt(sock, SOL_SOCKET, SO_ERROR, (char *) &error, &error_len) != 0 )
    {
        sock_non_blocking(sock, 0);return (-3);
    }if( error ) 
    {
        errno = error;
        sock_non_blocking(sock, 0);return (-4);
    }

    sock_non_blocking(sock, 0);/* * No problems.     */return (0);

}static int check_ip_in_list(const char *ip, char *iplist)
{        char *token = NULL;char *saveptr = NULL;
    token = strtok_r(iplist, ",", &saveptr);while(token != NULL)
    {        char *ptmp = NULL;                        char *ip_mask = strtok_r(token, "/", &ptmp);if(!ip_mask)                    return -1;                     char *ip_bit = strtok_r(NULL, "/", &ptmp);        if(ip_bit)
        {int mask_bit = atoi(ip_bit);if(mask_bit < 0 || mask_bit >32)continue;

            unsigned long addr[4] = { 0 };
            sscanf( ip_mask, "%lu.%lu.%lu.%lu", addr, addr + 1, addr + 2, addr + 3 );
            unsigned long vl1 = addr[0] << 24 | addr[1] << 16 | addr[2] << 8 | addr[3];

            sscanf( ip, "%lu.%lu.%lu.%lu", addr, addr + 1, addr + 2, addr + 3 );
            unsigned long vl2 = addr[0] << 24 | addr[1] << 16 | addr[2] << 8 | addr[3];

            vl1 = ( vl1 >> ( 32 - mask_bit ) );
            vl2 = ( vl2 >> ( 32 - mask_bit ) );if( vl1 == vl2 )                        return 1;                          
        }else{if(strcmp(ip,ip_mask) == 0)            return 1;                            
        }                    

        token = strtok_r(NULL, ",", &saveptr);                
    }        return 0;
}static int check_ip_in_redis(const char *redis_host, const char *ip,const char *rq_pro)
{char buf[128];int loops = 0;    

    strcpy(buf, redis_host);    do{
        loops ++;char *ptmp = NULL;char *host = strtok_r(buf, ":", &ptmp);if(!host) return -1;char *s_port = strtok_r(NULL, ":", &ptmp);if(!s_port) return -1;int port = atoi(s_port);char respone[40] = {0};int sock_fd = -1;if((sock_fd = sock_connect_nore(host, port, 5))<0)return -1;if(sock_write_loop(sock_fd, rq_pro, strlen(rq_pro)) != strlen(rq_pro))
        {
            close(sock_fd);return -1;
        }if(sock_read_tmo(sock_fd, respone, sizeof(respone)-1, 5)<=0)
        {
            close(sock_fd);return -1;
        }        if(strncmp(":0", respone, 2) == 0)
        {
            close(sock_fd);return 0;
        }            else if(strncmp(":1", respone, 2) == 0)
        {
            close(sock_fd);return 1;
        }            else if(strncmp("$", respone, 1) == 0)
        {                                    int data_size = 0;   int ret = 0;char *data_line = strstr(respone,"rn");if(!data_line)
            {
                close(sock_fd);return -1;
            }
            data_line = data_line+2;

            data_size = atoi(respone+1);if(data_size == -1)
            {
                close(sock_fd);return 0;
            }if(strlen(data_line) == data_size+2)
            {
                printf("line = %d, data_line = %sn",__LINE__,data_line);
                ret=check_ip_in_list(ip, data_line);
                close(sock_fd);return ret;
            }char *data = calloc(data_size+3,1);if(!data)
            {
                close(sock_fd);return -1;
            }
            strcpy(data,data_line);int read_size = strlen(data);int left_size = data_size + 2 - read_size;while(left_size > 0)
            {int nread = sock_read_tmo(sock_fd, data+read_size, left_size, 5);if(nread<=0)
                {free(data);
                    close(sock_fd);            return -1;
                }
                read_size += nread;
                left_size -= nread;
            }
            close(sock_fd);
            printf("line = %d, data = %sn",__LINE__,data);
            ret=check_ip_in_list(ip, data);free(data);return ret;
        }            else if(strncmp("-MOVED", respone, 6) == 0)
        {
            close(sock_fd);char *p = strchr(respone, &#39; &#39;);if(p == NULL)return -1;

            p = strchr(p+1, &#39; &#39;);if(p == NULL)return -1;

            strcpy(buf, p+1);
        }else{
            close(sock_fd);return -1;
        }            
        
    }while(loops < 2);return -1;
}int main(int argc,char *argv[])
{if(argc != 2)
    {
        printf("please input ipn");return -1;
    }     const char *redis_ip = "127.0.0.1:7002";const char *domain = "test.com";char exist_pro[128] = {0};char get_pro[128] = {0};    
    snprintf(exist_pro,sizeof(exist_pro),"EXISTS test|%s|%srn",domain,"127.0.0.1");        
    snprintf(get_pro,sizeof(get_pro),"GET test_%srn",domain);int loops = 0;int ret = 0;do{
        loops ++;
        ret = check_ip_in_redis(redis_ip, argv[1],exist_pro);if(ret == 0)
            ret = check_ip_in_redis(redis_ip, argv[1],get_pro);
    }while(loops < 3 && ret < 0);

    printf("line = %d, ret = %dn",__LINE__,ret);return ret;
}

c_redis_cli.c

 Cela dépend principalement de la fonction check_ip_in_redis, et les autres sont de l'encapsulation de socket.

 Python implémente le client Redis

#!/usr/bin/pythonimport sys  
import socketdef main(argv):if(len(argv) != 3):print "please input domain ip!"returnhost = "192.168.188.47"   
    port = 7002while 1:
        s = socket.socket()                
        s.connect((host, port))
        
        cmd = &#39;set %s_white_ip %srn&#39; % (argv[1],argv[2])        
        s.send(cmd)
        res = s.recv(32)
        s.close()        
    if res[0] == "+":print "set domain white  ip suc!"return    elif res[0:6] == "-MOVED":
            list = res.split(" ")
            ip_list = list[2].split(":")            
            host = ip_list[0]    
            port = int(ip_list[1])                            else:print "set domain white  ip error!"return                               if __name__ == "__main__":
    main(sys.argv)

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer