基于sqlalchemy对mysql实现增删改查操作

吾爱主题 阅读:194 2024-04-05 16:21:49 评论:0

需求场景:

老大让我利用爬虫爬取的数据写到或更新到mysql数据库中,百度了两种方法

1 是使用pymysql连接mysql,通过操作原生的sql语句进行增删改查数据;

2 是使用sqlalchemy连接mysql,通过ORM模型建表并操作数据库,不需要写原生的sql语句,相对简单些;

以下就是本次使用sqlalchemy的经验之谈。

实现流程:连接数据库》通过模型类创建表》建立会话》执行创建表语句》通过会话进行增删改查

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 from sqlalchemy import exists, Column, Integer, String, ForeignKey, exists from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker   # 创建的数据库引擎 engine = create_engine( "mysql+pymysql://user:pwd@ip/数据库名?charset=utf8" )   #创建session类型 DBSession = sessionmaker(bind = engine)   # 实例化官宣模型 - Base 就是 ORM 模型 Base = declarative_base()     # 创建服务单表 class ServiceOrder(Base):    __tablename__ = 'serviceOrderTable'    id = Column(Integer, primary_key = True , autoincrement = True )    serviceOrderId = Column(String( 32 ), nullable = False , index = True , comment = '服务单ID' )    serviceDesc = Column(String( 268 ), comment = '服务说明' )    oneLevelName = Column(String( 32 ), comment = 'C类别' )    twoLevelName = Column(String( 32 ), comment = 'T子类' )    threeLevelName = Column(String( 32 ), comment = 'I项目' )    fourLevelName = Column(String( 32 ), comment = 'S子项' )    transferTimes = Column(String( 32 ), comment = '转派次数' )    overDueStatus = Column(String( 32 ), comment = '过期状态' )    serviceTimeLimit = Column(String( 32 ), comment = '服务时限' )    serTimeLimitTypeName = Column(String( 16 ), comment = '时限类型'    # 一对多:    # serviceWorkOrder = relationship("ServiceWorkOrder", backref="serviceorder")     # 多对一:多个服务工单可以属于服务单 class ServiceWorkOrder(Base):    __tablename__ = 'serviceWorkOrderTable'    id = Column(Integer, primary_key = True , autoincrement = True )    serviceWorkOrderId = Column(String( 32 ), nullable = False , index = True , comment = '服务工单ID' )    workOrderName = Column(String( 268 ), comment = '工单名称' )    fromId = Column(String( 32 ), comment = '服务单ID' )    createUserSectionName = Column(String( 32 ), comment = '创建人室' )    createUserName = Column(String( 32 ), comment = '创建人' )    handlerName = Column(String( 32 ), comment = '处理人' )    statusName = Column(String( 32 ), comment = '工单状态' )    createTime = Column(String( 32 ), comment = '创建时间' )    # “多”的一方的book表是通过外键关联到user表的:    # serviceOrder_id = Column(Integer, ForeignKey('serviceOrderTable.id'))   # 创建数据库 如果数据库已存在 则不会创建 会根据库名直接连接已有的库 def init_db():    Base.metadata.create_all(engine)   def drop_db():    Base.metadata.drop_all(engine)   def insert_update():    # all_needed_data_lists 是需要插入数据库的数据 格式[{key: value, ... }, { }, { }...]    for item in all_needed_data_lists:      ServiceOrderRow = ServiceOrder(serviceOrderId = item[ 'serviceOrderId' ],                      serviceDesc = item[ 'serviceDesc' ],                      oneLevelName = item[ 'oneLevelName' ],                      twoLevelName = item[ 'twoLevelName' ],                      threeLevelName = item[ 'threeLevelName' ],                      fourLevelName = item[ 'fourLevelName' ],                      transferTimes = item[ 'transferTimes' ],                      overDueStatus = item[ 'overDueStatus' ],                      serviceTimeLimit = item[ 'serviceTimeLimit' ],                      serTimeLimitTypeName = item[ 'serTimeLimitTypeName' ],                      )      try :        # 利用exists判断目标对象是否存在,返回True或Faults        it_exists = session.query(            exists().where(ServiceOrder.serviceOrderId = = item[ 'serviceOrderId' ] )          ).scalar()      except Exception as e:        self .log.error(e)        break      try :        # 如果不存在,进行新增;存在的话就更新现存的数据        if not it_exists:          session.add(ServiceOrderRow)        else :          session.query(ServiceOrder). filter (ServiceOrder.serviceOrderId = = item[ 'serviceOrderId' ])\            .update(item)      except Exception as e:        self .log.error(e)        break    try :      session.commit()      self .log.info( '数据更新成功!' )    except :      session.rollback()      self .log.info( '数据更新失败!' )   if __name__ = = "__main__" :    # 创建数据库 如果数据库已存在 则不会创建 会根据库名直接连接已有的库    init_db()    # 创建session对象,进行增删改查:    session = DBSession()    # 利用session 增 改数据 记得提交    insert_update()

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持服务器之家。

原文链接:https://www.cnblogs.com/We612/p/12105135.html

可以去百度分享获取分享代码输入这里。
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

【腾讯云】云服务器产品特惠热卖中
搜索
标签列表
    关注我们

    了解等多精彩内容