首页 技术 正文
技术 2022年11月6日
0 收藏 631 点赞 1,519 浏览 34042 个字

回到目录

关于持久化到Redis的消息格式,主要是说在Broker上把消息持久化的过程中,需要存储哪些类型的消息,因为我们的消息是分topic的,而每个topic又有若干个queue组成,而我们的topic和queue由于redis存储结构的原因,我们需要将它们分区对应存储一下,而不能像关系型数据库那样灵活,所以要额外设计几个数据结构来存储它们。

一 Topic字典

二 Topic对应的Queue字典

三 Queue里的消息

四 某个客户端对应某个Queue的消费进度

以上四个结构是我们要说的,它们会在推消息,拉消息,删消息时用到,下面一一介绍一下,讲的不好不对的地方,欢迎大家为大叔留言。

一 Topic字典

主要存储每个topic,它是一个set集合,redis的我集合类型之一,每个key是唯一的LindMq_Topic,值value就是我们客户端传来的具体topic的名字,这主要是在删除过期的消息时用的,主是作用是遍历所有的topic消息类型,这样我们在删除消息时,就可以把所有注册的topic都找到了,最后把过期的删除,默认消息存活周期是一天。

aaarticlea/png;base64,iVBORw0KGgoAAAANSUhEUgAAAUIAAAB0CAIAAAB/m6zPAAAEqklEQVR4nO3dTXabSBQGUJbWG9Ceek1eRYYZeC2Zqgd2E1x/lASIeuje804OwkA9OXxSBSnSdAeCm85uANhKjCE8MYbwxBjCE2MIT4whPDGG8MQYwhNjCE+MITwxhvDEGMJ7UYz/BXZyZoz/ATb7/P3r5Bi/Ziy4MDGG8MQYwhNjCG89xtOa4jY9Yy9j3N6x84DLQyXHTI7f2eQTd6p4kOd2hE4rMc5P/WIYpmn6+PiYpul2uz0X4/uup3uxw+XKzhHnffdtD/b1QIxra5Y/ut1u/WOfEuP87vQfDcbUO6nObxYTvozx6ly0J8b5QdoNNOYLO8a4OEqt1cbuHiDYRdclruU5Vztl78fEuPijJEL5NsnK/M+eQRuj1wZa3b69JTzngSvVSXpXY7xqS4zz5Ygxhl08H+PiBlFivHwwWu1TjBncPleq55t7TaqfmJd2xri2S4NJNYPrinFbvmWypjZ28XXjxmHnNNYeSmrtLXfs7C3/DbRX5p0Xm390dOjx1u/i2jFLMsmJ3jrGs9pc4NHdD2oP2sQYwjs5xp+/fymlttdpMQaOI8YQnhhDeGIM4YkxhCfGEJ4YQ3hiDOGJMYQnxhCeGEN4Ygzhfcf41V/tCOznb4wP/0ZH4Bg/YnzOhKBpzK5gKGmMT/+/lMsasyulRisxVip8ibFS4UuMlQpfYqxU+LpyjOcPoz39jih1aF05xt/3UIzV1UuMlQpfrRjPM9JkappMVpeb5Rtvqbyr5dDFNvKh887z5cbuSo1fK8/G+ZmdnPrLheWfu1TtwaXWQL7cvtmzu1Lj13qMG2vGifF8s/240+h/6fS/FaUeqngxrg1UfGpt36ztolSs2ifGnwdk+PORGJtUq3eu9UtcxWlqbe76mhgXB8onxrWpcnG9GbWKW9d/wUmpy5cYKxW+xFip8CXGSoWvNMYHfuzXU8bsCobiI/UgvNE/Ug9YJcYQXjnG0zT0t0l8vU/j7C5gFIUYhwjJ+B3Cy4R8Nr5H6BBeJkaMk3dB3//vMJk45JvBOwgT43yhuDJZgHcQI8Zfkife4nJ+Ey5PjCG8MDFOJsyrkR7wLsBBqi84DRWDqWL503yzU1uG1/EuLghPjCE8MYbwxBjCE2MI70eM/wABiTGEV4jx/LrruZ1tsb3/5CXonu2PaAN6pDFennahT8Htze/yqwj9OyQKMe46ghgzsta/jYc6BZP57bycTFzzmXA+Ma7tm4+YL9eOlhznoQk5bFSN8VDnXyNR7c1qz6g9ASs+IhSP1h6oPQpsV47xaCdfLcarm00/FbfsGTRZ3/MI0j8QbFS+Un1uT7ktMV49YM+gq6OLMSdqXeIaxy4x3v5sbFLNmAoxHvPyTP7P1NrkNvlRcUa9egdrGzSOX2tvqF8jl+RdXBCeGKfPnJ48CUeMITwxhvDEGMLzsQEQnhhDeDE+p7pT3M5hizTGU/1LVaII2jY8rTWpDpqHoG3D06rf4TRUGJK3ZxTXLDc+qU04R4xn4yn7grV8Id8Y3kSMGH/pTOxobcPRxBjCC3Olun9ePVTb8AIxXjfuvMTVuO4FF+ZdXBCeGEN4YgzhiTGEJ8YQno8NgPD+A4zNhI48n/bUAAAAAElFTkSuQmCC” alt=”” />

删除过期的消息代码如下

 var topicList = RedisClient.RedisManager.Instance.GetDatabase().SetMembers(LINDMQ_TOPICKEY);
foreach (var topic in topicList)
{
var queueList = RedisClient.RedisManager.Instance.GetDatabase().SetMembers(LINDMQKEY + topic);
foreach (var queue in queueList)
{
 var removeKey = LINDMQKEY + queue + "_" + DateTime.Now.AddDays(-).ToString("yyyyMMdd");
  RedisClient.RedisManager.Instance.GetDatabase().KeyDelete(removeKey);
}
}

二 Topic对应的Queue字典

我们知道,为了加大redis的并发量和吞吐量,我们会把大数据键值对设计成多个键,这就像是一个集群环境的sharing,就是将大数据进行分片,而我们的分片规则是采用按对象取模的方式,模数可以自己设置,比较我设置8,那说明我的队列(分片)最多可以被分为8个,这个大家可以去做测试,挺有意思的,比随机数来个直接!而这一次redis里的键就是某个topic,而值就是我们的topic加上队列索引,例如你的topic是zzl,那么队列里的键可能就是zzl0,zzl1,zzl2…

aaarticlea/png;base64,iVBORw0KGgoAAAANSUhEUgAAAU8AAABhCAIAAADZfrRXAAADk0lEQVR4nO3dS3LaQBQFUC0tG2BPWZNXkQGDDLwMjzN1Bk5RHfVHLdP68c6pWy4QQt2q+EIXEDzd7/dPOJ87o013beeUPj4+jp7Cq9F2iELbIQpthyi0HaLQdohC2yEKbYcotB2i0HaIQtshCm2HKLQdotB2iGLvtv8EBlnbvgPa/gN42vvvX2vbd0zb9xwRXpK2QxTaDlFoO0QxrO3TkuI+PeOlbW/fsfOA6aFmx5wdv3+SqwYde0zoNKbteUOKnZmm6e3tbZqm2+32vbZ/rqx0W3GG6cbhI250QOgxvu21LelNt9utf7xD2p6fzqhBhx8NOg1eyedXiw8EadsXF7c9bc8P0p5AY/XxjbZP/2ts7DkabGTkq3Tpb3b7d31424s3zbqX7zPbmP/sGbTz4P1Hg42Mf01+VvLFti96pu355S3a3tinfyNsbfO2F3e4StvTx6zFeWo7J7fra/KPq6NW8u1n4GfaXrtLzarFvLZziJFtb8v3nG2pjVd8v71x2Cl5Wi4+4tSml96xc2610VdthH34LN0yzeQ1aPsKtZUFXIK2QxTXaPv7718i8nzWts/30kEU2g5RaDtEoe0QhbZDFNoOUWg7RKHtEIW2QxTaDlFoO0Sh7RDFvO17/01aYC+Ftm/+p2iBI5TbvseqYqVzzgoupNr2w//vbppzzkrkWtF2kSjRdpEo0XaRKNF2kSgJ0fbHd0gffiIiByZE2/+dqrZL7Gi7SJR0tf2xDJ6th2cr5HS3fOdnks8qHbo4jXzofOb55cbdRa6e3uf2vACzhqQX0p9DUnsMqk0gv9y+2nN3katnRdsbW87T9sfV9sNTY/6pw/95RAbmwm2vDVR8om5frd1F5JUyuO3vG1T9fU3breRFalnxKl1xbVxbMO/T9uJA+Wq8tj4vbreMl1dNoHfgRIJH20WiRNtFokTbRaKk2vYNvwvvW845K7gQ30IJUVzmWyiBJ2k7RLHQ9mmadp/SAI9PyBRv2n8+cAatttcKcxX55K9+RvCM13xu/zKb/NfVS58RPONibZ99sr24Jd05v3y2M4LdXK/tixfyndu7QRAXa/uXdrHzjcXaQzQh2v75/4J/28nBWV2v7f2Lee/AQWr5HbhT1aPzVbraS3cnPCPYjc/SQRTaDlFoO0Sh7RCFtkMU5bb/AV6OtkMU2g5RaDtEoe0QhbZDFNoOUWg7RKHtEIW2QxTaDlFoO0TxF9RUbEnCWMRJAAAAAElFTkSuQmCC” alt=”” />

三 Queue里的消息

我们的生产者将消息发送到broker里,然后于broker将消息持久化到具体的存储介质里,当然这里我们用的是Redis,在存储在redis里时,我们的具体队列的键是有后缀的,这主要用于消息的回收,因为我们打算1天回收一次消息,所以我们的消息后缀是个日期变量,当然精确到天就可以了,它可以是这样键名LindMQ_order_Paid4_20161202,每个队列都有自己的后缀,我们在清除消息时也就有了方法了。我们的队列存储结构是比较特殊的sortedSet ,就是可排序的集合,它有权重的概念,我们刚好可以使用这个特性来记录客户端的消费进度,因为我们的权重值在一个redis键/值对里是唯一的。

aaarticlea/png;base64,” alt=”” />

下面代码选自Push入队列的代码片断,分享给大家

       //存储当前Topic
RedisClient.RedisManager.Instance.GetDatabase().SetAdd(LINDMQ_TOPICKEY, body.Topic); //要存储到哪个队列
body.QueueId = Math.Abs(body.Body.GetHashCode() % BrokerManager.CONFIG_QUEUECOUNT);
var dataKey = body.Topic + body.QueueId;
RedisClient.RedisManager.Instance.GetDatabase().SetAdd(GetRedisKey(body.Topic), dataKey); //记录偏移
var offset = RedisClient.RedisManager.Instance.GetDatabase().SortedSetLength(GetRedisDataKey(dataKey));
body.QueueOffset = offset + ; //存储消息
RedisClient.RedisManager.Instance.GetDatabase().SortedSetAdd(
GetRedisDataKey(dataKey),
Utils.SerializeMemoryHelper.SerializeToJson(body),
score: body.QueueOffset);

四 某个客户端对应某个Queue的消费进度

消费进度是一个很麻烦的问题,生产者的消息是可以被多个消费者消费的,所以不能使用.net那种简单的Queue机制,出队列后就消失了,这是不靠谱的,万一消失失败了,也会造成消息的丢失!下面我们主要看一下消费进度的存储,它是一个Hash集合,其中redis的键名是LindMQ_ConsumerOffset,而value是一个hash对象,hash里的key是当前队列名+消费者IP地址的hashcode值,hash里的value是这个消费者(客户端)的消费进度(Queue里的权重,Queue的存储结构是一个sortedSet)。

aaarticlea/png;base64,iVBORw0KGgoAAAANSUhEUgAAAWQAAABrCAIAAADPWG8WAAAHN0lEQVR4nO2dS5bkKAxFvbReWi+g9lFbyUEOalBryal70N0uJ0jiGWMw9r1Hp46DkJHA4pn4VOSyruvn5+cKABCyrIgFAAggFgAggVgAgARiAQASiAUASNxCLP4GgCp6rtO7iMVfAHCQ378+eq7TG4nF2BwApgOxAAAJxAIAJBALAJAYLxbL//z48WPZsT1lOu/JT4zZi0V8ltJbklvSZ9K/mOH6faR6Dv3xMjSTn2JE4FEtFmYxF8vA3ll4yynvPXmqbh0mO4uGtZsnk2QoRjw0pwPxRmS268OPY8Eo6sSiupgPiMWyLD9//ozlYCKx8JZT0MnNCUbk7TUahoP+nBeLYyeuJ8RizRQh3pJ4WSpikfdgbhYS56vFohjFSyk4y5wrZfjmiMw+lUB5LOV06MmZlyHJw7gS/qul1ReL1NUSiySSeaKXzYa+szBDJ08lVb4/yP+tCGq2JyPVUyrm6XkmseIRmUPIG/U8vT6hJ2fe4DQXsvnwz8F6ZGcRrHlPIJS8z4hFMLa8sY9YeA568rlYmJMvXmxvCMF1zFuK0aE/5z8NOVQkqlgUVWBSsdiSvFos4meLYqEk1kQsxHPj06EPM4mFrin5fWlPIBYV66FiERbr/migk42BWIhiFM9b8fS66NCZtp+GFMvJEIttYSdflzBbTLwvaChikfdmtud9Bs77UzxPZYpzTy/JJKV8OOfzjPv0olQ0BrMkTh1cRLVYeHUbN9o7i84M/wYnRQ8zMv4bnP0ZLhYb5n0e4J4gFgAg8VKx+P3rA8Owo9Zznd5CLADg/iAWACCBWACABGIBABKIBQBIIBYAIIFYAIAEYgEAEogFAEggFgAggVgAgARiAQAShlj0/qPxADADtlhc/rfiAWA2XLG4fE9znHtmBUOgGPoTicXw/6u/t3tmhQ0ximGIIRbYfEYxDDHEApvPKIYhhlhg8xnFMMQQC2w+oxiG2MRi8e9P9Q9PDOtvJ0t0+zsPwwcyl00sFr9/fXC932lNSpTiOTxjiAU2nSEWQ0wVi23bluzfkh3d3i13PmOBWOS7SjMrzxmbzrzXpOYl9q54XjD5cXD6C+3AziKfsmRO9wf7f5tYvLMwM1EcsBmtuM1UrrtezKb/C+2YWAQtA8Uiv+p7YmdsRtPFYnsY3+q8071aeqdNLxbFIkjaueQPsLhEvXo4ubPA2ouFd81O2smXId4QsBlNFwtehjS0Y29wmrdx795+tVjsQ+evOMx9I9f7GRZ8GmJe9KR6vRcXZjuvQf7Mz9QfnR4bKtf7KXbPEn28vUIsuDk8zO5Zoo+3V4gF9jCjGIYYYoHNZxTDEIvEotkPfTbinlnBECiG/vCDvQAgMdMP9gLAQBALAJAoi8WyLL2T8tk+BDWfOtRJ0lt+fHXCYs65j9mSRym6Jd9Bijusa+nAkKDvpCAWN7wMTfJJFkxy0DBQ0JUyt94y9jrPxxW4FSVMOTB761YzQ4K+lsl2FuvFYlG8OZ8Ml7Qo/Xvnmg/FxRNEL4bzAiEWj+cJYuHth/cHied5sVi+EzTm/Zj350Pj9R4WF4+XbXJW4rZaM+kNp2fBIBY9eYJYmO1muefO+6eOikXu46lAH7GIFSo/xcs8kIniqPM5vBRv7HAFTxaL5LitWAQO8fI+ejMsisX6fX+kdB70eWjeim4dMMcOV4BYGGKh11+FWGz9izdDRSzMp6p3Fg3denK3Qn0ec4tFUKB1YpE/pWQiLqd4LMUoXouX8JkJaeV2NfrFgvMUxEK/AfZhycgb1yxtz3PfbrrpmXiNplvirEQ51KI3dnC7lP4R30x5Z/FaqEKAPYhFmSVjrv4BmoBYAIAEYgEAEogFAEi4YvEFALADsQAAiYJYbO/Pj81yI8hHT3LrYd9bflyX2yFnxT/3MVvy3opuyUcwcYd1LVcTDAGaE4lFUgfjkvxGk0ySBZMcVAcST9En1lvGxd4UNy90PgPBgdlbt2q5T1m+AcTim1gUb87NcyueYm4QvIfiJQvEohjOC4RYPB71PYv7XBWzms398P4g8TwvFuYGOA9quiWhK8brPSyuWC+35Kx4UF6gQIMuwptbuAJJLG51JbxkgqUY3Pr2Tx0Vi9wnCCqmrTh4SQbr2ezBm5BY+/LOc/WJR9QQcbzQhLJY3O0a6GKRHLcVCzFosZ/zYvH1fX9UFzRQgWDeim49uVuhPo/ypyFj88vpIBbm2qsLGudcEULvrXpn0dDtaoYr1KtQ3+C8D+KCrBOL/Cklk+LmXHErRvFavITPTEgrt6tBLHpSEIs9o1P9+spS2m8EzJb9WebpiUyYpyuZeEG9ORRnVTnR7Eps7OB2NUOCvhO+welCCQLsQSzKBBuEKfoHaAJiAQASiAUASCAWACDBj98AgARiAQASiAUASCAWACCBWACABGIBABKIBQBIIBYAIIFYAIAEYgEAEogFAEggFgAggVgAgARiAQASiAUASPwDwQoLTDg5Vb0AAAAASUVORK5CYII=” alt=”” />

客户端消费的测试代码

            #region Client-LindMQ
var consumer = new ConsumerSetting
{
BrokenName = "test",
BrokenAddress = new System.Net.IPEndPoint(IPAddress.Parse("192.168.2.71"), ),
Callback = new Dictionary<string, Action<MessageBody>>() {
{"zzl",(o)=>{
Console.WriteLine(o.ToString());
Thread.Sleep();
}},
{"zhz",(o)=>{
Console.WriteLine(o.ToString());
Thread.Sleep();
}}
}
};
var consumerClient = new ConsumerManager(new List<ConsumerSetting> { consumer });
consumerClient.Start();
#endregion

客户端消费的测试结果

aaarticlea/png;base64,” alt=”” />

好了,到这里我们的LindMQ里数据存储结构的内容就讲完了,主要使用了redis里的set,sortedSet,hash等数据结构,在设计过程中,使用了分片(Sharing)的概念,当然也是借鉴了mongodb和redis集群的设计理念,同时借鉴了方雪华老兄的EQueue设计理念,在这里和他们说一声:谢谢!

感谢各位对Lind的支持!

回到目录

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,082
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,557
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,406
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,179
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:7,815
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:4,898