工作队列
在第一篇教程中,我们已经写了一个从已知队列中发送和获取消息的程序。在这篇教程中,我们将创建一个工作队列(Work Queue),它会发送一些耗时的任务给多个工作者(Worker)。
工作队列(又称:任务队列——Task Queues)是为了避免等待一些占用大量资源、时间的操作。当我们把任务(Task)当作消息发送到队列中,一个运行在后台的工作者(worker)进程就会取出任务然后处理。当你运行多个工作者(workers),任务就会在它们之间共享。
这个概念在网络应用中是非常有用的,它可以在短暂的HTTP请求中处理一些复杂的任务。
准备
之前的教程中,我们发送了一个包含“Hello World!”的字符串消息。现在,我们将发送一些字符串,把这些字符串当作复杂的任务。我们没有真实的例子,例如图片缩放、pdf文件转换。所以使用time.sleep()函数来模拟这种情况。我们在字符串中加上点号(.)来表示任务的复杂程度,一个点(.)将会耗时1秒钟。比如”Hello…”就会耗时3秒钟。
我们对之前教程的send.py做些简单的调整,以便可以发送随意的消息。这个程序会按照计划发送任务到我们的工作队列中,如下:
1 2 3 4 5 6 |
#!/usr/bin/env python import pika credentials = pika.PlainCredentials('livemq','DA8664849C573ECE3CA0DF') connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',5672,'/',credentials)) channel = connection.channel() channel.queue_declare(queue='hello') |
1 2 3 4 5 6 7 |
import sys message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print " [x] Sent %r" % (message,) connection.close() |
我们的旧脚本(receive.py)同样需要做一些改动:它需要为消息体中每一个点号(.)模拟1秒钟的操作。它会从队列中获取消息并执行,如下:
1 2 3 4 5 6 7 |
#!/usr/bin/env python import pika credentials = pika.PlainCredentials('livemq','DA8664849C573ECE3CA0DF') connection = pika.BlockingConnection(pika.ConnectionParameters('localhost',5672,'/',credentials)) channel = connection.channel() channel.queue_declare(queue='hello') print ' [*] Waiting for messages. To exit press CTRL+C' |
1 2 3 4 5 6 7 8 9 10 |
import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" channel.basic_consume(callback, queue='hello', no_ack=True) channel.start_consuming() |
循环调度:
使用工作队列的一个好处就是它能够并行的处理队列。如果堆积了很多任务,我们只需要添加更多的工作者(workers)就可以了,扩展很简单。首先,我们先同时运行两个receive.py脚本,它们都会从队列中获取消息,到底是不是这样呢?我们看看。你需要打开三个终端,两个用来运行receive.py脚本,这两个终端就是我们的两个消费者(consumers)——C1和C2。
1 2 |
shell1$ python worker.py [*] Waiting for messages. To exit press CTRL+C |
1 2 |
shell2$ python worker.py [*] Waiting for messages. To exit press CTRL+C |
第三个终端,我们用来发布新任务。你可以发送一些消息给消费者(consumers):
1 2 3 4 5 |
shell3$ python new_task.py First message. shell3$ python new_task.py Second message.. shell3$ python new_task.py Third message... shell3$ python new_task.py Fourth message.... shell3$ python new_task.py Fifth message..... |
看看到底发送了什么给我们的工作者(workers):
1 2 3 4 5 |
shell1$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....' |
1 2 3 4 |
shell2$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....' |
默认来说,RabbitMQ会按顺序得把消息发送给每个消费者(consumer)。平均每个消费者都会收到同等数量得消息。这种发送消息得方式叫做——轮询(round-robin)。试着添加三个或更多得工作者(workers)。
公平调度
你应该已经发现,它仍旧没有按照我们期望的那样进行分发。比如有两个工作者(workers),处理奇数消息的比较繁忙,处理偶数消息的比较轻松。然而RabbitMQ并不知道这些,它仍然一如既往的派发消息。
这时因为RabbitMQ只管分发进入队列的消息,不会关心有多少消费者(consumer)没有作出响应。它盲目的把第n-th条消息发给第n-th个消费者。
我们可以使用basic.qos方法,并设置prefetch_count=1。这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个工作者(worker),直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的工作者(worker)。
1 |
channel.basic_qos(prefetch_count=1) |
关于队列大小
如果所有的工作者都处理繁忙状态,你的队列就会被填满。你需要留意这个问题,要么添加更多的工作者(workers),要么使用其他策略。