之前一篇Python 封装DBUtils 和pymysql 中写过一个basedao.py,最近几天又重新整理了下思绪,优化了下 basedao.py,目前支持的方法还不多,后续会进行改进、添加。
主要功能:
1.查询单个对象:
所需参数:表名,过滤条件
2.查询多个对象:
所需参数:表名,过滤条件
3.按主键查询:
所需参数:表名,值
4.分页查询:
所需参数:表名,页码,每页记录数,过滤条件
具体代码如下:
1 import json, os, sys, time 2 3 import pymysql 4 from DBUtils import PooledDB 5 6 class BaseDao(object): 7 """ 8 简便的数据库操作基类 9 """ 10 __config = {} # 数据库连接配置 11 __conn = None # 数据库连接 12 __cursor = None # 数据库游标 13 __database = None # 用于临时村塾查询数据库 14 __tableName = None # 用于临时存储查询表名 15 __fields = [] # 用于临时存储查询表的字段列表 16 __primaryKey_dict = {} # 用于存储配置中的数据库中所有表的主键 17 18 def __init__(self, creator=pymysql, host="localhost", user=None, password="", database=None, port=3306, charset="utf8"): 19 if host is None: 20 raise Exception("Parameter [host] is None.") 21 if user is None: 22 raise Exception("Parameter [user] is None.") 23 if password is None: 24 raise Exception("Parameter [password] is None.") 25 if database is None: 26 raise Exception("Parameter [database] is None.") 27 if port is None: 28 raise Exception("Parameter [port] is None.") 29 self.__config = dict({ 30 "creator" : creator, "charset":charset, 31 "host":host, "port":port, 32 "user":user, "password":password, "database":database 33 }) 34 self.__conn = PooledDB.connect(**self.__config) 35 self.__cursor = self.__conn.cursor() 36 self.__database = self.__config["database"] 37 self.__init_primaryKey() 38 print(get_time(), "数据库连接初始化成功。") 39 40 def __del__(self): 41 '重写类被清除时调用的方法' 42 if self.__cursor: 43 self.__cursor.close() 44 print(get_time(), "游标关闭") 45 if self.__conn: 46 self.__conn.close() 47 print(get_time(), "连接关闭") 48 49 def select_one(self, tableName=None, filters={}): 50 ''' 51 查询单个对象 52 @tableName 表名 53 @filters 过滤条件 54 @return 返回字典集合,集合中以表字段作为 key,字段值作为 value 55 ''' 56 self.__check_params(tableName) 57 sql = self.__query_util(filters) 58 self.__cursor.execute(sql) 59 result = self.__cursor.fetchone() 60 return self.__parse_result(result) 61 62 def select_pk(self, tableName=None, primaryKey=None): 63 ''' 64 按主键查询 65 @tableName 表名 66 @primaryKey 主键值 67 ''' 68 self.__check_params(tableName) 69 filters = {} 70 filters.setdefault(str(self.__primaryKey_dict[tableName]), primaryKey) 71 sql = self.__query_util(filters) 72 self.__cursor.execute(sql) 73 result = self.__cursor.fetchone() 74 return self.__parse_result(result) 75 76 def select_all(self, tableName=None, filters={}): 77 ''' 78 查询所有 79 @tableName 表名 80 @filters 过滤条件 81 @return 返回字典集合,集合中以表字段作为 key,字段值作为 value 82 ''' 83 self.__check_params(tableName) 84 sql = self.__query_util(filters) 85 self.__cursor.execute(sql) 86 results = self.__cursor.fetchall() 87 return self.__parse_results(results) 88 89 def count(self, tableName=None): 90 ''' 91 统计记录数 92 ''' 93 self.__check_params(tableName) 94 sql = "SELECT count(*) FROM %s"%(self.__tableName) 95 self.__cursor.execute(sql) 96 result = self.__cursor.fetchone() 97 return result[0] 98 99 def select_page(self, tableName=None, pageNum=1, limit=10, filters={}):100 '''101 分页查询102 @tableName 表名103 @return 返回字典集合,集合中以表字段作为 key,字段值作为 value104 '''105 self.__check_params(tableName)106 totalCount = self.count()107 if totalCount / limit == 0 :108 totalPage = totalCount / limit109 else:110 totalPage = totalCount // limit + 1111 if pageNum > totalPage:112 print("最大页数为%d"%totalPage)113 pageNum = totalPage114 elif pageNum < 1:115 print("页数不能小于1")116 pageNum = 1117 beginindex = (pageNum-1) * limit118 filters.setdefault("_limit_", (beginindex, limit))119 sql = self.__query_util(filters)120 self.__cursor.execute(sql)121 results = self.__cursor.fetchall()122 return self.__parse_results(results)123 124 def __parse_result(self, result):125 '用于解析单个查询结果,返回字典对象'126 obj = {}127 for k,v in zip(self.__fields, result):128 obj[k] = v129 return obj130 131 def __parse_results(self, results):132 '用于解析多个查询结果,返回字典列表对象'133 objs = []134 for result in results:135 obj = self.__parse_result(result)136 objs.append(obj)137 return objs138 139 def __init_primaryKey(self):140 '根据配置中的数据库读取该数据库中所有表的主键集合'141 sql = """SELECT TABLE_NAME, COLUMN_NAME142 FROM Information_schema.columns143 WHERE COLUMN_KEY='PRI' AND TABLE_SCHEMA='%s'"""%(self.__database)144 self.__cursor.execute(sql)145 results = self.__cursor.fetchall()146 for result in results:147 self.__primaryKey_dict[result[0]] = result[1]148 149 def __query_fields(self, tableName=None, database=None):150 '查询表的字段列表, 将查询出来的字段列表存入 __fields 中'151 sql = """SELECT column_name152 FROM Information_schema.columns153 WHERE table_Name = '%s' AND TABLE_SCHEMA='%s'"""%(tableName, database)154 self.__cursor.execute(sql)155 fields_tuple = self.__cursor.fetchall()156 self.__fields = [fields[0] for fields in fields_tuple]157 158 def __query_util(self, filters=None):159 """160 SQL 语句拼接方法161 @filters 过滤条件162 """163 sql = r'SELECT #{FIELDS} FROM #{TABLE_NAME} WHERE 1=1 #{FILTERS}'164 # 拼接查询表165 sql = sql.replace("#{TABLE_NAME}", self.__tableName)166 # 拼接查询字段167 self.__query_fields(self.__tableName, self.__database)168 FIELDS = ""169 for field in self.__fields:170 FIELDS += field + ", "171 FIELDS = FIELDS[0: len(FIELDS)-2]172 sql = sql.replace("#{FIELDS}", FIELDS)173 # 拼接查询条件(待优化)174 if filters is None:175 sql = sql.replace("#{FILTERS}", "")176 else:177 FILTERS = ""178 if not isinstance(filters, dict):179 raise Exception("Parameter [filters] must be dict type. ")180 isPage = False181 if filters.get("_limit_"):182 isPage = True183 beginindex, limit = filters.get("_limit_")184 for k, v in filters.items():185 if k.startswith("_in_"): # 拼接 in186 FILTERS += "AND %s IN (" %(k[4:])187 values = v.split(",")188 for value in values:189 FILTERS += "%s,"%value190 FILTERS = FILTERS[0:len(FILTERS)-1] + ") "191 elif k.startswith("_nein_"): # 拼接 not in192 FILTERS += "AND %s NOT IN (" %(k[4:])193 values = v.split(",")194 for value in values:195 FILTERS += "%s,"%value196 FILTERS = FILTERS[0:len(FILTERS)-1] + ") "197 elif k.startswith("_like_"): # 拼接 like198 FILTERS += "AND %s like '%%%s%%' " %(k[6:], v)199 elif k.startswith("_ne_"): # 拼接不等于200 FILTERS += "AND %s != '%s' " %(k[4:], v)201 elif k.startswith("_lt_"): # 拼接小于202 FILTERS += "AND %s < '%s' " %(k[4:], v)203 elif k.startswith("_le_"): # 拼接小于等于204 FILTERS += "AND %s <= '%s' " %(k[4:], v)205 elif k.startswith("_gt_"): # 拼接大于206 FILTERS += "AND %s > '%s' " %(k[4:], v)207 elif k.startswith("_ge_"): # 拼接大于等于208 FILTERS += "AND %s >= '%s' " %(k[4:], v)209 elif k in self.__fields: # 拼接等于210 FILTERS += "AND %s = '%s' "%(k, v)211 sql = sql.replace("#{FILTERS}", FILTERS)212 if isPage:213 sql += "LIMIT %d,%d"%(beginindex, limit)214 215 print(get_time(), sql)216 return sql217 218 def __check_params(self, tableName):219 '''220 检查参数221 '''222 if tableName:223 self.__tableName = tableName224 else:225 if self.__tableName is None:226 raise Exception("Parameter [tableName] is None.")227 228 def get_time():229 return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())230 231 if __name__ == "__main__":232 config = {233 # "creator": pymysql,234 # "host" : "127.0.0.1", 235 "user" : "root", 236 "password" : "root",237 "database" : "test", 238 # "port" : 3306,239 # "charset" : 'utf8'240 }241 base = BaseDao(**config)242 ########################################################################243 # user = base.select_one("user")244 # print(user)245 ########################################################################246 # users = base.select_all("user")247 # print(users)248 ########################################################################249 # filter1 = {250 # "sex":0,251 # "_in_id":"1,2,3,4,5",252 # "_like_name":"zhang",253 # "_ne_name":"wangwu"254 # }255 # user_filters = base.select_all(tableName="user", filters=filter1)256 # print(user_filters)257 ########################################################################258 # menu = base.select_one(tableName="menu")259 # print(menu)260 ########################################################################261 # user_pk = base.select_pk("user", 2)262 # print(user_pk)263 ########################################################################264 # filter2 = {265 # "_in_id":"1,2,3,4",266 # "_like_name":"test"267 # }268 # user_limit = base.select_page("user", 2, 10, filter2) #未实现269 # print(user_limit)270 ########################################################################
代码中已经给出了几个具体示例,大家可以参考使用。
以上是Python 封装DBUtils 和pymysql实例的详细内容。更多信息请关注PHP中文网其他相关文章!

Python和C 各有优势,选择应基于项目需求。1)Python适合快速开发和数据处理,因其简洁语法和动态类型。2)C 适用于高性能和系统编程,因其静态类型和手动内存管理。

选择Python还是C 取决于项目需求:1)如果需要快速开发、数据处理和原型设计,选择Python;2)如果需要高性能、低延迟和接近硬件的控制,选择C 。

通过每天投入2小时的Python学习,可以有效提升编程技能。1.学习新知识:阅读文档或观看教程。2.实践:编写代码和完成练习。3.复习:巩固所学内容。4.项目实践:应用所学于实际项目中。这样的结构化学习计划能帮助你系统掌握Python并实现职业目标。

在两小时内高效学习Python的方法包括:1.回顾基础知识,确保熟悉Python的安装和基本语法;2.理解Python的核心概念,如变量、列表、函数等;3.通过使用示例掌握基本和高级用法;4.学习常见错误与调试技巧;5.应用性能优化与最佳实践,如使用列表推导式和遵循PEP8风格指南。

Python适合初学者和数据科学,C 适用于系统编程和游戏开发。1.Python简洁易用,适用于数据科学和Web开发。2.C 提供高性能和控制力,适用于游戏开发和系统编程。选择应基于项目需求和个人兴趣。

Python更适合数据科学和快速开发,C 更适合高性能和系统编程。1.Python语法简洁,易于学习,适用于数据处理和科学计算。2.C 语法复杂,但性能优越,常用于游戏开发和系统编程。

每天投入两小时学习Python是可行的。1.学习新知识:用一小时学习新概念,如列表和字典。2.实践和练习:用一小时进行编程练习,如编写小程序。通过合理规划和坚持不懈,你可以在短时间内掌握Python的核心概念。

Python更易学且易用,C 则更强大但复杂。1.Python语法简洁,适合初学者,动态类型和自动内存管理使其易用,但可能导致运行时错误。2.C 提供低级控制和高级特性,适合高性能应用,但学习门槛高,需手动管理内存和类型安全。


热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

Video Face Swap
使用我们完全免费的人工智能换脸工具轻松在任何视频中换脸!

热门文章

热工具

记事本++7.3.1
好用且免费的代码编辑器

DVWA
Damn Vulnerable Web App (DVWA) 是一个PHP/MySQL的Web应用程序,非常容易受到攻击。它的主要目标是成为安全专业人员在合法环境中测试自己的技能和工具的辅助工具,帮助Web开发人员更好地理解保护Web应用程序的过程,并帮助教师/学生在课堂环境中教授/学习Web应用程序安全。DVWA的目标是通过简单直接的界面练习一些最常见的Web漏洞,难度各不相同。请注意,该软件中

螳螂BT
Mantis是一个易于部署的基于Web的缺陷跟踪工具,用于帮助产品缺陷跟踪。它需要PHP、MySQL和一个Web服务器。请查看我们的演示和托管服务。

SublimeText3汉化版
中文版,非常好用

mPDF
mPDF是一个PHP库,可以从UTF-8编码的HTML生成PDF文件。原作者Ian Back编写mPDF以从他的网站上“即时”输出PDF文件,并处理不同的语言。与原始脚本如HTML2FPDF相比,它的速度较慢,并且在使用Unicode字体时生成的文件较大,但支持CSS样式等,并进行了大量增强。支持几乎所有语言,包括RTL(阿拉伯语和希伯来语)和CJK(中日韩)。支持嵌套的块级元素(如P、DIV),