首页 >后端开发 >Python教程 >Python 封装DBUtils 和pymysql实例

Python 封装DBUtils 和pymysql实例

零下一度
零下一度原创
2017-07-23 14:13:034275浏览

  之前一篇Python 封装DBUtils 和pymysql 中写过一个basedao.py,最近几天又重新整理了下思绪,优化了下 basedao.py,目前支持的方法还不多,后续会进行改进、添加。

  主要功能:

    1.查询单个对象:

      所需参数:表名,过滤条件

    2.查询多个对象:
      所需参数:表名,过滤条件

    3.按主键查询:
      所需参数:表名,值

    4.分页查询:
      所需参数:表名,页码,每页记录数,过滤条件

  具体代码如下:  

  1 import json, os, sys, time  2   3 import pymysql  4 from DBUtils import PooledDB  5   6 class BaseDao(object):  7     """  8     简便的数据库操作基类  9     """ 10     __config = {}                   # 数据库连接配置 11     __conn = None                   # 数据库连接 12     __cursor = None                 # 数据库游标 13     __database = None               # 用于临时村塾查询数据库 14     __tableName = None              # 用于临时存储查询表名 15     __fields = []                   # 用于临时存储查询表的字段列表 16     __primaryKey_dict = {}          # 用于存储配置中的数据库中所有表的主键 17  18     def __init__(self, creator=pymysql, host="localhost", user=None, password="", database=None, port=3306, charset="utf8"): 19         if host is None: 20             raise Exception("Parameter [host] is None.") 21         if user is None: 22             raise Exception("Parameter [user] is None.") 23         if password is None: 24             raise Exception("Parameter [password] is None.") 25         if database is None: 26             raise Exception("Parameter [database] is None.") 27         if port is None: 28             raise Exception("Parameter [port] is None.") 29         self.__config = dict({ 30             "creator" : creator, "charset":charset, 31             "host":host, "port":port, 
 32             "user":user, "password":password, "database":database 33         }) 34         self.__conn = PooledDB.connect(**self.__config) 35         self.__cursor = self.__conn.cursor() 36         self.__database = self.__config["database"] 37         self.__init_primaryKey() 38         print(get_time(), "数据库连接初始化成功。") 39          40     def __del__(self): 41         '重写类被清除时调用的方法' 42         if self.__cursor: 43             self.__cursor.close() 44             print(get_time(), "游标关闭") 45         if self.__conn: 46             self.__conn.close() 47             print(get_time(), "连接关闭") 48  49     def select_one(self, tableName=None, filters={}): 50         ''' 51         查询单个对象 52         @tableName 表名 53         @filters 过滤条件 54         @return 返回字典集合,集合中以表字段作为 key,字段值作为 value 55         ''' 56         self.__check_params(tableName) 57         sql = self.__query_util(filters) 58         self.__cursor.execute(sql) 59         result = self.__cursor.fetchone() 60         return self.__parse_result(result) 61  62     def select_pk(self, tableName=None, primaryKey=None): 63         ''' 64         按主键查询 65         @tableName 表名 66         @primaryKey 主键值 67         ''' 68         self.__check_params(tableName) 69         filters = {} 70         filters.setdefault(str(self.__primaryKey_dict[tableName]), primaryKey) 71         sql = self.__query_util(filters) 72         self.__cursor.execute(sql) 73         result = self.__cursor.fetchone() 74         return self.__parse_result(result) 75          76     def select_all(self, tableName=None, filters={}): 77         ''' 78         查询所有 79         @tableName 表名 80         @filters 过滤条件 81         @return 返回字典集合,集合中以表字段作为 key,字段值作为 value 82         ''' 83         self.__check_params(tableName) 84         sql = self.__query_util(filters) 85         self.__cursor.execute(sql) 86         results = self.__cursor.fetchall() 87         return self.__parse_results(results) 88  89     def count(self, tableName=None): 90         ''' 91         统计记录数 92         ''' 93         self.__check_params(tableName) 94         sql = "SELECT count(*) FROM %s"%(self.__tableName) 95         self.__cursor.execute(sql) 96         result = self.__cursor.fetchone() 97         return result[0] 98  99     def select_page(self, tableName=None, pageNum=1, limit=10, filters={}):100         '''101         分页查询102         @tableName 表名103         @return 返回字典集合,集合中以表字段作为 key,字段值作为 value104         '''105         self.__check_params(tableName)106         totalCount = self.count()107         if totalCount / limit == 0 :108             totalPage = totalCount / limit109         else:110             totalPage = totalCount // limit + 1111         if pageNum > totalPage:112             print("最大页数为%d"%totalPage)113             pageNum = totalPage114         elif pageNum < 1:115             print("页数不能小于1")116             pageNum = 1117         beginindex = (pageNum-1) * limit118         filters.setdefault("_limit_", (beginindex, limit))119         sql = self.__query_util(filters)120         self.__cursor.execute(sql)121         results = self.__cursor.fetchall()122         return self.__parse_results(results)123 124     def __parse_result(self, result):125         &#39;用于解析单个查询结果,返回字典对象&#39;126         obj = {}127         for k,v in zip(self.__fields, result):128             obj[k] = v129         return obj130 131     def __parse_results(self, results):132         &#39;用于解析多个查询结果,返回字典列表对象&#39;133         objs = []134         for result in results:135             obj = self.__parse_result(result)136             objs.append(obj)137         return objs138 139     def __init_primaryKey(self):140         &#39;根据配置中的数据库读取该数据库中所有表的主键集合&#39;141         sql = """SELECT TABLE_NAME, COLUMN_NAME142                 FROM  Information_schema.columns143                 WHERE COLUMN_KEY=&#39;PRI&#39; AND TABLE_SCHEMA=&#39;%s&#39;"""%(self.__database)144         self.__cursor.execute(sql)145         results = self.__cursor.fetchall()146         for result in results:147             self.__primaryKey_dict[result[0]] = result[1]148 149     def __query_fields(self, tableName=None, database=None):150         &#39;查询表的字段列表, 将查询出来的字段列表存入 __fields 中&#39;151         sql = """SELECT column_name152                 FROM  Information_schema.columns153                 WHERE table_Name = &#39;%s&#39; AND TABLE_SCHEMA=&#39;%s&#39;"""%(tableName, database)154         self.__cursor.execute(sql)155         fields_tuple = self.__cursor.fetchall()156         self.__fields = [fields[0] for fields in fields_tuple]157 158     def __query_util(self, filters=None):159         """160         SQL 语句拼接方法161         @filters 过滤条件162         """163         sql = r&#39;SELECT #{FIELDS} FROM #{TABLE_NAME} WHERE 1=1 #{FILTERS}&#39;164         # 拼接查询表165         sql = sql.replace("#{TABLE_NAME}", self.__tableName)166         # 拼接查询字段167         self.__query_fields(self.__tableName, self.__database)168         FIELDS = ""169         for field in self.__fields:170             FIELDS += field + ", "171         FIELDS = FIELDS[0: len(FIELDS)-2]172         sql = sql.replace("#{FIELDS}", FIELDS)173         # 拼接查询条件(待优化)174         if filters is None:175             sql = sql.replace("#{FILTERS}", "")176         else:177             FILTERS =  ""178             if not isinstance(filters, dict):179                 raise Exception("Parameter [filters] must be dict type. ")180             isPage = False181             if filters.get("_limit_"):182                 isPage = True183                 beginindex, limit = filters.get("_limit_")184             for k, v in filters.items():185                 if k.startswith("_in_"):                # 拼接 in186                     FILTERS += "AND %s IN (" %(k[4:])187                     values = v.split(",")188                     for value in values:189                         FILTERS += "%s,"%value190                     FILTERS = FILTERS[0:len(FILTERS)-1] + ") "191                 elif k.startswith("_nein_"):            # 拼接 not in192                     FILTERS += "AND %s NOT IN (" %(k[4:])193                     values = v.split(",")194                     for value in values:195                         FILTERS += "%s,"%value196                     FILTERS = FILTERS[0:len(FILTERS)-1] + ") "197                 elif k.startswith("_like_"):            # 拼接 like198                     FILTERS += "AND %s like &#39;%%%s%%&#39; " %(k[6:], v)199                 elif k.startswith("_ne_"):              # 拼接不等于200                     FILTERS += "AND %s != &#39;%s&#39; " %(k[4:], v)201                 elif k.startswith("_lt_"):              # 拼接小于202                     FILTERS += "AND %s < &#39;%s&#39; " %(k[4:], v)203                 elif k.startswith("_le_"):              # 拼接小于等于204                     FILTERS += "AND %s <= &#39;%s&#39; " %(k[4:], v)205                 elif k.startswith("_gt_"):              # 拼接大于206                     FILTERS += "AND %s > '%s' " %(k[4:], v)207                 elif k.startswith("_ge_"):              # 拼接大于等于208                     FILTERS += "AND %s >= '%s' " %(k[4:], v)209                 elif k in self.__fields:                # 拼接等于210                     FILTERS += "AND %s = '%s' "%(k, v)211             sql = sql.replace("#{FILTERS}", FILTERS)212             if isPage:213                 sql += "LIMIT %d,%d"%(beginindex, limit)214 215         print(get_time(), sql)216         return sql217 218     def __check_params(self, tableName):219         '''220         检查参数221         '''222         if tableName:223             self.__tableName = tableName224         else:225             if self.__tableName is None:226                 raise Exception("Parameter [tableName] is None.")227 228 def get_time():229     return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())230 231 if __name__ == "__main__":232     config = {233         # "creator": pymysql,234         # "host" : "127.0.0.1", 235         "user" : "root", 
236         "password" : "root",237         "database" : "test", 
238         # "port" : 3306,239         # "charset" : 'utf8'240     }241     base = BaseDao(**config)242     ########################################################################243     # user = base.select_one("user")244     # print(user)245     ########################################################################246     # users = base.select_all("user")247     # print(users)248     ########################################################################249     # filter1 = {250     #     "sex":0,251     #     "_in_id":"1,2,3,4,5",252     #     "_like_name":"zhang",253     #     "_ne_name":"wangwu"254     # }255     # user_filters = base.select_all(tableName="user", filters=filter1)256     # print(user_filters)257     ########################################################################258     # menu = base.select_one(tableName="menu")259     # print(menu)260     ########################################################################261     # user_pk = base.select_pk("user", 2)262     # print(user_pk)263     ########################################################################264     # filter2 = {265     #     "_in_id":"1,2,3,4",266     #     "_like_name":"test"267     # }268     # user_limit = base.select_page("user", 2, 10, filter2)  #未实现269     # print(user_limit)270     ########################################################################
View Code

  代码中已经给出了几个具体示例,大家可以参考使用。  

以上是Python 封装DBUtils 和pymysql实例的详细内容。更多信息请关注PHP中文网其他相关文章!

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn