Heim >Datenbank >MySQL-Tutorial >[置顶] python脚本sqlite3模块的应用

[置顶] python脚本sqlite3模块的应用

WBOY
WBOYOriginal
2016-06-07 14:50:351399Durchsuche

#!/usr/bin/python # -*- coding:utf-8 -*- import sqlite3 import os class SQLTest: '''sqlite数据库接口''' def __init__(self,path='',verbose=False): self.verbose = verbose self.path = path if os.path.isfile(path): self.conn = sqlite3.connect(p

#!/usr/bin/python
# -*- coding:utf-8 -*-
import sqlite3
import os


class SQLTest:
    '''sqlite数据库接口'''
    def __init__(self,path='',verbose=False):
        self.verbose = verbose
        self.path = path
        if os.path.isfile(path):
            self.conn = sqlite3.connect(path)
            if self.verbose:
                print('硬盘上面:[{}].format(path)')
        else:
            self.conn = sqlite3.connect(':memory:')
            if self.verbose:
                print('内存上面:[:memory:]')
    
    def setverbose(self,b):
        self.verbose = b
        
    def createtable(self,sql):
        '''创建数据库表'''
        if sql is not None and sql != '':
            cu = self.conn.cursor()
            if self.verbose:
                print('执行sql:[{}]'.format(sql))
            cu.execute(sql)
            self.conn.commit()
            if self.verbose:
                print('创建数据库表成功!')
            self.close_all(cu)
        else:
            print('The [{}] is empty or equal None!'.format(sql))
    
    def querytable(self):
        sql = 'SELECT name FROM sqlite_master WHERE type="table" ORDER BY name'
        cu = self.conn.cursor()
        cu.execute(sql)
        return cu.fetchall()
    
    def renametable(self,table,newtable):
        if table is not None and talbe !='':
            sql = 'ALTER TABLE %s RENAME TO "%s" ' % (table,newtable)
            cu = self.conn.cursor()
            cu.execute(sql)
            self.conn.commit()
            print('delete table sucess!')
            self.close_all(cu)
        else:
            print('The [{}] is empty or equal None!'.format(sql))    
    
    def insert(self,sql,data):
        if sql is not None and sql != '':
            if data is not None:
                cu = self.conn.cursor()
                for d in data:
                    cu.execute(sql,d)
                    self.conn.commit()
                if self.verbose:
                    print('插入数据库表成功!')
                self.close_all(cu)
        else:
            print('The [{}] is empty or equal None!'.format(sql))    
            
    def fetchall(self,sql):
        if sql is not None and sql != '':
            cu = self.conn.cursor()
            if self.verbose:
                print('执行sql:[{}]'.format(sql))
            cu.execute(sql)
            return cu.fetchall()
        else:
            print('The [{}] is empty or equal None!'.format(sql))
            
    def fetchone(self,sql,data):
        if sql is not None and sql != '':
            if data is not None:
                cu = self.conn.cursor()
                cu.execute(sql,(data,))
                return cu.fetchall()
            else:
                print('The [{}] is None!'.format(data))
        else:
            print('The [{}] is empty or equal None!'.format(sql))
            
    def updata(self,sql,data):
        if sql is not None and sql != '':
            if data is not None:
                cu = self.conn.cursor()
                for d in data:
                    cu.execute(sql,d)
                    self.conn.commit()
                self.close_all(cu)
        else:
            print('The [{}] is empty or equal None!'.format(sql))
            
    def rowcount(self,table):
        sql = 'select count (*) from "%s"' % table
        cu = self.conn.cursor()
        r = cu.execute(sql)
        return (r.fetchone()[0])
    
    def delete(self,sql,data):
        if sql is not None and sql != '':
            if data is not None:
                cu = self.conn.cursor()
                for d in data:
                    cu.execute(sql,d)
                    self.conn.commit()
                self.close_all(cu)
        else:
            print('The [{}] is empty or equal None!'.format(sql))
                    
    def droptable(self,table):
        if table is not None and table != '':
            sql = 'DROP TABLE IF EXISTS ' + table
            cu = self.conn.cursor()
            cu.execute(sql)
            self.conn.commit()
            print('delete table sucess!')
            self.close_all(cu)
        else:
            print('The [{}] is empty or equal None!'.format(sql))
            
    def close_all(self,cu):
        try:
            if cu is not None:
                cu.close()
        finally:
            if cu is not None:
                cu.close()
                
# function
def drop_table_test(sql,table):
    '''删除数据库表测试'''
    print('删除数据库表测试 ...')
    db.droptable(table)


def create_table_test(sql):
    '''创建数据库表测试'''
    print('创建数据库表测试 ...')
    create_table_sql = '''CREATE TABLE IF NOT EXISTS 'table1'(
                        'id' integer(32) NOT NULL,
                        'name' nvarchar(128) NOT NULL,
                        PRIMARY KEY('id')
                        )'''
    sql.createtable(create_table_sql)


def insert_test(sql):
    '''插入数据测试 ...'''
    print('插入数据测试 ...')
    insert_sql = 'INSERT INTO table1 values(?,?)'
    data = [(1,'aaa'),(2,'bbb')]
    sql.insert(insert_sql,data)


def fetchall_test(sql):
    '''查询所有数据'''
    print('查询所有数据 ...')
    fetchall_sql = 'SELECT * FROM table1'
    r = sql.fetchall(fetchall_sql)
    for e in range(len(r)):
        print(r[e])


def fetchone_test(sql):
    '''查询一条数据'''
    print('查询一条数据 ...')
    fetchall_sql = 'SELECT * FROM table1 WHERE id = ?'
    data = 1
    r = sql.fetchone(fetchall_sql,data)
    for e in range(len(r)):
        print(r[e])


def query_test(sql):
    '''有几个表'''
    print('有几个表 ...')
    print(sql.query_table(db))
    
def update_test(sql):
    '''更新数据'''
    print('更新数据 ...')
    update_sql = 'UPDATE table1 SET name = ? WHERE id = ?'
    data = [('TestA',1),('TestB',2)]
    sql.updata(update_sql,data)


def delete_test(db):
    '''删除数据'''
    print('删除数据 ...')
    delete_sql = 'DELETE FROM table1 WHERE name = ? and id = ?'
    data = [('TestA',1)]
    sql.delete(delete_sql,data)
    
# self test
if __name__ == '__main__':  
    sql = SQLTest(verbose=True)
    #创建数据库表
    create_table_test(sql)
    #插入数据
    insert_test(sql)
    #查询多条数据
    fetchall_test(sql)
    #查询一条数据
    fetchone_test(sql)
    #更新数据
    update_test(sql)
    #查询多条数据
    fetchall_test(sql)
    #删除一条数据
    delete_test(sql)
    print(sql.rowcount('table1'))
    #查询多条数据
    fetchall_test(sql)

    

#==================================================================================

测试结果

    
    
    
Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn
Vorheriger Artikel:ElasticSearch使用Nächster Artikel:MySql修改数据库编码为utf8