一个client发送消息给orleans, 就只需要掉用Grain的函数就行了. 但是有时候Grain需要发送消息给client, 在orleans里面, 就只能通过Observer
来实现.
1 public interface IChat : IGrainObserver 2 { 3 void ReceiveMessage(string message); 4 } 5 6 public class Chat : IChat 7 { 8 public void ReceiveMessage(string message) 9 {10 Console.WriteLine(message);11 }12 }13 14 class HelloGrain : Grain, IHello15 {16 private ObserverSubscriptionManager_subsManager;17 public override async Task OnActivateAsync()18 {19 _subsManager = new ObserverSubscriptionManager ();20 await base.OnActivateAsync();21 }22 public async Task Subscribe(IChat observer)23 {24 _subsManager.Subscribe(observer);25 }26 public async Task UnSubscribe(IChat observer)27 {28 _SubsManager.Unsubscribe(observer);29 }30 }31 32 public Task SendUpdateMessage(string message)33 {34 _SubsManager.Notify(s => s.ReceiveMessage(message));35 return TaskDone.Done;36 }37 38 //下面就是Grain发送消息给Client的代码39 var friend = GrainClient.GrainFactory.GetGrain (0);40 Chat c = new Chat();41 42 var obj = await GrainClient.GrainFactory.CreateObjectReference (c);43 await friend.Subscribe(obj);
有了上面的代码, 我们就可以按照自己的需求造一个广播出来.
- 发送消息给Client上所有的人
- 发送消息给Client上某一部分人
- 发送消息给Client上某一个人
1 enum DestType 2 { 3 DestType_All = 1, 4 DestType_Server = 2, 5 DestType_Player = 3, 6 } 7 8 //这是我们的观察者 9 public interface IGatewayObserver : IGrainObserver10 {11 void SendMessage(int destType, long dest, int msgid, byte[] buffer);12 }13 14 public interface IAllGatewayGrain : IGrainWithIntegerKey15 {16 //注册网关17 Task RegisterGateway(string key);18 Task UnRegisterGateway(string key);19 20 //发送消息21 Task SendMessage(int destType, long dest, int msgid, byte[] buffer);22 23 //注册观察者24 Task RegisterObserver(string gateway, IGatewayObserver);25 }26 27 public interface IGatewayGrain : IGrainWithStringKey28 {29 Task SendMessage(int destType, long dest, int msgid, byte[] buffer);30 31 Task RegisterObserver(string gateway, IGatewayObserver);32 }
上面是接口的设计, 然后只需要在Client启动的时候, 把自己注册到两个Grain
里面去, 然后其他的Grain就可以通过两个Grain来发送针对所有人
, 服务器
, 和个人
的消息了.