001package org.opengion.plugin.daemon; 002 003import java.util.Date; 004 005import javax.jms.QueueSession; 006 007import org.hsqldb.lib.StringUtil; 008import org.opengion.fukurou.queue.QueueInfo; 009import org.opengion.fukurou.queue.QueueSend; 010import org.opengion.fukurou.queue.QueueSendFactory; 011import org.opengion.fukurou.util.HybsTimerTask; 012import org.opengion.hayabusa.common.HybsSystem; 013import org.opengion.hayabusa.queue.DBAccessQueue; 014 015/** 016 * メッセージキュー送信 017 * メッセージキュー送信テーブルを監視して、 018 * 送信処理を行います。 019 * 020 * @og.group メッセージ連携 021 * 022 * @og.rev 5.10.15.0 (2019/08/30) 新規作成 023 * @og.rev 5.10.15.2 (2019/09/20) DB登録の実装をhayabusa.queueに移動 024 * 025 * @version 5.0 026 * @author oota 027 * @since JDK7 028 * 029 */ 030public class Daemon_QueueSend extends HybsTimerTask { 031 private int loopCnt = 0; 032 private static final int LOOP_COUNTER = 24; 033 private QueueSend queueSend; 034 035 private String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID"); 036 private static final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys( "CLOUD_SQS_ACCESS_KEY" ); 037 private static final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys( "CLOUD_SQS_SECRET_KEY" ); 038 private final String USER_ID = "CYYYYY"; 039 private final String PG_ID = "DMN_QueSnd"; 040 private final String DMN_NAME = "QueueReceiveDMN"; 041 private final DBAccessQueue dbAccessQueue; 042 043 /** 044 * コンストラクター 045 * 初期処理を行います。 046 */ 047 public Daemon_QueueSend(){ 048 dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME); 049 } 050 /** 051 * 開始処理 052 * タイマータスクのデーモン処理の開始ポイントです。 053 */ 054 @Override 055 protected void startDaemon() { 056 if (loopCnt % LOOP_COUNTER == 0) { 057 loopCnt = 1; 058 System.out.println(); 059 System.out.println(toString() + " " + new Date() + ""); 060 } else { 061 // メッセージキュー送信管理テーブルから、送信対象のレコードを取得 062 String[][] vals = dbAccessQueue.selectGE65(); 063 064 // 取得データ分の繰り返し処理を実行する 065 for(int i = 0; i < vals.length; i++) { 066 String[] record = vals[i]; 067 068 // GE65から取得した値を変数に格納 069 String ykno = record[0]; 070 String queueId = record[1]; 071 String message = record[2]; 072 String dedupliId = record[3]; 073 String queSyu = record[4]; 074 String jmsUrl = record[5]; 075 076 String queueType = queSyu.toUpperCase(); 077 queueSend = QueueSendFactory.newQueueSend(queueType); 078 079 // 接続処理 080 queueSend.connect(jmsUrl, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY); 081 082 // メッセージ送信管理テーブルから取得したデータを送信実装予定 083 QueueInfo queueInfo = new QueueInfo(); 084 085 // 応答確認種別 086 if("MQ".equals(queueType)){ 087 // MQメッセージサーバ指定時 088 queueInfo.setMqTransacted(false); 089 queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE); 090 // キュー名 091 queueInfo.setMqQueueName(queueId); 092 }else if("SQS".equals(queueType)){ 093 // SQSメッセージサーバ指定時 094 // グループID 095 queueInfo.setSqsFifoGroupId(queueId); 096 if(!StringUtil.isEmpty(dedupliId)) { 097 // 重複排除ID 098 // コンテンツに基づく重複排除が有効時は、未設定でも可(メッセージによる重複判定が行われる) 099 queueInfo.setSqsFifoDedupliId(dedupliId); 100 } 101 } 102 103 // メッセージ 104 queueInfo.setMessage(message); 105 106 // 完了フラグを処理中:2に更新 107 dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_PROCESS); 108 109 // メッセージ送信処理 110 try{ 111 queueSend.sendMessage(queueInfo); 112 113 // 完了フラグを完了:3に更新 114 dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_END); 115 116 }catch(Exception e) { 117 // 完了フラグをエラー:4に更新して、エラー情報を登録 118 dbAccessQueue.updateGE66Error(ykno, e.getMessage()); 119 } 120 } 121 122 // クローズ処理 123 queueSend.close(); 124 125 loopCnt++; 126 } 127 } 128}