zzzhc's Blog

stay curious

Qpid Ruby Client

Qpid对ruby的官方支持:https://svn.apache.org/repos/asf/qpid/trunk/qpid/ruby/

感觉是对python client的port, 代码风格看起来很不ruby, 用了挺多锁,信号,线程,想要停止一个connection都比较困难。对AMQP协议的实现细节被序列化到一个spec_cache下,不可读。基本没有文档,总体来说不像是一个可以正式用的东西。

大概的用法,用rake gem生成gem包,安装gem install pkg/qpid-0.10.2.gem

用qpid java broker的时候要注意下,创建connection的时候要指定用户名密码。 conn = Qpid::Connection.new(TCPSocket.new(“localhost”, 5672), :username => “guest”, :password => “guest”)

参考example写了两个简单例子 producer.rb

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env ruby

require "rubygems"
require "qpid"
require "socket"

conn = Qpid::Connection.new(TCPSocket.new("localhost", 5672),
                                         :username => "guest",
                                         :password => "guest")
conn.start(10)

ssn = conn.session("test_producer")

# create a queue
ssn.queue_declare("test-queue")
ssn.exchange_declare("test-exchange", :type => "direct")

dp = ssn.delivery_properties(:routing_key => "test-queue")
mp = ssn.message_properties(:content_type => "text/plain")

ssn.message_transfer(:message => Qpid::Message.new(dp, mp, "Hello QPID!"))
ssn.message_transfer(:message => Qpid::Message.new(dp, mp, "Hello RUBY!"))
while line = gets.strip
  break if line =~ /^(exit|done)$/i
  ssn.message_transfer(:message => Qpid::Message.new(dp, mp, line.strip))
end
ssn.message_transfer(:message => Qpid::Message.new(dp, mp, "done"))
ssn.sync

ssn.close()
conn.close()

consumer.rb

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/usr/bin/env ruby

require "rubygems"
require "qpid"
require "socket"

conn = Qpid::Connection.new(TCPSocket.new("localhost", 5672),
                                         :username => "guest",
                                         :password => "guest")
conn.start(10)

ssn = conn.session("test_consumer")

incoming = ssn.incoming("messages")
ssn.message_subscribe(
  :destination => "messages",
  :queue => "test-queue",
  :accept_mode => ssn.message_accept_mode.none
)

# start incoming message flow
incoming.start()

while true
 body = incoming.get().body
 puts body
 break if body == "done"
end

ssn.close()
conn.close()

另一个ruby AMQP协议的实现是ruby-amqp ,基于event-machine, 目前只支持amqp 0-8.

主要是为与rabbitmq通讯打造,简单试了下simple example也可用在qpid上,其它自动的example基本跑不过。现在开发比较活跃,更看好它。

Comments