elasticsearch入門系列">elasticsearch入門系列
1073
2022-05-29
文章目錄
消息中間件
消息中間件適用與進行網絡通訊的系統(tǒng),建立網絡通訊的通道,進行數(shù)據(jù)和文件的傳送
產品:ActiveMQ、ZeroMQ、RabbitMQ、IBM webSphere MQ…
交易中間件
交易中間件管理分布與不同操作系統(tǒng)的數(shù)據(jù),實現(xiàn)數(shù)據(jù)一致性,保證系統(tǒng)的負載均衡
產品:IBM CICS,Bea tuxedo…
對象中間件
保證不同廠家的軟件之間的交互訪問
產品:IBM componentbroker, iona orbix,borland visibroker…
應用服務器
用來構造internet/intranet應用和其它分布式構件應用
產品:IBM Websphere,Bea weblogic…
安全中間件
以公鑰基礎設施(pki)為核心的、建立在一系列相關國際安全標準之上的一個開放式應用開發(fā)平臺
產品:entrust entrust…
應用集成服務器
把工作流和應用開發(fā)技術如消息及分布式構件結合在一起,使處理能方便自動地和構件、script 應用、工作流行為結合在一起,同時集成文檔和電子郵件
產品:lss flowman、ibm flowmark、vitria businessagiliti
##ESB##
ESB,即企業(yè)服務總線
松散耦合一直是企業(yè)軟件開發(fā)中的一個很重要的內容,而面向服務的SOA編程在隨著ESB的應用得到了進一步的發(fā)展,ESB就像服務提供者和服務使用者之間的中間層
##JMS##
JMS,即Java Message Service
ESB僅僅是作為一個中間層,所以應用程序之間的消息通訊必須借助JMS,即通過JMS從服務使用者接收消息,并將其轉發(fā)到相應的服務提供者。
而且,JMS 還定義了可發(fā)送的若干不同類型的消息。例如,Text 消息包含消息的字符串表示形式;Object 消息包含序列化的 Java 對象;Map 消息包含鍵/值對的映射,等等。
附錄:
MQ DEMO:
package com.wms.batchMsg; import java.io.File; import java.io.IOException; import java.sql.Timestamp; import java.text.ParseException; import java.util.Date; import org.apache.log4j.Logger; import com.ibm.mq.MQEnvironment; import com.ibm.mq.MQException; import com.ibm.mq.MQGetMessageOptions; import com.ibm.mq.MQMessage; import com.ibm.mq.MQPutMessageOptions; import com.ibm.mq.MQQueue; import com.ibm.mq.MQQueueManager; import com.ibm.mq.constants.MQConstants; public class MQUtil { private static String qmName; private static MQQueueManager qMgr; private static Logger logger = Logger.getLogger(MQUtil.class); static{ try{ MQEnvironment.hostname=ConfigManager.getValue("MQ_MQHost"); MQEnvironment.channel=ConfigManager.getValue("MQ_Server_Channel"); MQEnvironment.CCSID=Integer.parseInt(ConfigManager.getValue("MQ_CCSID")); MQEnvironment.port=Integer.parseInt(ConfigManager.getValue("MQ_port")); //MQEnvironment.userID = ConfigManager.getValue("MQ_UserId"); //MQEnvironment.password = ConfigManager.getValue("MQ_pass"); qmName = ConfigManager.getValue("MQ_QMname"); MQEnvironment.properties.put(MQConstants.TRANSPORT_PROPERTY,MQConstants.TRANSPORT_MQSERIES_CLIENT); qMgr = new MQQueueManager(qmName); }catch(MQException e){ e.printStackTrace(); logger.info("qManager failed: Completion code " + e.completionCode + " Reason Code is " + e.reasonCode); } } public static MQQueue getSendQueue(String queueName) { MQQueue sQueue; int openSendOptions = MQConstants.MQOO_OUTPUT | MQConstants.MQOO_FAIL_IF_QUIESCING | MQConstants.MQOO_SET_IDENTITY_CONTEXT; try { sQueue = qMgr.accessQueue(queueName, openSendOptions); } catch (MQException e) { e.printStackTrace(); return null; } return sQueue; } public static MQQueue getReceiveQueue(String revQueueName){ MQQueue rQueue ; int openRcvOptions = MQConstants.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_FAIL_IF_QUIESCING; try{ rQueue = qMgr.accessQueue(revQueueName, openRcvOptions); }catch(MQException e){ e.printStackTrace(); return null; } return rQueue; } public static void sendMsg(MQMsgEntity entity,String queueName) { MQQueue sendQ = null; try { MQMessage qMsg = new MQMessage(); byte[] qByte = entity.getMsgStr().getBytes("UTF-8"); // String message = entity.getMsgStr(); qMsg.messageId = MQConstants.MQMI_NONE; //TODO send and receive if(entity.getCorrelId()!=null){ qMsg.correlationId = entity.getCorrelId(); } qMsg.format = MQConstants.MQFMT_STRING; qMsg.write(qByte); MQPutMessageOptions pmo = new MQPutMessageOptions(); pmo.options = pmo.options + MQConstants.MQPMO_NEW_MSG_ID; pmo.options = pmo.options + MQConstants.MQPMO_NO_SYNCPOINT; pmo.options = pmo.options + MQConstants.MQPMO_SET_IDENTITY_CONTEXT; sendQ = getSendQueue(queueName); sendQ.put(qMsg, pmo); qMgr.commit(); //logger.info("The send message is: " +new String(qByte,"UTF-8")); } catch (MQException e) { logger.info("A WebSphere MQ error occurred : Completion code " + e.completionCode + " Reason Code is " + e.reasonCode); } catch (java.io.IOException e) { logger.info("An error occurred whilst to the message buffer " + e); }finally{ try{ if(sendQ!=null){ sendQ.close(); } }catch(MQException e){ // TODO Auto-generated catch block e.printStackTrace(); logger.info("Error for MQ connection:"+e.getMessage()); } } } // public static void messageHandlerByQueueName(MQMsgEntity entity,String queueName) { // try { // if(queueName.equalsIgnoreCase("sap_OrdersQueue")){ // ECOrder order = new ECOrder(); // order.CallOrderCURFC(entity, "ZECI001"); // }else if(queueName.equalsIgnoreCase("sap_OrderPendReqQueue")){ // ECOrderPending orderPending = new ECOrderPending(); // orderPending.CallOrderPendRFC(entity, "ZECI005"); // }else if(queueName.equalsIgnoreCase("sap_OrderPendCancelQueue")){ // ECOrderPending orderPending = new ECOrderPending(); // orderPending.CallCancelOrderPendRFC(entity, "ZECI006"); // }else if(queueName.equalsIgnoreCase("sap_ECReturnsQueue")){ // ECOrder order = new ECOrder(); // order.callOrderCancelRFC(entity, "ZECI001"); // }else if(queueName.equalsIgnoreCase("sap_downpaymentQueue")){ // ECDownPayment downPayment = new ECDownPayment(); // downPayment.callDownPaymentRFC(entity, "ZECI007"); // }else if(queueName.equalsIgnoreCase("sap_360LBPQueue")){ // EC360LBP lbp = new EC360LBP(); // lbp.generateHtmlFromQueue(entity.getMsgStr()); // } // } catch (Exception e) { // e.printStackTrace(); // logger.error(e.getMessage()); // } // // } public MQQueueManager generateNewMQQM(){ MQQueueManager qMgr = null; try{ MQEnvironment.hostname=ConfigManager.getValue("MQ_MQHost"); MQEnvironment.channel=ConfigManager.getValue("MQ_Server_Channel"); MQEnvironment.CCSID=Integer.parseInt(ConfigManager.getValue("MQ_CCSID")); MQEnvironment.port=Integer.parseInt(ConfigManager.getValue("MQ_port")); String qmName = ConfigManager.getValue("MQ_QMname"); MQEnvironment.properties.put(MQConstants.TRANSPORT_PROPERTY,MQConstants.TRANSPORT_MQSERIES_CLIENT); qMgr = new MQQueueManager(qmName); }catch(MQException e){ e.printStackTrace(); logger.info("qManager failed: Completion code " + e.completionCode + " Reason Code is " + e.reasonCode); } return qMgr; } public void MultiThreadGetMqMessage(MQQueueManager qMgr,String queueName){ MQQueue revQ = null; String mqString = null; MQMsgEntity entity = new MQMsgEntity(); int openRcvOptions = MQConstants.MQOO_INPUT_AS_Q_DEF | MQConstants.MQOO_FAIL_IF_QUIESCING; try { MQMessage retrievedMessage = new MQMessage(); MQGetMessageOptions gmo = new MQGetMessageOptions(); gmo.options += MQConstants.MQPMO_NO_SYNCPOINT;// gmo.options = gmo.options + MQConstants.MQGMO_WAIT;// gmo.options = gmo.options + MQConstants.MQGMO_FAIL_IF_QUIESCING;// gmo.waitInterval = MQConstants.MQWI_UNLIMITED;// gmo.matchOptions = MQConstants.MQMO_MATCH_MSG_ID; retrievedMessage.format=MQConstants.MQFMT_STRING; // MQC.MQWI_UNLIMITED; revQ = qMgr.accessQueue(queueName, openRcvOptions); revQ.get(retrievedMessage, gmo); qMgr.commit(); int length = retrievedMessage.getDataLength(); if(length >0){ long startTime = System.currentTimeMillis(); byte[] msg = new byte[length]; retrievedMessage.readFully(msg); mqString = new String(msg, "UTF-8"); if(queueName.equalsIgnoreCase("sap_360LBPQueue")){ mqString = mqString.replace("'", "\""); } long timeuse = System.currentTimeMillis() - startTime; Date currentDate = new Date(); Timestamp receiveTimestamp = new Timestamp(currentDate.getTime()); logger.info("=========mqString from "+queueName+" :"+mqString); DBUtil.insertIntoMQLog("Receive",queueName, mqString, timeuse, "success", "", null, receiveTimestamp); entity.setMsgStr(mqString); //messageHandlerByQueueName(entity,queueName); }else{ logger.info("Error MQ string Sent!"); } } catch (MQException e) { e.printStackTrace(); if (e.reasonCode != 2033) { logger.info(e.getMessage()); logger.info("Completion code " + e.completionCode + " Reason Code is " + e.reasonCode); } } catch (IOException e) { logger.info("IO error:" + e.getMessage()); } finally{ try{ if(revQ!=null){ revQ.close(); } }catch(MQException mqEx){ int rc = mqEx.reasonCode; if (rc != MQException.MQRC_NO_MSG_AVAILABLE) { logger.info(" PUT Message failed with rc = " + rc); } } } } public static String getMQMessage(String queueName) throws ParseException { MQQueue revQ = null; String mqString = null; MQMsgEntity entity = new MQMsgEntity(); try { MQMessage retrievedMessage = new MQMessage(); MQGetMessageOptions gmo = new MQGetMessageOptions(); gmo.options += MQConstants.MQPMO_NO_SYNCPOINT;// gmo.options = gmo.options + MQConstants.MQGMO_WAIT;// gmo.options = gmo.options + MQConstants.MQGMO_FAIL_IF_QUIESCING;// gmo.waitInterval = MQConstants.MQWI_UNLIMITED;// gmo.matchOptions = MQConstants.MQMO_MATCH_MSG_ID; retrievedMessage.format=MQConstants.MQFMT_STRING; // MQC.MQWI_UNLIMITED; revQ = getReceiveQueue(queueName); revQ.get(retrievedMessage, gmo); qMgr.commit(); int length = retrievedMessage.getDataLength(); if(length >0){ byte[] msg = new byte[length]; retrievedMessage.readFully(msg); mqString = new String(msg, "UTF-8"); logger.info("=========getMQMessage===mqString from "+queueName+" :"+mqString); entity.setMsgStr(mqString); //messageHandlerByQueueName(entity,queueName); }else{ logger.info("Error MQ string Sent!"); } } catch (MQException e) { e.printStackTrace(); if (e.reasonCode != 2033) { e.printStackTrace(); logger.info("Completion code " + e.completionCode + " Reason Code is " + e.reasonCode); } } catch (java.io.IOException e) { System.out.println("error" + e.getMessage()); }finally{ try{ if(revQ!=null){ revQ.close(); } }catch(MQException mqEx){ int rc = mqEx.reasonCode; if (rc != MQException.MQRC_NO_MSG_AVAILABLE) { System.out.println(" PUT Message failed with rc = " + rc); } } } return mqString; } public void revAndSend(MQMsgEntity entity,String queueName){ // sendMsg(entity,queueName); } public void subscribeMessage() throws ParseException{ while(true){ logger.info("waiting to get message....."); getMQMessage("sap_OrdersQueue"); } } public void subscribeOrderPendMessage() throws ParseException{ while(true){ logger.info("waiting to get message....."); getMQMessage("sap_ECReturnsQueue"); } } public static void main(String[] args) throws IOException, ParseException { MQMsgEntity entity = new MQMsgEntity(); String sendMsg = XMLBeanUtil.readFileToString(new File("D://batchXML0108.txt")); int intPktCtlNbr = 1; String StrPkt = null; String newPktCtlNbr =null; for (int i = 0; i < 20000; i++) { newPktCtlNbr = String.format("%09d", intPktCtlNbr+i); StrPkt="
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
網絡
版權聲明:本文內容由網絡用戶投稿,版權歸原作者所有,本站不擁有其著作權,亦不承擔相應法律責任。如果您發(fā)現(xiàn)本站中有涉嫌抄襲或描述失實的內容,請聯(lián)系我們jiasou666@gmail.com 處理,核實后本網站將在24小時內刪除侵權內容。