| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305 | using System;using System.Collections.Generic;using System.IO;using System.Linq;using System.Threading;using System.Threading.Tasks;using Expressive;using InABox.Clients;using InABox.Core;using WebSocket4Net;namespace InABox.Rpc{    public class RpcClientSocketTransport : RpcClientTransport, IDisposable    {        private WebSocket? _socket;        private Task? readTask;        private ManualResetEventSlim openEvent = new ManualResetEventSlim();        private string? _host;        private bool _connected = false;                private string[] _urls;        public string? Host => _host;        public RpcClientSocketTransport(string[] urls)        {            _urls = urls;        }        // Returns true if we are to continue the receive loop.        /*private bool DoReceive()        {            if(_socket != null)            {                try                {                    var buffer = new ArraySegment<byte>(new byte[1024]);                    using (var ms = new MemoryStream())                    {                        WebSocketReceiveResult result;                        do                        {                            var task = _socket.ReceiveAsync(buffer, _tokenSource.Token);                            task.Wait();                            result = task.Result;                            ms.Write(buffer.Array, buffer.Offset, result.Count);                        } while (!result.EndOfMessage);                        ms.Seek(0, SeekOrigin.Begin);                        if (result.MessageType == WebSocketMessageType.Close)                        {                            if (result.CloseStatus == WebSocketCloseStatus.NormalClosure)                            {                                Task.Run(() =>                                {                                    DoClose(RpcTransportCloseEventType.Closed);                                });                                return false;                            }                            else                            {                                DoException(new Exception(result.CloseStatusDescription));                                Task.Run(() =>                                {                                    DoClose(RpcTransportCloseEventType.Error);                                });                                return false;                            }                        }                        else                        {                            RpcMessage? rpcMessage = null;                            if (result.MessageType == WebSocketMessageType.Binary)                            {                                rpcMessage = Serialization.ReadBinary<RpcMessage>(ms, BinarySerializationSettings.Latest);                            }                            else if (result.MessageType == WebSocketMessageType.Text)                            {                                rpcMessage = Serialization.Deserialize<RpcMessage>(ms);                            }                            Accept(rpcMessage);                        }                    }                }                catch(Exception e)                {                    DoException(e);                    if (!IsConnected())                    {                        Task.Run(() =>                        {                            DoClose(RpcTransportCloseEventType.Error);                        });                    }                    return false;                }            }            return true;        }*/                private WebSocket? CreateSocket(string url, bool secure, CancellationToken ct)        {            WebSocket? client = null;                        var address = $"{(secure ? "wss" : "ws")}://{url}";            try            {                client = new WebSocket(address);                var openEvent = new ManualResetEventSlim();                var open = false;                void onOpen(object s, EventArgs e)                {                    open = true;                    openEvent.Set();                }                void onClose(object s, EventArgs e)                {                    openEvent.Set();                }                client.Opened += onOpen;                client.Closed += onClose;                client.Error += Client_Error;                client.MessageReceived += Client_MessageReceived;                client.DataReceived += Client_DataReceived;                client.Open();                try                {                    openEvent.Wait(ct);                }                catch (OperationCanceledException)                {                    client.Dispose();                    return null;                }                if (!open)                {                    client.Dispose();                    return null;                }                client.Opened -= onOpen;                client.Closed -= onClose;                client.Opened += Client_Opened;                client.Closed += Client_Closed;                DoOpen();                _host = url;                // Time to wait before disconnect - the default meant that the client disconnected during debugging, since the ping would fail                /*socket.WaitTime = TimeSpan.FromSeconds(20);                socket.OnOpen -= Socket_OnOpen;                socket.OnError -= Socket_OnError;                socket.OnClose -= Socket_OnClose;                socket.OnMessage -= Socket_OnMessage;                socket.Connect();                if (socket.ReadyState == WebSocketState.Open)                {                    DoOpen();                    socket.OnOpen += Socket_OnOpen;                    socket.OnError += Socket_OnError;                    socket.OnClose += Socket_OnClose;                    socket.OnMessage += Socket_OnMessage;                    return socket;                }*/            }            catch (Exception e)            {                Logger.Send(LogType.Error, ClientFactory.UserID, $"Error in CreateSocket(): {e.Message}");                throw;            }                        return client;        }        private void Client_Closed(object sender, EventArgs e)        {            var wasConnected = _connected;            _connected = false;            openEvent.Set();            if (wasConnected)            {                DoClose(RpcTransportCloseEventType.Closed);            }        }        private void Client_DataReceived(object sender, DataReceivedEventArgs e)        {            var rpcMessage = Serialization.ReadBinary<RpcMessage>(e.Data, BinarySerializationSettings.Latest);            Accept(rpcMessage);        }        private void Client_MessageReceived(object sender, MessageReceivedEventArgs e)        {            var rpcMessage = Serialization.Deserialize<RpcMessage>(e.Message);            Accept(rpcMessage);        }        private void Client_Error(object sender, SuperSocket.ClientEngine.ErrorEventArgs e)        {            DoException(e.Exception);        }        private void Client_Opened(object sender, EventArgs e)        {            _connected = true;            DoOpen();            openEvent.Set();        }        public override bool Connect(CancellationToken ct = default)        {            if (_socket != null)            {                openEvent.Reset();                _socket.Open();                openEvent.Wait(ct);                return _connected;            }            else            {                var childCts = CancellationTokenSource.CreateLinkedTokenSource(ct);                var tasks = new List<Task<WebSocket?>>();                foreach (var url in _urls)                {                    tasks.Add(Task.Run(() => CreateSocket(url, true, childCts.Token)));                    tasks.Add(Task.Run(() => CreateSocket(url, false, childCts.Token)));                }                while (tasks.Count > 0)                {                    var result = Task.WhenAny(tasks).Result;                    if (result.Result == null)                        tasks.Remove(result);                    else                    {                        childCts.Cancel();                        if(_socket != null)                        {                            throw new Exception("Socket already exists!");                        }                        _socket = result.Result;                        _connected = true;                        /*Task.Run(() =>                        {                            while (IsConnected())                            {                                if (!DoReceive())                                {                                    break;                                }                            }                        });*/                        return _connected;                    }                }                return _connected;            }        }        public override bool IsConnected() => _socket?.State == WebSocketState.Open;        public override bool IsSecure() => _socket?.Security.Certificates.Count > 0;        public override String? ServerName() => _host;                public override void Disconnect()        {            _socket?.Close();            //_socket?.CloseAsync(WebSocketCloseStatus.NormalClosure, "", _tokenSource.Token).Wait();        }        public override void Send(RpcMessage message)        {            var buffer = message.WriteBinary(BinarySerializationSettings.Latest);            _socket?.Send(buffer, 0, buffer.Length);//  ?.SendAsync(buffer, WebSocketMessageType.Binary, true, _tokenSource.Token)?.Wait();        }        protected override RpcClientTransport Clone() => new RpcClientSocketTransport(_urls);        public void Dispose()        {            _socket?.Close();            _socket?.Dispose();            /*if (IsConnected())                Disconnect();            _tokenSource.Cancel();            _socket?.Dispose();*/        }    }}
 |