返回文章列表

Redis 驱动的 Laravel Job Release 实现原理

2024年12月02日
Laravel

Redis 驱动的 Laravel Job Release 实现原理

在 Laravel 中处理 job 时,可以使用 release 方法将 job 重新放回队列,并且可以自定义延迟时间。那么,这里有两个问题:

  1. Job 是放回队列的队首还是队尾?
  2. 如何保证延迟时间后被重新消费?

假设队列的名字是 default

调用 Release 方法后的操作

当一个 job 被 dispatch 后,会被 push 到 queue:default 这个 Redis key 的队尾。当 job 被消费时,会从 queue:default 中取出,然后再被 push 到 queue:default:reserved 中,这样做有以下好处:

  1. 避免 job 丢失,防止 worker 意外被杀掉或因执行超时被杀掉后丢失正在被消费的 job。
  2. 方便在代码中对 job 进行处理,例如 release

调用 job 的 release 方法时,会执行 Lua 脚本,先从 queue:default:reserved key 中使用 zrem 命令删除 job,然后再使用 zadd 命令写入到 queue:default:delayed key 中。如果 release 指定了 delay,在调用 zadd 时会指定 score 参数。

Delayed 队列的使用

Worker 在获取下一个 job 时会执行一些 Lua 脚本。

处理 Delayed 的 Job

使用 Lua 脚本操作 3 个 Redis key:

  1. 执行 zrangebyscorequeue:default:delayed key 中取出所有过期的 job,过期的 job 表示在 delay 后到时间要被处理了。
  2. 对这些 job 进行分批处理,每批 100 个。
  3. 调用 rpush 方法添加到 queue:default 队尾。
  4. 调用 rpush 方法添加到 queue:default:notify 队尾。

处理 Reserved 的 Job

使用 Lua 脚本操作 3 个 Redis key:

  1. 执行 zrangebyscorequeue:default:reserved key 中取出所有过期的 job,过期的 job 表示因超时被杀掉或意外丢失。
  2. 对这些 job 进行分批处理,每批 100 个。
  3. 调用 rpush 方法添加到 queue:default 队尾。
  4. 调用 rpush 方法添加到 queue:default:notify 队尾。

获取下一个 Job

使用 Lua 脚本操作 3 个 Redis key:

  1. queue:default key 中 lpop 一个值。
  2. 更新该值的 attempt 次数,+1。
  3. 使用 zadd 添加到 queue:default:reserved key 中,并设置过期时间。
  4. queue:default:notify key 中 lpop 出来。

通过以上步骤可以看到,如果队列驱动是 Redis,并不能保证 job 在 delay 多少秒后会被消费,而只是被放回到队列的队尾。但是对于 SQS 驱动,是更新 job 的 VisibilityTimeout 值,所以可以在 delay 后被重新消费,因为并不是放回到队尾。