Maison > Questions et réponses > le corps du texte
from ccnet.sync_client import SyncClient
import Queue
class ClientPool(object):
"""ccnet client pool."""
def __init__(self, conf_dir, pool_size=5):
"""
:param conf_dir: the ccnet configuration directory
:param pool_size:
"""
self.conf_dir = conf_dir
self.pool_size = pool_size
self._pool = Queue.Queue(pool_size)
def _create_client(self):
client = SyncClient(self.conf_dir)
client.req_ids = {}
client.connect_daemon()
return client
def get_client(self):
try:
client = self._pool.get(False)
except:
client = self._create_client()
return client
def return_client(self, client):
try:
self._pool.put(client, False)
except Queue.Full:
pass
class SyncClient(Client):
'''sync mode client'''
def __init__(self, config_dir):
Client.__init__(self, config_dir)
self._req_id = _REQ_ID_START
self.mq_req_id = -1
def disconnect_daemon(self):
if self.is_connected():
try:
self._connfd.close()
except:
pass
def read_response(self):
packet = read_packet(self._connfd)
if packet.header.ptype != CCNET_MSG_RESPONSE:
raise RuntimeError('Invalid Response')
code, code_msg, content = parse_response(packet.body)
return Response(code, code_msg, content)
def send_cmd(self, cmd):
req_id = self.get_request_id()
self.send_request(req_id, 'receive-cmd')
resp = self.read_response()
if resp.code != '200':
raise RuntimeError('Failed to send-cmd: %s %s' % (resp.code, resp.code_msg))
cmd += '\000'
self.send_update(req_id, '200', '', cmd)
resp = self.read_response()
if resp.code != '200':
raise RuntimeError('Failed to send-cmd: %s %s' % (resp.code, resp.code_msg))
self.send_update(req_id, SC_PROC_DONE, SS_PROC_DONE, '')
def prepare_recv_message(self, msg_type):
request = 'mq-server %s' % msg_type
req_id = self.get_request_id()
self.send_request(req_id, request)
resp = self.read_response()
if resp.code != '200':
raise RuntimeError('bad response: %s %s' % (resp.code, resp.code_msg))
def receive_message(self):
resp = self.read_response()
# the message from ccnet daemon has the trailing null byte included
msg = message_from_string(resp.content[:-1])
return msg
def prepare_send_message(self):
request = 'mq-server'
mq_req_id = self.get_request_id()
self.send_request(mq_req_id, request)
resp = self.read_response()
if resp.code != '200':
raise RuntimeError('bad response: %s %s' % (resp.code, resp.code_msg))
self.mq_req_id = mq_req_id
def send_message(self, msg_type, content):
if self.mq_req_id == -1:
self.prepare_send_message()
msg = gen_inner_message_string(self.peer_id, msg_type, content)
self.send_update(self.mq_req_id, "300", '', msg)
resp = self.read_response()
if resp.code != '200':
self.mq_req_id = -1
raise RuntimeError('bad response: %s %s' % (resp.code, resp.code_msg))
def register_service_sync(self, service, group):
'''Mainly used by a program to register a dummy service to ensure only
single instance of that program is running
'''
cmd = 'register-service %s %s' % (service, group)
self.send_cmd(cmd)
巴扎黑2017-04-17 18:02:48
Queue
J'utilise rarement cette structure de données, mais j'ai vérifié la documentation de Python
:
Le module Queue implémente des files d'attente multi-producteurs et multi-consommateurs. Il est particulièrement utile dans la programmation threadée lorsque les informations doivent être échangées en toute sécurité entre plusieurs threads. La classe Queue de ce module implémente toute la sémantique de verrouillage requise. dépend de la disponibilité de la prise en charge des threads en Python ; voir le module de threading.
Le document indique qu'il s'agit d'une file d'attente thread-safe, elle peut donc être utilisée dans un environnement de course avec plusieurs producteurs et plusieurs consommateurs, ce qui est courant dans la programmation multithread.
C'est ce que j'aime Queue
Parlons maintenant du code dans la question. Il ressort clairement du nom que ce code est un 客户端的池
Lorsque vous avez besoin d'obtenir un client, appelez la méthode get_client()
et elle renverra un SyncClient
Le. Le client vous est remis. Lorsque vous avez terminé, n'oubliez pas d'appeler : return_client(client)
Échangez-le sinon, une grande vague de clients sera générée ici. sera problématique.
C’est à peu près tout, je ne sais pas si celui qui pose la question comprend.
迷茫2017-04-17 18:02:48
La file d'attente est une file d'attente thread-safe. Quant au code donné, @yylucifer l'a expliqué plus clairement. Il s'agit en fait d'un pool de clients réutilisable. Cette utilisation est très courante. blockconnectionpool one est fondamentalement la même idée.redis-py