from contextlib import contextmanager
from sqlalchemy import create_engine, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String
from sqlalchemy.orm import relationship, scoped_session
from sqlalchemy.orm import sessionmaker
def create_session(engine):
"""封装Session创建过程 此函数只在应用初始化时调用一次即可"""
# 创建Session工厂 SessionFactory()就可以创建出session对象, 但是该对象没有实现线程隔离
SessionFactory = sessionmaker(bind=engine)
# 生成线程隔离的session对象(每个线程取自己的session)
session = scoped_session(SessionFactory)
# ps: 其实返回的session是scoped_session类型对象, 再通过session()才会创建Session对象, 不过scoped_session实现了代理功能, 进行数据操作时, 会自动创建/获取实现了线程隔离的Session对象并执行操作
return session
# 创建模型基类
Base = declarative_base()
# 创建数据库引擎
# 细节1 sqlalchemy默认实现了连接池功能, 并且可以自动重连(pool_size 连接池中的连接数, max_overflow 超出连接池的额外连接数)
# 细节2 如果数据库使用utf-8支持中文, 则设置连接地址时需要标明 ?charset=utf8 否则报错
engine = create_engine('mysql://root:mysql@127.0.0.1:3306/test27?charset=utf8', echo=False, pool_size=5, max_overflow=10)
# 创建会话
session = create_session(engine)
class User(Base):
__tablename__ = 't_user'
id = Column(Integer, primary_key=True)
name = Column(String(20), unique=True)
addresses = relationship('Address')
class Address(Base):
__tablename__ = 't_address'
id = Column(Integer, primary_key=True)
detail = Column(String(20))
user_id = Column(Integer, ForeignKey('t_user.id'))
@contextmanager # 装饰器形式的上下文管理器
def session_scope():
"""使用上下文管理器对session和事务操作进行封装"""
try:
yield session
session.commit()
except:
session.rollback()
raise
finally:
session.remove() # session工作完成, 销毁session, 释放内存
def index():
"""模拟视图函数"""
with session_scope() as session: # 获取session对象, 代码块执行完会自动提交 并 销毁session
"""增加数据"""
user1 = User(name='zs')
session.add(user1)
session.flush()
adr1 = Address(detail="中关村3号", user_id=user1.id)
adr2 = Address(detail="华强北5号", user_id=user1.id)
session.add_all([adr1, adr2])
"""查询数据"""
# ret = session.query(User, Address).join(Address, User.id == Address.user_id).filter(User.name == 'zs').all()
# for user, adr in ret:
# print(user.name, adr.detail)
"""执行原生SQL"""
# data = session.execute('select * from t_user')
# print(type(data))
# row = data.fetchone() # 取第一条
# print(row.id) # 取主键
# print(row.name) # 取字段
# rows = data.fetchall() # 取所有数据
# for row in rows:
# print(row.id, row.name)
# print(data.rowcount) # 取条数
if __name__ == '__main__':
# 删除所有表
Base.metadata.drop_all(engine)
# 创建所有表
Base.metadata.create_all(engine)
index()
(wjun0) |