今天在写zabbix storm job监控脚本的时候用到了python的redis模块,之前也有用过,但是没有过多的了解,今天看了下相关的api和源码,看到有ConnectionPool的实现,这里简单说下。
在ConnectionPool之前,如果需要连接redis,我都是用StrictRedis这个类,在源码中可以看到这个类的具体解释:
redis.StrictRedis Implementation of the Redis protocol.This abstract class provides a Python interface to all Redis commands and an
implementation of the Redis protocol.Connection and Pipeline derive from this, implementing how the commands are sent and received to the Redis server
使用的方法:
r=redis.StrictRedis(host=xxxx, port=xxxx, db=xxxx) r.xxxx()
有了ConnectionPool这个类之后,可以使用如下方法
pool = redis.ConnectionPool(host=xxx, port=xxx, db=xxxx) r = redis.Redis(connection_pool=pool)
这里Redis是StrictRedis的子类
简单分析如下:
在StrictRedis类的__init__方法中,可以初始化connection_pool这个参数,其对应的是一个ConnectionPool的对象:
class StrictRedis(object): ........ def __init__(self, host='localhost', port=6379, db=0, password=None, socket_timeout=None, socket_connect_timeout=None, socket_keepalive=None, socket_keepalive_options=None, connection_pool=None, unix_socket_path=None, encoding='utf-8', encoding_errors='strict', charset=None, errors=None, decode_responses=False, retry_on_timeout=False, ssl=False, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None, ssl_ca_certs=None): if not connection_pool: .......... connection_pool = ConnectionPool(**kwargs) self.connection_pool = connection_pool
在StrictRedis的实例执行具体的命令时会调用execute_command方法,这里可以看到具体实现是从连接池中获取一个具体的连接,然后执行命令,完成后释放连接:
# COMMAND EXECUTION AND PROTOCOL PARSING def execute_command(self, *args, **options): "Execute a command and return a parsed response" pool = self.connection_pool command_name = args[0] connection = pool.get_connection(command_name, **options) #调用ConnectionPool.get_connection方法获取一个连接 try: connection.send_command(*args) #命令执行,这里为Connection.send_command return self.parse_response(connection, command_name, **options) except (ConnectionError, TimeoutError) as e: connection.disconnect() if not connection.retry_on_timeout and isinstance(e, TimeoutError): raise connection.send_command(*args) return self.parse_response(connection, command_name, **options) finally: pool.release(connection) #调用ConnectionPool.release释放连接
在来看看ConnectionPool类:
class ConnectionPool(object): ........... def __init__(self, connection_class=Connection, max_connections=None, **connection_kwargs): #类初始化时调用构造函数 max_connections = max_connections or 2 ** 31 if not isinstance(max_connections, (int, long)) or max_connections < 0: #判断输入的max_connections是否合法 raise ValueError('"max_connections" must be a positive integer') self.connection_class = connection_class #设置对应的参数 self.connection_kwargs = connection_kwargs self.max_connections = max_connections self.reset() #初始化ConnectionPool 时的reset操作 def reset(self): self.pid = os.getpid() self._created_connections = 0 #已经创建的连接的计数器 self._available_connections = [] #声明一个空的数组,用来存放可用的连接 self._in_use_connections = set() #声明一个空的集合,用来存放已经在用的连接 self._check_lock = threading.Lock() ....... def get_connection(self, command_name, *keys, **options): #在连接池中获取连接的方法 "Get a connection from the pool" self._checkpid() try: connection = self._available_connections.pop() #获取并删除代表连接的元素,在第一次获取connectiong时,因为_available_connections是一个空的数组, 会直接调用make_connection方法 except IndexError: connection = self.make_connection() self._in_use_connections.add(connection) #向代表正在使用的连接的集合中添加元素 return connection def make_connection(self): #在_available_connections数组为空时获取连接调用的方法 "Create a new connection" if self._created_connections >= self.max_connections: #判断创建的连接是否已经达到最大限制,max_connections可以通过参数初始化 raise ConnectionError("Too many connections") self._created_connections += 1 #把代表已经创建的连接的数值+1 return self.connection_class(**self.connection_kwargs) #返回有效的连接,默认为Connection(**self.connection_kwargs) def release(self, connection): #释放连接,链接并没有断开,只是存在链接池中 "Releases the connection back to the pool" self._checkpid() if connection.pid != self.pid: return self._in_use_connections.remove(connection) #从集合中删除元素 self._available_connections.append(connection) #并添加到_available_connections 的数组中 def disconnect(self): #断开所有连接池中的链接 "Disconnects all connections in the pool" all_conns = chain(self._available_connections, self._in_use_connections) for connection in all_conns: connection.disconnect()
execute_command最终调用的是Connection.send_command方法,关闭链接为 Connection.disconnect方法,而Connection类的实现:
class Connection(object): "Manages TCP communication to and from a Redis server" def __del__(self): #对象删除时的操作,调用disconnect释放连接 try: self.disconnect() except Exception: pass
核心的链接建立方法是通过socket模块实现:
def _connect(self): err = None for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM): family, socktype, proto, canonname, socket_address = res sock = None try: sock = socket.socket(family, socktype, proto) # TCP_NODELAY sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # TCP_KEEPALIVE if self.socket_keepalive: #构造函数中默认 socket_keepalive=False,因此这里默认为短连接 sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) for k, v in iteritems(self.socket_keepalive_options): sock.setsockopt(socket.SOL_TCP, k, v) # set the socket_connect_timeout before we connect sock.settimeout(self.socket_connect_timeout) #构造函数中默认socket_connect_timeout=None,即连接为blocking的模式 # connect sock.connect(socket_address) # set the socket_timeout now that we're connected sock.settimeout(self.socket_timeout) #构造函数中默认socket_timeout=None return sock except socket.error as _: err = _ if sock is not None: sock.close() .....
关闭链接的方法:
def disconnect(self): "Disconnects from the Redis server" self._parser.on_disconnect() if self._sock is None: return try: self._sock.shutdown(socket.SHUT_RDWR) #先shutdown再close self._sock.close() except socket.error: pass self._sock = None
可以小结如下
1)默认情况下每创建一个Redis实例都会构造出一个ConnectionPool实例,每一次访问redis都会从这个连接池得到一个连接,操作完成后会把该连接放回连接池(连接并没有释放),可以构造一个统一的ConnectionPool,在创建Redis实例时,可以将该ConnectionPool传入,那么后续的操作会从给定的ConnectionPool获得连接,不会再重复创建ConnectionPool。
2)默认情况下没有设置keepalive和timeout,建立的连接是blocking模式的短连接。
3)不考虑底层tcp的情况下,连接池中的连接会在ConnectionPool.disconnect中统一销毁。

2小时内可以学会Python的基本编程概念和技能。1.学习变量和数据类型,2.掌握控制流(条件语句和循环),3.理解函数的定义和使用,4.通过简单示例和代码片段快速上手Python编程。

Python在web开发、数据科学、机器学习、自动化和脚本编写等领域有广泛应用。1)在web开发中,Django和Flask框架简化了开发过程。2)数据科学和机器学习领域,NumPy、Pandas、Scikit-learn和TensorFlow库提供了强大支持。3)自动化和脚本编写方面,Python适用于自动化测试和系统管理等任务。

两小时内可以学到Python的基础知识。1.学习变量和数据类型,2.掌握控制结构如if语句和循环,3.了解函数的定义和使用。这些将帮助你开始编写简单的Python程序。

如何在10小时内教计算机小白编程基础?如果你只有10个小时来教计算机小白一些编程知识,你会选择教些什么�...

使用FiddlerEverywhere进行中间人读取时如何避免被检测到当你使用FiddlerEverywhere...

Python3.6环境下加载Pickle文件报错:ModuleNotFoundError:Nomodulenamed...

如何解决jieba分词在景区评论分析中的问题?当我们在进行景区评论分析时,往往会使用jieba分词工具来处理文�...

如何使用正则表达式匹配到第一个闭合标签就停止?在处理HTML或其他标记语言时,常常需要使用正则表达式来�...


热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

AI Hentai Generator
免费生成ai无尽的。

热门文章

热工具

WebStorm Mac版
好用的JavaScript开发工具

螳螂BT
Mantis是一个易于部署的基于Web的缺陷跟踪工具,用于帮助产品缺陷跟踪。它需要PHP、MySQL和一个Web服务器。请查看我们的演示和托管服务。

SecLists
SecLists是最终安全测试人员的伙伴。它是一个包含各种类型列表的集合,这些列表在安全评估过程中经常使用,都在一个地方。SecLists通过方便地提供安全测试人员可能需要的所有列表,帮助提高安全测试的效率和生产力。列表类型包括用户名、密码、URL、模糊测试有效载荷、敏感数据模式、Web shell等等。测试人员只需将此存储库拉到新的测试机上,他就可以访问到所需的每种类型的列表。

VSCode Windows 64位 下载
微软推出的免费、功能强大的一款IDE编辑器

Atom编辑器mac版下载
最流行的的开源编辑器