您现在的位置是:Instagram刷粉絲, Ins買粉絲自助下單平台, Ins買贊網站可微信支付寶付款 >
04 發布訂閱消息傳遞模式(Redis發布訂閱和Stream)
Instagram刷粉絲, Ins買粉絲自助下單平台, Ins買贊網站可微信支付寶付款2024-07-17 07:52:11【】3人已围观
简介3publicSubscribeClient(Stringi){024try{025mqttClient=newMqttClient(CONNECTION_STRING);026SimpleCallb
registerSimpleHandler(simpleCallbackHandler);//注冊接收消息方法028 mqttClient
買粉絲nnect(CLIENT_ID+i, CLEAN_START, KEEP_ALIVE);029 mqttClient
subscribe(TOPICS, QOS_VALUES);//訂閱接主題030031 /**032 * 完成訂閱后,可以增加心跳,保持網絡通暢,也可以發布自己的消息033 */034035 mqttClient
publish(PUBLISH_TOPICS, "keepalive"
getBytes(), QOS_VALUES[0], true);036037 } catch (MqttException e) { 038 // TODO Auto-generated catch block039 e
printStackTrace();040 }041 }042043 /**044 * 簡單回調函數,處理client接收到的主題消息045 * @author pig046 *047 */048 class SimpleCallbackHandler implements MqttSimpleCallback{ 049050 /**051 * 當客戶機和broker意外斷開時觸發052 * 可以再此處理重新訂閱053 */054 @Override055 public void 買粉絲nnectionLost() throws Exception { 056 // TODO Auto-generated method stub057 System
out
println("客戶機和broker已經斷開");058 }059060 /**061 * 客戶端訂閱消息后,該方法負責回調接收處理消息062 */063 @Override064 public void publishArrived(String topicName, byte[] payload, int Qos, booleanretained) throws Exception { 065 // TODO Auto-generated method stub066 System
out
println("訂閱主題: " + topicName);067 System
out
println("消息數據: " + new String(payload));068 System
out
println("消息級別(0,1,2): " + Qos);069 System
out
println("是否是實時發送的消息(false=實時,true=服務器上保留的最后消息): " + retained);070 }071072 }073074 /**075 * 高級回調076 * @author pig077 *078 */079 class AdvancedCallbackHandler implements MqttSimpleCallback{ 080081 @Override082 public void 買粉絲nnectionLost() throws Exception { 083 // TODO Auto-generated method stub084085 }086087 @Override088 public void publishArrived(String arg0, byte[] arg1, int arg2,089 boolean arg3) throws Exception { 090 // TODO Auto-generated method stub091092 }093094 }095096 /**097 * @param args098 */099 public static void main(String[] args) { 100 // TODO Auto-generated method stub101 new SubscribeClient("" + i);102103 }104105 }
Redis發布訂閱和Stream
發布訂單系統是日常開發中經常會用到的功能。簡單來說,就是發布者發布消息,訂閱者就會接受到消息并進行相應的處理,如下圖所示。
Redis為我們提供了發布/訂閱的功能模塊PubSub,可以用于消息傳遞。
其中發布者publisher、訂閱者subscriber都是redis客戶端,channel則是redis服務器。
發布者publisher向channel發送消息,訂閱該channel的subscriber就會接收到消息。
發布消息publish
訂閱test1、test2的客戶端會收到消息
按照上述這種方式,如果 訂閱者subscriber想要訂閱多個channel 則需要同時指定多個channel的名稱,redis為了解決這個問題提供 psubscribe模式匹配 這種訂閱方式,可以通過通配符的方式匹配頻道。
發布消息
之前訂閱ch*的客戶端就會收到cha頻道和買粉絲頻道的消息,這樣就一次性訂閱多個頻道
redis服務端存儲了訂閱頻道/模式的客戶端列表
相當于如果客戶端訂閱一個頻道 ,那么服務端的 pubsub_channels 就會存儲一條數據, pubsub_channels 其實是一個鏈表,key對應channel,value對應客戶端列表,根據key訂閱的頻道,就可以找到訂閱該頻道的所有客戶端。
同時如果客戶端訂閱一個模式 , pubsub_patterns 也會新增一條數據,記錄當前客戶端訂閱的模式, pubsub_patterns 也有自己的數據結構,其中就包含了客戶端以及模式。
當發布者向某個頻道發布消息時,就會遍歷 pubsub_channels 找到訂閱該頻道的客戶端列表,依次向這些客戶端發送消息。
然后遍歷 pubsub_patterns 找到符合當前頻道的模式,同時找到模式對應的客戶端,然后向客戶端發送消息。
雖然Redis提供了發布/訂閱的功能,但是并不完善,導致基本沒有合適的場景能夠使用。
PubSub缺點:
直到Redis5.0出現之后,出現了Stream這種數據結構,才終于完善了Redis的消息機制 。
Stream實際上就是一個消息列表,只是他幾乎實現了消息隊列所需要的所有功能,包括:
同時需要注意的是Stream只是一個數據結構,他不會主動把消息推送給消費者,需要消費者主動來消費數據 。
每個Stream都有唯一的名稱,它就是Redis的key,首次使用 xadd 指令追加消息時自動創建。
常見操作命令如下表:
如果客戶端希望知道自身消費到第幾條數據了,那么就需要記錄一下當前消費的消息ID,下次再次消費的時候就從上次消費的消息ID開始讀取數據即可。
消費組中多了一個游標 last_delivered_id ,表示當前消費到了哪一條數據。同時所有的數據都是待處理消息( PEL ),只有消費者處理完畢之后使用 ack 指令告知redis服務器,數據才會從 PEL 中移除,確認后的消息就無法再次消費。
如果接收到的消息比較多,為了避免Stream過長,可以選擇指定Stream的最大長度,一旦到達了最大長度,就會從最早的消息開始清除,保證Stream中最新的消息。
很赞哦!(923)
相关文章
- 03 快手海外業務占比(字節跳動市值多少億人民幣字節跳動資產估值)
- 03 快手網紅捐款名單(快手網紅大全排行榜(快手網紅大全?))
- 03 恩施周邊網紅打卡的地方(湖北非遺特色村鎮和街區有哪些)
- 03 怎么才能去海外工作(怎么能到國外工作)
- 01 海外各國華人數量(現在各國有多少華人華僑)
- 03 快手海外業務占比(快手發布二季度及中期業績:Q2收入191億元 同比勁增48.8%)
- 01 海外博士后申請需要學位證書嗎(博士后是學位嗎,學士是什么?)
- 03 意大利對外貿易協會英文(對外經濟貿易大學意大利語能進外交部嗎)
- 03 感謝粉絲來買衣服的文案(買了新衣服發朋友圈的文案收藏(40句))
- 01 海外號碼購買(在國內如何申請美國的手機號碼)
热门文章
站长推荐
03 快手第一個播放量破億的(快手是干什么用的(快手與抖音誰更勝一籌))
03 成功職場英語上海外語教育出版社(急求一篇關于招投標的英文文章,最好要有中文對照翻譯的!中文字數在3000就夠了。。。)
01 海外華人常自豪稱自己為炎黃子孫是因為(為什么中華民族常自豪稱自己為“炎黃子孫”)
01 海外華人數量排名(全球華人最多的國家,超過了一千萬,不是馬來西亞也不是新加坡)
01 海外商品發生退貨要承擔稅費嗎(購買進口商品退貨稅費誰承擔)
01 海外商品銷售占比怎么才能符合要求(淘寶全球購海內外商品占比100%,我店不達標怎么辦)
03 意大利對外貿易委員會成都辦公室(對外經濟貿易大學意大利語能進外交部嗎)
01 海外商城app搭建(如何在海外推廣APP)