using System.Collections.Concurrent; using H.Pipes; using InABox.Core; namespace InABox.IPC { public class RPCClientTransport : IDisposable { private PipeClient Client; private ConcurrentDictionary Events = new(); private ConcurrentDictionary Responses = new(); private const int DefaultRequestTimeout = 5 * 60 * 1000; // 5 minutes public delegate void ConnectEvent(); public delegate void DisconnectEvent(); public bool Disconnected { get; private set; } public event ConnectEvent? OnConnect; public event DisconnectEvent? OnDisconnect; public RPCClientTransport(string name) { Client = new PipeClient(name); Client.Connected += Client_Connected; Client.Disconnected += Client_Disconnected; Client.MessageReceived += Client_MessageReceived; Client.ExceptionOccurred += Client_ExceptionOccurred; Client.ConnectAsync(); } private void Client_ExceptionOccurred(object? sender, H.Pipes.Args.ExceptionEventArgs e) { Logger.Send(LogType.Error, "", $"Exception occured: {e.Exception.Message}"); } public TResult Send(TParameters properties) where TCommand : IRPCCommand { var request = new RPCMessage( new Guid(), typeof(TCommand).Name, Serialization.Serialize(properties) ); var response = Send(request); if (response.Error != RPCError.NONE) throw new Exception($"Exception in {typeof(TCommand).Name}({request.RequestID}): {response.Error}"); var result = Serialization.Deserialize(response.Payload); if (result == null) throw new Exception($"{typeof(TCommand).Name}({request.RequestID}) returned NULL"); return result; } public RPCMessage Send(RPCMessage request, int timeout = DefaultRequestTimeout) { var start = DateTime.Now; var ev = Queue(request.RequestID); Client.WriteAsync(request); var result = GetResult(request.RequestID, ev, timeout); return result; } public ManualResetEventSlim Queue(Guid id) { var ev = new ManualResetEventSlim(); Events[id] = ev; return ev; } public RPCMessage GetResult(Guid id, ManualResetEventSlim ev, int timeout) { if (Responses.TryGetValue(id, out var result)) { Responses.Remove(id, out result); Events.Remove(id, out ev); return result; } try { if (!ev.Wait(timeout)) { return new RPCMessage(id,"","",RPCError.TIMEOUT); } } catch (Exception e) { Logger.Send(LogType.Error, "", e.Message); throw; } Responses.Remove(id, out result); Events.Remove(id, out ev); return result ?? new RPCMessage(id,"","",RPCError.UNKNOWN); } private void Client_MessageReceived(object? sender, H.Pipes.Args.ConnectionMessageEventArgs e) { if (Events.TryGetValue(e.Message.RequestID, out var ev)) { Responses[e.Message.RequestID] = e.Message; ev.Set(); } else { Responses[e.Message.RequestID] = e.Message; } } private void Client_Connected(object? sender, H.Pipes.Args.ConnectionEventArgs e) { Logger.Send(LogType.Information, "", $"Connected to Pipe: {e.Connection.PipeName}"); Disconnected = false; OnConnect?.Invoke(); } private void Client_Disconnected(object? sender, H.Pipes.Args.ConnectionEventArgs e) { Logger.Send(LogType.Information, "", $"Disconnected from Pipe: {e.Connection.PipeName}"); foreach (var ev in Events) { Responses.TryAdd(ev.Key, new RPCMessage(Guid.Empty,"","",RPCError.DISCONNECTED)); ev.Value.Set(); } Disconnected = true; OnDisconnect?.Invoke(); } public void Dispose() { Client.DisposeAsync().AsTask().Wait(); } ~RPCClientTransport() { Dispose(); } } }