using System.Collections.Concurrent; using System.Text; using InABox.Core; using InABox.Server; namespace InABox.Rpc; public abstract class RpcProxyServer : IRpcProxyServer where TConnection : notnull { private IRpcClientTransport ServerTransport { get; set; } public event LogEvent? OnLog; public RpcProxyServer(IRpcClientTransport serverTransport) { ServerTransport = serverTransport; ServerTransport.OnMessage += ServerTransport_OnMessage; } private void ServerTransport_OnMessage(IRpcTransport transport, RpcTransportMessageArgs e) { if (e.Message != null && e.Message.Command == "Push") { var message = Serialization.ReadBinary(e.Message.Payload, BinarySerializationSettings.Latest); e.Message.Payload = message.Payload; if(message.Session == Guid.Empty) { foreach (var connection in _sessions.Keys) { Send(connection, e.Message); } } else { var sessionConnection = _sessions.FirstOrDefault(x => x.Value == message.Session).Key; if(sessionConnection is not null) { Send(sessionConnection, e.Message); } } } } public abstract bool IsSecure(); private ConcurrentDictionary _sessions = new ConcurrentDictionary(); protected void CreateSession(TConnection connection, Guid guid) { _sessions[connection] = guid; } public Guid GetSession(TConnection? connection) { if (connection == null) return Guid.Empty; _sessions.TryGetValue(connection, out Guid result); return result; } protected Guid DeleteSession(TConnection connection) { _sessions.Remove(connection, out Guid session); return session; } public abstract void Start(); public abstract void Stop(); private RpcMessage SendToServer(Guid session, RpcMessage message) { var internalMessage = new InternalServerMessage { Session = session, Payload = message.Payload }; return ServerTransport.Send(message.Command, internalMessage, checkErrors: false); } private TResult SendToServer(Guid session, TParameters parameters) where TCommand : IRpcCommand where TParameters : IRpcCommandParameters, ISerializeBinary where TResult : IRpcCommandResult, ISerializeBinary, new() { var internalMessage = new InternalServerMessage { Session = session, Payload = parameters.WriteBinary(BinarySerializationSettings.Latest) }; var response = ServerTransport.Send(typeof(TCommand).Name, internalMessage, checkErrors: false); switch (response.Error) { case RpcError.NONE: var result = Serialization.ReadBinary(response.Payload, BinarySerializationSettings.Latest) ?? throw new Exception($"Cannot Deserialize {typeof(TCommand).Name}"); return result; case RpcError.SERVERERROR: var errorMessage = Encoding.UTF8.GetString(response.Payload); throw new RpcException(errorMessage, response.Error); default: throw new RpcException($"Server error in {typeof(TCommand).Name}: {response.Error}", response.Error); } } protected void DoOpen(TConnection connection) { try { var result = SendToServer(Guid.Empty, new OpenSessionParameters()); CreateSession(connection, result.SessionID); if (result.SessionID != Guid.Empty) OnLog?.Invoke(LogType.Information, "", $"Client Connected [{result.SessionID}]"); } catch(Exception e) { DoException(connection, e); } } protected void DoClose(TConnection connection, RpcTransportCloseEventType type) { try { var session = DeleteSession(connection); if(session != Guid.Empty) { SendToServer(session, new CloseSessionParameters { SessionID = session }); } if (session != Guid.Empty) OnLog?.Invoke(LogType.Information, "", $"Client Disconnected [{session}]"); } catch(Exception e) { DoException(connection, e); } } protected void DoException(TConnection? connection, Exception e) { var session = GetSession(connection); if (session != Guid.Empty) { OnLog?.Invoke(LogType.Error, $"", $"Exception Occurred in {session}: {e.Message}"); } else { OnLog?.Invoke(LogType.Error, $"", $"Exception Occurred: {CoreUtils.FormatException(e)}"); } } /// /// Handle a message from a client. /// /// The client connection. /// The message to be handled. /// The response to be sent back to the client. public RpcMessage? DoMessage(TConnection? connection, RpcMessage? message) { if(message is null) { DoException(connection, new Exception("NULL Message Received")); return null; } var response = new RpcMessage() { Id = message.Id, Command = message.Command }; try { var session = GetSession(connection); if (session != Guid.Empty) { var serverResponse = SendToServer(session, message); response.Payload = serverResponse.Payload; response.Error = serverResponse.Error; } else { DoException(connection, new Exception("Session not Found")); response.Error = RpcError.SESSIONNOTFOUND; } } catch(Exception e) { DoException(connection, e); response.Payload = Encoding.UTF8.GetBytes(e.Message); response.Error = RpcError.SERVERERROR; } return response; } /// /// Send a message to a particular client connection. /// /// The connection to send to. /// The message to send. public abstract void Send(TConnection connection, RpcMessage message); }