博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
wcf系列学习5天速成——第四天 wcf之分布式架构
阅读量:5339 次
发布时间:2019-06-15

本文共 19724 字,大约阅读时间需要 65 分钟。

今天是wcf系列的第四天,也该出手压轴戏了。嗯,现在的大型架构,都是神马的,

nginx鸡群,iis鸡群,wcf鸡群,DB鸡群,由一个人作战变成了群殴.......

 

今天我就分享下wcf鸡群,高性能架构中一种常用的手法就是在内存中维护一个叫做“索引”的内存数据库,

在实战中利用“索引”这个概念做出"海量数据“的秒杀。

好,先上图:

 

这个图明白人都能看得懂吧。因为我的系列偏重于wcf,所以我重点说下”心跳检测“的实战手法。

 

第一步:上一下项目的结构,才能做到心中有数。

 

第二步:“LoadDBService”这个是控制台程序,目的就是从数据库抽出关系模型加载在内存数据库中,因为这些东西会涉及一些算法的知识,

             在这里就不写算法了,就简单的模拟一下。

1 using System;  2 using System.Collections.Generic;  3 using System.Linq;  4 using System.Text;  5 using System.Runtime.Serialization;  6 using System.Web.Script.Serialization;  7 using System.IO;  8 using System.Xml.Serialization;  9 using System.Xml; 10 using Common; 11 12 namespace LoadDBData 13 {
14 class Program 15 {
16 static void Main(string[] args) 17 {
18 //模拟从数据库加载索引到内存中,形成内存中的数据库 19 //这里的 "Dictionary" 用来表达“一个用户注册过多少店铺“,即UserID与ShopID的一对多关系 20 SerializableDictionary
> dic = new SerializableDictionary
>(); 21 22 List
shopIDList = new List
(); 23 24 for (int shopID = 300000; shopID < 300050; shopID++) 25 shopIDList.Add(shopID); 26 27 int UserID = 15; 28 29 //假设这里已经维护好了UserID与ShopID的关系 30 dic.Add(UserID, shopIDList); 31 32 XmlSerializer xml = new XmlSerializer(dic.GetType()); 33 34 var memoryStream = new MemoryStream(); 35 36 xml.Serialize(memoryStream, dic); 37 38 memoryStream.Seek(0, SeekOrigin.Begin); 39 40 //将Dictionary持久化,相当于模拟保存在Mencache里面 41 File.AppendAllText("F://1.txt", Encoding.UTF8.GetString(memoryStream.ToArray())); 42 43 Console.WriteLine("数据加载成功!"); 44 45 Console.Read(); 46 } 47 } 48 }

因为Dictionary不支持序列化,所以我从网上拷贝了一份代码让其执行序列化

1 using System;   2 using System.Collections.Generic;   3 using System.Linq;   4 using System.Text;   5 using System.Xml.Serialization;   6 using System.Xml;   7 using System.Xml.Schema;   8 using System.Runtime.Serialization;   9  10 namespace Common  11 {
12 /// 13 /// 标题:支持 XML 序列化的 Dictionary 14 /// 15 ///
16 ///
17 [XmlRoot("SerializableDictionary")] 18 public class SerializableDictionary
: Dictionary
, IXmlSerializable 19 {
20 21 public SerializableDictionary() 22 : base() 23 {
24 } 25 public SerializableDictionary(IDictionary
dictionary) 26 : base(dictionary) 27 {
28 } 29 30 public SerializableDictionary(IEqualityComparer
comparer) 31 : base(comparer) 32 {
33 } 34 35 public SerializableDictionary(int capacity) 36 : base(capacity) 37 {
38 } 39 public SerializableDictionary(int capacity, IEqualityComparer
comparer) 40 : base(capacity, comparer) 41 {
42 } 43 protected SerializableDictionary(SerializationInfo info, StreamingContext context) 44 : base(info, context) 45 {
46 } 47 48 49 public System.Xml.Schema.XmlSchema GetSchema() 50 {
51 return null; 52 } 53 ///
54 /// 从对象的 XML 表示形式生成该对象 55 /// 56 ///
57 public void ReadXml(System.Xml.XmlReader reader) 58 {
59 XmlSerializer keySerializer = new XmlSerializer(typeof(TKey)); 60 XmlSerializer valueSerializer = new XmlSerializer(typeof(TValue)); 61 bool wasEmpty = reader.IsEmptyElement; 62 reader.Read(); 63 if (wasEmpty) 64 return; 65 while (reader.NodeType != System.Xml.XmlNodeType.EndElement) 66 {
67 reader.ReadStartElement("item"); 68 reader.ReadStartElement("key"); 69 TKey key = (TKey)keySerializer.Deserialize(reader); 70 reader.ReadEndElement(); 71 reader.ReadStartElement("value"); 72 TValue value = (TValue)valueSerializer.Deserialize(reader); 73 reader.ReadEndElement(); 74 this.Add(key, value); 75 reader.ReadEndElement(); 76 reader.MoveToContent(); 77 } 78 reader.ReadEndElement(); 79 } 80 81 /**/ 82 ///
83 /// 将对象转换为其 XML 表示形式 84 /// 85 ///
86 public void WriteXml(System.Xml.XmlWriter writer) 87 {
88 XmlSerializer keySerializer = new XmlSerializer(typeof(TKey)); 89 XmlSerializer valueSerializer = new XmlSerializer(typeof(TValue)); 90 foreach (TKey key in this.Keys) 91 {
92 writer.WriteStartElement("item"); 93 writer.WriteStartElement("key"); 94 keySerializer.Serialize(writer, key); 95 writer.WriteEndElement(); 96 writer.WriteStartElement("value"); 97 TValue value = this[key]; 98 valueSerializer.Serialize(writer, value); 99 writer.WriteEndElement(); 100 writer.WriteEndElement(); 101 } 102 } 103 104 } 105 }

第三步: "HeartBeatService"也做成了一个控制台程序,为了图方便,把Contract和Host都放在一个控制台程序中,

            代码中加入了注释,看一下就会懂的。

            

1 using System;  2 using System.Collections.Generic;  3 using System.Linq;  4 using System.Runtime.Serialization;  5 using System.ServiceModel;  6 using System.Text;  7  8 namespace HeartBeatService  9 {
10 //CallbackContract:这个就是Client实现此接口,方便服务器端通知客户端 11 [ServiceContract(CallbackContract = typeof(ILiveAddressCallback))] 12 public interface IAddress 13 {
14 /// 15 /// 此方法用于Search启动后,将Search地址插入到此处 16 /// 17 /// 18 [OperationContract(IsOneWay = true)] 19 void AddSearch(string address); 20 21 /// 22 /// 此方法用于IIS端获取search地址 23 /// 24 /// 25 [OperationContract(IsOneWay = true)] 26 void GetService(string address); 27 } 28 }

 

1 using System;   2 using System.Collections.Generic;   3 using System.Linq;   4 using System.Runtime.Serialization;   5 using System.ServiceModel;   6 using System.Text;   7 using System.Timers;   8 using System.IO;   9 using System.Collections.Concurrent;  10 using SearhService;  11 using ClientService;  12  13 namespace HeartBeatService  14 {
15 //InstanceContextMode:主要是管理上下文的实例,此处是single,也就是单体 16 //ConcurrencyMode: 主要是用来控制实例中的线程数,此处是Multiple,也就是多线程 17 [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple)] 18 public class Address : IAddress 19 {
20 static List
search = new List
(); 21 22 static object obj = new object(); 23 24 ///
25 /// 此静态构造函数用来检测存活的Search个数 26 /// 27 static Address() 28 {
29 Timer timer = new Timer(); 30 timer.Interval = 6000; 31 timer.Elapsed += (sender, e) => 32 {
33 34 Console.WriteLine("\n***************************************************************************"); 35 Console.WriteLine("当前存活的Search为:"); 36 37 lock (obj) 38 {
39 //遍历当前存活的Search 40 foreach (var single in search) 41 {
42 ChannelFactory
factory = null; 43 44 try 45 {
46 //当Search存在的话,心跳服务就要定时检测Search是否死掉,也就是定时的连接Search来检测。 47 factory = new ChannelFactory
(new NetTcpBinding(SecurityMode.None), new EndpointAddress(single)); 48 factory.CreateChannel().TestSearch(); 49 factory.Close(); 50 51 Console.WriteLine(single); 52 53 } 54 catch (Exception ex) 55 {
56 Console.WriteLine(ex.Message); 57 58 //如果抛出异常,则说明此search已经挂掉 59 search.Remove(single); 60 factory.Abort(); 61 Console.WriteLine("\n当前时间:" + DateTime.Now + " ,存活的Search有:" + search.Count() + "个"); 62 } 63 } 64 } 65 66 //最后统计下存活的search有多少个 67 Console.WriteLine("\n当前时间:" + DateTime.Now + " ,存活的Search有:" + search.Count() + "个"); 68 }; 69 timer.Start(); 70 } 71 72 public void AddSearch(string address) 73 {
74 75 lock (obj) 76 {
77 //是否包含相同的Search地址 78 if (!search.Contains(address)) 79 {
80 search.Add(address); 81 82 //search添加成功后就要告诉来源处,此search已经被成功载入。 83 var client = OperationContext.Current.GetCallbackChannel
(); 84 client.LiveAddress(address); 85 } 86 } 87 } 88 89 public void GetService(string address) 90 {
91 Timer timer = new Timer(); 92 timer.Interval = 1000; 93 timer.Elapsed += (obj, sender) => 94 {
95 try 96 {
97 //这个是定时的检测IIS是否挂掉 98 var factory = new ChannelFactory
(new NetTcpBinding(SecurityMode.None), 99 new EndpointAddress(address)); 100 101 factory.CreateChannel().AddSearchList(search); 102 103 factory.Close(); 104 105 timer.Interval = 10000; 106 } 107 catch (Exception ex) 108 { 109 Console.WriteLine(ex.Message); 110 } 111 }; 112 timer.Start(); 113 } 114 } 115 }

 

1 using System;  2 using System.Collections.Generic;  3 using System.Linq;  4 using System.Text;  5 using System.ServiceModel;  6  7 namespace HeartBeatService  8 {
9 /// 10 /// 等客户端实现后,让客户端约束一下,只能是这个LiveAddress方法 11 /// 12 public interface ILiveAddressCallback 13 {
14 [OperationContract(IsOneWay = true)] 15 void LiveAddress(string address); 16 } 17 }

第四步: 我们开一下心跳,预览下效果:

         是的,心跳现在正在检测是否有活着的Search。

 

第五步:"SearhService" 这个Console程序就是WCF的search,主要用于从MemerCache里面读取索引。

          记得要添加一下对“心跳服务”的服务引用。

          

 

1 using System;  2 using System.Collections.Generic;  3 using System.Linq;  4 using System.Runtime.Serialization;  5 using System.ServiceModel;  6 using System.Text;  7  8 namespace SearhService  9 {
10 // 注意: 使用“重构”菜单上的“重命名”命令,可以同时更改代码和配置文件中的接口名“IService1”。 11 [ServiceContract] 12 public interface IProduct 13 {
14 [OperationContract] 15 List
GetShopListByUserID(int userID); 16 17 [OperationContract] 18 void TestSearch(); 19 } 20 }

 

1 using System;  2 using System.Collections.Generic;  3 using System.Linq;  4 using System.Runtime.Serialization;  5 using System.ServiceModel;  6 using System.Text;  7 using Common;  8 using System.Xml;  9 using System.IO; 10 using System.Xml.Serialization; 11 12 namespace SearhService 13 {
14 public class Product : IProduct 15 {
16 public List
GetShopListByUserID(int userID) 17 {
18 //模拟从MemCache中读取索引 19 SerializableDictionary
> dic = new SerializableDictionary
>(); 20 21 byte[] bytes = Encoding.UTF8.GetBytes(File.ReadAllText("F://1.txt", Encoding.UTF8)); 22 23 var memoryStream = new MemoryStream(); 24 25 memoryStream.Write(bytes, 0, bytes.Count()); 26 27 memoryStream.Seek(0, SeekOrigin.Begin); 28 29 XmlSerializer xml = new XmlSerializer(dic.GetType()); 30 31 var obj = xml.Deserialize(memoryStream) as Dictionary
>; 32 33 return obj[userID]; 34 } 35 36 public void TestSearch() { } 37 } 38 }

 

1 using System;  2 using System.Collections.Generic;  3 using System.Linq;  4 using System.Text;  5 using System.ServiceModel;  6 using System.Configuration;  7 using System.Timers;  8 using SearhService.HeartBeatService;  9 10 namespace SearhService 11 {
12 public class SearchHost : IAddressCallback 13 {
14 static DateTime startTime; 15 16 public static void Main() 17 {
18 ServiceHost host = new ServiceHost(typeof(Product)); 19 20 host.Open(); 21 22 AddSearch(); 23 24 Console.Read(); 25 26 } 27 28 static void AddSearch() 29 {
30 startTime = DateTime.Now; 31 32 Console.WriteLine("Search服务发送中.....\n\n*************************************************\n"); 33 34 try 35 {
36 var heartClient = new AddressClient(new InstanceContext(new SearchHost())); 37 38 string search = ConfigurationManager.AppSettings["search"]; 39 40 heartClient.AddSearch(search); 41 } 42 catch (Exception ex) 43 {
44 Console.WriteLine("Search服务发送失败:" + ex.Message); 45 } 46 } 47 48 public void LiveAddress(string address) 49 {
50 Console.WriteLine("恭喜你," + address + "已被心跳成功接收!\n"); 51 Console.WriteLine("发送时间:" + startTime + "\n接收时间:" + DateTime.Now); 52 } 53 } 54 }

 

第六步:此时Search服务已经建好,我们可以测试当Search开启获取关闭对心跳有什么影响:

              Search开启时:

                      

          

           Search关闭时:

              

           对的,当Search关闭时,心跳检测该Search已经死掉,然后只能从集群中剔除。

           当然,我们可以将Search拷贝N份,部署在N台机器中,只要修改一下endpoint地址就OK了,这一点明白人都会。

 

第七步:"ClientService" 这里也就指的是IIS,此时我们也要添加一下对心跳的服务引用。

1 using System;  2 using System.Collections.Generic;  3 using System.Linq;  4 using System.Text;  5 using System.ServiceModel;  6  7 namespace ClientService  8 {
9 [ServiceContract] 10 public interface IServiceList 11 {
12 [OperationContract] 13 void AddSearchList(List
search); 14 } 15 }

 

1 using System;  2 using System.Collections.Generic;  3 using System.Linq;  4 using System.Text;  5 using System.ServiceModel;  6 using System.Configuration;  7 using System.Timers;  8 using System.Threading;  9 10 namespace ClientService 11 {
12 public class ServiceList : IServiceList 13 {
14 public static List
searchList = new List
(); 15 16 static object obj = new object(); 17 18 public static string Search 19 {
20 get 21 {
22 lock (obj) 23 {
24 //如果心跳没及时返回地址,客户端就在等候 25 if (searchList.Count == 0) 26 Thread.Sleep(1000); 27 return searchList[new Random().Next(0, searchList.Count)]; 28 } 29 } 30 set 31 {
32 33 } 34 } 35 36 public void AddSearchList(List
search) 37 {
38 lock (obj) 39 {
40 searchList = search; 41 42 Console.WriteLine("************************************"); 43 Console.WriteLine("当前存活的Search为:"); 44 45 foreach (var single in searchList) 46 {
47 Console.WriteLine(single); 48 } 49 } 50 } 51 } 52 }

1 using System;  2 using System.Collections.Generic;  3 using System.Linq;  4 using System.Text;  5 using System.ServiceModel;  6 using System.Configuration;  7 using System.Threading;  8 using ClientService.HeartBeatService;  9 using SearhService; 10 using BaseClass; 11 using System.Data; 12 using System.Diagnostics; 13 14 namespace ClientService 15 {
16 class Program : IAddressCallback 17 {
18 static void Main(string[] args) 19 {
20 21 ServiceHost host = new ServiceHost(typeof(ServiceList)); 22 23 host.Open(); 24 25 var client = new AddressClient(new InstanceContext(new Program())); 26 27 //配置文件中获取iis的地址 28 var iis = ConfigurationManager.AppSettings["iis"]; 29 30 //将iis的地址告诉心跳 31 client.GetService(iis); 32 33 //从集群中获取search地址来对Search服务进行调用 34 var factory = new ChannelFactory
(new NetTcpBinding(SecurityMode.None), new EndpointAddress(ServiceList.Search)); 35 36 //根据userid获取了shopID的集合 37 var shopIDList = factory.CreateChannel().GetShopListByUserID(15); 38 39 //.......................... 后续就是我们将shopIDList做连接数据库查询(做到秒杀) 40 41 Console.Read(); 42 } 43 44 public void LiveAddress(string address) 45 {
46 47 } 48 } 49 }

 

然后我们开启Client,看看效果咋样:

当然,search集群后,client得到search的地址是随机的,也就分担了search的负担,实现有福同享,有难同当的效果了。

 

最后: 我们做下性能检测,看下“秒杀”和“毫秒杀”的效果。

          首先在数据库的User表和Shop插入了180万和20万的数据用于关联。

          ClientService改造后的代码:

          

1 using System;  2 using System.Collections.Generic;  3 using System.Linq;  4 using System.Text;  5 using System.ServiceModel;  6 using System.Timers;  7 using System.Diagnostics;  8 using BaseClass;  9 using ClientService; 10 using ClientService.HeartBeatService; 11 using System.Configuration; 12 using SearhService; 13 14 namespace ClientService 15 {
16 class Program : IAddressCallback 17 {
18 static void Main(string[] args) 19 {
20 21 ServiceHost host = new ServiceHost(typeof(ServiceList)); 22 23 host.Open(); 24 25 var client = new AddressClient(new InstanceContext(new Program())); 26 27 //配置文件中获取iis的地址 28 var iis = ConfigurationManager.AppSettings["iis"]; 29 30 //将iis的地址告诉心跳 31 client.GetService(iis); 32 33 //从集群中获取search地址来对Search服务进行调用 34 var factory = new ChannelFactory
(new NetTcpBinding(SecurityMode.None), new EndpointAddress(ServiceList.Search)); 35 36 //根据userid获取了shopID的集合 37 //比如说这里的ShopIDList是通过索引交并集获取的分页的一些shopID 38 var shopIDList = factory.CreateChannel().GetShopListByUserID(15); 39 40 var strSql = string.Join(",", shopIDList); 41 42 Stopwatch watch = new Stopwatch(); 43 44 watch.Start(); 45 SqlHelper.Query("select s.ShopID,u.UserName,s.ShopName from [User] as u ,Shop as s where s.ShopID in(" + strSql + ")"); 46 watch.Stop(); 47 48 Console.WriteLine("通过wcf索引获取的ID >>>花费时间:" + watch.ElapsedMilliseconds); 49 50 //普通的sql查询花费的时间 51 StringBuilder builder = new StringBuilder(); 52 53 builder.Append("select * from "); 54 builder.Append("(select ROW_NUMBER() over(order by s.ShopID) as NumberID, "); 55 builder.Append(" s.ShopID, u.UserName, s.ShopName "); 56 builder.Append("from Shop s left join [User] as u on u.UserID=s.UserID "); 57 builder.Append("where s.UserID=15) as array "); 58 builder.Append("where NumberID>300000 and NumberID<300050"); 59 60 watch.Start(); 61 SqlHelper.Query(builder.ToString()); 62 watch.Stop(); 63 64 Console.WriteLine("普通的sql分页 >>>花费时间:" + watch.ElapsedMilliseconds); 65 66 Console.Read(); 67 } 68 69 public void LiveAddress(string address) 70 {
71 72 } 73 } 74 }

 

性能图:

对的,一个秒杀,一个是毫秒杀,所以越复杂越能展示出“内存索引”的强大之处。

转载于:https://www.cnblogs.com/hyk110988/p/3461777.html

你可能感兴趣的文章
20172315 2017-2018-2 《程序设计与数据结构》第十一周学习总结
查看>>
MySQL添加、修改、撤销用户数据库操作权限的一些记录
查看>>
C#中List和数组之间转换的方法
查看>>
屏幕分辨率过高导致软件界面显示过小影响使用
查看>>
ViewBag & ViewData
查看>>
关于谷歌浏览器Chrome正在处理请求的问题解决
查看>>
Git核心技术:在Ubuntu下部署Gitolite服务端
查看>>
平面波展开法总结
查看>>
建造者模式
查看>>
ArraySort--冒泡排序、选择排序、插入排序工具类demo
查看>>
composer 安装laravel
查看>>
8-EasyNetQ之Send & Receive
查看>>
Android反编译教程
查看>>
java重写LinkedList
查看>>
zTree节点重叠或者遮挡
查看>>
List<string> 去重复 并且出现次数最多的排前面
查看>>
js日志管理-log4javascript学习小结
查看>>
Android之布局androidmanifest.xml 资源清单 概述
查看>>
How to Find Research Problems
查看>>
Linux用户管理
查看>>