Python3 通过 kombu 连接

【RabbitMQ 服务器】

# 在 vhosttest 里面有 exchangetest 和 queuetest 通过 rkeytest 绑定  Broker: 192.168.0.xx  virtual host: vhosttest  Exchange: exchangetest   Queue: queuetest   Routing key: rkeytest

【Python 环境】

OS: Windows 10  Python: 3.6.3 x64  kombu: 4.1.0

【查看队列状态】

# 通过浏览器查看队列状态  http://192.168.0.xx:15672/api/queues/vhosttest/queuetest      # 通过命令行查看队列状态curl -u user:password   http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  |  jq     # 通过命令行查看队列长度(messages = messages_ready + messages_unacknowledged)  curl -s -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  |       jq '.messages'

【send.py】

#encoding: utf-8  #author: walker  #date: 2018-03-09  #summary: 发送方/生产者    import os, sys, time  from kombu import Connection    def Main():  	with Connection('amqp://test:[email protected]:5672/vhosttest') as conn:  		with conn.channel() as channel:  			#producer = Producer(channel)  			producer = channel.Producer()    			while True:  				message = time.strftime('%H:%M:%S', time.localtime())  				producer.publish(  						body=message,  						retry=True,  						exchange='exchangetest',  						routing_key='rkeytest'  					)  				print('send message: %s' %  message)         				while True:  				        # 检查队列,以重新得到消息计数  					queue = channel.queue_declare(queue='queuetest', passive=True)  					messageCount = queue.message_count  					print('messageCount: %d' % messageCount)  					if messageCount < 100:  						break  					time.sleep(1)      if __name__ == '__main__':  	Main()

【recv.py】

#encoding: utf-8  #author: walker  #date:  2018-03-09  #summary: 接收方/消费者    import os, sys, time  from kombu import Connection, Queue  from kombu.mixins import ConsumerMixin    class C(ConsumerMixin):    	def __init__(self, connection, queueNmae):  		self.connection = connection  		self.queues = [Queue(queueNmae, durable=False)]    	def get_consumers(self, Consumer, channel):  		return [  			Consumer(self.queues, callbacks=[self.on_message]),  		]    	# 接收处理消息的回调函数  	def on_message(self, body, message):  		print("Received %s" % body)  		message.ack()    def Main():  	with Connection('amqp://test:[email protected]:5672/vhosttest') as conn:  		C(conn, 'queuetest').run()    if __name__ == '__main__':  	Main()