Python3 通過 kombu 連接

【RabbitMQ 服務器】

# 在 vhosttest 裏面有 exchangetest 和 queuetest 通過 rkeytest 綁定  Broker: 192.168.0.xx  virtual host: vhosttest  Exchange: exchangetest   Queue: queuetest   Routing key: rkeytest

【Python 環境】

OS: Windows 10  Python: 3.6.3 x64  kombu: 4.1.0

【查看隊列狀態】

# 通過瀏覽器查看隊列狀態  http://192.168.0.xx:15672/api/queues/vhosttest/queuetest      # 通過命令行查看隊列狀態curl -u user:password   http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  |  jq     # 通過命令行查看隊列長度(messages = messages_ready + messages_unacknowledged)  curl -s -u user:password http://192.168.0.xx:15672/api/queues/vhosttest/queuetest  |       jq '.messages'

【send.py】

#encoding: utf-8  #author: walker  #date: 2018-03-09  #summary: 發送方/生產者    import os, sys, time  from kombu import Connection    def Main():  	with Connection('amqp://test:[email protected]:5672/vhosttest') as conn:  		with conn.channel() as channel:  			#producer = Producer(channel)  			producer = channel.Producer()    			while True:  				message = time.strftime('%H:%M:%S', time.localtime())  				producer.publish(  						body=message,  						retry=True,  						exchange='exchangetest',  						routing_key='rkeytest'  					)  				print('send message: %s' %  message)         				while True:  				        # 檢查隊列,以重新得到消息計數  					queue = channel.queue_declare(queue='queuetest', passive=True)  					messageCount = queue.message_count  					print('messageCount: %d' % messageCount)  					if messageCount < 100:  						break  					time.sleep(1)      if __name__ == '__main__':  	Main()

【recv.py】

#encoding: utf-8  #author: walker  #date:  2018-03-09  #summary: 接收方/消費者    import os, sys, time  from kombu import Connection, Queue  from kombu.mixins import ConsumerMixin    class C(ConsumerMixin):    	def __init__(self, connection, queueNmae):  		self.connection = connection  		self.queues = [Queue(queueNmae, durable=False)]    	def get_consumers(self, Consumer, channel):  		return [  			Consumer(self.queues, callbacks=[self.on_message]),  		]    	# 接收處理消息的回調函數  	def on_message(self, body, message):  		print("Received %s" % body)  		message.ack()    def Main():  	with Connection('amqp://test:[email protected]:5672/vhosttest') as conn:  		C(conn, 'queuetest').run()    if __name__ == '__main__':  	Main()