RPCServerTransport.cs 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. using System.Collections.Concurrent;
  2. using InABox.Core;
  3. namespace InABox.Rpc
  4. {
  5. public abstract class RpcServerTransport<TConnection> : IPusher, IRpcServerTransport where TConnection : notnull
  6. {
  7. public abstract bool IsSecure();
  8. private ConcurrentDictionary<TConnection, RpcServerSession> _sessions = new ConcurrentDictionary<TConnection, RpcServerSession>();
  9. protected RpcServerSession CreateSession(TConnection connection)
  10. {
  11. var result = new RpcServerSession();
  12. _sessions[connection] = result;
  13. return result;
  14. }
  15. public RpcServerSession? GetSession(TConnection? connection)
  16. {
  17. if (connection == null)
  18. return null;
  19. _sessions.TryGetValue(connection, out RpcServerSession? result);
  20. return result;
  21. }
  22. protected RpcServerSession? DeleteSession(TConnection connection)
  23. {
  24. _sessions.Remove(connection, out RpcServerSession? session);
  25. return session;
  26. }
  27. private Dictionary<string, IRpcCommandHandler> _handlers = new Dictionary<string, IRpcCommandHandler>();
  28. public void AddHandler<TSender, TCommand, TProperties, TResult>(RpcCommandHandler<TSender, TCommand, TProperties, TResult> handler)
  29. where TSender : class
  30. where TCommand : IRpcCommand<TProperties, TResult>
  31. where TProperties : IRpcCommandParameters, new()
  32. where TResult : IRpcCommandResult, new()
  33. {
  34. _handlers[typeof(TCommand).Name] = handler;
  35. }
  36. public abstract void Start();
  37. public abstract void Stop();
  38. public event RpcTransportOpenEvent? OnOpen;
  39. protected void DoOpen(TConnection connection)
  40. {
  41. var session = CreateSession(connection);
  42. OnOpen?.Invoke(this, new RpcTransportOpenArgs(session) );
  43. }
  44. public event RpcTransportCloseEvent? OnClose;
  45. protected void DoClose(TConnection connection, RpcTransportCloseEventType type)
  46. {
  47. var session = DeleteSession(connection);
  48. OnClose?.Invoke(this, new RpcTransportCloseArgs(session, type));
  49. }
  50. public event RpcTransportExceptionEvent? OnException;
  51. protected void DoException(TConnection? connection, Exception e)
  52. {
  53. var session = GetSession(connection);
  54. OnException?.Invoke(this, new RpcTransportExceptionArgs(session, e));
  55. }
  56. public event RpcTransportMessageEvent? BeforeMessage;
  57. protected void DoBeforeMessage(RpcServerSession? session, RpcMessage? message)
  58. {
  59. BeforeMessage?.Invoke(this, new RpcTransportMessageArgs(session,message));
  60. }
  61. public event RpcTransportMessageEvent? AfterMessage;
  62. protected void DoAfterMessage(RpcServerSession? session, RpcMessage? message)
  63. {
  64. AfterMessage?.Invoke(this, new RpcTransportMessageArgs(session,message));
  65. }
  66. /// <summary>
  67. /// Handle a message from a client.
  68. /// </summary>
  69. /// <param name="connection">The client connection.</param>
  70. /// <param name="message">The message to be handled.</param>
  71. /// <returns>The response to be sent back to the client.</returns>
  72. public RpcMessage? DoMessage(TConnection? connection, RpcMessage? message)
  73. {
  74. if(message is null)
  75. {
  76. DoException(connection, new Exception("NULL Message Received"));
  77. return null;
  78. }
  79. var response = new RpcMessage() { Id = message.Id, Command = message.Command };
  80. try
  81. {
  82. var session = GetSession(connection);
  83. DoBeforeMessage(session, message);
  84. if (session != null)
  85. {
  86. response = new RpcMessage() { Id = message.Id, Command = message.Command };
  87. if (_handlers.TryGetValue(message.Command, out var command))
  88. {
  89. try
  90. {
  91. response.Payload = command.Execute(session, message.Payload);
  92. }
  93. catch (RpcException err)
  94. {
  95. response.Error = err.Error;
  96. }
  97. }
  98. else
  99. {
  100. DoException(connection, new Exception("Command Not Found"));
  101. response.Error = RpcError.COMMANDNOTFOUND;
  102. }
  103. DoAfterMessage(session, response);
  104. }
  105. else
  106. {
  107. DoException(connection, new Exception("Session not Found"));
  108. response.Error = RpcError.SESSIONNOTFOUND;
  109. }
  110. }
  111. catch(Exception e)
  112. {
  113. DoException(connection, e);
  114. response.Error = RpcError.SERVERERROR;
  115. }
  116. return response;
  117. }
  118. /// <summary>
  119. /// Send a message to a particular client connection.
  120. /// </summary>
  121. /// <param name="connection">The connection to send to.</param>
  122. /// <param name="message">The message to send.</param>
  123. public abstract void Send(TConnection connection, RpcMessage message);
  124. #region Pusher Stuff
  125. public void PushToAll<TPush>(TPush push) where TPush : BaseObject
  126. {
  127. var message = new RpcMessage(Guid.NewGuid(), "Push", RpcPush.Create(push).WriteBinary(BinarySerializationSettings.Latest));
  128. foreach (var connection in _sessions.Keys)
  129. {
  130. Send(connection, message);
  131. }
  132. }
  133. public void PushToSession<TPush>(Guid session, TPush push) where TPush : BaseObject
  134. {
  135. var message = new RpcMessage(Guid.NewGuid(), "Push", RpcPush.Create(push).WriteBinary(BinarySerializationSettings.Latest));
  136. var sessionConnection = _sessions.FirstOrDefault(x => x.Value.ID == session).Key;
  137. if(sessionConnection is not null)
  138. {
  139. Send(sessionConnection, message);
  140. }
  141. }
  142. public void PushToSession(Guid session, Type TPush, BaseObject push)
  143. {
  144. var message = new RpcMessage(Guid.NewGuid(), "Push", new RpcPush
  145. {
  146. Object = push,
  147. Type = TPush
  148. }.WriteBinary(BinarySerializationSettings.Latest));
  149. var sessionConnection = _sessions.FirstOrDefault(x => x.Value.ID == session).Key;
  150. if (sessionConnection is not null)
  151. {
  152. Send(sessionConnection, message);
  153. }
  154. }
  155. public IEnumerable<Guid> GetUserSessions(Guid user) =>
  156. _sessions.Values.Where(x => x.UserGuid == user).Select(x => x.ID);
  157. public IEnumerable<Guid> GetSessions(Platform platform) =>
  158. _sessions.Values.Where(x => x.Platform == platform).Select(x => x.ID);
  159. #endregion
  160. }
  161. }