【】每秒轮询将到期的任务迁移到就绪

Pump Thread:每秒轮询 Redis 将过期任务迁移到就绪队列 Metric Collector,定期收集队列相关统计信息到实例,然后通过 prometheus exporter 暴露给监控系统 Token Manager 来管理命名空间和令牌。Module,namespace是用于业务隔离的Producer/Consumer单元,用于处理用户任务和消费请求

除了将业务数据存储在 Default Pool 中,namespace/token 等元数据也会默认存储在 Default Redis 池中。

基本概念

数据存储

lmstfy的redis store由四部分组成:

timer(sorted set) – 用于对延迟的任务进行排序,然后后台线程定期将过期的任务写入Ready Queue 就绪队列(list) – 无延迟或过期任务的队列死信(list) – 失败的任务消费(重试次数达到上限)可以手动放回队列作业 pool(string) – 存储消息内容的池

启用延迟的任务队列本质上是两种数据结构的组合:FIFO 和有序集。sorted set 用于实现延迟部分,按过期时间戳升序存储任务,然后周期性地将过期任务迁移到 FIFO(就绪队列)中。任务的具体内容只会存储在作业池中。其他的比如ready queue、timer、deadletter等只存储job id,这样可以节省一些内存空间。

这是整体设计:

任务写

写任务时,会先生成一个job id。目前job id(16bytes)包含写时间戳、随机数和延迟秒,然后key为j:{namespace}/{queue}/{ID}。任务进入任务池(pool)里面。然后根据延迟时间,判断job id应该去ready queue还是timer:

timer的实现是利用zset按照绝对时间戳排序c语言延时程序2s,然后由bypass线程周期性轮询,通过redis lua脚本将过期任务原子性地转移到就绪队列中。

任务消耗

前面说过,任务消费失败后预计会重试,那么需要知道什么时候考虑任务消费失败?服务在消费服务时需要携带ttr(运行时间)参数,用于表示服务预期任务的最长执行时间。如果服务在 ttr 时间内没有收到服务的 ACK 消息,则认为任务失败(类似于 tcp 的重传定时器)。)。

图片[1]-【】每秒轮询将到期的任务迁移到就绪-老王博客

消费时,(B)RPOP从就绪队列中取出任务的job id,然后根据job id将任务内容从pool中发送给consumer。同时,将尝试次数减1,根据消耗的ttr(运行时间)参数将任务放入定时器。如果 try 为零,则在 ttr 时间到期后,作业 id 将被放入死信队列(表示作业执行失败)。

同步任务模型

除了实现异步和延迟任务模型外,lmstfy 还可以用来实现同步任务模型(生产者等待任务执行成功并返回),因为命名空间下的队列是动态创建的,并且作业 id 是全局唯一的。大致如下:

生产者写完任务后得到job id,然后监听(消费)job id命名的队列。消费者消费成功后,将回复消息写入以job id命名的队列。如果生产者在规定时间内能够读取到回复消息,则认为消费成功,等待超时则认为任务失败。

如何水平缩放

lmstfy 本身就是一个无状态服务,可以轻松实现横向扩展。这里的横向扩展主要是存储的横向扩展(目前只支持Redis)。设计也比较简单,主要是通过命名空间对应的token路由。比如我们目前配置了两组Redis资源:default和meipai:

[Pool][Pool.default]Addr = "1.1.1.1:6379"[Pool.meipai]Addr = "2.2.2.2:6389"

在创建命名空间时可以指定资源池,token会携带资源池名称作为前缀。如果指定美拍资源池,则令牌类似于:美拍:01DT8EZ1N6XT。以后处理请求时,可以根据token中携带的资源池名称来路由数据。但是,这种设计实现了队列级扩展。如果单个队列中存储的消息量超过 Redis 内存的上限,则需要其他手段来解决(稍后会支持磁盘式存储)。

如何使用

# 创建 namespace 和 token, 注意这里使用管理端口$ ./scripts/token-cli -c -n test_ns -p default -D "test ns apply by @hulk" 127.0.0.1:7778
{ "token": "01DT9323JACNBQ9JESV80G0000"}
# 写入内容为 value 的任务$ curl -XPUT -d "value" -i "http://127.0.0.1:7777/api/test_ns/q1?tries=3&delay=1&token=01DT931XGSPKNB7E2XFKPY3ZPB"
{"job_id":"01DT9323JACNBQ9JESV80G0000","msg":"published"}
# 消费任务$ curl -i "http://127.0.0.1:7777/api/test_ns/q1?ttr=30&timeout=3&&token=01DT931XGSPKNB7E2XFKPY3ZPB"
{"data":"value","elapsed_ms":272612,"job_id":"01DT9323JACNBQ9JESV80G0000","msg":"new job","namespace":"test_ns","queue":"q1","ttl":86127}
# ACK 任务 id,表示消费成功不再重新下发改任务curl -i -XDELETE "http://127.0.0.1:7777/api/test_ns/q1/job/01DT9323JACNBQ9JESV80G0000?token=01DT931XGSPKNB7E2XFKPY3ZPB"

有关更详细的 API 描述,请参阅项目 README。目前,我们提供两种语言 SDK,PHP/Golang。其他语言可以直接基于HTTP库打包。

监测指标

lmstfy 任务队列的另一个设计目标是提供足够的监控指标。除了监控告警外,还可以为k8s等调度器提供反馈指标,引导系统根据当前队列堆积情况进行动态扩容。

业务指标:

性能相关指标:

未来计划

在我们目前的使用场景中,一个2G的redis实例可以支持大约千万级的延迟任务。但是在对象存储的生命周期管理(对象存储的TTL)等时间量大、延时长的场景下,使用Redis存储的成本相对较高。未来我们会考虑基于本地文件将数据存储到磁盘或者使用kvrocks(自研SSD Redis KV)作为存储。kvrocks 目前也是开源的。美图内网已经部署了近100个实例,外网有白山云等公司正在使用,后续会发布相关设计和实现文章。欢迎大家关注使用c语言延时程序2s,更欢迎issues和PR。

© 版权声明
THE END
喜欢就支持一下吧
点赞0
分享
评论 抢沙发

请登录后发表评论