Rb(redis blaster),一個為 redis 實現 non-replicated 分片的 python 庫
- 2022 年 4 月 7 日
- 筆記
Rb,redis blaster
,是一個為 redis
實現非複製分片(non-replicated sharding)
的庫。它在 python redis
之上實現了一個自定義路由系統,允許您自動定位不同的伺服器,而無需手動將請求路由到各個節點。
它沒有實現 redis
的所有功能,也沒有嘗試這樣做。 您可以隨時將客戶端連接到特定主機,但大多數情況下假設您的操作僅限於可以自動路由到不同節點的基本 key/value
操作。
你可以做什麼:
- 自動針對主機進行單
key
操作 - 對所有或部分節點執行命令
- 並行執行所有這些
安裝
rb
在 PyPI
上可用,可以從那裡安裝:
$ pip install rb
配置
開始使用 rb
非常簡單。如果您之前一直在使用 py-redis
,您會感到賓至如歸。 主要區別在於,不是連接到單個主機,而是將 cluster
配置為連接到多個:
from rb import Cluster
cluster = Cluster(hosts={
0: {'port': 6379},
1: {'port': 6380},
2: {'port': 6381},
3: {'port': 6382},
4: {'port': 6379},
5: {'port': 6380},
6: {'port': 6381},
7: {'port': 6382},
}, host_defaults={
'host': '127.0.0.1',
})
在這種情況下,我們在同一主機上的四個不同伺服器進程上設置了 8
個節點。hosts
參數是要連接的主機的映射。 字典的 key
是 host ID
(整數),值是參數字典。host_defaults
是為所有主機填寫的可選默認值字典。 如果您想共享一些重複的常見默認值(在這種情況下,所有主機都連接到 localhost
),這很有用。
在默認配置中,PartitionRouter
用於路由。
路由
現在集群已經構建好了,我們可以使用 Cluster.get_routing_client()
來獲取一個 redis
客戶端,它會為每個命令自動路由到正確的 redis
節點:
client = cluster.get_routing_client()
results = {}
for key in keys_to_look_up:
results[key] = client.get(key)
該客戶端的工作原理與標準的 pyredis StrictClient
非常相似,主要區別在於它只能執行只涉及一個 key
的命令。
然而,這個基本操作是串聯運行的。使 rb
有用的是它可以自動構建 redis
管道並將查詢並行發送到許多主機。但是,這會稍微改變用法,因為現在該值無法立即使用:
results = {}
with cluster.map() as client:
for key in keys_to_look_up:
results[key] = client.get(key)
雖然到目前為止看起來很相似,但不是將實際值存儲在 result
字典中,而是存儲 Promise
對象。當 map context manager
結束時,它們保證已經被執行,您可以訪問 Promise.value
屬性來獲取值:
for key, promise in results.iteritems():
print '%s: %s' % (key, promise.value)
如果要向所有參與的主機發送命令(例如刪除資料庫),可以使用 Cluster.all()
方法:
with cluster.all() as client:
client.flushdb()
如果你這樣做,promise
值是一個字典,其中 host ID
作為 key
,結果作為 value
。舉個例子:
with cluster.all() as client:
results = client.info()
for host_id, info in results.iteritems():
print 'host %s is running %s' % (host_id, info['os'])
要明確針對某些主機,您可以使用 Cluster.fanout()
接受要將命令發送到 host ID
列表。
API
這是公共 API 的完整參考。請注意,此庫擴展了 Python redis
庫,因此其中一些類具有更多功能,您需要查閱 py-redis
庫。
Cluster
class rb.Cluster(hosts, host_defaults=None, pool_cls=None, pool_options=None, router_cls=None, router_options=None)
cluster
是 rb
背後的核心對象。 它保存到各個節點的連接池,並且可以在應用程式運行期間在中央位置共享。
具有默認 router
的四個 redis
實例上的集群的基本示例:
cluster = Cluster(hosts={
0: {'port': 6379},
1: {'port': 6380},
2: {'port': 6381},
3: {'port': 6382},
}, host_defaults={
'host': '127.0.0.1',
})
hosts
是一個主機字典,它將 host ID
數量映射到配置參數。參數對應於 add_host()
函數的簽名。這些參數的默認值是從 host_defaults
中提取的。要覆蓋 pool
類,可以使用 pool_cls
和 pool_options
參數。這同樣適用於 router
的 router_cls
和 router_options
。pool
選項對於設置 socket
超時和類似參數很有用。
add_host(host_id=None, host='localhost', port=6379, unix_socket_path=None, db=0, password=None, ssl=False, ssl_options=None)
- 將新主機添加到集群。 這僅對單元測試真正有用,因為通常主機是通過構造函數添加的,並且在第一次使用集群後進行更改不太可能有意義。
all(timeout=None, max_concurrency=64, auto_batch=True)
- 扇出到所有主機。其他方面與
fanout()
完全一樣。 - 例子:
with cluster.all() as client: client.flushdb()
- 扇出到所有主機。其他方面與
disconnect_pools()
- 斷開與內部池的所有連接。
execute_commands(mapping, *args, **kwargs)
-
同時在
Redis
集群上執行與路由key
關聯的一系列命令,返回一個新映射,其中值是與同一位置的命令對應的結果列表。例如:>>> cluster.execute_commands({ ... 'foo': [ ... ('PING',), ... ('TIME',), ... ], ... 'bar': [ ... ('CLIENT', 'GETNAME'), ... ], ... }) {'bar': [<Promise None>], 'foo': [<Promise True>, <Promise (1454446079, 418404)>]}
-
作為
redis.client.Script
實例的命令將首先檢查它們在目標節點上的存在,然後在執行之前載入到目標上,並且可以與其他命令交錯:>>> from redis.client import Script >>> TestScript = Script(None, 'return {KEYS, ARGV}') >>> cluster.execute_commands({ ... 'foo': [ ... (TestScript, ('key:1', 'key:2'), range(0, 3)), ... ], ... 'bar': [ ... (TestScript, ('key:3', 'key:4'), range(3, 6)), ... ], ... }) {'bar': [<Promise [['key:3', 'key:4'], ['3', '4', '5']]>], 'foo': [<Promise [['key:1', 'key:2'], ['0', '1', '2']]>]}
在內部,
FanoutClient
用於發出命令。
-
fanout(hosts=None, timeout=None, max_concurrency=64, auto_batch=True)
- 用於獲取路由客戶端、開始扇出操作並
join
結果的快捷上下文管理器。 - 在上下文管理器中,可用的客戶端是
FanoutClient
。示例用法:with cluster.fanout(hosts='all') as client: client.flushdb()
- 用於獲取路由客戶端、開始扇出操作並
get_local_client(host_id)
- 返回特定主機
ID
的本地化client
。這個client
就像一個普通的Python redis
客戶端一樣工作,並立即返回結果。
- 返回特定主機
get_local_client_for_key(key)
- 類似於
get_local_client_for_key()
但根據router
所說的key
目的地返回client
。
- 類似於
get_pool_for_host(host_id)
- 返回給定主機的連接池。
- redis 客戶端使用此連接池來確保它不必不斷地重新連接。如果要使用自定義 redis 客戶端,可以手動將其作為連接池傳入。
get_router()
- 返回
cluster
的router
。如果cluster
重新配置,router
將被重新創建。 通常,您不需要自己與router
交互,因為集群的路由客戶端會自動執行此操作。 - 這將返回
BaseRouter
的一個實例。
- 返回
get_routing_client(auto_batch=True)
- 返回一個路由客戶端。該客戶端能夠自動將請求路由到各個主機。 它是執行緒安全的,可以類似於主機本地客戶端使用,但它會拒絕執行無法直接路由到單個節點的命令。
- 路由客戶端的默認行為是嘗試將符合條件的命令批處理成批處理版本。 例如,路由到同一節點的多個
GET
命令最終可以合併為一個MGET
命令。可以通過將auto_batch
設置為False
來禁用此行為。這對於調試很有用,因為MONITOR
將更準確地反映程式碼中發出的命令。 - 有關詳細資訊,請參閱
RoutingClient
。
map(timeout=None, max_concurrency=64, auto_batch=True)
- 用於獲取路由客戶端、開始映射操作並
join
結果的快捷上下文管理器。max_concurrency
定義在隱式連接發生之前可以存在多少未完成的並行查詢。 - 在上下文管理器中,可用的客戶端是
MappingClient
。示例用法:results = {} with cluster.map() as client: for key in keys_to_fetch: results[key] = client.get(key) for key, promise in results.iteritems(): print '%s => %s' % (key, promise.value)
- 用於獲取路由客戶端、開始映射操作並
remove_host(host_id)
- 從
client
中刪除host
。這僅對單元測試真正有用。
- 從
Clients
class rb.RoutingClient(cluster, auto_batch=True)
可以路由到單個目標的客戶端。
有關參數,請參見 Cluster.get_routing_client()
。
execute_command(*args, **options)
- 執行命令並返回解析後的響應
fanout(hosts=None, timeout=None, max_concurrency=64, auto_batch=None)
- 返回映射操作的
context manager
,該操作扇出到手動指定的主機,而不是使用路由系統。 例如,這可用於清空所有主機上的資料庫。context manager
返回一個FanoutClient
。 示例用法:with cluster.fanout(hosts=[0, 1, 2, 3]) as client: results = client.info() for host_id, info in results.value.iteritems(): print '%s -> %s' % (host_id, info['is'])
- 返回的
promise
將所有結果累積到由host_id
鍵入的字典中。 hosts
參數是一個host_id
列表,或者是字元串'all'
,用於將命令發送到所有主機。fanout API
需要非常小心地使用,因為當key
被寫入不期望它們的主機時,它可能會造成很多損壞。
- 返回映射操作的
get_fanout_client(hosts, max_concurrency=64, auto_batch=None)
- 返回執行緒不安全的扇出客戶端。
- 返回
FanoutClient
的實例。
get_mapping_client(max_concurrency=64, auto_batch=None)
- 返回一個執行緒不安全的映射客戶端。此客戶端的工作方式類似於
redis
管道並返回最終結果對象。它需要join
才能正常工作。您應該使用自動join
的map()
上下文管理器,而不是直接使用它。 - 返回
MappingClient
的一個實例。
- 返回一個執行緒不安全的映射客戶端。此客戶端的工作方式類似於
map(timeout=None, max_concurrency=64, auto_batch=None)
- 返回映射操作的
context manager
。 這會並行運行多個查詢,然後最後join
以收集所有結果。 - 在上下文管理器中,可用的客戶端是
MappingClient
。示例用法:results = {} with cluster.map() as client: for key in keys_to_fetch: results[key] = client.get(key) for key, promise in results.iteritems(): print '%s => %s' % (key, promise.value)
- 返回映射操作的
class rb.MappingClient(connection_pool, max_concurrency=None, auto_batch=True)
路由客戶端使用 cluster
的 router
根據執行的 redis
命令的 key
自動定位單個節點。
有關參數,請參見 Cluster.map()
。
cancel()
- 取消所有未完成的請求。
execute_command(*args, **options)
- 執行命令並返回解析後的響應
join(timeout=None)
- 等待所有未完成的響應返回或超時
mget(keys, *args)
- 返回與
key
順序相同的值列表
- 返回與
mset(*args, **kwargs)
- 根據映射設置
key/value
。映射是key/value
對的字典。key
和value
都應該是可以通過str()
轉換為string
的字元串或類型。
- 根據映射設置
class rb.FanoutClient(hosts, connection_pool, max_concurrency=None, auto_batch=True)
這與 MappingClient
的工作方式相似,但它不是使用 router
來定位主機,而是將命令發送到所有手動指定的主機。
結果累積在由 host_id
鍵入的字典中。
有關參數,請參見 Cluster.fanout()
。
execute_command(*args, **options)
- 執行命令並返回解析後的響應
target(hosts)
- 為一次調用臨時重新定位
client
。當必須為一次調用處理主機subset
時,這很有用。
- 為一次調用臨時重新定位
target_key(key)
- 臨時重新定位客戶端以進行一次調用,以專門路由到給定
key
路由到的一台主機。 在這種情況下,promise
的結果只是一個主機的值而不是字典。 1.3
版中的新功能。
- 臨時重新定位客戶端以進行一次調用,以專門路由到給定
Promise
class rb.Promise
一個嘗試為 Promise
對象鏡像 ES6 API
的 Promise
對象。與 ES6
的 Promise
不同,這個 Promise
也直接提供對底層值的訪問,並且它有一些稍微不同的靜態方法名稱,因為這個 Promise
可以在外部解析。
static all(iterable_or_dict)
- 當所有傳遞的
promise
都解決時,promise
就解決了。你可以傳遞一個promise
列表或一個promise
字典。
- 當所有傳遞的
done(on_success=None, on_failure=None)
- 將一些回調附加到
Promise
並返回Promise
。
- 將一些回調附加到
is_pending
- 如果
promise
仍然等待,則為True
,否則為False
。
- 如果
is_rejected
- 如果
promise
被拒絕,則為True
,否則為False
。
- 如果
is_resolved
- 如果
promise
已解決,則為True
,否則為False
。
- 如果
reason
- 如果它被拒絕,這個
promise
的原因。
- 如果它被拒絕,這個
reject(reason)
- 以給定的理由拒絕
promise
。
- 以給定的理由拒絕
static rejected(reason)
- 創建一個以特定值被拒絕的
promise
對象。
- 創建一個以特定值被拒絕的
resolve(value)
- 用給定的值解決
promise
。
- 用給定的值解決
static resolved(value)
- 創建一個以特定值解析的
promise
對象。
- 創建一個以特定值解析的
then(success=None, failure=None)
- 向
Promise
添加成功和/或失敗回調的實用方法,該方法還將在此過程中返回另一個Promise
。
- 向
value
- 如果它被解決,這個
promise
所持有的值。
- 如果它被解決,這個
Routers
class rb.BaseRouter(cluster)
所有路由的基類。如果你想實現一個自定義路由,這就是你的子類。
cluster
- 引用回此
router
所屬的Cluster
。
- 引用回此
get_host_for_command(command, args)
- 返回應執行此命令的主機。
get_host_for_key(key)
- 執行路由並返回目標的
host_id
。 - 子類需要實現這一點。
- 執行路由並返回目標的
get_key(command, args)
- 返回命令操作的
key
。
- 返回命令操作的
class rb.ConsistentHashingRouter(cluster)
基於一致哈希演算法返回 host_id
的 router
。 一致的哈希演算法僅在提供 key
參數時才有效。
該 router
要求主機是無間隙的,這意味著 N
台主機的 ID
範圍從 0
到 N-1
。
get_host_for_key(key)
- 執行路由並返回目標的
host_id
。 - 子類需要實現這一點。
- 執行路由並返回目標的
class rb.PartitionRouter(cluster)
一個簡單的 router
,僅根據簡單的 crc32 % node_count
設置將命令單獨路由到單個節點。
該 router
要求主機是無間隙的,這意味著 N
台主機的 ID
範圍從 0
到 N-1
。
- get_host_for_key(key)
- 執行路由並返回目標的
host_id
。 - 子類需要實現這一點。
- 執行路由並返回目標的
exception rb.UnroutableCommand
如果發出的命令無法通過 router
路由到單個主機,則引發。
Testing
class rb.testing.TestSetup(servers=4, databases_each=8, server_executable='redis-server')
測試設置是生成多個 redis
伺服器進行測試並自動關閉它們的便捷方式。 這可以用作 context manager
來自動終止客戶端。
rb.testing.make_test_cluster(*args, **kwargs)
- 用於創建測試設置然後從中創建
cluster
的便捷快捷方式。這必須用作context manager
:from rb.testing import make_test_cluster with make_test_cluster() as cluster: ...
- 用於創建測試設置然後從中創建