ActiveMQ

内容は2012年当時のものです

Date: 2012.4.6

Update: 2020.8.29 (codehausへのリンクを削除)

JMS の実装、ActiveMQ を perl から使うことについて。特にトランザクションまわり。

まずはJavaで。

Queue からメッセージを読みこみ、処理をして、失敗したら rollback したい、という場合にはJMS使ってJavaから使うと、こんな感じ?

ActiveMQConnectionFactory factory =
  new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
QueueConnection connection = factory.createQueueConnection();
// 第一引数が true で transacted な session になる。第二引数は無視される
QueueSession session = connection.createQueueSession(true,0);
Queue queue = session.createQueue("TestQueue");
QueueReceiver receiver = session.createReceiver(queue);

connection.start();

while (true) {
  TextMessage msg = (TextMessage) receiver.receive();

  // なにか処理する
  if ( processMessage(msg) ) {
    // 成功したら
    session.commit();
  }
  else {
    // 失敗したら
    session.rollback();
  }
  if ( some condition ) { // なんらかの条件で
    break;                // ループを抜ける
  }
}
receiver.close();
session.close();
connection.close();

こうすることで、メッセージ一つずつ受信しては処理し、成功や失敗によってトランザクションを commit、rollback することができる。

StompConnect と perl で

perl には Net::Stomp を使い、STOMP の受け口には ActiveMQ native なものではなく、StompConnect を使う(StompConnectの開発サイトは2015年に閉鎖されました)

ActiveMQ の設定

conf/activemq.xml で

<transportConnector name="stomp"   uri="stomp://localhost:61613"/>

<!-- transportConnector name="stomp"   uri="stomp://localhost:61613"/ -->

としてコメントアウト。

StompConnect の起動

ActiveMQ (5.0-SNAPSHOT) と、StompConnect (1.0) が

./apache-activemq-5.0-SNAPSHOT/
./stompconnect-1.0/

として展開されていたとすると、

java -Djava.naming.factory.initial=org.apache.activemq.jndi.ActiveMQInitialContextFactory \
 -classpath .:stompconnect-1.0/stompconnect-1.0.jar:stompconnect-1.0/lib/commons-logging-1.1.jar:stompconnect-1.0/lib/geronimo-jms_1.1_spec-1.0.jar:apache-activemq-5.0-SNAPSHOT/apache-activemq-5.0-SNAPSHOT.jar:stompconnect-1.0/lib/optional/log4j-1.2.12.jar \
 org.codehaus.stomp.jms.Main tcp://localhost:61613

とすると、StompConnect が起動できる。

Net::Stompを使う

Net::Stomp には、トランザクション関連のメソッドが定義されてないので、適当に作ってみる

## 適当な unique ID を生成する関数
our $TxCount=0;
sub unique_tx_id {
   return
       sprintf( "txid-%x-%x-%x", $$, scalar(time), $TxCount++ );
}
## BEGIN
sub tx_begin {
 my $stomp = shift;
 my $tx = unique_tx_id();
 my $frame = Net::Stomp::Frame->new(
   { command => 'BEGIN', headers => { transaction => $tx } } );
 $stomp->send_frame($frame);
 return $tx;
}
## COMMIT
sub tx_commit {
 my ( $stomp, $conf ) = @_;
 my $tx    = $conf->{transaction};
 my $frame = Net::Stomp::Frame->new(
   { command => 'COMMIT', headers => { transaction => $tx } } );
 $stomp->send_frame($frame);
}
## ABORT
sub tx_abort {
 my ( $stomp, $conf ) = @_;
 my $tx    = $conf->{transaction};
 my $frame = Net::Stomp::Frame->new(
   { command => 'ABORT', headers => { transaction => $tx } } );
 $stomp->send_frame($frame);
}

これで、

の3つが定義された ($stomp は Net::Stomp のインスタンス)。

my $dest = '/queue/TestQueue';
my $st = Net::Stomp->new( { hostname => 'localhost', port => 61613 } );
$st->connect( { login => 'dummy', passcode => 'dummy' } );

my $tx = tx_begin($st);

$st->subscribe(
       {   destination             => $dest,
           transaction             => $tx,
       }
 );
 while ($Run) {
   ## Receive
   my $msg_frame = $st->receive_frame;

   ## Commit or Rollback
   if ( do_some_work($msg_frame) ) {
     tx_commit( $st, { transaction => $tx } );
   }
   else {
     tx_abort( $st, { transaction => $tx } );
   }
 }
 $st->unsubscribe( { destination => $dest } );
 $st->disconnect;

これで、処理が失敗した場合に、それまでのメッセージ(複数)がまとめてrollback される。メッセージを一つずつ受信するにはどうすればいいのだろう。

なお、subscribe メソッドに transaction キーを指定する必要があるのは、StompConnect のソースを読まないとわからない。STOMP のプロトコル文書にも書いてない。

ActiveMQ の STOMP connector を使う

StompConnect を起動せず、conf/activemq.xml の stomp connector の定義を有効化して、ActiveMQ を起動する。

ただ、トランザクションはうまく動かない?

ActiveMQ の STOMP connector のソースを見ても、SUBSCRIBE の際にはclient ack か auto ack になってしまうような気がする。