Heim  >  Artikel  >  Backend-Entwicklung  >  Python-Operation SQL

Python-Operation SQL

高洛峰
高洛峰Original
2016-12-02 17:10:384391Durchsuche

pymsql ist ein Modul zum Betrieb von MySQL in Python

1. Laden Sie

pip3 install pymysql

herunter und installieren Sie es 2. Vorgang verwenden

1. SQL ausführen

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
  
# 创建连接
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1')
# 创建游标
cursor = conn.cursor()
  
# 执行SQL,并返回收影响行数
effect_row = cursor.execute("update hosts set host = '1.1.1.2'")
  
# 执行SQL,并返回受影响行数
#effect_row = cursor.execute("update hosts set host = '1.1.1.2' where nid > %s", (1,))
  
# 执行SQL,并返回受影响行数
#effect_row = cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)])
  
  
# 提交,不然无法保存新建或者修改的数据
conn.commit()
  
# 关闭游标
cursor.close()
# 关闭连接
conn.close()

2. Neu erstellte Daten mit Auto-Inkrement-ID abrufen

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
  
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1')
cursor = conn.cursor()
cursor.executemany("insert into hosts(host,color_id)values(%s,%s)", [("1.1.1.11",1),("1.1.1.11",2)])
conn.commit()
cursor.close()
conn.close()
  
# 获取最新自增ID
new_id = cursor.lastrowid

Abfragedaten abrufen

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
  
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1')
cursor = conn.cursor()
cursor.execute("select * from hosts")
  
# 获取第一行数据
row_1 = cursor.fetchone()
  
# 获取前n行数据
# row_2 = cursor.fetchmany(3)
# 获取所有数据
# row_3 = cursor.fetchall()
  
conn.commit()
cursor.close()
conn.close()

Hinweis: Gehen Sie beim Abrufen von Daten der Reihe nach vor. Sie können die Cursorposition mit „cursor.scroll(num, mode)“ verschieben, z. B.:

cursor.scroll(1, mode). ='relative') # Relativ zur aktuellen Positionsbewegung

cursor.scroll(2,mode='absolute') # Relative absolute Positionsbewegung

Der Abrufdatentyp Die erhaltenen Daten sind Vorfahrentypen oder Wörterbuchtypen, nämlich:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
import pymysql
  
conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='t1')
  
# 游标设置为字典类型
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
r = cursor.execute("call p1()")
  
result = cursor.fetchone()
  
conn.commit()
cursor.close()
conn.close()

SQLAlchemy

SQLAlchemy ist ein ORM-Framework unter der Programmiersprache Python die Datenbank-API und verwendet die relationale Objektzuordnung für Datenbankoperationen, kurz gesagt: Konvertieren Sie das Objekt in SQL, verwenden Sie dann die Daten-API, um die SQL auszuführen und die Ausführungsergebnisse zu erhalten.

Installation:

pip3 install SQLAlchemy

Python-Operation SQL

SQLAlchemy selbst kann die Datenbank nicht betreiben, es muss sich auf Plug-Ins von Drittanbietern verlassen -ins wie pymsql. Dialekt wird verwendet, um mit der Daten-API zu kommunizieren und verschiedene Datenbank-APIs gemäß verschiedenen Konfigurationsdateien aufzurufen, um Vorgänge in der Datenbank durchzuführen, wie zum Beispiel:

MySQL-Python
    mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
   
pymysql
    mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
   
MySQL-Connector
    mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
   
cx_Oracle
    oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]

1. Interne Verarbeitung

Use Engine /ConnectionPooling/Dialect führt Datenbankoperationen durch. Die Engine verwendet ConnectionPooling, um eine Verbindung zur Datenbank herzustellen, und führt dann SQL-Anweisungen über Dialect aus.

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy import create_engine
  
  
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)
  
# 执行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES (&#39;1.1.1.22&#39;, 3)"
# )
  
# 新插入行自增ID
# cur.lastrowid
  
# 执行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES(%s, %s)",[(&#39;1.1.1.22&#39;, 3),(&#39;1.1.1.221&#39;, 3),]
# )
  
  
# 执行SQL
# cur = engine.execute(
#     "INSERT INTO hosts (host, color_id) VALUES (%(host)s, %(color_id)s)",
#     host=&#39;1.1.1.99&#39;, color_id=3
# )
  
# 执行SQL
# cur = engine.execute(&#39;select * from hosts&#39;)
# 获取第一行数据
# cur.fetchone()
# 获取第n行数据
# cur.fetchmany(3)
# 获取所有数据
# cur.fetchall()

2. Verwendung von ORM-Funktionen

Verwenden Sie alle Komponenten von ORM/Schematyp/SQL-Ausdruckssprache/Engine/ConnectionPooling/Dialekt, um Daten zu bearbeiten. Erstellen Sie Objekte basierend auf Klassen, konvertieren Sie Objekte in SQL und führen Sie SQL aus.

1. Tabelle erstellen

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
 
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)
 
Base = declarative_base()
 
# 创建单表
class Users(Base):
    __tablename__ = &#39;users&#39;
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    extra = Column(String(16))
 
    __table_args__ = (
    UniqueConstraint(&#39;id&#39;, &#39;name&#39;, name=&#39;uix_id_name&#39;),
        Index(&#39;ix_id_name&#39;, &#39;name&#39;, &#39;extra&#39;),
    )
 
 
# 一对多
class Favor(Base):
    __tablename__ = &#39;favor&#39;
    nid = Column(Integer, primary_key=True)
    caption = Column(String(50), default=&#39;red&#39;, unique=True)
 
 
class Person(Base):
    __tablename__ = &#39;person&#39;
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    favor_id = Column(Integer, ForeignKey("favor.nid"))
 
 
# 多对多
class Group(Base):
    __tablename__ = &#39;group&#39;
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)
    port = Column(Integer, default=22)
 
 
class Server(Base):
    __tablename__ = &#39;server&#39;
 
    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)
 
 
class ServerToGroup(Base):
    __tablename__ = &#39;servertogroup&#39;
    nid = Column(Integer, primary_key=True, autoincrement=True)
    server_id = Column(Integer, ForeignKey(&#39;server.id&#39;))
    group_id = Column(Integer, ForeignKey(&#39;group.id&#39;))
 
 
def init_db():
    Base.metadata.create_all(engine)
 
 
def drop_db():
    Base.metadata.drop_all(engine)
注:设置外检的另一种方式 ForeignKeyConstraint([&#39;other_id&#39;], [&#39;othertable.other_id&#39;])

2. Operationstabelle

#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine

engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)

Base = declarative_base()

# 创建单表
class Users(Base):
    __tablename__ = &#39;users&#39;
    id = Column(Integer, primary_key=True)
    name = Column(String(32))
    extra = Column(String(16))

    __table_args__ = (
    UniqueConstraint(&#39;id&#39;, &#39;name&#39;, name=&#39;uix_id_name&#39;),
        Index(&#39;ix_id_name&#39;, &#39;name&#39;, &#39;extra&#39;),
    )

    def __repr__(self):
        return "%s-%s" %(self.id, self.name)

# 一对多
class Favor(Base):
    __tablename__ = &#39;favor&#39;
    nid = Column(Integer, primary_key=True)
    caption = Column(String(50), default=&#39;red&#39;, unique=True)

    def __repr__(self):
        return "%s-%s" %(self.nid, self.caption)

class Person(Base):
    __tablename__ = &#39;person&#39;
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    favor_id = Column(Integer, ForeignKey("favor.nid"))
    # 与生成表结构无关,仅用于查询方便
    favor = relationship("Favor", backref=&#39;pers&#39;)

# 多对多
class ServerToGroup(Base):
    __tablename__ = &#39;servertogroup&#39;
    nid = Column(Integer, primary_key=True, autoincrement=True)
    server_id = Column(Integer, ForeignKey(&#39;server.id&#39;))
    group_id = Column(Integer, ForeignKey(&#39;group.id&#39;))
    group = relationship("Group", backref=&#39;s2g&#39;)
    server = relationship("Server", backref=&#39;s2g&#39;)

class Group(Base):
    __tablename__ = &#39;group&#39;
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)
    port = Column(Integer, default=22)
    # group = relationship(&#39;Group&#39;,secondary=ServerToGroup,backref=&#39;host_list&#39;)


class Server(Base):
    __tablename__ = &#39;server&#39;

    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)




def init_db():
    Base.metadata.create_all(engine)


def drop_db():
    Base.metadata.drop_all(engine)


Session = sessionmaker(bind=engine)
session = Session()
obj = Users(name="alex0", extra=&#39;sb&#39;)
session.add(obj)
session.add_all([
    Users(name="alex1", extra=&#39;sb&#39;),
    Users(name="alex2", extra=&#39;sb&#39;),
])
session.commit()
 session.query(Users).filter(Users.id > 2).delete()
 session.commit()
session.query(Users).filter(Users.id > 2).update({"name" : "099"})
session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False)
session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate")
session.commit()
 ret = session.query(Users).all()
 ret = session.query(Users.name, Users.extra).all()
 ret = session.query(Users).filter_by(name=&#39;alex&#39;).all()
 ret = session.query(Users).filter_by(name=&#39;alex&#39;).first()
ret = session.query(Users).filter_by(name=&#39;alex&#39;).all()
ret = session.query(Users).filter(Users.id > 1, Users.name == &#39;eric&#39;).all()
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == &#39;eric&#39;).all()
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name=&#39;eric&#39;))).all()
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == &#39;eric&#39;)).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == &#39;eric&#39;)).all()
ret = session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.name == &#39;eric&#39;, Users.id > 3),
        Users.extra != ""
    )).all()


# 通配符
ret = session.query(Users).filter(Users.name.like(&#39;e%&#39;)).all()
ret = session.query(Users).filter(~Users.name.like(&#39;e%&#39;)).all()

# 限制
ret = session.query(Users)[1:2]

# 排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

# 分组
from sqlalchemy.sql import func

ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).all()

ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()

# 连表

ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()

ret = session.query(Person).join(Favor).all()

ret = session.query(Person).join(Favor, isouter=True).all()


# 组合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()

q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()


Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn