RPCClientSocketTransport.cs 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using InABox.Clients;
  7. using InABox.Core;
  8. using WebSocketSharp;
  9. using ErrorEventArgs = WebSocketSharp.ErrorEventArgs;
  10. namespace InABox.Rpc
  11. {
  12. public class RpcClientSocketTransport : RpcClientTransport, IDisposable
  13. {
  14. private WebSocket? _socket;
  15. private string[] _urls;
  16. public RpcClientSocketTransport(string[] urls)
  17. {
  18. _urls = urls;
  19. }
  20. private void Socket_OnOpen(object? sender, EventArgs e)
  21. {
  22. DoOpen();
  23. }
  24. private void Socket_OnMessage(object? sender, MessageEventArgs e)
  25. {
  26. RpcMessage? message = null;
  27. if (e.IsBinary && (e.RawData != null))
  28. message = Serialization.ReadBinary<RpcMessage>(e.RawData, BinarySerializationSettings.Latest);
  29. else if (e.IsText && !string.IsNullOrWhiteSpace(e.Data))
  30. message = Serialization.Deserialize<RpcMessage>(e.Data);
  31. Accept(message);
  32. }
  33. private void Socket_OnClose(object? sender, CloseEventArgs e)
  34. {
  35. DoClose(RpcTransportCloseEventType.Closed);
  36. }
  37. private void Socket_OnError(object? sender, ErrorEventArgs e)
  38. {
  39. DoException(e.Exception);
  40. }
  41. private WebSocket? CreateSocket(string url, bool secure)
  42. {
  43. WebSocket socket = null;
  44. var address = $"{(secure ? "wss" : "ws")}://{url}";
  45. try
  46. {
  47. socket = new WebSocket(address);
  48. }
  49. catch (Exception e)
  50. {
  51. return null;
  52. }
  53. // Time to wait before disconnect - the default meant that the client disconnected during debugging, since the ping would fail
  54. socket.WaitTime = TimeSpan.FromSeconds(20);
  55. socket.OnOpen -= Socket_OnOpen;
  56. socket.OnError -= Socket_OnError;
  57. socket.OnClose -= Socket_OnClose;
  58. socket.OnMessage -= Socket_OnMessage;
  59. socket.Connect();
  60. if (socket.ReadyState == WebSocketState.Open)
  61. {
  62. DoOpen();
  63. socket.OnOpen += Socket_OnOpen;
  64. socket.OnError += Socket_OnError;
  65. socket.OnClose += Socket_OnClose;
  66. socket.OnMessage += Socket_OnMessage;
  67. return socket;
  68. }
  69. return null;
  70. }
  71. public override void Connect()
  72. {
  73. List<Task<WebSocket>> tasks = new List<Task<WebSocket>>();
  74. foreach (var url in _urls)
  75. {
  76. tasks.Add(Task<WebSocket>.Run(() => CreateSocket(url, true)));
  77. tasks.Add(Task<WebSocket>.Run(() => CreateSocket(url, false)));
  78. }
  79. while (tasks.Count > 0)
  80. {
  81. var result = Task.WhenAny(tasks).Result;
  82. if (result.Result == null)
  83. tasks.Remove(result);
  84. else
  85. {
  86. _socket = result.Result;
  87. return;
  88. }
  89. }
  90. }
  91. public override bool IsConnected() => _socket?.ReadyState == WebSocketState.Open;
  92. public override bool IsSecure() => _socket?.IsSecure == true;
  93. public override void Disconnect()
  94. {
  95. _socket?.Close(CloseStatusCode.Normal);
  96. }
  97. public override void Send(RpcMessage message)
  98. {
  99. var buffer = message.WriteBinary(BinarySerializationSettings.Latest);
  100. _socket?.Send(buffer);
  101. }
  102. protected override RpcClientTransport Clone() => new RpcClientSocketTransport(_urls);
  103. public void Dispose()
  104. {
  105. if (IsConnected())
  106. Disconnect();
  107. }
  108. }
  109. }