001package org.opengion.plugin.daemon;
002
003import java.util.Date;
004
005import javax.jms.QueueSession;
006
007import org.hsqldb.lib.StringUtil;
008import org.opengion.fukurou.queue.QueueInfo;
009import org.opengion.fukurou.queue.QueueSend;
010import org.opengion.fukurou.queue.QueueSendFactory;
011import org.opengion.fukurou.util.HybsTimerTask;
012import org.opengion.hayabusa.common.HybsSystem;
013import org.opengion.hayabusa.queue.DBAccessQueue;
014
015/**
016 * メッセージキュー送信
017 * メッセージキュー送信テーブルを監視して、
018 * 送信処理を行います。
019 * 
020 * @og.group メッセージ連携
021 *
022 * @og.rev 5.10.15.0 (2019/08/30) 新規作成
023 * @og.rev 5.10.15.2 (2019/09/20) DB登録の実装をhayabusa.queueに移動
024 * 
025 * @version 5.0
026 * @author oota
027 * @since JDK7
028 *
029 */
030public class Daemon_QueueSend extends HybsTimerTask {
031        private int loopCnt = 0;
032        private static final int LOOP_COUNTER = 24;
033        private QueueSend queueSend;
034
035        private String SYSTEM_ID = HybsSystem.sys("SYSTEM_ID");
036        private static final String CLOUD_SQS_ACCESS_KEY = HybsSystem.sys( "CLOUD_SQS_ACCESS_KEY" );
037        private static final String CLOUD_SQS_SECRET_KEY = HybsSystem.sys( "CLOUD_SQS_SECRET_KEY" );
038        private final String USER_ID = "CYYYYY";
039        private final String PG_ID = "DMN_QueSnd";
040        private final String DMN_NAME = "QueueReceiveDMN";
041        private final DBAccessQueue dbAccessQueue;
042        
043        /**
044         * コンストラクター
045         * 初期処理を行います。
046         */
047        public Daemon_QueueSend(){
048                dbAccessQueue = new DBAccessQueue(SYSTEM_ID, USER_ID, PG_ID, DMN_NAME);
049        }
050        /**
051         * 開始処理
052         * タイマータスクのデーモン処理の開始ポイントです。
053         */
054        @Override
055        protected void startDaemon() {
056                if (loopCnt % LOOP_COUNTER == 0) {
057                        loopCnt = 1;
058                        System.out.println();
059                        System.out.println(toString() + " " + new Date() + "");
060                } else {
061                        // メッセージキュー送信管理テーブルから、送信対象のレコードを取得
062                        String[][] vals = dbAccessQueue.selectGE65();
063                        
064                        // 取得データ分の繰り返し処理を実行する
065                        for(int i = 0; i  < vals.length; i++) {
066                                String[] record = vals[i];
067                                
068                                // GE65から取得した値を変数に格納
069                                String ykno =  record[0];
070                                String queueId = record[1];
071                                String message = record[2];
072                                String dedupliId = record[3];
073                                String queSyu = record[4];
074                                String jmsUrl = record[5];
075                                
076                                String queueType = queSyu.toUpperCase();
077                                queueSend = QueueSendFactory.newQueueSend(queueType);
078        
079                                // 接続処理
080                                queueSend.connect(jmsUrl, CLOUD_SQS_ACCESS_KEY, CLOUD_SQS_SECRET_KEY);
081                                
082                                // メッセージ送信管理テーブルから取得したデータを送信実装予定
083                                QueueInfo queueInfo = new QueueInfo();
084                                
085                                // 応答確認種別
086                                if("MQ".equals(queueType)){
087                                        // MQメッセージサーバ指定時
088                                        queueInfo.setMqTransacted(false);
089                                        queueInfo.setMqAcknowledgeMode(QueueSession.AUTO_ACKNOWLEDGE);
090                                        // キュー名
091                                        queueInfo.setMqQueueName(queueId);
092                                }else if("SQS".equals(queueType)){
093                                        // SQSメッセージサーバ指定時
094                                        // グループID
095                                        queueInfo.setSqsFifoGroupId(queueId);
096                                        if(!StringUtil.isEmpty(dedupliId)) {
097                                                // 重複排除ID
098                                                // コンテンツに基づく重複排除が有効時は、未設定でも可(メッセージによる重複判定が行われる)
099                                                queueInfo.setSqsFifoDedupliId(dedupliId);
100                                        }
101                                }
102                        
103                                // メッセージ
104                                queueInfo.setMessage(message);
105                        
106                                // 完了フラグを処理中:2に更新
107                                dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_PROCESS);
108                                
109                                // メッセージ送信処理
110                                try{
111                                        queueSend.sendMessage(queueInfo);
112                                        
113                                        // 完了フラグを完了:3に更新
114                                        dbAccessQueue.updateGE66(ykno, DBAccessQueue.FGKAN_END);
115                                        
116                                }catch(Exception e) {
117                                        // 完了フラグをエラー:4に更新して、エラー情報を登録
118                                        dbAccessQueue.updateGE66Error(ykno, e.getMessage());
119                                } 
120                        }
121                        
122                        // クローズ処理
123                        queueSend.close();
124                        
125                        loopCnt++;
126                }
127        }
128}