内容は2012年当時のものです
Date: 2012.4.6
Update: 2020.8.29 (codehausへのリンクを削除)
JMS の実装、ActiveMQ を perl から使うことについて。特にトランザクションまわり。
まずはJavaで。
Queue からメッセージを読みこみ、処理をして、失敗したら rollback したい、という場合にはJMS使ってJavaから使うと、こんな感じ?
- 参考にしたページ TECHSCORE JMS / 5.メッセージ配信の確認
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL); QueueConnection connection = factory.createQueueConnection(); // 第一引数が true で transacted な session になる。第二引数は無視される QueueSession session = connection.createQueueSession(true,0); Queue queue = session.createQueue("TestQueue"); QueueReceiver receiver = session.createReceiver(queue); connection.start(); while (true) { TextMessage msg = (TextMessage) receiver.receive(); // なにか処理する if ( processMessage(msg) ) { // 成功したら session.commit(); } else { // 失敗したら session.rollback(); } if ( some condition ) { // なんらかの条件で break; // ループを抜ける } } receiver.close(); session.close(); connection.close();
こうすることで、メッセージ一つずつ受信しては処理し、成功や失敗によってトランザクションを commit、rollback することができる。
StompConnect と perl で
perl には Net::Stomp を使い、STOMP の受け口には ActiveMQ native なものではなく、StompConnect を使う(StompConnectの開発サイトは2015年に閉鎖されました)。
ActiveMQ の設定
conf/activemq.xml で
<transportConnector name="stomp" uri="stomp://localhost:61613"/>
を
<!-- transportConnector name="stomp" uri="stomp://localhost:61613"/ -->
としてコメントアウト。
StompConnect の起動
ActiveMQ (5.0-SNAPSHOT) と、StompConnect (1.0) が
./apache-activemq-5.0-SNAPSHOT/ ./stompconnect-1.0/
として展開されていたとすると、
java -Djava.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory \ -classpath .:stompconnect-1.0/stompconnect-1.0.jar:stompconnect-1.0/lib/commons-logging-1.1.jar:stompconnect-1.0/lib/geronimo-jms_1.1_spec-1.0.jar:apache-activemq-5.0-SNAPSHOT/apache-activemq-5.0-SNAPSHOT.jar:stompconnect-1.0/lib/optional/log4j-1.2.12.jar \ org.codehaus.stomp.jms.Main tcp://localhost:61613
とすると、StompConnect が起動できる。
Net::Stompを使う
Net::Stomp には、トランザクション関連のメソッドが定義されてないので、適当に作ってみる
## 適当な unique ID を生成する関数 our $TxCount=0; sub unique_tx_id { return sprintf( "txid-%x-%x-%x", $$, scalar(time), $TxCount++ ); } ## BEGIN sub tx_begin { my $stomp = shift; my $tx = unique_tx_id(); my $frame = Net::Stomp::Frame->new( { command => 'BEGIN', headers => { transaction => $tx } } ); $stomp->send_frame($frame); return $tx; } ## COMMIT sub tx_commit { my ( $stomp, $conf ) = @_; my $tx = $conf->{transaction}; my $frame = Net::Stomp::Frame->new( { command => 'COMMIT', headers => { transaction => $tx } } ); $stomp->send_frame($frame); } ## ABORT sub tx_abort { my ( $stomp, $conf ) = @_; my $tx = $conf->{transaction}; my $frame = Net::Stomp::Frame->new( { command => 'ABORT', headers => { transaction => $tx } } ); $stomp->send_frame($frame); }
これで、
- tx_begin($stopm)
- tx_commit($stomp,{transaction=>$txid})
- tx_aborrt($stomp,{transaction=>$txid})
の3つが定義された ($stomp は Net::Stomp のインスタンス)。
my $dest = '/queue/TestQueue'; my $st = Net::Stomp->new( { hostname => 'localhost', port => 61613 } ); $st->connect( { login => 'dummy', passcode => 'dummy' } ); my $tx = tx_begin($st); $st->subscribe( { destination => $dest, transaction => $tx, } ); while ($Run) { ## Receive my $msg_frame = $st->receive_frame; ## Commit or Rollback if ( do_some_work($msg_frame) ) { tx_commit( $st, { transaction => $tx } ); } else { tx_abort( $st, { transaction => $tx } ); } } $st->unsubscribe( { destination => $dest } ); $st->disconnect;
これで、処理が失敗した場合に、それまでのメッセージ(複数)がまとめてrollback される。メッセージを一つずつ受信するにはどうすればいいのだろう。
なお、subscribe メソッドに transaction キーを指定する必要があるのは、StompConnect のソースを読まないとわからない。STOMP のプロトコル文書にも書いてない。
ActiveMQ の STOMP connector を使う
StompConnect を起動せず、conf/activemq.xml の stomp connector の定義を有効化して、ActiveMQ を起動する。
ただ、トランザクションはうまく動かない?
ActiveMQ の STOMP connector のソースを見ても、SUBSCRIBE の際にはclient ack か auto ack になってしまうような気がする。