Redis 驱动的 Laravel Job Release 实现原理
在 Laravel 中处理 job 时,可以使用 release
方法将 job 重新放回队列,并且可以自定义延迟时间。那么,这里有两个问题:
- Job 是放回队列的队首还是队尾?
- 如何保证延迟时间后被重新消费?
假设队列的名字是 default
。
调用 Release 方法后的操作
当一个 job 被 dispatch 后,会被 push 到 queue:default
这个 Redis key 的队尾。当 job 被消费时,会从 queue:default
中取出,然后再被 push 到 queue:default:reserved
中,这样做有以下好处:
- 避免 job 丢失,防止 worker 意外被杀掉或因执行超时被杀掉后丢失正在被消费的 job。
- 方便在代码中对 job 进行处理,例如
release
。
调用 job 的 release
方法时,会执行 Lua 脚本,先从 queue:default:reserved
key 中使用 zrem
命令删除 job,然后再使用 zadd
命令写入到 queue:default:delayed
key 中。如果 release
指定了 delay
,在调用 zadd
时会指定 score
参数。
Delayed 队列的使用
Worker 在获取下一个 job 时会执行一些 Lua 脚本。
处理 Delayed 的 Job
使用 Lua 脚本操作 3 个 Redis key:
- 执行
zrangebyscore
从queue:default:delayed
key 中取出所有过期的 job,过期的 job 表示在 delay 后到时间要被处理了。 - 对这些 job 进行分批处理,每批 100 个。
- 调用
rpush
方法添加到queue:default
队尾。 - 调用
rpush
方法添加到queue:default:notify
队尾。
处理 Reserved 的 Job
使用 Lua 脚本操作 3 个 Redis key:
- 执行
zrangebyscore
从queue:default:reserved
key 中取出所有过期的 job,过期的 job 表示因超时被杀掉或意外丢失。 - 对这些 job 进行分批处理,每批 100 个。
- 调用
rpush
方法添加到queue:default
队尾。 - 调用
rpush
方法添加到queue:default:notify
队尾。
获取下一个 Job
使用 Lua 脚本操作 3 个 Redis key:
- 从
queue:default
key 中lpop
一个值。 - 更新该值的
attempt
次数,+1。 - 使用
zadd
添加到queue:default:reserved
key 中,并设置过期时间。 - 从
queue:default:notify
key 中lpop
出来。
通过以上步骤可以看到,如果队列驱动是 Redis,并不能保证 job 在 delay 多少秒后会被消费,而只是被放回到队列的队尾。但是对于 SQS 驱动,是更新 job 的 VisibilityTimeout
值,所以可以在 delay 后被重新消费,因为并不是放回到队尾。