Maison  >  Article  >  développement back-end  >  Rb (redis blaster), une bibliothèque Python qui implémente le partitionnement non répliqué pour Redis

Rb (redis blaster), une bibliothèque Python qui implémente le partitionnement non répliqué pour Redis

WBOY
WBOYavant
2023-04-11 19:27:271696parcourir

Rb, redis blaster, est une bibliothèque qui implémente le partitionnement non répliqué pour Redis. Il implémente un système de routage personnalisé au-dessus de Python Redis, vous permettant de cibler automatiquement différents serveurs sans avoir à acheminer manuellement les requêtes vers des nœuds individuels.

Il n'implémente pas toutes les fonctionnalités de Redis et ne tente pas de le faire. Vous pouvez toujours connecter un client à un hôte spécifique, mais la plupart supposent que vos opérations sont limitées à des opérations clé/valeur de base qui peuvent être automatiquement acheminées vers différents nœuds.

Ce que vous pouvez faire :

  • Effectuer automatiquement des opérations à une seule touche sur l'hôte.
  • Exécuter des commandes sur tout ou partie des nœuds.
  • Exécutez tout cela en parallèle.

Installation

rb est disponible sur PyPI et peut être installé à partir de là :

$ pip install rb

Configuration

Démarrer avec rb est très simple. Si vous avez déjà utilisé py-redis, vous vous sentirez comme chez vous. La principale différence est qu'au lieu de se connecter à un seul hôte, le cluster est configuré pour se connecter à plusieurs :

rom rb import Cluster

cluster = Cluster(hosts={
0: {'port': 6379},
1: {'port': 6380},
2: {'port': 6381},
3: {'port': 6382},
4: {'port': 6379},
5: {'port': 6380},
6: {'port': 6381},
7: {'port': 6382},
}, host_defaults={
'host': '127.0.0.1',
})

Dans ce cas, nous avons 8 nœuds configurés sur quatre processus serveur différents sur le même hôte. Le paramètre hosts est une carte des hôtes auxquels se connecter. La clé du dictionnaire est l'ID d'hôte (entier) et la valeur est le dictionnaire de paramètres. host_defaults est un dictionnaire de valeurs par défaut facultatives renseignées pour tous les hôtes. Ceci est utile si vous souhaitez partager certaines valeurs par défaut communes qui sont répétées (dans ce cas, tous les hôtes se connectent à localhost).

Dans la configuration par défaut, PartitionRouter est utilisé pour le routage.

Routing

Maintenant que le cluster est construit, nous pouvons utiliser Cluster.get_routing_client() pour obtenir un client Redis qui acheminera automatiquement chaque commande vers le bon nœud Redis :

client = cluster.get_routing_client()
results = {}
for key in keys_to_look_up:
results[key] = client.get(key)

Le client fonctionne de la même manière que le pyredis StrictClient standard est très similaire, la principale différence est qu'il ne peut exécuter que des commandes impliquant une seule touche.

Cependant, cette opération de base s'exécute en série. Ce qui rend rb utile, c'est qu'il peut créer automatiquement des pipelines Redis et envoyer des requêtes à de nombreux hôtes en parallèle. Cependant, cela modifie légèrement l'utilisation, car désormais la valeur n'est pas immédiatement disponible :

results = {}
with cluster.map() as client:
for key in keys_to_look_up:
results[key] = client.get(key)

Bien que cela semble similaire jusqu'à présent, au lieu de stocker la valeur réelle dans le dictionnaire de résultats, un objet Promise est stocké. Lorsque le gestionnaire de contexte de carte se termine, elles sont garanties d'avoir été exécutées, vous pouvez accéder à la propriété Promise.value pour obtenir la valeur :

for key, promise in results.iteritems():
print '%s: %s' % (key, promise.value)

Si vous souhaitez envoyer une commande à tous les hôtes participants (par exemple supprimer une base de données), vous peut utiliser la méthode Cluster.all() :

with cluster.all() as client:
client.flushdb()

Si vous faites cela, la valeur promise est un dictionnaire avec l'ID d'hôte comme clé et le résultat comme valeur. Par exemple :

with cluster.all() as client:
results = client.info()
for host_id, info in results.iteritems():
print 'host %s is running %s' % (host_id, info['os'])

Pour cibler explicitement certains hôtes, vous pouvez utiliser Cluster.fanout() pour accepter une liste d'ID d'hôte auxquels envoyer la commande.

API

Ceci est une référence complète de l'API publique. Notez que cette bibliothèque étend la bibliothèque Python redis, donc certaines classes ont plus de fonctionnalités, vous devrez consulter la bibliothèque py-redis.

Cluster

class rb.Cluster(hosts, host_defaults=None, pool_cls=None, pool_options=None, router_cls=None, router_options=None)

cluster est l'objet principal derrière rb. Il est enregistré dans le pool de connexions de chaque nœud et peut être partagé dans un emplacement central pendant l'exécution de l'application.

Exemple de base d'un cluster sur quatre instances Redis avec routeur par défaut :

cluster = Cluster(hosts={
0: {'port': 6379},
1: {'port': 6380},
2: {'port': 6381},
3: {'port': 6382},
}, host_defaults={
'host': '127.0.0.1',
})

hosts est un dictionnaire d'hôtes qui mappe les numéros d'ID d'hôte aux paramètres de configuration. Les paramètres correspondent à la signature de la fonction add_host(). Les valeurs par défaut de ces paramètres sont extraites de host_defaults. Pour remplacer la classe pool, utilisez les paramètres pool_cls et pool_options. La même chose s'applique aux router_cls et router_options du routeur. L'option pool est utile pour définir les délais d'attente des sockets et des paramètres similaires.

  • add_host(host_id=None, host='localhost', port=6379, unix_socket_path=None, db=0, password=None, ssl=False, ssl_options=None)

Ajoutez un nouvel hôte au cluster. Ceci n'est vraiment utile que pour les tests unitaires, car généralement les hôtes sont ajoutés via le constructeur et il est peu probable qu'il soit logique de les modifier après la première utilisation du cluster.

  • all(timeout=None, max_concurrency=64, auto_batch=True)

fanout vers tous les hôtes. Sinon identique à fanout().

Exemple :

with cluster.all() as client:
client.flushdb()
  • disconnect_pools()

Déconnecte toutes les connexions aux pools internes.

  • execute_commands(mapping, *args, **kwargs)

Exécute simultanément une série de commandes associées à la clé de routage sur le cluster Redis, renvoyant un nouveau mappage où la valeur est une liste de résultats correspondant à la commande à le même endroit. Par exemple :

>>> cluster.execute_commands({
... 'foo': [
... ('PING',),
... ('TIME',),
... ],
... 'bar': [
... ('CLIENT', 'GETNAME'),
... ],
... })
{'bar': [<Promise None>],
 'foo': [<Promise True>, <Promise (1454446079, 418404)>]}
  • Les commandes qui sont des instances de redis.client.Script vérifieront d'abord leur existence sur le nœud cible, puis seront chargées sur la cible avant l'exécution, et pourront être entrelacées avec d'autres commandes :
>>> from redis.client import Script
>>> TestScript = Script(None, 'return {KEYS, ARGV}')
>>> cluster.execute_commands({
... 'foo': [
... (TestScript, ('key:1', 'key:2'), range(0, 3)),
... ],
... 'bar': [
... (TestScript, ('key:3', 'key:4'), range(3, 6)),
... ],
... })
{'bar': [<Promise [['key:3', 'key:4'], ['3', '4', '5']]>],
 'foo': [<Promise [['key:1', 'key:2'], ['0', '1', '2']]>]}

En interne , FanoutClient Utilisé pour émettre des commandes.

  • fanout(hosts=None, timeout=None, max_concurrency=64, auto_batch=True)

用于获取路由客户端、开始扇出操作并 join 结果的快捷上下文管理器。

在上下文管理器中,可用的客户端是 FanoutClient。示例用法:

with cluster.fanout(hosts='all') as client: client.flushdb()
get_local_client(host_id)
  • get_local_client(host_id)

返回特定主机 ID 的本地化 client。这个 client 就像一个普通的 Python redis 客户端一样工作,并立即返回结果。

  • get_local_client_for_key(key)

类似于 get_local_client_for_key() 但根据 router 所说的 key 目的地返回 client。

  • get_pool_for_host(host_id)

返回给定主机的连接池。

redis 客户端使用此连接池来确保它不必不断地重新连接。如果要使用自定义 redis 客户端,可以手动将其作为连接池传入。

  • get_router()

返回 cluster 的 router 。如果 cluster 重新配置,router 将被重新创建。通常,您不需要自己与 router 交互,因为集群的路由客户端会自动执行此操作。

这将返回 BaseRouter 的一个实例。

  • get_routing_client(auto_batch=True)

返回一个路由客户端。该客户端能够自动将请求路由到各个主机。它是线程安全的,可以类似于主机本地客户端使用,但它会拒绝执行无法直接路由到单个节点的命令。

路由客户端的默认行为是尝试将符合条件的命令批处理成批处理版本。例如,路由到同一节点的多个 GET 命令最终可以合并为一个 MGET 命令。可以通过将 auto_batch 设置为 False 来禁用此行为。这对于调试很有用,因为 MONITOR 将更准确地反映代码中发出的命令。

有关详细信息,请参阅 RoutingClient。

  • map(timeout=None, max_concurrency=64, auto_batch=True)

用于获取路由客户端、开始映射操作并 join 结果的快捷上下文管理器。max_concurrency 定义在隐式连接发生之前可以存在多少未完成的并行查询。

在上下文管理器中,可用的客户端是 MappingClient。示例用法:

results = {}
with cluster.map() as client:
for key in keys_to_fetch:
results[key] = client.get(key)
for key, promise in results.iteritems():
print '%s => %s' % (key, promise.value)
  • remove_host(host_id)

从 client 中删除 host。这仅对单元测试真正有用。

Clients

class rb.RoutingClient(cluster, auto_batch=True)

可以路由到单个目标的客户端。

有关参数,请参见 Cluster.get_routing_client()。

  • execute_command(*args, **options)

执行命令并返回解析后的响应

  • fanout(hosts=None, timeout=None, max_concurrency=64, auto_batch=None)

返回映射操作的 context manager,该操作扇出到手动指定的主机,而不是使用路由系统。例如,这可用于清空所有主机上的数据库。context manager 返回一个 FanoutClient。示例用法:

with cluster.fanout(hosts=[0, 1, 2, 3]) as client:
results = client.info()
for host_id, info in results.value.iteritems():
print '%s -> %s' % (host_id, info['is'])

返回的 promise 将所有结果累积到由 host_id 键入的字典中。

hosts 参数是一个 host_id 列表,或者是字符串 'all' ,用于将命令发送到所有主机。

fanout API 需要非常小心地使用,因为当 key 被写入不期望它们的主机时,它可能会造成很多损坏。

  • get_fanout_client(hosts, max_concurrency=64, auto_batch=None)

返回线程不安全的扇出客户端。

返回 FanoutClient 的实例。

  • get_mapping_client(max_concurrency=64, auto_batch=None)

返回一个线程不安全的映射客户端。此客户端的工作方式类似于 redis 管道并返回最终结果对象。它需要 join 才能正常工作。您应该使用自动 join 的 map() 上下文管理器,而不是直接使用它。

返回 MappingClient 的一个实例。

  • map(timeout=None, max_concurrency=64, auto_batch=None)

返回映射操作的 context manager。这会并行运行多个查询,然后最后 join 以收集所有结果。

在上下文管理器中,可用的客户端是 MappingClient。示例用法:

results = {}
with cluster.map() as client:
for key in keys_to_fetch:
results[key] = client.get(key)
for key, promise in results.iteritems():
print '%s => %s' % (key, promise.value)

class rb.MappingClient(connection_pool, max_concurrency=None, auto_batch=True)

路由客户端使用 cluster 的 router 根据执行的 redis 命令的 key 自动定位单个节点。

Voir Cluster.map() pour les paramètres.

  • cancel()

Annulez toutes les demandes en attente.

  • execute_command(*args, **options)

Exécuter la commande et renvoyer la réponse analysée

  • join(timeout=None)

Attendez que toutes les réponses en attente reviennent ou expirent

  • mget(keys , *args)

Renvoyer une liste de valeurs dans le même ordre que la clé

  • mset(*args, **kwargs)

Définir la clé/valeur en fonction du mappage. Une carte est un dictionnaire de paires clé/valeur. La clé et la valeur doivent être des chaînes ou des types convertibles en chaîne via str().

class rb.FanoutClient(hosts, connection_pool, max_concurrency=None, auto_batch=True)

Cela fonctionne de la même manière que MappingClient, mais au lieu d'utiliser un routeur pour localiser l'hôte, il envoie la commande à tous les hôtes spécifiés manuellement.

Les résultats sont accumulés dans un dictionnaire saisi par host_id.

Voir Cluster.fanout() pour les paramètres.

  • execute_command(*args, **options)

Exécutez la commande et renvoyez la réponse analysée

  • target(hosts)

Déplacez temporairement le client pour un appel. Ceci est utile lorsqu'un sous-ensemble d'hôtes doit être traité pour un seul appel.

  • target_key(key)

Déplace temporairement le client pour passer un appel spécifiquement acheminé vers un hôte vers lequel la clé donnée est acheminée. Dans ce cas, le résultat de la promesse est simplement une valeur hôte plutôt qu'un dictionnaire.

Nouvelles fonctionnalités de la version 1.3.

Promise

class rb.Promise

Un objet Promise qui tente de refléter l'API ES6 pour les objets Promise. Contrairement à la promesse d'ES6, cette promesse fournit également un accès direct à la valeur sous-jacente et ses noms de méthodes statiques sont légèrement différents, car cette promesse peut être résolue en externe.

  • static all(iterable_or_dict)

Une promesse est résolue lorsque toutes les promesses passées ont été résolues. Vous pouvez transmettre une liste de promesses ou un dictionnaire de promesses.

  • done(on_success=None, on_failure=None)

Ajoutez quelques rappels à la promesse et renvoyez la promesse.

  • is_ending

Vrai si la promesse est toujours en attente, False sinon.

  • is_rejected

Vrai si la promesse a été rejetée, False sinon.

  • is_resolved

Vrai si la promesse a été résolue, Faux sinon.

  • raison

La raison de cette promesse si elle a été rejetée.

  • rejeter (raison)

Rejeter une promesse avec la raison donnée.

  • statique rejeté (raison)

Crée un objet de promesse qui est rejeté avec une valeur spécifique.

  • resolve(value)

Résolvez la promesse avec la valeur donnée.

  • statique résolu (valeur)

Crée un objet de promesse qui se résout avec une valeur spécifique.

  • then(success=None, failed=None)

Méthode utilitaire pour ajouter des rappels de réussite et/ou d'échec à une promesse, qui renverra également une autre promesse dans le processus.

  • valeur

La valeur détenue par cette promesse si elle se résout.

Routers

class rb.BaseRouter(cluster)

La classe de base pour tous les itinéraires. Si vous souhaitez implémenter un itinéraire personnalisé, il s'agit de votre sous-classe.

  • cluster

fait référence au cluster auquel appartient ce routeur.

  • get_host_for_command(command, args)

Renvoie l'hôte sur lequel cette commande doit être exécutée.

  • get_host_for_key(key)

Exécutez le routage et renvoyez l'identifiant d'hôte de la cible.

Les sous-classes doivent implémenter cela.

  • get_key(command, args)

Renvoie la clé de l'opération de commande.

class rb.ConsistentHashingRouter(cluster)

Renvoie le routeur de host_id en fonction de l'algorithme de hachage cohérent. Les algorithmes de hachage cohérents ne fonctionnent que lorsque le paramètre key est fourni.

Le routeur nécessite que les hôtes soient sans interruption, ce qui signifie que les identifiants de N hôtes vont de 0 à N-1.

  • get_host_for_key(key)

Exécutez le routage et renvoyez l'identifiant d'hôte de la cible.

Les sous-classes doivent implémenter cela.

class rb.PartitionRouter(cluster)

Un routeur simple qui achemine les commandes individuellement vers un seul nœud basé uniquement sur un simple paramètre crc32 % node_count.

Le routeur nécessite que les hôtes soient sans interruption, ce qui signifie que les identifiants de N hôtes vont de 0 à N-1.

  • get_host_for_key(key)

执行路由并返回目标的 host_id。

子类需要实现这一点。

exception rb.UnroutableCommand

如果发出的命令无法通过 router 路由到单个主机,则引发。

Testing

class rb.testing.TestSetup(servers=4, databases_each=8, server_executable='redis-server')

测试设置是生成多个 redis 服务器进行测试并自动关闭它们的便捷方式。这可以用作 context manager 来自动终止客户端。

  • rb.testing.make_test_cluster(*args, **kwargs)

用于创建测试设置然后从中创建 cluster 的便捷快捷方式。这必须用作 context manager:

from rb.testing import make_test_cluster
with make_test_cluster() as cluster:
...

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer