RPCClientSocketTransport.cs 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. using System;
  2. using System.Collections.Generic;
  3. using System.IO;
  4. using System.Linq;
  5. using System.Net.WebSockets;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using InABox.Clients;
  9. using InABox.Core;
  10. namespace InABox.Rpc
  11. {
  12. public class RpcClientSocketTransport : RpcClientTransport, IDisposable
  13. {
  14. private ClientWebSocket? _socket;
  15. private Task? readTask;
  16. private string? _host;
  17. private object _sendLock = new object();
  18. private CancellationTokenSource _tokenSource = new CancellationTokenSource();
  19. private string[] _urls;
  20. public RpcClientSocketTransport(string[] urls)
  21. {
  22. _urls = urls;
  23. }
  24. // Returns true if we are to continue the receive loop.
  25. private bool DoReceive()
  26. {
  27. if(_socket != null)
  28. {
  29. try
  30. {
  31. var buffer = new ArraySegment<byte>(new byte[1024]);
  32. using (var ms = new MemoryStream())
  33. {
  34. WebSocketReceiveResult result;
  35. do
  36. {
  37. var task = _socket.ReceiveAsync(buffer, _tokenSource.Token);
  38. task.Wait();
  39. result = task.Result;
  40. ms.Write(buffer.Array, buffer.Offset, result.Count);
  41. } while (!result.EndOfMessage);
  42. ms.Seek(0, SeekOrigin.Begin);
  43. if (result.MessageType == WebSocketMessageType.Close)
  44. {
  45. if (result.CloseStatus == WebSocketCloseStatus.NormalClosure)
  46. {
  47. Task.Run(() =>
  48. {
  49. DoClose(RpcTransportCloseEventType.Closed);
  50. });
  51. return false;
  52. }
  53. else
  54. {
  55. DoException(new Exception(result.CloseStatusDescription));
  56. Task.Run(() =>
  57. {
  58. DoClose(RpcTransportCloseEventType.Error);
  59. });
  60. return false;
  61. }
  62. }
  63. else
  64. {
  65. RpcMessage? rpcMessage = null;
  66. if (result.MessageType == WebSocketMessageType.Binary)
  67. {
  68. rpcMessage = Serialization.ReadBinary<RpcMessage>(ms, BinarySerializationSettings.Latest);
  69. }
  70. else if (result.MessageType == WebSocketMessageType.Text)
  71. {
  72. rpcMessage = Serialization.Deserialize<RpcMessage>(ms);
  73. }
  74. Accept(rpcMessage);
  75. }
  76. }
  77. }
  78. catch(Exception e)
  79. {
  80. DoException(e);
  81. if (!IsConnected())
  82. {
  83. Task.Run(() =>
  84. {
  85. DoClose(RpcTransportCloseEventType.Error);
  86. });
  87. }
  88. return false;
  89. }
  90. }
  91. return true;
  92. }
  93. private ClientWebSocket? CreateSocket(string url, bool secure)
  94. {
  95. var client = new ClientWebSocket();
  96. //WebsocketClient client;
  97. //WebSocket socket = null;
  98. var address = $"{(secure ? "wss" : "ws")}://{url}";
  99. try
  100. {
  101. client.ConnectAsync(new Uri(address), _tokenSource.Token).Wait();
  102. //socket = new WebSocket(address);
  103. }
  104. catch (Exception e)
  105. {
  106. client.Dispose();
  107. return null;
  108. }
  109. DoOpen();
  110. _host = url;
  111. // Time to wait before disconnect - the default meant that the client disconnected during debugging, since the ping would fail
  112. /*socket.WaitTime = TimeSpan.FromSeconds(20);
  113. socket.OnOpen -= Socket_OnOpen;
  114. socket.OnError -= Socket_OnError;
  115. socket.OnClose -= Socket_OnClose;
  116. socket.OnMessage -= Socket_OnMessage;
  117. socket.Connect();
  118. if (socket.ReadyState == WebSocketState.Open)
  119. {
  120. DoOpen();
  121. socket.OnOpen += Socket_OnOpen;
  122. socket.OnError += Socket_OnError;
  123. socket.OnClose += Socket_OnClose;
  124. socket.OnMessage += Socket_OnMessage;
  125. return socket;
  126. }*/
  127. return client;
  128. }
  129. public override void Connect()
  130. {
  131. _socket?.Dispose();
  132. var tasks = new List<Task<ClientWebSocket?>>();
  133. foreach (var url in _urls)
  134. {
  135. tasks.Add(Task.Run(() => CreateSocket(url, true)));
  136. tasks.Add(Task.Run(() => CreateSocket(url, false)));
  137. }
  138. while (tasks.Count > 0)
  139. {
  140. var result = Task.WhenAny(tasks).Result;
  141. if (result.Result == null)
  142. tasks.Remove(result);
  143. else
  144. {
  145. _socket = result.Result;
  146. Task.Run(() =>
  147. {
  148. while (IsConnected())
  149. {
  150. if (!DoReceive())
  151. {
  152. break;
  153. }
  154. }
  155. });
  156. return;
  157. }
  158. }
  159. }
  160. public override bool IsConnected() => _socket?.State == WebSocketState.Open;
  161. public override bool IsSecure() => false;
  162. public override String? ServerName() => _host;
  163. public override void Disconnect()
  164. {
  165. _socket?.CloseAsync(WebSocketCloseStatus.NormalClosure, "", _tokenSource.Token).Wait();
  166. }
  167. public override void Send(RpcMessage message)
  168. {
  169. lock (_sendLock)
  170. {
  171. var buffer = message.WriteBinary(BinarySerializationSettings.Latest);
  172. _socket?.SendAsync(buffer, WebSocketMessageType.Binary, true, _tokenSource.Token)?.Wait();
  173. }
  174. }
  175. protected override RpcClientTransport Clone() => new RpcClientSocketTransport(_urls);
  176. public void Dispose()
  177. {
  178. if (IsConnected())
  179. Disconnect();
  180. _tokenSource.Cancel();
  181. _socket?.Dispose();
  182. }
  183. }
  184. }