RPCClientSocketTransport.cs 8.9 KB


  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Linq;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using InABox.Clients;
  8. using InABox.Core;
  9. using WebSocket4Net;
  10. namespace InABox.Rpc
  11. {
  12. public class RpcClientSocketTransport : RpcClientTransport, IDisposable
  13. {
  14. private WebSocket? _socket;
  15. private Task? readTask;
  16. private ManualResetEventSlim openEvent = new ManualResetEventSlim();
  17. private string? _host;
  18. private bool _connected = false;
  19. private string[] _urls;
  20. public RpcClientSocketTransport(string[] urls)
  21. {
  22. _urls = urls;
  23. }
  24. // Returns true if we are to continue the receive loop.
  25. /*private bool DoReceive()
  26. {
  27. if(_socket != null)
  28. {
  29. try
  30. {
  31. var buffer = new ArraySegment<byte>(new byte[1024]);
  32. using (var ms = new MemoryStream())
  33. {
  34. WebSocketReceiveResult result;
  35. do
  36. {
  37. var task = _socket.ReceiveAsync(buffer, _tokenSource.Token);
  38. task.Wait();
  39. result = task.Result;
  40. ms.Write(buffer.Array, buffer.Offset, result.Count);
  41. } while (!result.EndOfMessage);
  42. ms.Seek(0, SeekOrigin.Begin);
  43. if (result.MessageType == WebSocketMessageType.Close)
  44. {
  45. if (result.CloseStatus == WebSocketCloseStatus.NormalClosure)
  46. {
  47. Task.Run(() =>
  48. {
  49. DoClose(RpcTransportCloseEventType.Closed);
  50. });
  51. return false;
  52. }
  53. else
  54. {
  55. DoException(new Exception(result.CloseStatusDescription));
  56. Task.Run(() =>
  57. {
  58. DoClose(RpcTransportCloseEventType.Error);
  59. });
  60. return false;
  61. }
  62. }
  63. else
  64. {
  65. RpcMessage? rpcMessage = null;
  66. if (result.MessageType == WebSocketMessageType.Binary)
  67. {
  68. rpcMessage = Serialization.ReadBinary<RpcMessage>(ms, BinarySerializationSettings.Latest);
  69. }
  70. else if (result.MessageType == WebSocketMessageType.Text)
  71. {
  72. rpcMessage = Serialization.Deserialize<RpcMessage>(ms);
  73. }
  74. Accept(rpcMessage);
  75. }
  76. }
  77. }
  78. catch(Exception e)
  79. {
  80. DoException(e);
  81. if (!IsConnected())
  82. {
  83. Task.Run(() =>
  84. {
  85. DoClose(RpcTransportCloseEventType.Error);
  86. });
  87. }
  88. return false;
  89. }
  90. }
  91. return true;
  92. }*/
  93. private WebSocket? CreateSocket(string url, bool secure)
  94. {
  95. //WebsocketClient client;
  96. //WebSocket socket = null;
  97. var address = $"{(secure ? "wss" : "ws")}://{url}";
  98. var client = new WebSocket(address);
  99. var openEvent = new ManualResetEventSlim();
  100. var open = false;
  101. EventHandler onOpen = (s, e) =>
  102. {
  103. open = true;
  104. openEvent.Set();
  105. };
  106. EventHandler onClose = (s, e) =>
  107. {
  108. openEvent.Set();
  109. };
  110. client.Opened += onOpen;
  111. client.Closed += onClose;
  112. client.Error += Client_Error;
  113. client.MessageReceived += Client_MessageReceived;
  114. client.DataReceived += Client_DataReceived;
  115. client.Open();
  116. openEvent.Wait();
  117. if (!open)
  118. {
  119. return null;
  120. }
  121. client.Opened -= onOpen;
  122. client.Closed -= onClose;
  123. client.Opened += Client_Opened;
  124. client.Closed += Client_Closed;
  125. DoOpen();
  126. _host = url;
  127. // Time to wait before disconnect - the default meant that the client disconnected during debugging, since the ping would fail
  128. /*socket.WaitTime = TimeSpan.FromSeconds(20);
  129. socket.OnOpen -= Socket_OnOpen;
  130. socket.OnError -= Socket_OnError;
  131. socket.OnClose -= Socket_OnClose;
  132. socket.OnMessage -= Socket_OnMessage;
  133. socket.Connect();
  134. if (socket.ReadyState == WebSocketState.Open)
  135. {
  136. DoOpen();
  137. socket.OnOpen += Socket_OnOpen;
  138. socket.OnError += Socket_OnError;
  139. socket.OnClose += Socket_OnClose;
  140. socket.OnMessage += Socket_OnMessage;
  141. return socket;
  142. }*/
  143. return client;
  144. }
  145. private void Client_Closed(object sender, EventArgs e)
  146. {
  147. openEvent.Set();
  148. if (_connected)
  149. {
  150. _connected = false;
  151. DoClose(RpcTransportCloseEventType.Closed);
  152. }
  153. }
  154. private void Client_DataReceived(object sender, DataReceivedEventArgs e)
  155. {
  156. var rpcMessage = Serialization.ReadBinary<RpcMessage>(e.Data, BinarySerializationSettings.Latest);
  157. Accept(rpcMessage);
  158. }
  159. private void Client_MessageReceived(object sender, MessageReceivedEventArgs e)
  160. {
  161. var rpcMessage = Serialization.Deserialize<RpcMessage>(e.Message);
  162. Accept(rpcMessage);
  163. }
  164. private void Client_Error(object sender, SuperSocket.ClientEngine.ErrorEventArgs e)
  165. {
  166. DoException(e.Exception);
  167. }
  168. private void Client_Opened(object sender, EventArgs e)
  169. {
  170. _connected = true;
  171. DoOpen();
  172. openEvent.Set();
  173. }
  174. public override void Connect()
  175. {
  176. if(_socket != null)
  177. {
  178. openEvent.Reset();
  179. _socket.Open();
  180. openEvent.Wait();
  181. }
  182. else
  183. {
  184. var tasks = new List<Task<WebSocket?>>();
  185. foreach (var url in _urls)
  186. {
  187. tasks.Add(Task.Run(() => CreateSocket(url, true)));
  188. tasks.Add(Task.Run(() => CreateSocket(url, false)));
  189. }
  190. while (tasks.Count > 0)
  191. {
  192. var result = Task.WhenAny(tasks).Result;
  193. if (result.Result == null)
  194. tasks.Remove(result);
  195. else
  196. {
  197. _socket = result.Result;
  198. _connected = true;
  199. /*Task.Run(() =>
  200. {
  201. while (IsConnected())
  202. {
  203. if (!DoReceive())
  204. {
  205. break;
  206. }
  207. }
  208. });*/
  209. return;
  210. }
  211. }
  212. }
  213. }
  214. public override bool IsConnected() => _connected;//_socket?.State == WebSocketState.Open;
  215. public override bool IsSecure() => _socket?.Security.Certificates.Count > 0;
  216. public override String? ServerName() => _host;
  217. public override void Disconnect()
  218. {
  219. _socket?.Close();
  220. //_socket?.CloseAsync(WebSocketCloseStatus.NormalClosure, "", _tokenSource.Token).Wait();
  221. }
  222. public override void Send(RpcMessage message)
  223. {
  224. var buffer = message.WriteBinary(BinarySerializationSettings.Latest);
  225. _socket?.Send(buffer, 0, buffer.Length);// ?.SendAsync(buffer, WebSocketMessageType.Binary, true, _tokenSource.Token)?.Wait();
  226. }
  227. protected override RpcClientTransport Clone() => new RpcClientSocketTransport(_urls);
  228. public void Dispose()
  229. {
  230. _socket?.Close();
  231. _socket?.Dispose();
  232. /*if (IsConnected())
  233. Disconnect();
  234. _tokenSource.Cancel();
  235. _socket?.Dispose();*/
  236. }
  237. }
  238. }