正在加载文档...
文档内容较大,正在处理中,请稍候
正在加载文档...
文档内容较大,正在处理中,请稍候
本文档详细说明了后端服务中定时任务系统的实现和使用方法,包括任务调度器、任务配置、失败重试机制等内容。
node-schedule 的定时任务管理器TaskScheduler 是一个单例类,提供以下核心功能:
支持两种任务类型:
任务配置位于 config/base/schedules.js:
module.exports = {
// 数据库健康检查:每5分钟执行一次
databaseHealthCheck: {
cron: "*/5 * * * *", // Cron 表达式
description: "数据库连接健康检查",
},
// Redis健康检查:每5分钟执行一次
redisHealthCheck: {
cron: "*/5 * * * *",
description: "检查Redis连接状态",
},
// SSE心跳检查:每60秒执行一次
sseHeartbeat: {
cron: 60000, // 毫秒数(间隔任务)
description: "SSE连接心跳检查和清理",
},
// Token清理:通过环境变量配置
tokenCleanup: {
// cron: '', // 通过env配置
description: "过期Token清理",
},
// 验证码清理:每60秒执行一次
captchaCleanup: {
cron: 60000,
description: "过期验证码清理",
},
};Cron 表达式格式:秒 分 时 日 月 周
| 字段 | 允许值 | 特殊字符 |
|---|---|---|
| 秒 | 0-59 | , - * / |
| 分 | 0-59 | , - * / |
| 时 | 0-23 | , - * / |
| 日 | 1-31 | , - * ? / L W |
| 月 | 1-12 | , - * / |
| 周 | 0-7 | , - * ? / L # |
常用示例:
"*/5 * * * *"; // 每5分钟执行一次
"0 */1 * * *"; // 每小时执行一次
"0 0 * * *"; // 每天0点执行
"0 0 0 * * 0"; // 每周日0点执行
"0 0 0 1 * *"; // 每月1号0点执行
"0 0 2 * * *"; // 每天凌晨2点执行部分任务可以通过环境变量配置执行频率:
开发环境(config/env/development.js):
schedules: {
tokenCleanup: {
cron: "*/5 * * * *", // 开发环境:每5分钟清理一次
},
},生产环境(config/env/production.js):
schedules: {
tokenCleanup: {
cron: "0 * * * *", // 生产环境:每小时清理一次
},
},使用 addFromConfig() 方法从配置文件添加任务:
const scheduler = require("./src/infra/scheduler");
// 从配置中添加任务
scheduler.addFromConfig("databaseHealthCheck", async () => {
try {
const ok = await checkDatabaseConnection();
if (!ok) {
logger.warn("⚠️ 数据库健康检查失败");
}
} catch (error) {
logger.error("数据库健康检查出错:", { error });
}
});使用 addTask() 方法直接添加任务:
const scheduler = require("./src/infra/scheduler");
// 方式1:使用 Cron 表达式
scheduler.addTask(
"myTask",
"*/10 * * * *", // 每10分钟执行一次
async () => {
console.log("执行任务");
},
{
description: "我的定时任务",
}
);
// 方式2:使用间隔时间(毫秒)
scheduler.addTask(
"myIntervalTask",
60000, // 每60秒执行一次
async () => {
console.log("执行间隔任务");
}
);// 移除单个任务
scheduler.removeTask("myTask");
// 移除所有任务
scheduler.removeAllTasks();// 获取所有任务状态
const tasks = scheduler.getAllTasks();
console.log(tasks);
// 输出示例:
// [
// {
// taskId: "databaseHealthCheck",
// status: { failed: false }
// },
// {
// taskId: "myTask",
// status: {
// failed: true,
// failureCount: 3,
// lastError: "Connection timeout",
// nextAttempt: "2024-12-19T10:30:00.000Z"
// }
// }
// ]任务执行失败时,调度器会自动记录失败信息:
当任务连续失败 3 次后,调度器会自动暂停任务:
Math.min(300000, Math.pow(2, failureCount - 1) * 10000)任务执行
↓
执行失败
↓
记录失败次数 +1
↓
失败次数 < 3?
├─ 是 → 记录错误日志,下次继续执行
└─ 否 → 计算冷却时间,暂停任务
↓
冷却时间结束
↓
自动恢复执行任务执行成功后,失败计数会自动重置:
// 任务执行成功
await taskFunction();
this.resetTaskFailureCount(taskId); // 自动重置失败计数任务ID:databaseHealthCheck
执行频率:每 5 分钟
功能:检查数据库连接状态
scheduler.addFromConfig("databaseHealthCheck", async () => {
try {
const ok = await checkDatabaseConnection();
if (!ok) {
logger.warn("⚠️ 数据库健康检查失败,请检查数据库服务状态");
}
} catch (error) {
logger.error("数据库健康检查出错:", { error });
}
});任务ID:redisHealthCheck
执行频率:每 5 分钟
功能:检查 Redis 连接状态(仅在 Redis 缓存模式下启用)
if (cacheType === "redis" && config.redis?.host && config.redis?.port) {
scheduler.addFromConfig("redisHealthCheck", async () => {
try {
const connected = await performRedisHealthCheck();
if (!connected) {
logger.debug("Redis连接不可用,确认已切换到内存缓存");
}
} catch (error) {
logger.debug("Redis健康检查出错(已预期),系统应自动使用内存缓存:", {
error: error,
});
}
});
}任务ID:sseHeartbeat
执行频率:每 60 秒
功能:检查 SSE 连接心跳,清理非活跃连接
scheduler.addFromConfig("sseHeartbeat", () => {
sseManager.checkHeartbeats();
sseManager.cleanupInactiveConnections();
});任务ID:tokenCleanup
执行频率:
scheduler.addFromConfig("tokenCleanup", async () => {
try {
await authController.cleanupExpiredTokens();
} catch (error) {
logger.error("❌ Token清理任务失败", { error });
}
});任务ID:captchaCleanup
执行频率:每 60 秒
功能:清理过期的验证码(配置中定义,实际使用需在代码中初始化)
任务初始化在 index.js 的 initScheduledTasks() 函数中:
// 20. 定时任务初始化函数
const initScheduledTasks = async () => {
logger.info("⏰ 初始化定时任务...");
// 启动时清理过期 Token
try {
const cleanedCount = await authController.cleanupExpiredTokens();
if (isDev && cleanedCount > 0) {
logger.info(`🚀 启动时清理了 ${cleanedCount} 个过期token`);
}
} catch (error) {
logger.error("启动时清理失败", { error });
}
// 添加数据库健康检查
scheduler.addFromConfig("databaseHealthCheck", async () => {
// ...
});
// 添加 Redis 健康检查(如果启用)
if (cacheType === "redis") {
scheduler.addFromConfig("redisHealthCheck", async () => {
// ...
});
}
// 添加 SSE 心跳检查
scheduler.addFromConfig("sseHeartbeat", () => {
// ...
});
// 添加 Token 清理
scheduler.addFromConfig("tokenCleanup", async () => {
// ...
});
logger.info("✅ 所有定时任务初始化完成");
};任务在服务器启动后自动初始化:
app.listen(envconfig.port, envconfig.host, async () => {
// ... 其他初始化代码
// 步骤2: 初始化定时任务系统
await initScheduledTasks();
});服务关闭时,调度器会自动清理所有任务:
// 在 scheduler.js 中
shutdown() {
this.removeAllTasks();
logger.debug("🛑 定时任务管理器已关闭");
}建议在应用退出时调用 shutdown():
process.on("SIGINT", async () => {
scheduler.shutdown();
// ... 其他清理代码
process.exit(0);
});好的实践:
scheduler.addFromConfig("myTask", async () => {
try {
// 执行任务逻辑
const result = await doSomething();
logger.info("任务执行成功", { result });
} catch (error) {
// 捕获异常,避免影响调度器
logger.error("任务执行失败", { error: error.message });
// 不抛出异常,让调度器记录失败即可
}
});不好的实践:
scheduler.addFromConfig("myTask", async () => {
// ❌ 未捕获异常,可能导致调度器崩溃
await doSomething();
// ❌ 未记录日志,难以排查问题
processData();
});getAllTasks() 返回的任务状态可能原因:
解决方案:
// 检查任务状态
const tasks = scheduler.getAllTasks();
console.log(tasks);
// 检查任务配置
const config = require("./config");
console.log(config.schedules);原因:任务内部抛出了未捕获的异常
解决方案:在任务内部捕获所有异常:
scheduler.addTask("myTask", "*/5 * * * *", async () => {
try {
await doSomething();
} catch (error) {
logger.error("任务执行失败", { error });
// 不抛出异常,让调度器记录失败
}
});方案1:移除旧任务,添加新任务
scheduler.removeTask("myTask");
scheduler.addTask("myTask", "*/10 * * * *", taskFunction);方案2:修改配置文件后重启服务(推荐用于生产环境)
解决方案:
| 方法 | 说明 | 参数 | 返回值 |
|---|---|---|---|
addTask(taskId, cronExpression, taskFunction, options) |
添加定时任务 | taskId: 任务IDcronExpression: Cron表达式或毫秒数taskFunction: 任务函数options: 选项对象 |
Job 对象 |
addFromConfig(taskKey, taskFunction, options) |
从配置添加任务 | taskKey: 配置键taskFunction: 任务函数options: 选项对象 |
Job 对象或 null |
removeTask(taskId) |
移除任务 | taskId: 任务ID |
void |
removeAllTasks() |
移除所有任务 | - | void |
getAllTasks() |
获取所有任务状态 | - | 任务状态数组 |
shutdown() |
关闭调度器 | - | void |
interface TaskStatus {
taskId: string;
status: {
failed: boolean;
failureCount?: number;
lastError?: string;
nextAttempt?: string; // ISO 时间字符串或 "Now"
};
}scheduler.addFromConfig("dataCleanup", async () => {
try {
// 清理 30 天前的监控数据
const daysToKeep = 30;
await monitorModel.cleanExpiredData(daysToKeep);
logger.info("监控数据清理完成");
} catch (error) {
logger.error("数据清理失败", { error });
}
});scheduler.addTask(
"dailyReport",
"0 0 1 * * *", // 每天凌晨1点执行
async () => {
try {
const report = await generateDailyReport();
await saveReport(report);
logger.info("日报生成完成");
} catch (error) {
logger.error("日报生成失败", { error });
}
}
);scheduler.addTask(
"cacheRefresh",
300000, // 每5分钟执行一次
async () => {
try {
await refreshUserCache();
await refreshMenuCache();
logger.debug("缓存刷新完成");
} catch (error) {
logger.error("缓存刷新失败", { error });
}
}
);定时任务系统提供了:
通过合理使用定时任务系统,可以实现数据清理、健康检查、资源管理等自动化功能,提高系统的可靠性和可维护性。
可以自己参照示例,添加新的定时任务