Home  >  Article  >  Backend Development  >  Rb (redis blaster), a Python library that implements non-replicated sharding for Redis

Rb (redis blaster), a Python library that implements non-replicated sharding for Redis

WBOY
WBOYforward
2023-04-11 19:27:271643browse

Rb, redis blaster, is a library that implements non-replicated sharding for redis. It implements a custom routing system on top of python redis, allowing you to automatically target different servers without having to manually route requests to individual nodes.

It does not implement all the functions of redis, nor does it attempt to do so. You can always connect a client to a specific host, but most assume your operations are limited to basic key/value operations that can be automatically routed to different nodes.

What you can do:

  • Automatically perform single-key operations on the host.
  • Execute the command on all or part of the nodes.
  • Execute all of these in parallel.

Installation

rb is available on PyPI and can be installed from there:

$ pip install rb

Configuration

Getting started with rb is very simple. If you've been using py-redis before, you'll feel right at home. The main difference is that instead of connecting to a single host, the cluster is configured to connect to multiple:

rom rb import Cluster

cluster = Cluster(hosts={
0: {'port': 6379},
1: {'port': 6380},
2: {'port': 6381},
3: {'port': 6382},
4: {'port': 6379},
5: {'port': 6380},
6: {'port': 6381},
7: {'port': 6382},
}, host_defaults={
'host': '127.0.0.1',
})

In this case we have 8 set up on four different server processes on the same host node. The hosts parameter is a map of hosts to connect to. The key of the dictionary is the host ID (integer), and the value is the parameter dictionary. host_defaults is a dictionary of optional defaults filled in for all hosts. This is useful if you want to share some common defaults that are repeated (in this case, all hosts connect to localhost).

In the default configuration, PartitionRouter is used for routing.

Routing

Now that the cluster is built, we can use Cluster.get_routing_client() to get a redis client that will automatically route each command to the correct redis node:

client = cluster.get_routing_client()
results = {}
for key in keys_to_look_up:
results[key] = client.get(key)

This client works very similarly to the standard pyredis StrictClient, the main difference being that it can only execute commands involving only one key.

However, this basic operation is run in series. What makes rb useful is that it can automatically build redis pipelines and send queries to many hosts in parallel. However, this changes the usage slightly, as now the value is not immediately available:

results = {}
with cluster.map() as client:
for key in keys_to_look_up:
results[key] = client.get(key)

While it looks similar so far, instead of storing the actual value in the result dictionary, a Promise object is stored. When the map context manager ends, they are guaranteed to have been executed, you can access the Promise.value property to get the value:

for key, promise in results.iteritems():
print '%s: %s' % (key, promise.value)

If you want to send a command to all participating hosts (such as dropping a database), you can use Cluster. all() method:

with cluster.all() as client:
client.flushdb()

If you do this, the promise value is a dictionary with the host ID as the key and the result as the value. For example:

with cluster.all() as client:
results = client.info()
for host_id, info in results.iteritems():
print 'host %s is running %s' % (host_id, info['os'])

To explicitly target certain hosts, you can use Cluster.fanout() to accept a list of host IDs to send the command to.

API

This is a complete reference to the public API. Note that this library extends the Python redis library, so some of the classes have more functionality, you will need to check out the py-redis library.

Cluster

class rb.Cluster(hosts, host_defaults=None, pool_cls=None, pool_options=None, router_cls=None, router_options=None)

cluster is behind rb core object. It is saved to the individual node's connection pool and can be shared in a central location while the application is running.

Basic example of a cluster on four redis instances with default router:

cluster = Cluster(hosts={
0: {'port': 6379},
1: {'port': 6380},
2: {'port': 6381},
3: {'port': 6382},
}, host_defaults={
'host': '127.0.0.1',
})

hosts is a host dictionary that maps host ID numbers to configuration parameters. The parameters correspond to the signature of the add_host() function. The default values ​​for these parameters are taken from host_defaults. To override the pool class, use the pool_cls and pool_options parameters. The same applies to router's router_cls and router_options. The pool option is useful for setting socket timeouts and similar parameters.

  • add_host(host_id=None, host='localhost', port=6379, unix_socket_path=None, db=0, password=None, ssl=False, ssl_options=None)

Add a new host to the cluster. This is only really useful for unit tests, as typically hosts are added via the constructor and it's unlikely to make sense to change after the first time the cluster is used.

  • all(timeout=None, max_concurrency=64, auto_batch=True)

Fanout to all hosts. Otherwise identical to fanout().

Example:

with cluster.all() as client:
client.flushdb()
  • disconnect_pools()

Disconnects all connections to internal pools.

  • execute_commands(mapping, *args, **kwargs)

Execute a series of commands associated with the routing key on the Redis cluster at the same time and return a new mapping, where The value is a list of results corresponding to the command at the same location. For example:

>>> cluster.execute_commands({
... 'foo': [
... ('PING',),
... ('TIME',),
... ],
... 'bar': [
... ('CLIENT', 'GETNAME'),
... ],
... })
{'bar': [<Promise None>],
 'foo': [<Promise True>, <Promise (1454446079, 418404)>]}
  • Commands that are instances of redis.client.Script will first check for their existence on the target node and then be loaded on the target before execution, and can be interleaved with other commands:
>>> from redis.client import Script
>>> TestScript = Script(None, 'return {KEYS, ARGV}')
>>> cluster.execute_commands({
... 'foo': [
... (TestScript, ('key:1', 'key:2'), range(0, 3)),
... ],
... 'bar': [
... (TestScript, ('key:3', 'key:4'), range(3, 6)),
... ],
... })
{'bar': [<Promise [['key:3', 'key:4'], ['3', '4', '5']]>],
 'foo': [<Promise [['key:1', 'key:2'], ['0', '1', '2']]>]}

Internally, FanoutClient is used to issue commands.

  • fanout(hosts=None, timeout=None, max_concurrency=64, auto_batch=True)

用于获取路由客户端、开始扇出操作并 join 结果的快捷上下文管理器。

在上下文管理器中,可用的客户端是 FanoutClient。示例用法:

with cluster.fanout(hosts='all') as client: client.flushdb()
get_local_client(host_id)
  • get_local_client(host_id)

返回特定主机 ID 的本地化 client。这个 client 就像一个普通的 Python redis 客户端一样工作,并立即返回结果。

  • get_local_client_for_key(key)

类似于 get_local_client_for_key() 但根据 router 所说的 key 目的地返回 client。

  • get_pool_for_host(host_id)

返回给定主机的连接池。

redis 客户端使用此连接池来确保它不必不断地重新连接。如果要使用自定义 redis 客户端,可以手动将其作为连接池传入。

  • get_router()

返回 cluster 的 router 。如果 cluster 重新配置,router 将被重新创建。通常,您不需要自己与 router 交互,因为集群的路由客户端会自动执行此操作。

这将返回 BaseRouter 的一个实例。

  • get_routing_client(auto_batch=True)

返回一个路由客户端。该客户端能够自动将请求路由到各个主机。它是线程安全的,可以类似于主机本地客户端使用,但它会拒绝执行无法直接路由到单个节点的命令。

路由客户端的默认行为是尝试将符合条件的命令批处理成批处理版本。例如,路由到同一节点的多个 GET 命令最终可以合并为一个 MGET 命令。可以通过将 auto_batch 设置为 False 来禁用此行为。这对于调试很有用,因为 MONITOR 将更准确地反映代码中发出的命令。

有关详细信息,请参阅 RoutingClient。

  • map(timeout=None, max_concurrency=64, auto_batch=True)

用于获取路由客户端、开始映射操作并 join 结果的快捷上下文管理器。max_concurrency 定义在隐式连接发生之前可以存在多少未完成的并行查询。

在上下文管理器中,可用的客户端是 MappingClient。示例用法:

results = {}
with cluster.map() as client:
for key in keys_to_fetch:
results[key] = client.get(key)
for key, promise in results.iteritems():
print '%s => %s' % (key, promise.value)
  • remove_host(host_id)

从 client 中删除 host。这仅对单元测试真正有用。

Clients

class rb.RoutingClient(cluster, auto_batch=True)

可以路由到单个目标的客户端。

有关参数,请参见 Cluster.get_routing_client()。

  • execute_command(*args, **options)

执行命令并返回解析后的响应

  • fanout(hosts=None, timeout=None, max_concurrency=64, auto_batch=None)

返回映射操作的 context manager,该操作扇出到手动指定的主机,而不是使用路由系统。例如,这可用于清空所有主机上的数据库。context manager 返回一个 FanoutClient。示例用法:

with cluster.fanout(hosts=[0, 1, 2, 3]) as client:
results = client.info()
for host_id, info in results.value.iteritems():
print '%s -> %s' % (host_id, info['is'])

返回的 promise 将所有结果累积到由 host_id 键入的字典中。

hosts 参数是一个 host_id 列表,或者是字符串 'all' ,用于将命令发送到所有主机。

fanout API 需要非常小心地使用,因为当 key 被写入不期望它们的主机时,它可能会造成很多损坏。

  • get_fanout_client(hosts, max_concurrency=64, auto_batch=None)

返回线程不安全的扇出客户端。

返回 FanoutClient 的实例。

  • get_mapping_client(max_concurrency=64, auto_batch=None)

返回一个线程不安全的映射客户端。此客户端的工作方式类似于 redis 管道并返回最终结果对象。它需要 join 才能正常工作。您应该使用自动 join 的 map() 上下文管理器,而不是直接使用它。

返回 MappingClient 的一个实例。

  • map(timeout=None, max_concurrency=64, auto_batch=None)

返回映射操作的 context manager。这会并行运行多个查询,然后最后 join 以收集所有结果。

在上下文管理器中,可用的客户端是 MappingClient。示例用法:

results = {}
with cluster.map() as client:
for key in keys_to_fetch:
results[key] = client.get(key)
for key, promise in results.iteritems():
print '%s => %s' % (key, promise.value)

class rb.MappingClient(connection_pool, max_concurrency=None, auto_batch=True)

路由客户端使用 cluster 的 router 根据执行的 redis 命令的 key 自动定位单个节点。

See Cluster.map() for parameters.

  • cancel()

Cancel all outstanding requests.

  • execute_command(*args, **options)

Execute the command and return the parsed response

  • join(timeout=None)

Wait for all outstanding responses to return or timeout

  • mget(keys, *args)

Return values ​​in the same order as keys List

  • mset(*args, **kwargs)

Set key/value according to the mapping. A map is a dictionary of key/value pairs. Both key and value should be strings or types convertible to string via str().

class rb.FanoutClient(hosts, connection_pool, max_concurrency=None, auto_batch=True)

This works similarly to MappingClient, but instead of using the router to locate the host, it uses the command Sent to all manually specified hosts.

The results are accumulated in the dictionary keyed by host_id.

See Cluster.fanout() for parameters.

  • execute_command(*args, **options)

Execute the command and return the parsed response

  • target(hosts)

Temporarily relocate the client for a call. This is useful when a subset of hosts must be processed for a single call.

  • target_key(key)

Temporarily relocate the client to make a call specifically routed to one host to which the given key is routed. In this case, the result of the promise is just a host value rather than a dictionary.

New features in version 1.3.

Promise

class rb.Promise

A Promise object that attempts to mirror the ES6 API for Promise objects. Unlike ES6's Promise, this Promise also provides direct access to the underlying value, and it has slightly different static method names because this Promise can be resolved externally.

  • static all(iterable_or_dict)

A promise is resolved when all passed promises have resolved. You can pass a list of promises or a dictionary of promises.

  • done(on_success=None, on_failure=None)

Append some callbacks to the Promise and return the Promise.

  • is_pending

True if the promise is still pending, False otherwise.

  • is_rejected

True if the promise was rejected, False otherwise.

  • is_resolved

True if the promise was resolved, False otherwise.

  • #reason

The reason for this promise if it was rejected.

  • reject(reason)

Reject the promise with the given reason.

  • static rejected(reason)

Create a promise object that is rejected with a specific value.

  • resolve(value)

Resolve the promise with the given value.

  • static resolved(value)

Create a promise object that resolves with a specific value.

  • then(success=None, failure=None)

Utility method to add success and/or failure callbacks to a Promise, which will also be returned during the process Another Promise.

  • #value

The value held by this promise if it resolves.

Routers

class rb.BaseRouter(cluster)

The base class for all routes. If you want to implement a custom route, this is your subclass.

  • cluster

References back to the Cluster to which this router belongs.

  • get_host_for_command(command, args)

Returns the host on which this command should be executed.

  • get_host_for_key(key)

Execute routing and return the target's host_id.

Subclasses need to implement this.

  • get_key(command, args)

Returns the key of the command operation.

class rb.ConsistentHashingRouter(cluster)

Returns the router of host_id based on the consistent hashing algorithm. Consistent hashing algorithms only work when the key parameter is provided.

This router requires the hosts to be gapless, which means that the IDs of N hosts range from 0 to N-1.

  • get_host_for_key(key)

Execute routing and return the target's host_id.

Subclasses need to implement this.

class rb.PartitionRouter(cluster)

A simple router that routes commands individually to a single node based only on a simple crc32 % node_count setting.

This router requires the hosts to be gapless, which means that the IDs of N hosts range from 0 to N-1.

  • get_host_for_key(key)

执行路由并返回目标的 host_id。

子类需要实现这一点。

exception rb.UnroutableCommand

如果发出的命令无法通过 router 路由到单个主机,则引发。

Testing

class rb.testing.TestSetup(servers=4, databases_each=8, server_executable='redis-server')

测试设置是生成多个 redis 服务器进行测试并自动关闭它们的便捷方式。这可以用作 context manager 来自动终止客户端。

  • rb.testing.make_test_cluster(*args, **kwargs)

用于创建测试设置然后从中创建 cluster 的便捷快捷方式。这必须用作 context manager:

from rb.testing import make_test_cluster
with make_test_cluster() as cluster:
...

The above is the detailed content of Rb (redis blaster), a Python library that implements non-replicated sharding for Redis. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:51cto.com. If there is any infringement, please contact admin@php.cn delete