淘先锋技术网

首页 1 2 3 4 5 6 7

Python 的 ORM 框架 SQLAlchemy

Object Relation Mapping
Object - Table 通过 Object 去操纵数据表

 

新建单表 create_table.py

from sqlalchemy.ext.declarative import declarative_base  # orm 官宣基类
from sqlalchemy import Column, Integer, String  # orm 数据类型字段
from sqlalchemy import create_engine  # 创建数据库连接

# 常见ORM 模型基类
Base = declarative_base()


class User(Base):
    __tablename__ = 'user'
    id = Column(Integer, primary_key=True, autoincrement=True)
    name = Column(String(20))


engine = create_engine('mysql+pymysql://root:@127.0.0.1:3306/day127sqlalchemy?charset=utf8')

# 去engine 数据库中创建所有继承Base 的orm 对象
Base.metadata.create_all(engine)

 

单表的增删改查 curd_one_table.py

from sqlalchemy.orm import sessionmaker

from create_table import engine, User

# crud 增删改查 : create/ retrieve/ update/ delete


# 创建会话, 打开数据库连接(通过engine)
Session = sessionmaker(engine)
# 打开会话窗口
db_session = Session()


# 增 insert into 表名(字段1, 字段2) values(值1, 值2)
user_1 = User(name='May')
db_session.add(user_1)
# 增 删 改 都要 commit
db_session.commit()
db_session.close()
# 批量增加
db_session.add_all([User(name='Sandy'), User(name='Sheldon')])
db_session.commit()
db_session.close()


# 查 select * from 表名 where ....
res1 = db_session.query(User).all()
print(res1)  # 列表套对象
# 带条件的查询 filter
res2 = db_session.query(User).filter(User.id > 2).first()
print(res2.name)
res3 = db_session.query(User).filter_by(id = 3).first()
print(res3.name)


# 改 update user set name=Ryan where id=1
res4 = db_session.query(User).filter(User.id == 1).update({'name': 'ryan'})
db_session.commit()
db_session.close()


# 删 delete from user where ...
res5 = db_session.query(User).filter(User.id == 5).delete()
db_session.commit()
db_session.close()

 

ForeignKey的创建 create_ForeignKey_table.py

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String, Integer, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy import create_engine

Base = declarative_base()


class School(Base):
__tablename__ = 'school'
id = Column(Integer, primary_key=True)
name = Column(String(20))


class Student(Base):
__tablename__ = 'student'
id = Column(Integer, primary_key=True)
name = Column(String(20))
# 关联字段, 让school_id 和school的id 字段关联, 注意一定是foreignkey一定是表名
school_id = Column(Integer, ForeignKey('school.id'))
# 将school和student 创建关系, backref 是反向关联字段
stu2sch = relationship('School', backref='sch2stu')


engine = create_engine('mysql+pymysql://root:@127.0.0.1:3306/day127sqlalchemy?charset=utf8')
Base.metadata.create_all(engine)

 

ForeignKey的增删改查 curd_foreign_key_table.py

from sqlalchemy.orm import sessionmaker

from create_ForeignKey_table import engine, School, Student

Session = sessionmaker(engine)
db_session = Session()


#
# 1.笨方法
school_obj = School(name='Beijing_college')
db_session.add(school_obj)
db_session.commit()
school_id = db_session.query(School).filter(School.name == 'Beijing_college').first().id
student_obj = Student(name='001', school_id=school_obj.id)
db_session.add(student_obj)
db_session.commit()
db_session.close()
# 2. 正向 relationship
student2 = Student(name='002', stu2sch=School(name='Shanghai_college'))
db_session.add(student2)
db_session.commit()
db_session.close()
# 3. 反向 relationship
school_3 = School(name='Guangzhou_college')
school_3.sch2stu = [Student(name='003'), Student(name='004')]
db_session.add(school_3)
db_session.commit()
db_session.close()


#
# 正向查询
# res1 = db_session.query(Student).filter(Student.stu2sch.id > 2).all()
# 报错. 类 没有 stu2sch 这个属性, 但是 实例化出来的对象可以
student_list = db_session.query(Student).all()
for student in student_list:
    print(student.id, student.name, student.stu2sch.name)
# 反向查询
school_list = db_session.query(School).all()
for school in school_list:
    print(school.sch2stu)  # [<create_ForeignKey_table.Student object at 0x0000009742531B70>]
    for student in school.sch2stu:
        print(school.name, student.name)


#
school_obj = db_session.query(School).filter(School.id == 3).first()
res = db_session.query(Student).filter(Student.name == '001').update({Student.school_id: school_obj.id})
db_session.commit()
student_obj = db_session.query(Student).filter(Student.name == '001').first()
print(student_obj.school_id, student_obj.stu2sch.name)  # 3 Shanghai_college
db_session.close()


#
sch = db_session.query(School).filter(School.id == 3).first()
db_session.query(Student).filter(Student.school_id == sch.id).delete()
db_session.commit()
db_session.close()

 

Many2Many的创建 create_M2M_table.py

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy import create_engine
from sqlalchemy.orm import relationship

Base = declarative_base()


class Boy(Base):
    __tablename__ = 'boy'
    id = Column(Integer, primary_key=True)
    name = Column(String(20))
    # 创建关联
    boy2girl = relationship('Girl', secondary='hotel', backref='girl2boy')


class Girl(Base):
    __tablename__ = 'girl'
    id = Column(Integer, primary_key=True)
    name = Column(String(20))


class Hotel(Base):
    __tablename__ = 'hotel'
    id = Column(Integer, primary_key=True)
    name = Column(String(20))
    boy_id = Column(Integer, ForeignKey('boy.id'))
    girl_id = Column(Integer, ForeignKey('girl.id'))


engine = create_engine('mysql+pymysql://root:@127.0.0.1:3306/day127sqlalchemy?charset=utf8')
Base.metadata.create_all(engine)

 

Many2Many的增删改查 curd_m2m_tables.py

from sqlalchemy.orm import sessionmaker

from create_M2M_table import engine, Boy, Girl

Session = sessionmaker(engine)
db_session = Session()


#
# relationship 正向
boy_1 = Boy(name='Sheldon', boy2girl=[Girl(name='Amy')])  # 注意此处为列表
db_session.add(boy_1)
db_session.commit()
db_session.close()
# relationship 反向
girl_1 = Girl(name='Penny')
girl_1.girl2boy = [Boy(name='Leonard')]
db_session.add(girl_1)
db_session.commit()
db_session.close()


#
boy_list = db_session.query(Boy).all()
for boy in boy_list:
    for girl in boy.boy2girl:
        print(boy.name, girl.name)

 

 

更多查询操作, 待补充

from sqlalchemy.orm import sessionmaker
from sqlalchemy import and_, or_, text
from sqlalchemy.sql import func

from create_table import User, engine

Session = sessionmaker(engine)
db_session = Session()

# 1. and_, or_
res = db_session.query(User).filter(or_(User.id > 2, User.name == 'May')).all()
res2 = db_session.query(User).filter(and_(User.id>= 1, User.name == 'May')).all()
for row in res2:
    print(row.name)

# 2. 别名
res3 = db_session.query(User.name.label('username'), User.id).all()
print(res3)  # [('ryan', 1), ('May', 2), ('Sandy', 3), ('Sheldon', 4)]
for row in res3:
    print(type(row))  # <class 'sqlalchemy.util._collections.result'>
    print(row.username, row.id)

# 3. 字符串匹配方式筛选条件 并使用 order_by进行 降序, 排序
res4 = db_session.query(User).filter(text('id<:value and name=:name')).params(value=3, name='May').\
    order_by(User.id.desc()).all()

# # 4. 原生SQL查询
res5 = db_session.query(User).from_statement(text("SELECT * FROM User where name=:name")).\
    params(name='May').all()

# 5. between
res6 = db_session.query(User).filter(User.id.between(2,3)).all()

# 6. in_, ~in_取反,只能和in组合
res7 = db_session.query(User).filter(User.id.in_([3, 5, 7])).all()
res8 = db_session.query(User).filter(~User.id.in_([3, 5, 7])).all()

# 7. 子查询
res9 = db_session.query(User).filter(User.id.in_(db_session.query(User.id).filter(User.id > 2))).all()

# 8. like 通配符查询
res10 = db_session.query(User).filter(User.name.like('M%')).all()

# 9. 限制
res11 = db_session.query(User)[1:3]

# 10. 分组
# 待整理

db_session.close()

 

转载于:https://www.cnblogs.com/amber-liu/p/10380975.html