RPCClientSocketTransport.cs 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using InABox.Clients;
  7. using InABox.Core;
  8. using WebSocketSharp;
  9. using ErrorEventArgs = WebSocketSharp.ErrorEventArgs;
  10. namespace InABox.Rpc
  11. {
  12. public class RpcClientSocketTransport : RpcClientTransport, IDisposable
  13. {
  14. private WebSocket? _socket;
  15. private string[] _urls;
  16. public RpcClientSocketTransport(string[] urls)
  17. {
  18. _urls = urls;
  19. }
  20. private void Socket_OnOpen(object? sender, EventArgs e)
  21. {
  22. DoOpen();
  23. }
  24. private void Socket_OnMessage(object? sender, MessageEventArgs e)
  25. {
  26. RpcMessage? message = null;
  27. if (e.IsBinary && (e.RawData != null))
  28. message = Serialization.ReadBinary<RpcMessage>(e.RawData, BinarySerializationSettings.Latest);
  29. else if (e.IsText && !string.IsNullOrWhiteSpace(e.Data))
  30. message = Serialization.Deserialize<RpcMessage>(e.Data);
  31. Accept(message);
  32. }
  33. private void Socket_OnClose(object? sender, CloseEventArgs e)
  34. {
  35. DoClose(RpcTransportCloseEventType.Closed);
  36. }
  37. private void Socket_OnError(object? sender, ErrorEventArgs e)
  38. {
  39. DoException(e.Exception);
  40. }
  41. private WebSocket? CreateSocket(string url, bool secure)
  42. {
  43. var address = $"{(secure ? "wss" : "ws")}://{url}";
  44. var socket = new WebSocket(address);
  45. // Time to wait before disconnect - the default meant that the client disconnected during debugging, since the ping would fail
  46. socket.WaitTime = TimeSpan.FromSeconds(60);
  47. socket.OnOpen -= Socket_OnOpen;
  48. socket.OnError -= Socket_OnError;
  49. socket.OnClose -= Socket_OnClose;
  50. socket.OnMessage -= Socket_OnMessage;
  51. socket.Connect();
  52. if (socket.ReadyState == WebSocketState.Open)
  53. {
  54. DoOpen();
  55. socket.OnOpen += Socket_OnOpen;
  56. socket.OnError += Socket_OnError;
  57. socket.OnClose += Socket_OnClose;
  58. socket.OnMessage += Socket_OnMessage;
  59. return socket;
  60. }
  61. return null;
  62. }
  63. public override void Connect()
  64. {
  65. List<Task<WebSocket>> tasks = new List<Task<WebSocket>>();
  66. foreach (var url in _urls)
  67. {
  68. tasks.Add(Task<WebSocket>.Run(() => CreateSocket(url, true)));
  69. tasks.Add(Task<WebSocket>.Run(() => CreateSocket(url, false)));
  70. }
  71. while (tasks.Count > 0)
  72. {
  73. var result = Task.WhenAny(tasks).Result;
  74. if (result.Result == null)
  75. tasks.Remove(result);
  76. else
  77. {
  78. _socket = result.Result;
  79. return;
  80. }
  81. }
  82. }
  83. public override bool IsConnected() => _socket?.ReadyState == WebSocketState.Open;
  84. public override bool IsSecure() => _socket?.IsSecure == true;
  85. public override void Disconnect()
  86. {
  87. _socket?.Close(CloseStatusCode.Normal);
  88. }
  89. public override void Send(RpcMessage message)
  90. {
  91. var buffer = message.WriteBinary(BinarySerializationSettings.Latest);
  92. _socket?.Send(buffer);
  93. }
  94. protected override RpcClientTransport Clone() => new RpcClientSocketTransport(_urls);
  95. public void Dispose()
  96. {
  97. if (IsConnected())
  98. Disconnect();
  99. }
  100. }
  101. }