Maison >base de données >tutoriel mysql >[置顶] python脚本sqlite3模块的应用

[置顶] python脚本sqlite3模块的应用

WBOY
WBOYoriginal
2016-06-07 14:50:351420parcourir

#!/usr/bin/python # -*- coding:utf-8 -*- import sqlite3 import os class SQLTest: '''sqlite数据库接口''' def __init__(self,path='',verbose=False): self.verbose = verbose self.path = path if os.path.isfile(path): self.conn = sqlite3.connect(p

#!/usr/bin/python
# -*- coding:utf-8 -*-
import sqlite3
import os


class SQLTest:
    '''sqlite数据库接口'''
    def __init__(self,path='',verbose=False):
        self.verbose = verbose
        self.path = path
        if os.path.isfile(path):
            self.conn = sqlite3.connect(path)
            if self.verbose:
                print('硬盘上面:[{}].format(path)')
        else:
            self.conn = sqlite3.connect(':memory:')
            if self.verbose:
                print('内存上面:[:memory:]')
    
    def setverbose(self,b):
        self.verbose = b
        
    def createtable(self,sql):
        '''创建数据库表'''
        if sql is not None and sql != '':
            cu = self.conn.cursor()
            if self.verbose:
                print('执行sql:[{}]'.format(sql))
            cu.execute(sql)
            self.conn.commit()
            if self.verbose:
                print('创建数据库表成功!')
            self.close_all(cu)
        else:
            print('The [{}] is empty or equal None!'.format(sql))
    
    def querytable(self):
        sql = 'SELECT name FROM sqlite_master WHERE type="table" ORDER BY name'
        cu = self.conn.cursor()
        cu.execute(sql)
        return cu.fetchall()
    
    def renametable(self,table,newtable):
        if table is not None and talbe !='':
            sql = 'ALTER TABLE %s RENAME TO "%s" ' % (table,newtable)
            cu = self.conn.cursor()
            cu.execute(sql)
            self.conn.commit()
            print('delete table sucess!')
            self.close_all(cu)
        else:
            print('The [{}] is empty or equal None!'.format(sql))    
    
    def insert(self,sql,data):
        if sql is not None and sql != '':
            if data is not None:
                cu = self.conn.cursor()
                for d in data:
                    cu.execute(sql,d)
                    self.conn.commit()
                if self.verbose:
                    print('插入数据库表成功!')
                self.close_all(cu)
        else:
            print('The [{}] is empty or equal None!'.format(sql))    
            
    def fetchall(self,sql):
        if sql is not None and sql != '':
            cu = self.conn.cursor()
            if self.verbose:
                print('执行sql:[{}]'.format(sql))
            cu.execute(sql)
            return cu.fetchall()
        else:
            print('The [{}] is empty or equal None!'.format(sql))
            
    def fetchone(self,sql,data):
        if sql is not None and sql != '':
            if data is not None:
                cu = self.conn.cursor()
                cu.execute(sql,(data,))
                return cu.fetchall()
            else:
                print('The [{}] is None!'.format(data))
        else:
            print('The [{}] is empty or equal None!'.format(sql))
            
    def updata(self,sql,data):
        if sql is not None and sql != '':
            if data is not None:
                cu = self.conn.cursor()
                for d in data:
                    cu.execute(sql,d)
                    self.conn.commit()
                self.close_all(cu)
        else:
            print('The [{}] is empty or equal None!'.format(sql))
            
    def rowcount(self,table):
        sql = 'select count (*) from "%s"' % table
        cu = self.conn.cursor()
        r = cu.execute(sql)
        return (r.fetchone()[0])
    
    def delete(self,sql,data):
        if sql is not None and sql != '':
            if data is not None:
                cu = self.conn.cursor()
                for d in data:
                    cu.execute(sql,d)
                    self.conn.commit()
                self.close_all(cu)
        else:
            print('The [{}] is empty or equal None!'.format(sql))
                    
    def droptable(self,table):
        if table is not None and table != '':
            sql = 'DROP TABLE IF EXISTS ' + table
            cu = self.conn.cursor()
            cu.execute(sql)
            self.conn.commit()
            print('delete table sucess!')
            self.close_all(cu)
        else:
            print('The [{}] is empty or equal None!'.format(sql))
            
    def close_all(self,cu):
        try:
            if cu is not None:
                cu.close()
        finally:
            if cu is not None:
                cu.close()
                
# function
def drop_table_test(sql,table):
    '''删除数据库表测试'''
    print('删除数据库表测试 ...')
    db.droptable(table)


def create_table_test(sql):
    '''创建数据库表测试'''
    print('创建数据库表测试 ...')
    create_table_sql = '''CREATE TABLE IF NOT EXISTS 'table1'(
                        'id' integer(32) NOT NULL,
                        'name' nvarchar(128) NOT NULL,
                        PRIMARY KEY('id')
                        )'''
    sql.createtable(create_table_sql)


def insert_test(sql):
    '''插入数据测试 ...'''
    print('插入数据测试 ...')
    insert_sql = 'INSERT INTO table1 values(?,?)'
    data = [(1,'aaa'),(2,'bbb')]
    sql.insert(insert_sql,data)


def fetchall_test(sql):
    '''查询所有数据'''
    print('查询所有数据 ...')
    fetchall_sql = 'SELECT * FROM table1'
    r = sql.fetchall(fetchall_sql)
    for e in range(len(r)):
        print(r[e])


def fetchone_test(sql):
    '''查询一条数据'''
    print('查询一条数据 ...')
    fetchall_sql = 'SELECT * FROM table1 WHERE id = ?'
    data = 1
    r = sql.fetchone(fetchall_sql,data)
    for e in range(len(r)):
        print(r[e])


def query_test(sql):
    '''有几个表'''
    print('有几个表 ...')
    print(sql.query_table(db))
    
def update_test(sql):
    '''更新数据'''
    print('更新数据 ...')
    update_sql = 'UPDATE table1 SET name = ? WHERE id = ?'
    data = [('TestA',1),('TestB',2)]
    sql.updata(update_sql,data)


def delete_test(db):
    '''删除数据'''
    print('删除数据 ...')
    delete_sql = 'DELETE FROM table1 WHERE name = ? and id = ?'
    data = [('TestA',1)]
    sql.delete(delete_sql,data)
    
# self test
if __name__ == '__main__':  
    sql = SQLTest(verbose=True)
    #创建数据库表
    create_table_test(sql)
    #插入数据
    insert_test(sql)
    #查询多条数据
    fetchall_test(sql)
    #查询一条数据
    fetchone_test(sql)
    #更新数据
    update_test(sql)
    #查询多条数据
    fetchall_test(sql)
    #删除一条数据
    delete_test(sql)
    print(sql.rowcount('table1'))
    #查询多条数据
    fetchall_test(sql)

    

#==================================================================================

测试结果

    
    
    
Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Article précédent:ElasticSearch使用Article suivant:MySql修改数据库编码为utf8