celery出现worker异常offline情况

出现的问题

celery版本:3.1.18

线上环境celery运行一段时间后,worker会出现大面积offline的情况,导致异步任务不能继续执行。

从线上日志看到出现很多missed heartbeat from celery@xxx的记录,查看celery文档,发现worker会每分钟发送一次heartbeat,如果2分钟没有发送的话,就认为worker offline了。

问题的可能原因

从网上查到的类似问题Issue #2296。里面有提到出现这个问题的可能的原因

  1. redis死锁
  2. 内存占满

暂定的几个解决方案

  1. 从网上查到的这类问题大多出现在celery-3.1.18版本及以前,而celery-3.1.19已经修复了上述worker hanging的问题,现在celery最新版本是4.2.0,所以可以尝试升级看看是否能解决问题,同时也要关心一下高版本兼容问题。
  2. supervisor定时重启worker,简单粗暴但不解决根本问题,尽量避免影响线上用户体验。
  3. 不用redis做broker,换rabbitmq。但是网上有人用rabbitmq也出现这类问题,所以不建议尝试。
  4. 监控celery worker的状态,出现大面积offline时发送报警邮件,手动重启worker,同样没有解决根本问题。

Celery升级

查看celery更新日志,发现在3.1.19版本修复了如下问题Issue #1847

1
Worker: Fixed worker hanging when gossip/heartbeat disabled (Issue #1847)

若希望升级到最新版本4.2.0,要注意,celery在3.x使用的是消息协议版本1,4.x使用的消息版本2,两者存在结构上的差异。在4.1.0版本出现了Issue #4356,大致问题是,当同时使用3.1.25版本和4.1.0版本时,3.1.25的task执行失败,被4.1.0的worker获取并重新执行时,任务message升级为版本2,但是仍然保留版本1的消息格式,如果此任务再次失败,并被3.1.25的worker获取并重新执行时,还会把这个消息误当作版本1的消息,最终造成崩溃和消息丢失。另外还有一类出现问题的情况是,4.1.0的生产者和3.1.25的消费者,当消费者调用retry()时,返回的消息体转为版本1,但是保留了版本2的headers,所以就被认为是版本2,然后出现崩溃和消息丢失的情况。然后在4.2.0的升级日志中,发现已经修复了这个问题

1
Message Protocol Compatibility: Handle “hybrid” messages that have moved between Celery versions (#4358) (Issue #4356)

所以,在升级时,建议升级到4.2.0。

安装Celery 4.2.0需要依赖Kombu 4.2及以上的版本。

1
2
3
4
5
$ pip install https://github.com/celery/celery/zipball/master#egg=celery
$ pip install https://github.com/celery/billiard/zipball/master#egg=billiard
$ pip install https://github.com/celery/py-amqp/zipball/master#egg=amqp
$ pip install https://github.com/celery/kombu/zipball/master#egg=kombu
$ pip install https://github.com/celery/vine/zipball/master#egg=vine

查看worker状态的方法

inspect

inspect()可以查看所有worker的状态

1
2
3
4
5
6
7
8
9
# Inspect all nodes.
>>> i = app.control.inspect()

# Specify multiple nodes to inspect.
>>> i = app.control.inspect(['worker1.example.com',
'worker2.example.com'])

# Specify a single node to inspect.
>>> i = app.control.inspect('worker1.example.com')

查看注册在worker上的任务

1
2
3
>>> i.registered()
[{'worker1.example.com': ['tasks.add',
'tasks.sleeptask']}]

查看worker上的活跃任务

1
2
3
4
5
6
>>> i.active()
[{'worker1.example.com':
[{'name': 'tasks.sleeptask',
'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf',
'args': '(8,)',
'kwargs': '{}'}]}]

scheduled()查看等待被安排的任务,这些任务带有eta/countdown参数,非周期性任务。

reserved()查看已经收到,但是仍在等待执行的任务。

stats()方法可以返回worker的详细数据。

ping

这个命令给所有存活的worker发送ping请求,worker返回pong,默认超时时间1秒。

1
2
3
4
>>> app.control.ping(timeout=0.5)
[{'worker1.example.com': 'pong'},
{'worker2.example.com': 'pong'},
{'worker3.example.com': 'pong'}]

ping也可以指定特定的worker

1
2
3
>>> ping(['worker2.example.com', 'worker3.example.com'])
[{'worker2.example.com': 'pong'},
{'worker3.example.com': 'pong'}]
谢谢支持