id=42, template_id=some:template:id:1 data_str:{"name":"lisi"}
2023/02/03 07:18:19 Sending Email to User: user_id=42, template_id=some:template:id:2 data_str:{"name":"lisi"}
生产者 go run test.go
$ go run test.go
2023/02/03 07:18:09 enqueued task: id=5d998c6b-3978-4a25-a096-6e564e032359 queue=default
2023/02/03 07:18:12 enqueued task: id=74a5fea4-d4d4-465f-b310-31981e472f6a queue=default
2023/02/03 07:18:15 enqueued task: id=41c46b7b-ea78-4abc-878a-ea65e3859e28 queue=default
三、细节
1. 关于asynq的优雅退出
如果异步服务突然被暂停,正在执行的异步任务会push到队列中,下次启动的时候自动执行。
我们可以将一个异步任务中途sleep几秒,发送一个异步任务,任务没执行完中途停掉任务测试出结果:
再次启动异步任务服务,发现这个任务被重新执行。
2. client中 client.Enqueue 的使用
- 立即处理任务
client.Enqueue(t1, time.Now())
2)延时处理任务, 两小时后处理
client.Enqueue(t2, asynq.ProcessIn(time.Now().Add(2 * time.Hour)))
- 任务重试,最大重试次数为25次。
client.Enqueue(task, asynq.MaxRetry(5))
4)确保任务的唯一性
4-1:使用TaskID选项:自行生成唯一的任务 ID
_, err := client.Enqueue(task, asynq.TaskID("mytaskid"))
// Second task will fail, err is ErrTaskIDConflict (assuming that the first task didn't get processed yet)
_, err = client.Enqueue(task, asynq.TaskID("mytaskid"))
4-2:使用Unique选项:让 Asynq 为任务创建唯一性锁
err := c.Enqueue(t1, asynq.Unique(time.Hour))
四、监控和管理工具
另外,asynq
异步任务提供了命令行工具和 Asynqmon
用于监控和管理 Asynq
异步任务和队列。WebUI
可以通过传递两个标志来启用与 Prometheus
的集成。
Asynqmon
asynqmon
是 asynq
延迟队列、定时队列的 webui
#asynqmon asynq延迟队列、定时队列的webui
asynqmon:
image: hibiken/asynqmon:latest
container_name: asynqmon
ports:
- 8980:8080
command:
- '--redis-addr=192.168.0.120:6379'
- '--redis-password=123456'
- '--redis-db=2'
restart: always
# networks:
# - looklook_net
# depends_on:
# - redis
启动服务:
docker-compose up
访问:
http://192.168.0.120:8980/
参考文档:
Asynq 实现 Go 异步任务处理