Python 的 ORM 框架 SQLAlchemy
Object Relation Mapping Object - Table 通过 Object 去操纵数据表
新建单表 create_table.py
from sqlalchemy.ext.declarative import declarative_base # orm 官宣基类 from sqlalchemy import Column, Integer, String # orm 数据类型字段 from sqlalchemy import create_engine # 创建数据库连接 # 常见ORM 模型基类 Base = declarative_base() class User(Base): __tablename__ = 'user' id = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(20)) engine = create_engine('mysql+pymysql://root:@127.0.0.1:3306/day127sqlalchemy?charset=utf8') # 去engine 数据库中创建所有继承Base 的orm 对象 Base.metadata.create_all(engine)
单表的增删改查 curd_one_table.py
from sqlalchemy.orm import sessionmaker from create_table import engine, User # crud 增删改查 : create/ retrieve/ update/ delete # 创建会话, 打开数据库连接(通过engine) Session = sessionmaker(engine) # 打开会话窗口 db_session = Session() # 增 insert into 表名(字段1, 字段2) values(值1, 值2) user_1 = User(name='May') db_session.add(user_1) # 增 删 改 都要 commit db_session.commit() db_session.close() # 批量增加 db_session.add_all([User(name='Sandy'), User(name='Sheldon')]) db_session.commit() db_session.close() # 查 select * from 表名 where .... res1 = db_session.query(User).all() print(res1) # 列表套对象 # 带条件的查询 filter res2 = db_session.query(User).filter(User.id > 2).first() print(res2.name) res3 = db_session.query(User).filter_by(id = 3).first() print(res3.name) # 改 update user set name=Ryan where id=1 res4 = db_session.query(User).filter(User.id == 1).update({'name': 'ryan'}) db_session.commit() db_session.close() # 删 delete from user where ... res5 = db_session.query(User).filter(User.id == 5).delete() db_session.commit() db_session.close()
ForeignKey的创建 create_ForeignKey_table.py
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String, Integer, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy import create_engine
Base = declarative_base()
class School(Base):
__tablename__ = 'school'
id = Column(Integer, primary_key=True)
name = Column(String(20))
class Student(Base):
__tablename__ = 'student'
id = Column(Integer, primary_key=True)
name = Column(String(20))
# 关联字段, 让school_id 和school的id 字段关联, 注意一定是foreignkey一定是表名
school_id = Column(Integer, ForeignKey('school.id'))
# 将school和student 创建关系, backref 是反向关联字段
stu2sch = relationship('School', backref='sch2stu')
engine = create_engine('mysql+pymysql://root:@127.0.0.1:3306/day127sqlalchemy?charset=utf8')
Base.metadata.create_all(engine)
ForeignKey的增删改查 curd_foreign_key_table.py
from sqlalchemy.orm import sessionmaker from create_ForeignKey_table import engine, School, Student Session = sessionmaker(engine) db_session = Session() # 增 # 1.笨方法 school_obj = School(name='Beijing_college') db_session.add(school_obj) db_session.commit() school_id = db_session.query(School).filter(School.name == 'Beijing_college').first().id student_obj = Student(name='001', school_id=school_obj.id) db_session.add(student_obj) db_session.commit() db_session.close() # 2. 正向 relationship student2 = Student(name='002', stu2sch=School(name='Shanghai_college')) db_session.add(student2) db_session.commit() db_session.close() # 3. 反向 relationship school_3 = School(name='Guangzhou_college') school_3.sch2stu = [Student(name='003'), Student(name='004')] db_session.add(school_3) db_session.commit() db_session.close() # 查 # 正向查询 # res1 = db_session.query(Student).filter(Student.stu2sch.id > 2).all() # 报错. 类 没有 stu2sch 这个属性, 但是 实例化出来的对象可以 student_list = db_session.query(Student).all() for student in student_list: print(student.id, student.name, student.stu2sch.name) # 反向查询 school_list = db_session.query(School).all() for school in school_list: print(school.sch2stu) # [<create_ForeignKey_table.Student object at 0x0000009742531B70>] for student in school.sch2stu: print(school.name, student.name) # 改 school_obj = db_session.query(School).filter(School.id == 3).first() res = db_session.query(Student).filter(Student.name == '001').update({Student.school_id: school_obj.id}) db_session.commit() student_obj = db_session.query(Student).filter(Student.name == '001').first() print(student_obj.school_id, student_obj.stu2sch.name) # 3 Shanghai_college db_session.close() # 删 sch = db_session.query(School).filter(School.id == 3).first() db_session.query(Student).filter(Student.school_id == sch.id).delete() db_session.commit() db_session.close()
Many2Many的创建 create_M2M_table.py
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey from sqlalchemy import create_engine from sqlalchemy.orm import relationship Base = declarative_base() class Boy(Base): __tablename__ = 'boy' id = Column(Integer, primary_key=True) name = Column(String(20)) # 创建关联 boy2girl = relationship('Girl', secondary='hotel', backref='girl2boy') class Girl(Base): __tablename__ = 'girl' id = Column(Integer, primary_key=True) name = Column(String(20)) class Hotel(Base): __tablename__ = 'hotel' id = Column(Integer, primary_key=True) name = Column(String(20)) boy_id = Column(Integer, ForeignKey('boy.id')) girl_id = Column(Integer, ForeignKey('girl.id')) engine = create_engine('mysql+pymysql://root:@127.0.0.1:3306/day127sqlalchemy?charset=utf8') Base.metadata.create_all(engine)
Many2Many的增删改查 curd_m2m_tables.py
from sqlalchemy.orm import sessionmaker from create_M2M_table import engine, Boy, Girl Session = sessionmaker(engine) db_session = Session() # 增 # relationship 正向 boy_1 = Boy(name='Sheldon', boy2girl=[Girl(name='Amy')]) # 注意此处为列表 db_session.add(boy_1) db_session.commit() db_session.close() # relationship 反向 girl_1 = Girl(name='Penny') girl_1.girl2boy = [Boy(name='Leonard')] db_session.add(girl_1) db_session.commit() db_session.close() # 查 boy_list = db_session.query(Boy).all() for boy in boy_list: for girl in boy.boy2girl: print(boy.name, girl.name)
更多查询操作, 待补充
from sqlalchemy.orm import sessionmaker from sqlalchemy import and_, or_, text from sqlalchemy.sql import func from create_table import User, engine Session = sessionmaker(engine) db_session = Session() # 1. and_, or_ res = db_session.query(User).filter(or_(User.id > 2, User.name == 'May')).all() res2 = db_session.query(User).filter(and_(User.id>= 1, User.name == 'May')).all() for row in res2: print(row.name) # 2. 别名 res3 = db_session.query(User.name.label('username'), User.id).all() print(res3) # [('ryan', 1), ('May', 2), ('Sandy', 3), ('Sheldon', 4)] for row in res3: print(type(row)) # <class 'sqlalchemy.util._collections.result'> print(row.username, row.id) # 3. 字符串匹配方式筛选条件 并使用 order_by进行 降序, 排序 res4 = db_session.query(User).filter(text('id<:value and name=:name')).params(value=3, name='May').\ order_by(User.id.desc()).all() # # 4. 原生SQL查询 res5 = db_session.query(User).from_statement(text("SELECT * FROM User where name=:name")).\ params(name='May').all() # 5. between res6 = db_session.query(User).filter(User.id.between(2,3)).all() # 6. in_, ~in_取反,只能和in组合 res7 = db_session.query(User).filter(User.id.in_([3, 5, 7])).all() res8 = db_session.query(User).filter(~User.id.in_([3, 5, 7])).all() # 7. 子查询 res9 = db_session.query(User).filter(User.id.in_(db_session.query(User.id).filter(User.id > 2))).all() # 8. like 通配符查询 res10 = db_session.query(User).filter(User.name.like('M%')).all() # 9. 限制 res11 = db_session.query(User)[1:3] # 10. 分组 # 待整理 db_session.close()