RPCClientTransport.cs 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. using System.Collections.Concurrent;
  2. using InABox.Core;
  3. namespace InABox.IPC
  4. {
  5. public abstract class RPCClientTransport : IRPCClientTransport
  6. {
  7. private RPCClientSession _session = new RPCClientSession();
  8. private ConcurrentDictionary<Guid, ManualResetEventSlim> Events = new();
  9. private ConcurrentDictionary<Guid, RPCMessage> Responses = new();
  10. private const int DefaultRequestTimeout = 5 * 60 * 1000; // 5 minutes
  11. public event RPCTransportOpenEvent? OnOpen;
  12. protected void DoOpen() => OnOpen?.Invoke(this, new RPCTransportOpenArgs(_session));
  13. public event RPCTransportCloseEvent? OnClose;
  14. protected void DoClose(RPCTransportCloseEventType type)
  15. {
  16. foreach (var ev in Events)
  17. {
  18. Responses.TryAdd(ev.Key, new RPCMessage(Guid.Empty,"","",RPCError.DISCONNECTED));
  19. ev.Value.Set();
  20. }
  21. OnClose?.Invoke(this, new RPCTransportCloseArgs(_session, type));
  22. }
  23. public event RPCTransportExceptionEvent? OnException;
  24. protected void DoException(Exception e) => OnException?.Invoke(this, new RPCTransportExceptionArgs(_session, e));
  25. public event RPCTransportMessageEvent? OnMessage;
  26. protected void DoMessage(RPCMessage message) => OnMessage?.Invoke(this, new RPCTransportMessageArgs(_session, message));
  27. public abstract void Connect();
  28. public abstract void Send(RPCMessage message);
  29. public void Accept(RPCMessage? message)
  30. {
  31. if (message == null)
  32. return;
  33. if (Events.TryGetValue(message.ID, out var ev))
  34. {
  35. Responses[message.ID] = message;
  36. ev.Set();
  37. }
  38. else
  39. Responses[message.ID] = message;
  40. }
  41. public abstract void Disconnect();
  42. public TResult Send<TCommand, TParameters, TResult>(TParameters properties) where TCommand : IRPCCommand<TParameters,TResult>
  43. {
  44. var request = new RPCMessage(
  45. Guid.NewGuid(),
  46. typeof(TCommand).Name,
  47. Serialization.Serialize(properties)
  48. );
  49. var ev = Queue(request.ID);
  50. Send(request);
  51. var response = GetResult(request.ID, ev, DefaultRequestTimeout);
  52. if (response == null)
  53. throw new Exception($"{typeof(TCommand).Name}({request.ID}) returned NULL");
  54. if (response.Error != RPCError.NONE)
  55. throw new Exception($"Exception in {typeof(TCommand).Name}({request.ID}): {response.Error}");
  56. var result = Serialization.Deserialize<TResult>(response.Payload);
  57. if (result == null)
  58. throw new Exception($"Cannot Deserialize {typeof(TCommand).Name}({request.ID})");
  59. return result;
  60. }
  61. public ManualResetEventSlim Queue(Guid id)
  62. {
  63. var ev = new ManualResetEventSlim();
  64. Events[id] = ev;
  65. return ev;
  66. }
  67. public RPCMessage? GetResult(Guid id, ManualResetEventSlim ev, int timeout)
  68. {
  69. if (Responses.TryGetValue(id, out var result))
  70. {
  71. Responses.Remove(id, out result);
  72. Events.Remove(id, out ev);
  73. return result;
  74. }
  75. try
  76. {
  77. if (!ev.Wait(timeout))
  78. {
  79. return new RPCMessage(id,"","",RPCError.TIMEOUT);
  80. }
  81. }
  82. catch (Exception e)
  83. {
  84. Logger.Send(LogType.Error, "", e.Message);
  85. throw;
  86. }
  87. Responses.Remove(id, out result);
  88. Events.Remove(id, out ev);
  89. return result ?? new RPCMessage(id,"","",RPCError.UNKNOWN);
  90. }
  91. }
  92. }