ActiveMQ
JMS の実装、ActiveMQ を perl から使うことについて。特にトランザクションまわり。
※2024年11月27日追記。内容は今となっては古いものです
まずはJavaで。
Queue からメッセージを読みこみ、処理をして、失敗したら rollback したい、という場合にはJMS使ってJavaから使うと、こんな感じ?
参考にしたページ: TECHSCORE JMS / 5.メッセージ配信の確認
ActiveMQConnectionFactory factory =
new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
QueueConnection connection = factory.createQueueConnection();
// 第一引数が true で transacted な session になる。第二引数は無視される
QueueSession session = connection.createQueueSession(true,0);
Queue queue = session.createQueue("TestQueue");
QueueReceiver receiver = session.createReceiver(queue);
connection.start();
while (true) {
TextMessage msg = (TextMessage) receiver.receive();
// なにか処理する
if ( processMessage(msg) ) {
// 成功したら
session.commit();
}
else {
// 失敗したら
session.rollback();
}
if ( some condition ) { // なんらかの条件で
break; // ループを抜ける
}
}
receiver.close();
session.close();
connection.close();
こうすることで、メッセージ一つずつ受信しては処理し、成功や失敗によってトランザクションを commit、rollback することができる。
StompConnect と perl で
perl には Net::Stomp を使い、STOMP の受け口には ActiveMQ native なものではなく、StompConnect を使う(StompConnectの開発サイトは2015年に閉鎖されました)。
ActiveMQ の設定
conf/activemq.xml
で
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
を
<!-- transportConnector name="stomp" uri="stomp://localhost:61613"/ -->
としてコメントアウト。
StompConnect の起動
ActiveMQ (5.0-SNAPSHOT) と、StompConnect (1.0) が
./apache-activemq-5.0-SNAPSHOT/
./stompconnect-1.0/
として展開されていたとすると、
java -Djava.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory \
-classpath .:stompconnect-1.0/stompconnect-1.0.jar:stompconnect-1.0/lib/commons-logging-1.1.jar:stompconnect-1.0/lib/geronimo-jms_1.1_spec-1.0.jar:apache-activemq-5.0-SNAPSHOT/apache-activemq-5.0-SNAPSHOT.jar:stompconnect-1.0/lib/optional/log4j-1.2.12.jar \
org.codehaus.stomp.jms.Main tcp://localhost:61613
とすると、StompConnect が起動できる。
Net::Stompを使う
Net::Stomp には、トランザクション関連のメソッドが定義されてないので、適当に作ってみる
## 適当な unique ID を生成する関数
our $TxCount=0;
sub unique_tx_id {
return
sprintf( "txid-%x-%x-%x", $$, scalar(time), $TxCount++ );
}
## BEGIN
sub tx_begin {
my $stomp = shift;
my $tx = unique_tx_id();
my $frame = Net::Stomp::Frame-<new(
{ command => 'BEGIN', headers => { transaction =< $tx } } );
$stomp->send_frame($frame);
return $tx;
}
## COMMIT
sub tx_commit {
my ( $stomp, $conf ) = @_;
my $tx = $conf->{transaction};
my $frame = Net::Stomp::Frame->new(
{ command => 'COMMIT', headers => { transaction => $tx } } );
$stomp->send_frame($frame);
}
## ABORT
sub tx_abort {
my ( $stomp, $conf ) = @_;
my $tx = $conf->{transaction};
my $frame = Net::Stomp::Frame->new(
{ command => 'ABORT', headers => { transaction => $tx } } );
$stomp->send_frame($frame);
}
これで、
tx_begin($stomp)
tx_commit($stomp,{transaction=>$txid})
tx_aborrt($stomp,{transaction=>$txid})
の3つが定義された ($stomp
は Net::Stomp
のインスタンス)。
my $dest = '/queue/TestQueue';
my $st = Net::Stomp->new( { hostname => 'localhost', port => 61613 } );
$st->connect( { login => 'dummy', passcode => 'dummy' } );
my $tx = tx_begin($st);
$st->subscribe(
{ destination => $dest,
transaction => $tx,
}
);
while ($Run) {
## Receive
my $msg_frame = $st->receive_frame;
## Commit or Rollback
if ( do_some_work($msg_frame) ) {
tx_commit( $st, { transaction => $tx } );
}
else {
tx_abort( $st, { transaction => $tx } );
}
}
$st->unsubscribe( { destination => $dest } );
$st->disconnect;
これで、処理が失敗した場合に、それまでのメッセージ(複数)がまとめてrollback される。メッセージを一つずつ受信するにはどうすればいいのだろう。
なお、subscribe メソッドに transaction キーを指定する必要があるのは、StompConnect のソースを読まないとわからない。STOMP のプロトコル文書にも書いてない。
ActiveMQ の STOMP connector を使う
StompConnect を起動せず、conf/activemq.xml の stomp connector の定義を有効化して、ActiveMQ を起動する。
ただ、トランザクションはうまく動かない?
ActiveMQ の STOMP connector のソースを見ても、SUBSCRIBE の際にはclient ack か auto ack になってしまうような気がする。