RPCClientTransport.cs 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. using System.Collections.Concurrent;
  2. using H.Pipes;
  3. using InABox.Core;
  4. namespace InABox.IPC
  5. {
  6. public class RPCClientTransport : IDisposable
  7. {
  8. private PipeClient<RPCMessage> Client;
  9. private ConcurrentDictionary<Guid, ManualResetEventSlim> Events = new();
  10. private ConcurrentDictionary<Guid, RPCMessage> Responses = new();
  11. private const int DefaultRequestTimeout = 5 * 60 * 1000; // 5 minutes
  12. public delegate void ConnectEvent();
  13. public delegate void DisconnectEvent();
  14. public bool Disconnected { get; private set; }
  15. public event ConnectEvent? OnConnect;
  16. public event DisconnectEvent? OnDisconnect;
  17. public RPCClientTransport(string name)
  18. {
  19. Client = new PipeClient<RPCMessage>(name);
  20. Client.Connected += Client_Connected;
  21. Client.Disconnected += Client_Disconnected;
  22. Client.MessageReceived += Client_MessageReceived;
  23. Client.ExceptionOccurred += Client_ExceptionOccurred;
  24. Client.ConnectAsync();
  25. }
  26. private void Client_ExceptionOccurred(object? sender, H.Pipes.Args.ExceptionEventArgs e)
  27. {
  28. Logger.Send(LogType.Error, "", $"Exception occured: {e.Exception.Message}");
  29. }
  30. public TResult Send<TCommand, TParameters, TResult>(TParameters properties) where TCommand : IRPCCommand<TParameters,TResult>
  31. {
  32. var request = new RPCMessage(
  33. new Guid(),
  34. typeof(TCommand).Name,
  35. Serialization.Serialize(properties)
  36. );
  37. var response = Send(request);
  38. if (response.Error != RPCError.NONE)
  39. throw new Exception($"Exception in {typeof(TCommand).Name}({request.RequestID}): {response.Error}");
  40. var result = Serialization.Deserialize<TResult>(response.Payload);
  41. if (result == null)
  42. throw new Exception($"{typeof(TCommand).Name}({request.RequestID}) returned NULL");
  43. return result;
  44. }
  45. public RPCMessage Send(RPCMessage request, int timeout = DefaultRequestTimeout)
  46. {
  47. var start = DateTime.Now;
  48. var ev = Queue(request.RequestID);
  49. Client.WriteAsync(request);
  50. var result = GetResult(request.RequestID, ev, timeout);
  51. return result;
  52. }
  53. public ManualResetEventSlim Queue(Guid id)
  54. {
  55. var ev = new ManualResetEventSlim();
  56. Events[id] = ev;
  57. return ev;
  58. }
  59. public RPCMessage GetResult(Guid id, ManualResetEventSlim ev, int timeout)
  60. {
  61. if (Responses.TryGetValue(id, out var result))
  62. {
  63. Responses.Remove(id, out result);
  64. Events.Remove(id, out ev);
  65. return result;
  66. }
  67. try
  68. {
  69. if (!ev.Wait(timeout))
  70. {
  71. return new RPCMessage(id,"","",RPCError.TIMEOUT);
  72. }
  73. }
  74. catch (Exception e)
  75. {
  76. Logger.Send(LogType.Error, "", e.Message);
  77. throw;
  78. }
  79. Responses.Remove(id, out result);
  80. Events.Remove(id, out ev);
  81. return result ?? new RPCMessage(id,"","",RPCError.UNKNOWN);
  82. }
  83. private void Client_MessageReceived(object? sender, H.Pipes.Args.ConnectionMessageEventArgs<RPCMessage?> e)
  84. {
  85. if (Events.TryGetValue(e.Message.RequestID, out var ev))
  86. {
  87. Responses[e.Message.RequestID] = e.Message;
  88. ev.Set();
  89. }
  90. else
  91. {
  92. Responses[e.Message.RequestID] = e.Message;
  93. }
  94. }
  95. private void Client_Connected(object? sender, H.Pipes.Args.ConnectionEventArgs<RPCMessage> e)
  96. {
  97. Logger.Send(LogType.Information, "", $"Connected to Pipe: {e.Connection.PipeName}");
  98. Disconnected = false;
  99. OnConnect?.Invoke();
  100. }
  101. private void Client_Disconnected(object? sender, H.Pipes.Args.ConnectionEventArgs<RPCMessage> e)
  102. {
  103. Logger.Send(LogType.Information, "", $"Disconnected from Pipe: {e.Connection.PipeName}");
  104. foreach (var ev in Events)
  105. {
  106. Responses.TryAdd(ev.Key, new RPCMessage(Guid.Empty,"","",RPCError.DISCONNECTED));
  107. ev.Value.Set();
  108. }
  109. Disconnected = true;
  110. OnDisconnect?.Invoke();
  111. }
  112. public void Dispose()
  113. {
  114. Client.DisposeAsync().AsTask().Wait();
  115. }
  116. ~RPCClientTransport()
  117. {
  118. Dispose();
  119. }
  120. }
  121. }