using System.Collections.Concurrent; using InABox.Core; namespace InABox.IPC { public abstract class RPCClientTransport : IRPCClientTransport { private RPCClientSession _session = new RPCClientSession(); private ConcurrentDictionary Events = new(); private ConcurrentDictionary Responses = new(); private const int DefaultRequestTimeout = 5 * 60 * 1000; // 5 minutes public event RPCTransportOpenEvent? OnOpen; protected void DoOpen() => OnOpen?.Invoke(this, new RPCTransportOpenArgs(_session)); public event RPCTransportCloseEvent? OnClose; protected void DoClose(RPCTransportCloseEventType type) { foreach (var ev in Events) { Responses.TryAdd(ev.Key, new RPCMessage(Guid.Empty,"","",RPCError.DISCONNECTED)); ev.Value.Set(); } OnClose?.Invoke(this, new RPCTransportCloseArgs(_session, type)); } public event RPCTransportExceptionEvent? OnException; protected void DoException(Exception e) => OnException?.Invoke(this, new RPCTransportExceptionArgs(_session, e)); public event RPCTransportMessageEvent? OnMessage; protected void DoMessage(RPCMessage message) => OnMessage?.Invoke(this, new RPCTransportMessageArgs(_session, message)); public abstract void Connect(); public abstract void Send(RPCMessage message); public void Accept(RPCMessage? message) { if (message == null) return; if (Events.TryGetValue(message.ID, out var ev)) { Responses[message.ID] = message; ev.Set(); } else Responses[message.ID] = message; } public abstract void Disconnect(); public TResult Send(TParameters properties) where TCommand : IRPCCommand { var request = new RPCMessage( Guid.NewGuid(), typeof(TCommand).Name, Serialization.Serialize(properties) ); var ev = Queue(request.ID); Send(request); var response = GetResult(request.ID, ev, DefaultRequestTimeout); if (response == null) throw new Exception($"{typeof(TCommand).Name}({request.ID}) returned NULL"); if (response.Error != RPCError.NONE) throw new Exception($"Exception in {typeof(TCommand).Name}({request.ID}): {response.Error}"); var result = Serialization.Deserialize(response.Payload); if (result == null) throw new Exception($"Cannot Deserialize {typeof(TCommand).Name}({request.ID})"); return result; } public ManualResetEventSlim Queue(Guid id) { var ev = new ManualResetEventSlim(); Events[id] = ev; return ev; } public RPCMessage? GetResult(Guid id, ManualResetEventSlim ev, int timeout) { if (Responses.TryGetValue(id, out var result)) { Responses.Remove(id, out result); Events.Remove(id, out ev); return result; } try { if (!ev.Wait(timeout)) { return new RPCMessage(id,"","",RPCError.TIMEOUT); } } catch (Exception e) { Logger.Send(LogType.Error, "", e.Message); throw; } Responses.Remove(id, out result); Events.Remove(id, out ev); return result ?? new RPCMessage(id,"","",RPCError.UNKNOWN); } } }