ホームページ  >  記事  >  バックエンド開発  >  Rb (redis blaster)、Redis の非レプリケート シャーディングを実装する Python ライブラリ

Rb (redis blaster)、Redis の非レプリケート シャーディングを実装する Python ライブラリ

WBOY
WBOY転載
2023-04-11 19:27:271643ブラウズ

Rb (redis blaster) は、redis の非レプリケート シャーディングを実装するライブラリです。 Python Redis 上にカスタム ルーティング システムが実装されているため、リクエストを個々のノードに手動でルーティングしなくても、自動的にさまざまなサーバーをターゲットにすることができます。

redis のすべての機能を実装しているわけではありませんし、実装しようともしません。クライアントはいつでも特定のホストに接続できますが、ほとんどの操作は、別のノードに自動的にルーティングできる基本的なキー/値操作に限定されていると想定されています。

できること:

  • ホスト上で単一キー操作を自動的に実行します。
  • コマンドをノードのすべてまたは一部で実行します。
  • これらの をすべて並行して実行します。

インストール

rb は PyPI で利用可能で、そこからインストールできます:

$ pip install rb

Configuration

rb の使用を開始するには、とてもシンプルです。以前に py-redis を使用したことがある場合は、すぐに使いこなせるでしょう。主な違いは、クラスターが単一のホストに接続するのではなく、複数のホストに接続するように構成されていることです。

rom rb import Cluster

cluster = Cluster(hosts={
0: {'port': 6379},
1: {'port': 6380},
2: {'port': 6381},
3: {'port': 6382},
4: {'port': 6379},
5: {'port': 6380},
6: {'port': 6381},
7: {'port': 6382},
}, host_defaults={
'host': '127.0.0.1',
})

この場合、同じホスト ノード上の 4 つの異なるサーバー プロセスに 8 つがセットアップされています。 hosts パラメータは、接続先のホストのマップです。辞書のキーはホスト ID (整数)、値はパラメータ辞書です。 host_defaults は、すべてのホストに入力されるオプションのデフォルトのディクショナリです。これは、繰り返されるいくつかの共通のデフォルトを共有する場合に便利です (この場合、すべてのホストがローカルホストに接続します)。

デフォルト構成では、ルーティングに PartitionRouter が使用されます。

ルーティング

クラスターが構築されたので、Cluster.get_routing_client() を使用して、各コマンドを正しい Redis ノードに自動的にルーティングする Redis クライアントを取得できます。

client = cluster.get_routing_client()
results = {}
for key in keys_to_look_up:
results[key] = client.get(key)

このクライアントは、標準の pyredis StrictClient と非常によく似た動作をしますが、主な違いは、1 つのキーのみを含むコマンドのみを実行できることです。

ただし、この基本操作は連続して実行されます。 rb が便利なのは、redis パイプラインを自動的に構築し、クエリを多数のホストに並行して送信できることです。ただし、値がすぐに利用できないため、使用法が少し変わります。

results = {}
with cluster.map() as client:
for key in keys_to_look_up:
results[key] = client.get(key)

ここまでは似ていますが、実際の値を結果ディクショナリに保存する代わりに、Promise オブジェクトが保存されます。マップ コンテキスト マネージャーが終了すると、実行されたことが保証され、Promise.value プロパティにアクセスして値を取得できます。

for key, promise in results.iteritems():
print '%s: %s' % (key, promise.value)

参加しているすべてのホストにコマンドを送信する場合 (ドロップなど)データベース)、Cluster.all() メソッドを使用できます:

with cluster.all() as client:
client.flushdb()

これを実行すると、promise 値はホスト ID をキー、結果を値とするディクショナリになります。例:

with cluster.all() as client:
results = client.info()
for host_id, info in results.iteritems():
print 'host %s is running %s' % (host_id, info['os'])

特定のホストを明示的にターゲットにするには、Cluster.fanout() を使用して、コマンドの送信先となるホスト ID のリストを受け入れることができます。

API

これはパブリック API への完全なリファレンスです。このライブラリは Python redis ライブラリを拡張するため、一部のクラスにはより多くの機能があることに注意してください。py-redis ライブラリをチェックアウトする必要があります。

Cluster

class rb.Cluster(hosts, host_defaults=None, pool_cls=None, pool_options=None, router_cls=None, router_options=None)

クラスタは rb の背後にありますコアオブジェクト。これは個々のノードの接続プールに保存され、アプリケーションの実行中に中央の場所で共有できます。

デフォルト ルーターを備えた 4 つの Redis インスタンス上のクラスターの基本的な例:

cluster = Cluster(hosts={
0: {'port': 6379},
1: {'port': 6380},
2: {'port': 6381},
3: {'port': 6382},
}, host_defaults={
'host': '127.0.0.1',
})

hosts は、ホスト ID 番号を構成パラメーターにマップするホスト ディクショナリです。パラメータは add_host() 関数のシグネチャに対応します。これらのパラメータのデフォルト値は host_defaults から取得されます。プール クラスをオーバーライドするには、pool_cls パラメーターと pool_options パラメーターを使用します。同じことがルーターの router_cls と router_options にも当てはまります。プール オプションは、ソケット タイムアウトや同様のパラメータを設定するのに役立ちます。

  • add_host(host_id=なし、host='localhost'、port=6379、unix_socket_path=なし、db=0、パスワード=なし、ssl=False、ssl_options=なし)

新しいホストをクラスターに追加します。通常、ホストはコンストラクターを介して追加され、クラスターを初めて使用した後に変更することは意味がありにくいため、これは単体テストの場合にのみ役立ちます。

  • all(timeout=None、max_concurrency=64、auto_batch=True)

すべてのホストにファンアウトします。それ以外は fanout() と同じです。

例:

with cluster.all() as client:
client.flushdb()
  • disconnect_pools()

内部プールへのすべての接続を切断します。

  • execute_commands(mapping, *args, **kwargs)

Redis クラスター上のルーティング キーに関連付けられた一連のコマンドを同時に実行し、新しいマッピング。値は、同じ場所にあるコマンドに対応する結果のリストです。例:

>>> cluster.execute_commands({
... 'foo': [
... ('PING',),
... ('TIME',),
... ],
... 'bar': [
... ('CLIENT', 'GETNAME'),
... ],
... })
{'bar': [<Promise None>],
 'foo': [<Promise True>, <Promise (1454446079, 418404)>]}
  • redis.client.Script のインスタンスであるコマンドは、まずターゲット ノード上に存在するかどうかを確認し、次に実行前にターゲットにロードされ、他のコマンドとインターリーブできます。 :
>>> from redis.client import Script
>>> TestScript = Script(None, 'return {KEYS, ARGV}')
>>> cluster.execute_commands({
... 'foo': [
... (TestScript, ('key:1', 'key:2'), range(0, 3)),
... ],
... 'bar': [
... (TestScript, ('key:3', 'key:4'), range(3, 6)),
... ],
... })
{'bar': [<Promise [['key:3', 'key:4'], ['3', '4', '5']]>],
 'foo': [<Promise [['key:1', 'key:2'], ['0', '1', '2']]>]}

内部的には、FanoutClient を使用してコマンドを発行します。

  • fanout(hosts=None, timeout=None, max_concurrency=64, auto_batch=True)

用于获取路由客户端、开始扇出操作并 join 结果的快捷上下文管理器。

在上下文管理器中,可用的客户端是 FanoutClient。示例用法:

with cluster.fanout(hosts='all') as client: client.flushdb()
get_local_client(host_id)
  • get_local_client(host_id)

返回特定主机 ID 的本地化 client。这个 client 就像一个普通的 Python redis 客户端一样工作,并立即返回结果。

  • get_local_client_for_key(key)

类似于 get_local_client_for_key() 但根据 router 所说的 key 目的地返回 client。

  • get_pool_for_host(host_id)

返回给定主机的连接池。

redis 客户端使用此连接池来确保它不必不断地重新连接。如果要使用自定义 redis 客户端,可以手动将其作为连接池传入。

  • get_router()

返回 cluster 的 router 。如果 cluster 重新配置,router 将被重新创建。通常,您不需要自己与 router 交互,因为集群的路由客户端会自动执行此操作。

这将返回 BaseRouter 的一个实例。

  • get_routing_client(auto_batch=True)

返回一个路由客户端。该客户端能够自动将请求路由到各个主机。它是线程安全的,可以类似于主机本地客户端使用,但它会拒绝执行无法直接路由到单个节点的命令。

路由客户端的默认行为是尝试将符合条件的命令批处理成批处理版本。例如,路由到同一节点的多个 GET 命令最终可以合并为一个 MGET 命令。可以通过将 auto_batch 设置为 False 来禁用此行为。这对于调试很有用,因为 MONITOR 将更准确地反映代码中发出的命令。

有关详细信息,请参阅 RoutingClient。

  • map(timeout=None, max_concurrency=64, auto_batch=True)

用于获取路由客户端、开始映射操作并 join 结果的快捷上下文管理器。max_concurrency 定义在隐式连接发生之前可以存在多少未完成的并行查询。

在上下文管理器中,可用的客户端是 MappingClient。示例用法:

results = {}
with cluster.map() as client:
for key in keys_to_fetch:
results[key] = client.get(key)
for key, promise in results.iteritems():
print '%s => %s' % (key, promise.value)
  • remove_host(host_id)

从 client 中删除 host。这仅对单元测试真正有用。

Clients

class rb.RoutingClient(cluster, auto_batch=True)

可以路由到单个目标的客户端。

有关参数,请参见 Cluster.get_routing_client()。

  • execute_command(*args, **options)

执行命令并返回解析后的响应

  • fanout(hosts=None, timeout=None, max_concurrency=64, auto_batch=None)

返回映射操作的 context manager,该操作扇出到手动指定的主机,而不是使用路由系统。例如,这可用于清空所有主机上的数据库。context manager 返回一个 FanoutClient。示例用法:

with cluster.fanout(hosts=[0, 1, 2, 3]) as client:
results = client.info()
for host_id, info in results.value.iteritems():
print '%s -> %s' % (host_id, info['is'])

返回的 promise 将所有结果累积到由 host_id 键入的字典中。

hosts 参数是一个 host_id 列表,或者是字符串 'all' ,用于将命令发送到所有主机。

fanout API 需要非常小心地使用,因为当 key 被写入不期望它们的主机时,它可能会造成很多损坏。

  • get_fanout_client(hosts, max_concurrency=64, auto_batch=None)

返回线程不安全的扇出客户端。

返回 FanoutClient 的实例。

  • get_mapping_client(max_concurrency=64, auto_batch=None)

返回一个线程不安全的映射客户端。此客户端的工作方式类似于 redis 管道并返回最终结果对象。它需要 join 才能正常工作。您应该使用自动 join 的 map() 上下文管理器,而不是直接使用它。

返回 MappingClient 的一个实例。

  • map(timeout=None, max_concurrency=64, auto_batch=None)

返回映射操作的 context manager。这会并行运行多个查询,然后最后 join 以收集所有结果。

在上下文管理器中,可用的客户端是 MappingClient。示例用法:

results = {}
with cluster.map() as client:
for key in keys_to_fetch:
results[key] = client.get(key)
for key, promise in results.iteritems():
print '%s => %s' % (key, promise.value)

class rb.MappingClient(connection_pool, max_concurrency=None, auto_batch=True)

路由客户端使用 cluster 的 router 根据执行的 redis 命令的 key 自动定位单个节点。

パラメータについては、「Cluster.map()」を参照してください。

  • cancel()

未処理のリクエストをすべてキャンセルします。

  • execute_command(*args, **options)

コマンドを実行し、解析された応答を返します

  • join(timeout=None)

すべての未処理の応答が返されるかタイムアウトになるまで待機します

  • mget(keys, *args)

同じ値を返しますキーとして並べる List

  • mset(*args, **kwargs)

マッピングに従ってキー/値を設定します。マップはキーと値のペアの辞書です。キーと値は両方とも文字列、または str() を介して文字列に変換できる型である必要があります。

class rb.FanoutClient(hosts, connection_pool, max_concurrency=None, auto_batch=True)

これは MappingClient と同様に機能しますが、ルーターを使用してホストを見つけるのではなく、次のコマンドを使用します。手動で指定したすべてのホストに送信されます。

結果は、host_id をキーとする辞書に蓄積されます。

パラメータについては、「Cluster.fanout()」を参照してください。

  • execute_command(*args, **options)

コマンドを実行し、解析された応答を返します

  • target(hosts)

通話のためにクライアントを一時的に移動します。これは、単一の呼び出しに対してホストのサブセットを処理する必要がある場合に便利です。

  • target_key(key)

クライアントを一時的に再配置して、指定されたキーがルーティングされる 1 つのホストに特別にルーティングされる呼び出しを行います。この場合、Promise の結果は辞書ではなく単なるホスト値です。

バージョン 1.3 の新機能。

Promise

class rb.Promise

Promise オブジェクトの ES6 API をミラーリングしようとする Promise オブジェクト。 ES6 の Promise とは異なり、この Promise は基になる値への直接アクセスも提供し、この Promise は外部で解決できるため、静的メソッド名が若干異なります。

  • static all(iterable_or_dict)

Promise は、渡されたすべての Promise が解決されたときに解決されます。 Promise のリストまたは Promise の辞書を渡すことができます。

  • done(on_success=None, on_failure=None)

Promise にいくつかのコールバックを追加し、Promise を返します。

  • is_pending

Promise がまだ保留中の場合は True、それ以外の場合は False。

  • is_rejected

Promise が拒否された場合は True、それ以外の場合は False。

  • is_resolved

Promise が解決された場合は True、そうでない場合は False。

  • #reason

この約束が拒否された場合のその理由。

  • reject(reason)

指定された理由で約束を拒否します。

  • static requested(reason)

特定の値で拒否される Promise オブジェクトを作成します。

  • resolve(value)

指定された値で Promise を解決します。

  • staticsolved(value)

特定の値で解決する Promise オブジェクトを作成します。

  • then(success=None, Failure=None)

成功および/または失敗のコールバックを Promise に追加するユーティリティ メソッド。これもプロセス中に返されます。もう一つの約束。

  • #value

この Promise が解決された場合に保持される値。

Routers

class rb.BaseRouter(cluster)

すべてのルートの基本クラス。カスタム ルートを実装する場合は、これがサブクラスになります。

  • cluster

このルーターが属するクラスターへの参照。

  • get_host_for_command(command, args)

このコマンドを実行するホストを返します。

  • get_host_for_key(key)

ルーティングを実行し、ターゲットの host_id を返します。

サブクラスはこれを実装する必要があります。

  • get_key(command, args)

コマンド操作のキーを返します。

class rb.ConsistentHashingRouter(cluster)

コンシステント ハッシュ アルゴリズムに基づいて host_id のルーターを返します。一貫したハッシュ アルゴリズムは、key パラメーターが指定された場合にのみ機能します。

このルーターでは、ホストがギャップレスである必要があります。これは、N 個のホストの ID の範囲が 0 から N-1 であることを意味します。

  • get_host_for_key(key)

ルーティングを実行し、ターゲットの host_id を返します。

サブクラスはこれを実装する必要があります。

class rb.PartitionRouter(cluster)

単純な crc32 % node_count 設定のみに基づいてコマンドを単一ノードに個別にルーティングする単純なルーター。

このルーターでは、ホストがギャップレスである必要があります。これは、N 個のホストの ID の範囲が 0 から N-1 であることを意味します。

  • get_host_for_key(key)

执行路由并返回目标的 host_id。

子类需要实现这一点。

exception rb.UnroutableCommand

如果发出的命令无法通过 router 路由到单个主机,则引发。

Testing

class rb.testing.TestSetup(servers=4, databases_each=8, server_executable='redis-server')

测试设置是生成多个 redis 服务器进行测试并自动关闭它们的便捷方式。这可以用作 context manager 来自动终止客户端。

  • rb.testing.make_test_cluster(*args, **kwargs)

用于创建测试设置然后从中创建 cluster 的便捷快捷方式。这必须用作 context manager:

from rb.testing import make_test_cluster
with make_test_cluster() as cluster:
...

以上がRb (redis blaster)、Redis の非レプリケート シャーディングを実装する Python ライブラリの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

声明:
この記事は51cto.comで複製されています。侵害がある場合は、admin@php.cn までご連絡ください。