MySQL单表千万级数据处理的思路分享

吾爱主题 阅读:138 2024-04-02 08:03:04 评论:0

项目背景

在处理过程中,今天上午需要更新A字段,下午爬虫组完成了规格书或图片的爬取又需要更新图片和规格书字段,由于单表千万级深度翻页会导致处理速度越来越慢。

?
1 select a,b,c from db.tb limit 10000 offset 9000000

但是时间是有限的,是否有更好的方法去解决这种问题呢?

改进思路

是否有可以不需要深度翻页也可以进行数据更新的凭据?
是的,利用自增id列

观察数据特征

此单表有自增id列且为主键,根据索引列查询数据和更新数据是最理想的途径。

?
1 2 select a,b, c from db.tb where id=9999999; update db.tb set a=x where id=9999999;

多进程处理

每个进程处理一定id范围内的数据,这样既避免的深度翻页又可以同时多进程处理数据。
提高数据查询速度的同时也提高了数据处理速度。
下面是我编写的任务分配函数,供参考:

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 def mission_handler(all_missions, worker_mission_size):      """      根据总任务数和每个worker的任务数计算出任务列表, 任务列表元素为(任务开始id, 任务结束id)。      例: 总任务数100个,每个worker的任务数40, 那么任务列表为:[(1, 40), (41, 80), (81, 100)]      :param all_missions: 总任务数      :param worker_mission_size: 每个worker的最大任务数      :return: [(start_id, end_id), (start_id, end_id), ...]      """      worker_mission_ids = []      current_id = 0      while current_id < = all_missions:          start_id = all_missions if current_id + 1 > = all_missions else current_id + 1          end_id = all_missions if current_id + worker_mission_size > = all_missions else current_id + worker_mission_size          if start_id = = end_id:              if worker_mission_ids[ - 1 ][ 1 ] = = start_id:                  break          worker_mission_ids.append((start_id, end_id))          current_id + = worker_mission_size        return worker_mission_ids

假设单表id最大值为100, 然后我们希望每个进程处理20个id,那么任务列表将为:

?
1 2 >>> mission_handler( 100 , 40 ) [( 1 , 40 ), ( 41 , 80 ), ( 81 , 100 )]

那么,
进程1将只需要处理id between 1 to 40的数据;
进程2将只需要处理id between 41 to 80的数据;
进程3将只需要处理id between 81 to 100的数据。

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from concurrent.futures import ProcessPoolExecutor     def main():      # 自增id最大值      max_id = 30000000      # 单worker处理数据量      worker_mission_size = 1000000      # 使用多进程进行处理      missions = mission_handler(max_id, worker_mission_size)      workers = []      executor = ProcessPoolExecutor()      for idx, mission in enumerate (missions):          start_id, end_id = mission          workers.append(executor.submit(data_handler, start_id, end_id, idx))     def data_handler(start_id, end_id, worker_id):      pass

思路总结

  1. 避免深度翻页进而使用自增id进行查询数据和数据
  2. 使用多进程处理数据

数据处理技巧

记录处理成功与处理失败的数据id,以便后续跟进处理

?
1 2 # 用另外一张表记录处理状态 insert into db.tb_handle_status(row_id, success) values (999, 0);

循环体内进行异常捕获,避免程序异常退出

?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 def data_handler(start_id, end_id, worker_id):      # 数据连接      conn, cursor = mysql()      current_id = start_id          try :              while current_id < = end_id:                  try :                      # TODO 数据处理代码                      pass                    except Exception as e:                      # TODO 记录处理结果                      # 数据移动到下一条                      current_id + = 1                      continue                  else :                      # 无异常,继续处理下一条数据                      current_id + = 1          except Exception as e:              return 'worker_id({}): result({})' . format (worker_id, False )          finally :              # 数据库资源释放              cursor.close()              conn.close()            return 'worker_id({}): result({})' . format (worker_id, True )

更新数据库数据尽量使用批量提交

?
1 2 3 4 5 6 7 8 sql = """update db.tb set a=%s, b=%s where id=%s""" values = [              ( 'a_value' , 'b_value' , 9999 ),              ( 'a_value' , 'b_value' , 9998 ),              ...           ] # 批量提交,减少网络io以及锁获取频率 cursor.executemany(sql, values)

以上就是MySQL单表千万级数据处理的思路分享的详细内容,更多关于MySQL单表千万级数据处理的资料请关注服务器之家其它相关文章!

原文链接:https://juejin.cn/post/6969527649995587615

可以去百度分享获取分享代码输入这里。
声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

【腾讯云】云服务器产品特惠热卖中
搜索
标签列表
    关注我们

    了解等多精彩内容