Browse Source

New RPC websocket library

Kenric Nugteren 1 year ago
parent
commit
1cad93d10a

+ 1 - 0
InABox.Client.RPC/InABox.Client.RPC.csproj

@@ -15,6 +15,7 @@
     <ItemGroup>
       <PackageReference Include="H.Pipes" Version="2.0.51" />
       <PackageReference Include="Websocket.Client" Version="4.6.1" />
+      <PackageReference Include="WebSocket4Net" Version="0.15.2" />
     </ItemGroup>
 
 </Project>

+ 110 - 45
InABox.Client.RPC/Transports/Socket/RPCClientSocketTransport.cs

@@ -2,22 +2,22 @@ using System;
 using System.Collections.Generic;
 using System.IO;
 using System.Linq;
-using System.Net.WebSockets;
 using System.Threading;
 using System.Threading.Tasks;
 using InABox.Clients;
 using InABox.Core;
+using WebSocket4Net;
 
 namespace InABox.Rpc
 {
     public class RpcClientSocketTransport : RpcClientTransport, IDisposable
     {
-        private ClientWebSocket? _socket;
+        private WebSocket? _socket;
         private Task? readTask;
+        private ManualResetEventSlim openEvent = new ManualResetEventSlim();
 
         private string? _host;
-        private object _sendLock = new object();
-        private CancellationTokenSource _tokenSource = new CancellationTokenSource();
+        private bool _connected = false;
         
         private string[] _urls;
 
@@ -27,7 +27,7 @@ namespace InABox.Rpc
         }
 
         // Returns true if we are to continue the receive loop.
-        private bool DoReceive()
+        /*private bool DoReceive()
         {
             if(_socket != null)
             {
@@ -97,27 +97,49 @@ namespace InABox.Rpc
                 }
             }
             return true;
-        }
+        }*/
         
-        private ClientWebSocket? CreateSocket(string url, bool secure)
+        private WebSocket? CreateSocket(string url, bool secure)
         {
-            var client = new ClientWebSocket();
-
             //WebsocketClient client;
 
             //WebSocket socket = null;
             var address = $"{(secure ? "wss" : "ws")}://{url}";
-            try
+            var client = new WebSocket(address);
+
+            var openEvent = new ManualResetEventSlim();
+            var open = false;
+            EventHandler onOpen = (s, e) =>
             {
-                client.ConnectAsync(new Uri(address), _tokenSource.Token).Wait();
-                //socket = new WebSocket(address);
-            }
-            catch (Exception e)
+                open = true;
+                openEvent.Set();
+            };
+            EventHandler onClose = (s, e) =>
+            {
+                openEvent.Set();
+            };
+
+            client.Opened += onOpen;
+            client.Closed += onClose;
+
+            client.Error += Client_Error;
+            client.MessageReceived += Client_MessageReceived;
+            client.DataReceived += Client_DataReceived;
+            client.Open();
+
+            openEvent.Wait();
+
+            if (!open)
             {
-                client.Dispose();
                 return null;
             }
 
+            client.Opened -= onOpen;
+            client.Closed -= onClose;
+
+            client.Opened += Client_Opened;
+            client.Closed += Client_Closed;
+
             DoOpen();
             _host = url;
 
@@ -142,68 +164,111 @@ namespace InABox.Rpc
             return client;
         }
 
-        public override void Connect()
+        private void Client_Closed(object sender, EventArgs e)
         {
-            _socket?.Dispose();
+            openEvent.Set();
+            if (_connected)
+            {
+                _connected = false;
+                DoClose(RpcTransportCloseEventType.Closed);
+            }
+        }
+
+        private void Client_DataReceived(object sender, DataReceivedEventArgs e)
+        {
+            var rpcMessage = Serialization.ReadBinary<RpcMessage>(e.Data, BinarySerializationSettings.Latest);
+            Accept(rpcMessage);
+        }
+
+        private void Client_MessageReceived(object sender, MessageReceivedEventArgs e)
+        {
+            var rpcMessage = Serialization.Deserialize<RpcMessage>(e.Message);
+            Accept(rpcMessage);
+        }
 
-            var tasks = new List<Task<ClientWebSocket?>>();
-            foreach (var url in _urls)
+        private void Client_Error(object sender, SuperSocket.ClientEngine.ErrorEventArgs e)
+        {
+            DoException(e.Exception);
+        }
+
+        private void Client_Opened(object sender, EventArgs e)
+        {
+            _connected = true;
+            DoOpen();
+            openEvent.Set();
+        }
+
+        public override void Connect()
+        {
+            if(_socket != null)
             {
-                tasks.Add(Task.Run(() => CreateSocket(url, true)));
-                tasks.Add(Task.Run(() => CreateSocket(url, false)));
+                openEvent.Reset();
+                _socket.Open();
+                openEvent.Wait();
             }
-            while (tasks.Count > 0)
+            else
             {
-                var result = Task.WhenAny(tasks).Result;
-                if (result.Result == null)
-                    tasks.Remove(result);
-                else
+                var tasks = new List<Task<WebSocket?>>();
+                foreach (var url in _urls)
                 {
-                    _socket = result.Result;
-
-                    Task.Run(() =>
+                    tasks.Add(Task.Run(() => CreateSocket(url, true)));
+                    tasks.Add(Task.Run(() => CreateSocket(url, false)));
+                }
+                while (tasks.Count > 0)
+                {
+                    var result = Task.WhenAny(tasks).Result;
+                    if (result.Result == null)
+                        tasks.Remove(result);
+                    else
                     {
-                        while (IsConnected())
+                        _socket = result.Result;
+                        _connected = true;
+
+                        /*Task.Run(() =>
                         {
-                            if (!DoReceive())
+                            while (IsConnected())
                             {
-                                break;
+                                if (!DoReceive())
+                                {
+                                    break;
+                                }
                             }
-                        }
-                    });
+                        });*/
 
-                    return;
+                        return;
+                    }
                 }
             }
+
         }
 
-        public override bool IsConnected() => _socket?.State == WebSocketState.Open;
-        public override bool IsSecure() => false;
+        public override bool IsConnected() => _connected;//_socket?.State == WebSocketState.Open;
+        public override bool IsSecure() => _socket?.Security.Certificates.Count > 0;
 
         public override String? ServerName() => _host;
         
         public override void Disconnect()
         {
-            _socket?.CloseAsync(WebSocketCloseStatus.NormalClosure, "", _tokenSource.Token).Wait();
+            _socket?.Close();
+            //_socket?.CloseAsync(WebSocketCloseStatus.NormalClosure, "", _tokenSource.Token).Wait();
         }
 
         public override void Send(RpcMessage message)
         {
-            lock (_sendLock)
-            {
-                var buffer = message.WriteBinary(BinarySerializationSettings.Latest);
-                _socket?.SendAsync(buffer, WebSocketMessageType.Binary, true, _tokenSource.Token)?.Wait();
-            }
+            var buffer = message.WriteBinary(BinarySerializationSettings.Latest);
+            _socket?.Send(buffer, 0, buffer.Length);//  ?.SendAsync(buffer, WebSocketMessageType.Binary, true, _tokenSource.Token)?.Wait();
         }
 
         protected override RpcClientTransport Clone() => new RpcClientSocketTransport(_urls);
 
         public void Dispose()
         {
-            if (IsConnected())
+            _socket?.Close();
+            _socket?.Dispose();
+            /*if (IsConnected())
                 Disconnect();
             _tokenSource.Cancel();
-            _socket?.Dispose();
+            _socket?.Dispose();*/
         }
     }
 }