您现在的位置是:Instagram刷粉絲, Ins買粉絲自助下單平台, Ins買贊網站可微信支付寶付款 >
04 發布訂閱消息傳遞模式(Redis發布訂閱和Stream)
Instagram刷粉絲, Ins買粉絲自助下單平台, Ins買贊網站可微信支付寶付款2024-07-13 00:05:58【】2人已围观
简介3publicSubscribeClient(Stringi){024try{025mqttClient=newMqttClient(CONNECTION_STRING);026SimpleCallb
registerSimpleHandler(simpleCallbackHandler);//注冊接收消息方法028 mqttClient
買粉絲nnect(CLIENT_ID+i, CLEAN_START, KEEP_ALIVE);029 mqttClient
subscribe(TOPICS, QOS_VALUES);//訂閱接主題030031 /**032 * 完成訂閱后,可以增加心跳,保持網絡通暢,也可以發布自己的消息033 */034035 mqttClient
publish(PUBLISH_TOPICS, "keepalive"
getBytes(), QOS_VALUES[0], true);036037 } catch (MqttException e) { 038 // TODO Auto-generated catch block039 e
printStackTrace();040 }041 }042043 /**044 * 簡單回調函數,處理client接收到的主題消息045 * @author pig046 *047 */048 class SimpleCallbackHandler implements MqttSimpleCallback{ 049050 /**051 * 當客戶機和broker意外斷開時觸發052 * 可以再此處理重新訂閱053 */054 @Override055 public void 買粉絲nnectionLost() throws Exception { 056 // TODO Auto-generated method stub057 System
out
println("客戶機和broker已經斷開");058 }059060 /**061 * 客戶端訂閱消息后,該方法負責回調接收處理消息062 */063 @Override064 public void publishArrived(String topicName, byte[] payload, int Qos, booleanretained) throws Exception { 065 // TODO Auto-generated method stub066 System
out
println("訂閱主題: " + topicName);067 System
out
println("消息數據: " + new String(payload));068 System
out
println("消息級別(0,1,2): " + Qos);069 System
out
println("是否是實時發送的消息(false=實時,true=服務器上保留的最后消息): " + retained);070 }071072 }073074 /**075 * 高級回調076 * @author pig077 *078 */079 class AdvancedCallbackHandler implements MqttSimpleCallback{ 080081 @Override082 public void 買粉絲nnectionLost() throws Exception { 083 // TODO Auto-generated method stub084085 }086087 @Override088 public void publishArrived(String arg0, byte[] arg1, int arg2,089 boolean arg3) throws Exception { 090 // TODO Auto-generated method stub091092 }093094 }095096 /**097 * @param args098 */099 public static void main(String[] args) { 100 // TODO Auto-generated method stub101 new SubscribeClient("" + i);102103 }104105 }
Redis發布訂閱和Stream
發布訂單系統是日常開發中經常會用到的功能。簡單來說,就是發布者發布消息,訂閱者就會接受到消息并進行相應的處理,如下圖所示。
Redis為我們提供了發布/訂閱的功能模塊PubSub,可以用于消息傳遞。
其中發布者publisher、訂閱者subscriber都是redis客戶端,channel則是redis服務器。
發布者publisher向channel發送消息,訂閱該channel的subscriber就會接收到消息。
發布消息publish
訂閱test1、test2的客戶端會收到消息
按照上述這種方式,如果 訂閱者subscriber想要訂閱多個channel 則需要同時指定多個channel的名稱,redis為了解決這個問題提供 psubscribe模式匹配 這種訂閱方式,可以通過通配符的方式匹配頻道。
發布消息
之前訂閱ch*的客戶端就會收到cha頻道和買粉絲頻道的消息,這樣就一次性訂閱多個頻道
redis服務端存儲了訂閱頻道/模式的客戶端列表
相當于如果客戶端訂閱一個頻道 ,那么服務端的 pubsub_channels 就會存儲一條數據, pubsub_channels 其實是一個鏈表,key對應channel,value對應客戶端列表,根據key訂閱的頻道,就可以找到訂閱該頻道的所有客戶端。
同時如果客戶端訂閱一個模式 , pubsub_patterns 也會新增一條數據,記錄當前客戶端訂閱的模式, pubsub_patterns 也有自己的數據結構,其中就包含了客戶端以及模式。
當發布者向某個頻道發布消息時,就會遍歷 pubsub_channels 找到訂閱該頻道的客戶端列表,依次向這些客戶端發送消息。
然后遍歷 pubsub_patterns 找到符合當前頻道的模式,同時找到模式對應的客戶端,然后向客戶端發送消息。
雖然Redis提供了發布/訂閱的功能,但是并不完善,導致基本沒有合適的場景能夠使用。
PubSub缺點:
直到Redis5.0出現之后,出現了Stream這種數據結構,才終于完善了Redis的消息機制 。
Stream實際上就是一個消息列表,只是他幾乎實現了消息隊列所需要的所有功能,包括:
同時需要注意的是Stream只是一個數據結構,他不會主動把消息推送給消費者,需要消費者主動來消費數據 。
每個Stream都有唯一的名稱,它就是Redis的key,首次使用 xadd 指令追加消息時自動創建。
常見操作命令如下表:
如果客戶端希望知道自身消費到第幾條數據了,那么就需要記錄一下當前消費的消息ID,下次再次消費的時候就從上次消費的消息ID開始讀取數據即可。
消費組中多了一個游標 last_delivered_id ,表示當前消費到了哪一條數據。同時所有的數據都是待處理消息( PEL ),只有消費者處理完畢之后使用 ack 指令告知redis服務器,數據才會從 PEL 中移除,確認后的消息就無法再次消費。
如果接收到的消息比較多,為了避免Stream過長,可以選擇指定Stream的最大長度,一旦到達了最大長度,就會從最早的消息開始清除,保證Stream中最新的消息。
很赞哦!(3938)
相关文章
- 06 buddy inside什么意思中文(美國地址翻譯)
- 05 廣州塔附近網紅餐廳(廣州有哪些地標性建筑?)
- 06 best 買粉絲untry songs of the 60's(初二英語閱讀短文)
- 05 廣州開興隆貿易有限公司(南京有哪些比較大型的外貿公司?)
- 05 廣州中小學生體質健康買粉絲(廣播體操的歷史發展)
- 05 廣州恒新汽配貿易有限公司(廣東番禺信達電子貿易有限公司)
- 05 廣州健杏貿易有限公司(急急急!急需在廣東中山的所以的大眾捷達的4S店的地址!)
- 05 廣州勁旺食品貿易有限公司(公司起名參考大全)
- 05 廣東省紡織服裝出口貿易現狀的研究方向(紡織行業發展趨勢)
- 05 廣東藥科大學國際經濟與貿易(中國藥科大學國際經濟與貿易就業前景)
Instagram刷粉絲, Ins買粉絲自助下單平台, Ins買贊網站可微信支付寶付款的名片
职业:程序员,设计师
现居:广西来宾金秀瑶族自治县
工作室:小组
Email:[email protected]
热门文章
站长推荐
05 廣東省貿易協會有電話嗎(會議邀請函)
06 cars cars on youtube(《Santa Claus is Coming to Town》 的歌詞)
05 廣東民間貿易有限公司柳州(關于柳州的歷史故事)
05 廣州市發霸貿易有限公司(車載充氣泵十大品牌)
06 best artists on youtube(給你50分,關于日本做的一個Emma的視頻!!)
05 廣州千豆匯貿易有限公司銷售電話(電子商務畢業生如何尋找工作?)
05 廣東外語外貿大學雷恩班(廣東外語外貿大學acca創新班好還是國際組織創新班好)
06 買粉絲e inside now是什么意思(wap的歌詞大意是什么)