using H.Pipes; using InABox.Core; using InABox.IPC; using System.Collections.Concurrent; using H.Formatters; namespace InABox.Client.IPC { public class IPCClientTransport : IDisposable { private PipeClient Client; private ConcurrentDictionary Events = new(); private ConcurrentDictionary Responses = new(); private const int DefaultRequestTimeout = 5 * 60 * 1000; // 5 minutes public delegate void ConnectEvent(); public delegate void DisconnectEvent(); /// /// A handler for any requests pushed from the server, i.e., not initialised by the client. /// public delegate void PushEvent(IPCMessage request); public bool Disconnected { get; private set; } public event ConnectEvent? OnConnect; public event DisconnectEvent? OnDisconnect; public event PushEvent? OnPush; public IPCClientTransport(string pipeName) { Client = new PipeClient(pipeName, formatter:new BinaryFormatter()); Client.Connected += Client_Connected; Client.Disconnected += Client_Disconnected; Client.MessageReceived += Client_MessageReceived; Client.ExceptionOccurred += Client_ExceptionOccurred; Client.ConnectAsync(); } private void Client_ExceptionOccurred(object? sender, H.Pipes.Args.ExceptionEventArgs e) { Logger.Send(LogType.Error, "", $"Exception occured: {e.Exception.Message}"); } public IPCMessage Send(IPCMessage request, int timeout = DefaultRequestTimeout) { var ev = Queue(request.RequestID); Client.WriteAsync(request); return GetResult(request.RequestID, ev, timeout); } public ManualResetEventSlim Queue(Guid id) { var ev = new ManualResetEventSlim(); Events[id] = ev; return ev; } public IPCMessage GetResult(Guid id, ManualResetEventSlim ev, int timeout) { if (Responses.Remove(id, out var result)) { Events.Remove(id, out ev); return result; } try { if (!ev.Wait(timeout)) { return IPCMessage.Error(RequestError.TIMEOUT); } } catch (Exception e) { Console.WriteLine(e); throw; } Responses.Remove(id, out result); Events.Remove(id, out ev); return result ?? IPCMessage.Error(RequestError.UNKNOWN); } private void Client_MessageReceived(object? sender, H.Pipes.Args.ConnectionMessageEventArgs e) { if (Events.TryGetValue(e.Message.RequestID, out var ev)) { Responses[e.Message.RequestID] = e.Message; ev.Set(); } else { Task.Run(() => { OnPush?.Invoke(e.Message); }).ContinueWith(task => { if (task.Exception != null) { Logger.Send(LogType.Error, "", $"Error in IPC Client Push: {CoreUtils.FormatException(task.Exception)}"); } }); } } private void Client_Connected(object? sender, H.Pipes.Args.ConnectionEventArgs e) { Logger.Send(LogType.Information, "", $"Connected to Pipe: {e.Connection.PipeName}"); Disconnected = false; OnConnect?.Invoke(); } private void Client_Disconnected(object? sender, H.Pipes.Args.ConnectionEventArgs e) { Logger.Send(LogType.Information, "", $"Disconnected from Pipe: {e.Connection.PipeName}"); foreach (var ev in Events) { Responses.TryAdd(ev.Key, IPCMessage.Error(RequestError.DISCONNECTED)); ev.Value.Set(); } Disconnected = true; OnDisconnect?.Invoke(); } public void Dispose() { Client.DisposeAsync().AsTask().Wait(); } ~IPCClientTransport() { Dispose(); } } }