Node文件下载服务并发问题优化(数据库方式)

2024-01-03 | 1,062浏览 | 0评论 | 标签:无

前言

上篇《记录Node服务cpu 100%优化过程》末尾提到的文件下载服务并发控制问题,今天再简单地总结一下自己的处理方法,简单易上手。

需求背景

我的《油管评论下载器》工具中,涉及到批量请求第三方接口,并将结果生成excel供下载的功能,之前为了省时,做成了同步。随着用户量的增加,生成的文件已经过万,服务器CPU时而占满100%,节流的工作不能再拖了。向java同事请教过他们处理并发的方式,主要用到各种缓存和锁。考虑到实现难度和应用场景,最后决定采用这种相对简单的数据库方式。

技术结构

后端:Node.js(express) + mysql + pm2
前端:Vue + Element-ui + nginx

思路

将每个下载任务在数据库中生成一条记录,并定义一个状态字段(进行中、排队中、已完成...),每次只有一个"进行中",其余都为“排队中”,完成之后标记为“已完成”。设置一个定时器,轮询剩下的任务。

具体步骤:

一、更改表结构:

每条下载记录额外添加3个字段:
status (执行状态,varchar,枚举有:WAITING/DOING/SUCCESS/FAILED/CANCEL/EXPIRED...)
task_conditions (执行条件,用于保存具体的任务内容,json格式,varchar);
task_create_time (任务生成时间,用于排序和展示,datetime);

Node文件下载服务并发问题优化(数据库方式)

二、核心代码

注意以下代码仅供思路参考,可能并不能直接运行。

// express路由
const router = require('express').Router();
router.post('/api/createTask',createTask);

// 生成任务
const createTask = async(req,res,next)=>{
    const { code, order, videoId } = req.body;
    // 指定条件查询任务列表
    const taskList = await queryTaskList({ videoId, code });
    const task_conditions = JSON.stringify({ order,videoId});
    const time = dayjs().format('YYYY/MM/DD HH:mm:ss');
    if(taskList.length){
      //任务存在,更新条件并继续等待 (updateRecord为更新数据行方法,略)
      await updateRecord({code,videoId,status:'WAITING',task_conditions})
    }else{
      //任务不存在,则新建任务
      await updateRecord({code,videoId,status:'WAITING',task_conditions,task_create_time:time})
    }

    // 立即执行任务
    await handleTask();

    // 告知前端当前排队量
    const waitList = await queryTaskList({ status: ["WAITING"] });

    // 接口回调
    res.send({ code: 1, data: { waitLen:waitList.length||0 }, msg: '' })
}

// 查询任务列表
const queryTaskList = async ({ status = [], videoId }) => {

  // 按条件查询
  const statusArray = status.map((str) => `"${str}"`)
  const where_status = status.length ? `AND status IN (${statusArray})` : ``;
  const where_video_id = videoId ? `AND video_id = '${videoId}'` : ``;

  // _query为mysql查询方法,略
  const data = await _query(`SELECT video_id,task_conditions FROM task_table WHERE 1=1 ${where_status} ${where_code} ${where_video_id} ORDER BY task_create_time ASC`);
  return data;
}

// 处理任务
const handleTask = async ()=>{
 const doingList = await queryTaskList({ status: ["DOING"] });
  // 并发量限制为 <= 1
  if (doingList.length <= 0) {
    const waitList = await queryTaskList({ status: ["WAITING"] });
    if (waitList.length > 0) {
      const task = waitList[0];
      let { task_conditions } = task;

      // 如果没有执行条件,状态更新为失败
      if (!task_conditions) {
        await updateRecord({
          code: task.code,
          video_id: task.video_id,
          status: 'FAILED',
        });
      } else {
        // 变更状态
        await updateRecord({
          code: task.code,
          video_id: task.video_id,
          status: 'DOING',
        });

        task_conditions = JSON.parse(task_conditions);
        // 执行具体任务
        // TODO...
      }
    }
  }
}

// 定时轮询任务
(async () => {
  // 每隔5秒扫描一次任务列表
  setInterval(() => {
    handleTask()
  }, 5000);
})()

效果演示

Node文件下载服务并发问题优化(数据库方式)

总结

项目在本地自测通过,上线后持续观察了一周,运行良好,CPU也没再因这个原因而占满100%。

(本篇完。有疑问欢迎留言探讨)

留言:

*

* (方便回复通知)

打赏
编辑代码 运行结果
退出