RPCProxyServer.cs 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. using System.Collections.Concurrent;
  2. using System.Text;
  3. using InABox.Core;
  4. using InABox.Server;
  5. namespace InABox.Rpc;
  6. public abstract class RpcProxyServer<TConnection> : IRpcProxyServer where TConnection : notnull
  7. {
  8. private IRpcClientTransport ServerTransport { get; set; }
  9. public event LogEvent? OnLog;
  10. public RpcProxyServer(IRpcClientTransport serverTransport)
  11. {
  12. ServerTransport = serverTransport;
  13. ServerTransport.OnMessage += ServerTransport_OnMessage;
  14. }
  15. private void ServerTransport_OnMessage(IRpcTransport transport, RpcTransportMessageArgs e)
  16. {
  17. if (e.Message != null && e.Message.Command == "Push")
  18. {
  19. var message = Serialization.ReadBinary<InternalServerMessage>(e.Message.Payload, BinarySerializationSettings.Latest);
  20. e.Message.Payload = message.Payload;
  21. if(message.Session == Guid.Empty)
  22. {
  23. foreach (var connection in _sessions.Keys)
  24. {
  25. Send(connection, e.Message);
  26. }
  27. }
  28. else
  29. {
  30. var sessionConnection = _sessions.FirstOrDefault(x => x.Value == message.Session).Key;
  31. if(sessionConnection is not null)
  32. {
  33. Send(sessionConnection, e.Message);
  34. }
  35. }
  36. }
  37. }
  38. public abstract bool IsSecure();
  39. private ConcurrentDictionary<TConnection, Guid> _sessions = new ConcurrentDictionary<TConnection, Guid>();
  40. protected void CreateSession(TConnection connection, Guid guid)
  41. {
  42. _sessions[connection] = guid;
  43. }
  44. public Guid GetSession(TConnection? connection)
  45. {
  46. if (connection == null)
  47. return Guid.Empty;
  48. _sessions.TryGetValue(connection, out Guid result);
  49. return result;
  50. }
  51. protected Guid DeleteSession(TConnection connection)
  52. {
  53. _sessions.Remove(connection, out Guid session);
  54. return session;
  55. }
  56. public abstract void Start();
  57. public abstract void Stop();
  58. private RpcMessage SendToServer(Guid session, RpcMessage message)
  59. {
  60. var internalMessage = new InternalServerMessage
  61. {
  62. Session = session,
  63. Payload = message.Payload
  64. };
  65. return ServerTransport.Send(message.Command, internalMessage, checkErrors: false);
  66. }
  67. private TResult SendToServer<TCommand, TParameters, TResult>(Guid session, TParameters parameters)
  68. where TCommand : IRpcCommand<TParameters,TResult>
  69. where TParameters : IRpcCommandParameters, ISerializeBinary
  70. where TResult : IRpcCommandResult, ISerializeBinary, new()
  71. {
  72. var internalMessage = new InternalServerMessage
  73. {
  74. Session = session,
  75. Payload = parameters.WriteBinary(BinarySerializationSettings.Latest)
  76. };
  77. var response = ServerTransport.Send(typeof(TCommand).Name, internalMessage, checkErrors: false);
  78. switch (response.Error)
  79. {
  80. case RpcError.NONE:
  81. var result = Serialization.ReadBinary<TResult>(response.Payload, BinarySerializationSettings.Latest)
  82. ?? throw new Exception($"Cannot Deserialize {typeof(TCommand).Name}");
  83. return result;
  84. case RpcError.SERVERERROR:
  85. var errorMessage = Encoding.UTF8.GetString(response.Payload);
  86. throw new RpcException(errorMessage, response.Error);
  87. default:
  88. throw new RpcException($"Server error in {typeof(TCommand).Name}: {response.Error}", response.Error);
  89. }
  90. }
  91. protected void DoOpen(TConnection connection)
  92. {
  93. try
  94. {
  95. var result = SendToServer<OpenSessionCommand, OpenSessionParameters, OpenSessionResult>(Guid.Empty, new OpenSessionParameters());
  96. CreateSession(connection, result.SessionID);
  97. if (result.SessionID != Guid.Empty)
  98. OnLog?.Invoke(LogType.Information, "", $"Client Connected [{result.SessionID}]");
  99. }
  100. catch(Exception e)
  101. {
  102. DoException(connection, e);
  103. }
  104. }
  105. protected void DoClose(TConnection connection, RpcTransportCloseEventType type)
  106. {
  107. try
  108. {
  109. var session = DeleteSession(connection);
  110. if(session != Guid.Empty)
  111. {
  112. SendToServer<CloseSessionCommand, CloseSessionParameters, CloseSessionResult>(session, new CloseSessionParameters
  113. {
  114. SessionID = session
  115. });
  116. }
  117. if (session != Guid.Empty)
  118. OnLog?.Invoke(LogType.Information, "", $"Client Disconnected [{session}]");
  119. }
  120. catch(Exception e)
  121. {
  122. DoException(connection, e);
  123. }
  124. }
  125. protected void DoException(TConnection? connection, Exception e)
  126. {
  127. var session = GetSession(connection);
  128. if (session != Guid.Empty)
  129. {
  130. OnLog?.Invoke(LogType.Error, $"", $"Exception Occurred in {session}: {e.Message}");
  131. }
  132. else
  133. {
  134. OnLog?.Invoke(LogType.Error, $"", $"Exception Occurred: {CoreUtils.FormatException(e)}");
  135. }
  136. }
  137. /// <summary>
  138. /// Handle a message from a client.
  139. /// </summary>
  140. /// <param name="connection">The client connection.</param>
  141. /// <param name="message">The message to be handled.</param>
  142. /// <returns>The response to be sent back to the client.</returns>
  143. public RpcMessage? DoMessage(TConnection? connection, RpcMessage? message)
  144. {
  145. if(message is null)
  146. {
  147. DoException(connection, new Exception("NULL Message Received"));
  148. return null;
  149. }
  150. var response = new RpcMessage() { Id = message.Id, Command = message.Command };
  151. try
  152. {
  153. var session = GetSession(connection);
  154. if (session != Guid.Empty)
  155. {
  156. var serverResponse = SendToServer(session, message);
  157. response.Payload = serverResponse.Payload;
  158. response.Error = serverResponse.Error;
  159. }
  160. else
  161. {
  162. DoException(connection, new Exception("Session not Found"));
  163. response.Error = RpcError.SESSIONNOTFOUND;
  164. }
  165. }
  166. catch(Exception e)
  167. {
  168. DoException(connection, e);
  169. response.Payload = Encoding.UTF8.GetBytes(e.Message);
  170. response.Error = RpcError.SERVERERROR;
  171. }
  172. return response;
  173. }
  174. /// <summary>
  175. /// Send a message to a particular client connection.
  176. /// </summary>
  177. /// <param name="connection">The connection to send to.</param>
  178. /// <param name="message">The message to send.</param>
  179. public abstract void Send(TConnection connection, RpcMessage message);
  180. }