首頁 >後端開發 >Python教學 >Rb(redis blaster),一個為 Redis 實作 non-replicated 分片的 Python 函式庫

Rb(redis blaster),一個為 Redis 實作 non-replicated 分片的 Python 函式庫

WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB
WBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWBOYWB轉載
2023-04-11 19:27:271756瀏覽

Rb,redis blaster,是一個為 redis 實作非複製分片(non-replicated sharding)的函式庫。它在 python redis 之上實作了一個自訂路由系統,讓您可以自動定位不同的伺服器,而無需手動將請求路由到各個節點。

它沒有實作 redis 的所有功能,也沒有嘗試這樣做。您可以隨時將客戶端連接到特定主機,但大多數情況下假設您的操作僅限於可以自動路由到不同節點的基本 key/value 操作。

你可以做什麼:

  • 自動針對主機進行單一 key 操作。
  • 對所有或部分節點執行指令
  • 並行執行所有這些

安裝

rb 在 PyPI 上可用,可以從那裡安裝:

$ pip install rb

配置

#開始使用 rb 非常簡單。如果您之前一直在使用 py-redis,您會感到賓至如歸。主要區別在於,不是連接到單一主機,而是將cluster 配置為連接到多個:

rom rb import Cluster

cluster = Cluster(hosts={
0: {'port': 6379},
1: {'port': 6380},
2: {'port': 6381},
3: {'port': 6382},
4: {'port': 6379},
5: {'port': 6380},
6: {'port': 6381},
7: {'port': 6382},
}, host_defaults={
'host': '127.0.0.1',
})

在這種情況下,我們在同一主機上的四個不同伺服器進程上設定了8 個節點。 hosts 參數是要連接的主機的對應。字典的 key 是 host ID(整數),值是參數字典。 host_defaults 是為所有主機填入的選用預設值字典。如果您想要共用一些重複的常見預設值(在這種情況下,所有主機都連接到 localhost),這很有用。

在預設設定中,PartitionRouter 用於路由。

路由

現在叢集已經建置好了,我們可以使用Cluster.get_routing_client() 來取得一個redis 用戶端,它會為每個指令自動路由到正確的redis 節點:

client = cluster.get_routing_client()
results = {}
for key in keys_to_look_up:
results[key] = client.get(key)

該客戶端的工作原理與標準的pyredis StrictClient 非常相似,主要區別在於它只能執行只涉及一個key 的命令。

然而,這個基本操作是串聯運行的。使 rb 有用的是它可以自動建立 redis 管道並將查詢並行發送到許多主機。但是,這會稍微改變用法,因為現在該值無法立即使用:

results = {}
with cluster.map() as client:
for key in keys_to_look_up:
results[key] = client.get(key)

雖然到目前為止看起來很相似,但不是將實際值儲存在 result 字典中,而是儲存 Promise 物件。當map context manager 結束時,它們保證已經被執行,您可以存取Promise.value 屬性來取得值:

for key, promise in results.iteritems():
print '%s: %s' % (key, promise.value)

如果要向所有參與的主機發送命令(例如刪除資料庫),可以使用Cluster. all() 方法:

with cluster.all() as client:
client.flushdb()

如果你這樣做,promise 值是一個字典,其中host ID 作為key,結果作為value。舉個例子:

with cluster.all() as client:
results = client.info()
for host_id, info in results.iteritems():
print 'host %s is running %s' % (host_id, info['os'])

要明確針對某些主機,您可以使用 Cluster.fanout() 接受要將指令傳送到 host ID 清單。

API

這是公共 API 的完整參考。請注意,此庫擴展了 Python redis 庫,因此其中一些類別具有更多功能,您需要查閱 py-redis 庫。

Cluster

class rb.Cluster(hosts, host_defaults=None, pool_cls=None, pool_options=None, router_cls=None, router_options=None)

cluster 是rb 背後的核心對象。它保存到各個節點的連接池,並且可以在應用程式運行期間在中央位置共享。

具有預設 router 的四個 redis 實例上的叢集的基本範例:

cluster = Cluster(hosts={
0: {'port': 6379},
1: {'port': 6380},
2: {'port': 6381},
3: {'port': 6382},
}, host_defaults={
'host': '127.0.0.1',
})

hosts 是主機字典,它將 host ID 數量對應到設定參數。參數對應於 add_host() 函數的簽章。這些參數的預設值是從 host_defaults 中提取的。若要覆寫 pool 類,可以使用 pool_cls 和 pool_options 參數。這也適用於 router 的 router_cls 和 router_options。 pool 選項對於設定 socket 逾時和類似參數很有用。

  • add_host(host_id=None, host='localhost', port=6379, unix_socket_path=None, db=0, password=None, ssl=False, ssl_options=None)##word
#將新主機新增至叢集。這僅對單元測試真正有用,因為通常主機是透過建構函式添加的,並且在第一次使用叢集後進行更改不太可能有意義。

    all(timeout=None, max_concurrency=64, auto_batch=True)
扇出到所有主機。其他方面與 fanout() 完全一樣。

範例:

with cluster.all() as client:
client.flushdb()

    disconnect_pools()
中斷與內部池的所有連線。

    execute_commands(mapping, *args, **kwargs)
同時在Redis 叢集上執行與路由key 關聯的一系列命令,傳回一個新映射,其中值是與相同位置的命令對應的結果清單。例如:

>>> cluster.execute_commands({
... 'foo': [
... ('PING',),
... ('TIME',),
... ],
... 'bar': [
... ('CLIENT', 'GETNAME'),
... ],
... })
{'bar': [<Promise None>],
 'foo': [<Promise True>, <Promise (1454446079, 418404)>]}

    作為redis.client.Script 實例的命令將首先檢查它們在目標節點上的存在,然後在執行之前載入到目標上,並且可以與其他命令交錯:
  • >>> from redis.client import Script
    >>> TestScript = Script(None, 'return {KEYS, ARGV}')
    >>> cluster.execute_commands({
    ... 'foo': [
    ... (TestScript, ('key:1', 'key:2'), range(0, 3)),
    ... ],
    ... 'bar': [
    ... (TestScript, ('key:3', 'key:4'), range(3, 6)),
    ... ],
    ... })
    {'bar': [<Promise [['key:3', 'key:4'], ['3', '4', '5']]>],
     'foo': [<Promise [['key:1', 'key:2'], ['0', '1', '2']]>]}
    
在內部,FanoutClient用來發出指令。

  • fanout(hosts=None, timeout=None, max_concurrency=64, auto_batch=True)

用于获取路由客户端、开始扇出操作并 join 结果的快捷上下文管理器。

在上下文管理器中,可用的客户端是 FanoutClient。示例用法:

with cluster.fanout(hosts='all') as client: client.flushdb()
get_local_client(host_id)
  • get_local_client(host_id)

返回特定主机 ID 的本地化 client。这个 client 就像一个普通的 Python redis 客户端一样工作,并立即返回结果。

  • get_local_client_for_key(key)

类似于 get_local_client_for_key() 但根据 router 所说的 key 目的地返回 client。

  • get_pool_for_host(host_id)

返回给定主机的连接池。

redis 客户端使用此连接池来确保它不必不断地重新连接。如果要使用自定义 redis 客户端,可以手动将其作为连接池传入。

  • get_router()

返回 cluster 的 router 。如果 cluster 重新配置,router 将被重新创建。通常,您不需要自己与 router 交互,因为集群的路由客户端会自动执行此操作。

这将返回 BaseRouter 的一个实例。

  • get_routing_client(auto_batch=True)

返回一个路由客户端。该客户端能够自动将请求路由到各个主机。它是线程安全的,可以类似于主机本地客户端使用,但它会拒绝执行无法直接路由到单个节点的命令。

路由客户端的默认行为是尝试将符合条件的命令批处理成批处理版本。例如,路由到同一节点的多个 GET 命令最终可以合并为一个 MGET 命令。可以通过将 auto_batch 设置为 False 来禁用此行为。这对于调试很有用,因为 MONITOR 将更准确地反映代码中发出的命令。

有关详细信息,请参阅 RoutingClient。

  • map(timeout=None, max_concurrency=64, auto_batch=True)

用于获取路由客户端、开始映射操作并 join 结果的快捷上下文管理器。max_concurrency 定义在隐式连接发生之前可以存在多少未完成的并行查询。

在上下文管理器中,可用的客户端是 MappingClient。示例用法:

results = {}
with cluster.map() as client:
for key in keys_to_fetch:
results[key] = client.get(key)
for key, promise in results.iteritems():
print '%s => %s' % (key, promise.value)
  • remove_host(host_id)

从 client 中删除 host。这仅对单元测试真正有用。

Clients

class rb.RoutingClient(cluster, auto_batch=True)

可以路由到单个目标的客户端。

有关参数,请参见 Cluster.get_routing_client()。

  • execute_command(*args, **options)

执行命令并返回解析后的响应

  • fanout(hosts=None, timeout=None, max_concurrency=64, auto_batch=None)

返回映射操作的 context manager,该操作扇出到手动指定的主机,而不是使用路由系统。例如,这可用于清空所有主机上的数据库。context manager 返回一个 FanoutClient。示例用法:

with cluster.fanout(hosts=[0, 1, 2, 3]) as client:
results = client.info()
for host_id, info in results.value.iteritems():
print '%s -> %s' % (host_id, info['is'])

返回的 promise 将所有结果累积到由 host_id 键入的字典中。

hosts 参数是一个 host_id 列表,或者是字符串 'all' ,用于将命令发送到所有主机。

fanout API 需要非常小心地使用,因为当 key 被写入不期望它们的主机时,它可能会造成很多损坏。

  • get_fanout_client(hosts, max_concurrency=64, auto_batch=None)

返回线程不安全的扇出客户端。

返回 FanoutClient 的实例。

  • get_mapping_client(max_concurrency=64, auto_batch=None)

返回一个线程不安全的映射客户端。此客户端的工作方式类似于 redis 管道并返回最终结果对象。它需要 join 才能正常工作。您应该使用自动 join 的 map() 上下文管理器,而不是直接使用它。

返回 MappingClient 的一个实例。

  • map(timeout=None, max_concurrency=64, auto_batch=None)

返回映射操作的 context manager。这会并行运行多个查询,然后最后 join 以收集所有结果。

在上下文管理器中,可用的客户端是 MappingClient。示例用法:

results = {}
with cluster.map() as client:
for key in keys_to_fetch:
results[key] = client.get(key)
for key, promise in results.iteritems():
print '%s => %s' % (key, promise.value)

class rb.MappingClient(connection_pool, max_concurrency=None, auto_batch=True)

路由客户端使用 cluster 的 router 根据执行的 redis 命令的 key 自动定位单个节点。

有關參數,請參閱 Cluster.map()。

  • cancel()

取消所有未完成的請求。

  • execute_command(*args, **options)

#執行指令並傳回解析後的回應

  • join(timeout=None)

等待所有未完成的回應傳回或逾時

  • mget(keys, *args)

傳回與key 順序相同的值列表

  • mset(*args, **kwargs)

#根據映射設定key/value。映射是 key/value 對的字典。 key 和 value 都應該是可以透過 str() 轉換為 string 的字串或型別。

class rb.FanoutClient(hosts, connection_pool, max_concurrency=None, auto_batch=True)

這與MappingClient 的工作方式相似,但它不是使用router 來定位主機,而是將指令傳送到所有手動指定的主機。

結果累積在由 host_id 鍵入的字典中。

有關參數,請參閱 Cluster.fanout()。

  • execute_command(*args, **options)

#執行指令並傳回解析後的回應

  • target(hosts)

為一次呼叫暫時重新定位client。當必須為一次呼叫處理主機 subset 時,這很有用。

  • target_key(key)

暫時重新定位客戶端以進行一次調用,以專門路由到給定 key 路由到的一台主機。在這種情況下,promise 的結果只是一個主機的值而不是字典。

1.3 版中的新功能。

Promise

class rb.Promise

一個嘗試為 Promise 物件鏡像 ES6 API 的 Promise 物件。與 ES6 的 Promise 不同,這個 Promise 也直接提供對底層值的訪問,並且它有一些稍微不同的靜態方法名稱,因為這個 Promise 可以在外部解析。

  • static all(iterable_or_dict)

當所有傳遞的 promise 都解決時,promise 就解決了。你可以傳遞一個 promise 列表或一個 promise 字典。

  • done(on_success=None, on_failure=None)

將一些回呼附加到 Promise 並傳回 Promise。

  • is_pending

如果 promise 仍然等待,則為 True,否則為 False。

  • is_rejected

如果 promise 被拒絕,則為 True,否則為 False。

  • is_resolved

如果 promise 已解決,則為 True,否則為 False。

  • reason

如果它被拒絕,這個 promise 的原因。

  • reject(reason)

以給定的理由拒絕 promise。

  • static rejected(reason)

建立一個以特定值被拒絕的 promise 物件。

  • resolve(value)

用給定的值解 promise。

  • static resolved(value)

建立一個以特定值解析的 promise 物件。

  • then(success=None, failure=None)

向Promise 添加成功和/或失敗回呼的實用方法,該方法還將在此過程中返回另一個Promise。

  • value

如果它被解決,這個 promise 所持有的值。

Routers

class rb.BaseRouter(cluster)

所有路由的基底類別。如果你想實作一個自訂路由,這就是你的子類別。

  • cluster

引用回此 router 所屬的 Cluster。

  • get_host_for_command(command, args)

傳回應執行此指令的主機。

  • get_host_for_key(key)

執行路由並傳回目標的 host_id。

子類別需要實現這一點。

  • get_key(command, args)

傳回指令動作的 key。

class rb.ConsistentHashingRouter(cluster)

基於一致雜湊演算法傳回 host_id 的 router。一致的雜湊演算法僅在提供 key 參數時才有效。

該 router 要求主機是無間隙的,這表示 N 台主機的 ID 範圍從 0 到 N-1。

  • get_host_for_key(key)

執行路由並傳回目標的 host_id。

子類別需要實現這一點。

class rb.PartitionRouter(cluster)

一個簡單的 router,只根據簡單的 crc32 % node_count 設定將指令單獨路由到單一節點。

該 router 要求主機是無間隙的,這表示 N 台主機的 ID 範圍從 0 到 N-1。

  • get_host_for_key(key)

执行路由并返回目标的 host_id。

子类需要实现这一点。

exception rb.UnroutableCommand

如果发出的命令无法通过 router 路由到单个主机,则引发。

Testing

class rb.testing.TestSetup(servers=4, databases_each=8, server_executable='redis-server')

测试设置是生成多个 redis 服务器进行测试并自动关闭它们的便捷方式。这可以用作 context manager 来自动终止客户端。

  • rb.testing.make_test_cluster(*args, **kwargs)

用于创建测试设置然后从中创建 cluster 的便捷快捷方式。这必须用作 context manager:

from rb.testing import make_test_cluster
with make_test_cluster() as cluster:
...

以上是Rb(redis blaster),一個為 Redis 實作 non-replicated 分片的 Python 函式庫的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:51cto.com。如有侵權,請聯絡admin@php.cn刪除