OEMConnection.cs 15 KB


  1. using Comal.Classes;
  2. using InABox.Core;
  3. using InABox.DigitalMatter;
  4. using NPOI.HSSF.Record.CF;
  5. using PRS.Shared;
  6. using Syncfusion.Data;
  7. using System;
  8. using System.Collections.Generic;
  9. using System.Linq;
  10. using System.Net.Sockets;
  11. using System.Text;
  12. using System.Threading;
  13. using System.Threading.Tasks;
  14. using System.Windows.Media.Media3D;
  15. namespace PRSServer
  16. {
  17. /// <summary>
  18. /// Represents one conversation between OEM and our GPS Engine.
  19. /// </summary>
  20. internal class OEMConnection
  21. {
  22. private TcpClient Client;
  23. private NetworkStream Stream;
  24. private byte[] Buffer;
  25. private List<byte> ConsolidatedBuffer;
  26. private bool Finished = false;
  27. private Task? ReadTask;
  28. private CancellationTokenSource TokenSource = new();
  29. // References to necessary caches
  30. private GPSDeviceCache Devices;
  31. private GPSUpdateQueue Queue;
  32. // Fields of this conversation; these should be initialised by the 'Hello' message.
  33. private string Product;
  34. private string Serial;
  35. // Anything more recent than this threshold is ignored.
  36. private static TimeSpan AgeThreshold = TimeSpan.FromMinutes(2);
  37. // Data that forms the the request.
  38. private List<DMDataRequest> Data = new();
  39. public delegate void CompletedEvent();
  40. public CompletedEvent? OnCompletion;
  41. public OEMConnection(TcpClient client, GPSDeviceCache cache, GPSUpdateQueue queue)
  42. {
  43. Client = client;
  44. Stream = Client.GetStream();
  45. Devices = cache;
  46. Queue = queue;
  47. }
  48. private DMHelloResponse HandleHello(DMHelloRequest hello)
  49. {
  50. Product = DMFactory.GetDeviceName(hello.ProductID);
  51. Serial = hello.SerialNumber.ToString();
  52. Logger.Send(LogType.Information, Serial, string.Format("Hello {0} ({1})", Product, Serial));
  53. return new DMHelloResponse();
  54. }
  55. private DMMessage? HandleData(DMDataRequest data)
  56. {
  57. Logger.Send(LogType.Information, Serial, string.Format("{0} DataRecords Received", data.Records.Length));
  58. var iRecord = 1;
  59. foreach (var record in data.Records)
  60. {
  61. Logger.Send(LogType.Information, Serial,
  62. string.Format("- Data Record #{0}: {1:dd MMM yy hh-mm-ss} ({2} Fields)", iRecord,
  63. record.TimeStampToDateTime(record.TimeStamp), record.Fields.Length));
  64. iRecord++;
  65. foreach (var field in record.Fields)
  66. Logger.Send(LogType.Information, Serial,
  67. string.Format(" [{0}] {1}: {2}", field.IsValid() ? "X" : " ", DMFactory.GetFieldName(field.Type),
  68. field));
  69. }
  70. Data.Add(data);
  71. return null;
  72. }
  73. private GPSTrackerLocation? HandleTracker(DMGPSField gps, DMRecord record)
  74. {
  75. if (gps.StatusFlags().Any(x => x == GPSStatus.NoSignal))
  76. {
  77. Logger.Send(LogType.Information, Serial, $"- Skipping: Invalid Signal ({Serial})");
  78. return null;
  79. }
  80. var timestamp = record.TimeStampToDateTime(record.TimeStamp);
  81. var age = timestamp - Devices[Serial].TimeStamp;
  82. if (age <= AgeThreshold)
  83. {
  84. Logger.Send(LogType.Information, Serial, $"- Skipping: Recent Update ({Serial}) {age:mm\\:ss}");
  85. return null;
  86. }
  87. var device = Devices[Serial];
  88. var location = new GPSTrackerLocation();
  89. location.DeviceID = Serial;
  90. location.Tracker.ID = device.ID;
  91. location.Location.Timestamp = timestamp;
  92. location.Location.Latitude = (double)gps.Latitude / 10000000.0F;
  93. location.Location.Longitude = (double)gps.Longitude / 10000000.0F;
  94. var analoguedata = record.GetFields<DMAnalogueDataField16>().FirstOrDefault();
  95. if (analoguedata != null)
  96. location.BatteryLevel = analoguedata.BatteryStrength
  97. ?? device.CalculateBatteryLevel(analoguedata.InternalVoltage);
  98. return location;
  99. }
  100. private GPSTrackerLocation? HandleBluetoothTag(DMGPSField gps, DMRecord record, DMBluetoothTag tag)
  101. {
  102. var tagID = tag.ID();
  103. if (!Devices.ContainsKey(tagID) && tagID.Length == 17 && tagID.Split(':').Length == 6)
  104. {
  105. var truncated = tagID[..15];
  106. var newtag = Devices.Keys.FirstOrDefault(x => x.StartsWith(truncated));
  107. Logger.Send(LogType.Information, Serial, $"- Truncating BT Tag: {tagID} -> {truncated} -> {newtag}");
  108. if (!string.IsNullOrWhiteSpace(newtag))
  109. tagID = newtag;
  110. }
  111. if (!Devices.ContainsKey(tagID))
  112. {
  113. Logger.Send(LogType.Information, Serial, $"- Skipping: Unknown Tag ({tagID})");
  114. return null;
  115. }
  116. var timestamp = record.TimeStampToDateTime(record.TimeStamp);
  117. var age = timestamp - Devices[tagID].TimeStamp;
  118. if (age <= AgeThreshold)
  119. {
  120. Logger.Send(LogType.Information, Serial, $"- Skipping: Recent Update ({tagID}) {age:mm\\:ss}");
  121. return null;
  122. }
  123. var device = Devices[tagID];
  124. var btloc = new GPSTrackerLocation();
  125. btloc.DeviceID = tagID;
  126. btloc.Tracker.ID = device.ID;
  127. btloc.Location.Timestamp = timestamp;
  128. btloc.Location.Latitude = (double)gps.Latitude / 10000000.0F;
  129. btloc.Location.Longitude = (double)gps.Longitude / 10000000.0F;
  130. if (tag is DMGuppyBluetoothTag guppy)
  131. {
  132. btloc.BatteryLevel = device.CalculateBatteryLevel(guppy.BatteryVoltage);
  133. //guppy.BatteryVoltage * 5F / 3F;
  134. }
  135. else if (tag is DMSensorNodeBluetoothTag sensornode)
  136. {
  137. // Need to check with Kenrick about the calcs here..
  138. // Guppies have 1 battery (ie 1.5V) while Sensornodes have 3 (4.5V)
  139. btloc.BatteryLevel = device.CalculateBatteryLevel(sensornode.BatteryVoltage);
  140. //btloc.BatteryLevel = sensornode.BatteryVoltage * 5F / 3F;
  141. }
  142. return btloc;
  143. }
  144. private IEnumerable<GPSTrackerLocation> HandleRecord(DMRecord record)
  145. {
  146. if (Devices.ContainsKey(Serial))
  147. {
  148. if (record.Fields.FirstOrDefault(x => x is DMGPSField && x.IsValid()) is DMGPSField gps)
  149. {
  150. if (record.TimeStamp != 0 && gps.Latitude != 0 && gps.Longitude != 0)
  151. {
  152. if (HandleTracker(gps, record) is GPSTrackerLocation trackerLocation)
  153. {
  154. yield return trackerLocation;
  155. }
  156. foreach (DMBluetoothTagList taglist in record.GetFields<DMBluetoothTagList>())
  157. foreach (var item in taglist.Items.Where(x => x.LogReason != 2))
  158. {
  159. if (HandleBluetoothTag(gps, record, item.Tag) is GPSTrackerLocation location)
  160. {
  161. yield return location;
  162. }
  163. }
  164. foreach (DMBluetoothTagData tag in record.GetFields<DMBluetoothTagData>())
  165. if (tag.LogReason != 2 && tag.TimeStamp != 0 && tag.Latitude != 0 && tag.Longitude != 0)
  166. {
  167. if (HandleBluetoothTag(gps, record, tag.Tag) is GPSTrackerLocation location)
  168. {
  169. yield return location;
  170. }
  171. }
  172. }
  173. else
  174. {
  175. Logger.Send(LogType.Information, Serial,
  176. string.Format("- Skipping: Invalid GPS Data ({0}) {1}{2}{3}", Serial,
  177. gps.TimeStamp == 0 ? "Bad TimeStamp " : "", gps.Latitude == 0 ? "Bad Latitude " : "",
  178. gps.Longitude == 0 ? "Bad Longitude " : "").Trim());
  179. }
  180. }
  181. else
  182. {
  183. Logger.Send(LogType.Information, Serial,
  184. string.Format("- Skipping: Missing GPS Data ({0})", Serial));
  185. }
  186. }
  187. else
  188. {
  189. Logger.Send(LogType.Information, Serial, string.Format("- Skipping: Unknown Device ({0})", Serial));
  190. }
  191. }
  192. private DMConfirmResponse HandleConfirm(DMConfirmRequest confirm)
  193. {
  194. Logger.Send(LogType.Information, Serial, string.Format("Goodbye {0} ({1})", Product, Serial));
  195. var updates = new List<GPSTrackerLocation>();
  196. foreach (var data in Data)
  197. foreach (var record in data.Records)
  198. {
  199. foreach(var update in HandleRecord(record))
  200. {
  201. updates.Add(update);
  202. }
  203. }
  204. if (updates.Any())
  205. {
  206. Logger.Send(LogType.Information, Serial,
  207. string.Format("Sending updates ({0}): {1}", updates.Count,
  208. string.Join(", ", updates.Select(x => x.DeviceID).Distinct())));
  209. foreach (var update in updates)
  210. {
  211. Logger.Send(LogType.Information, Serial,
  212. string.Format("- Updating Device Cache: ({0}): {1:yyyy-MM-dd hh:mm:ss}", update.DeviceID,
  213. update.Location.Timestamp));
  214. //if (m_devices.ContainsKey(update.DeviceID))
  215. var oldDevice = Devices[update.DeviceID];
  216. Devices[update.DeviceID] =
  217. new Device(oldDevice.ID, update.Location.Timestamp, oldDevice.BatteryFormula);
  218. }
  219. foreach (var update in updates)
  220. {
  221. Queue.QueueUpdate($"Updated by {Product} ({Serial})", update);
  222. }
  223. }
  224. return new DMConfirmResponse { Status = 1 };
  225. }
  226. private void HandleMessage(DMMessage message)
  227. {
  228. DMMessage? response;
  229. if (message is DMHelloRequest hello)
  230. {
  231. response = HandleHello(hello);
  232. }
  233. else if (message is DMDataRequest data)
  234. {
  235. response = HandleData(data);
  236. }
  237. else if (message is DMConfirmRequest confirm)
  238. {
  239. response = HandleConfirm(confirm);
  240. }
  241. else
  242. {
  243. Logger.Send(LogType.Information, "", $"Unknown message type {message.Type}");
  244. response = null;
  245. }
  246. if(response is not null)
  247. {
  248. Stream.Write(response.Encode());
  249. }
  250. }
  251. /// <summary>
  252. ///
  253. /// </summary>
  254. /// <remarks>
  255. /// Read occurs asynchronously, so essentially this method spins off a read thread and then returns.
  256. /// If we call <see cref="Stop"/> before data comes in, the read does not occur.
  257. /// </remarks>
  258. private void DoRead()
  259. {
  260. var readData = false;
  261. // We pass in the cancellation token to prevent the task from continuing if we call stop.
  262. ReadTask = Stream.ReadAsync(Buffer, 0, Buffer.Length).ContinueWith(t =>
  263. {
  264. readData = true;
  265. var nBytes = t.Result;
  266. if (nBytes > 0)
  267. {
  268. ConsolidatedBuffer.AddRange(Buffer.Take(nBytes));
  269. try
  270. {
  271. while (ConsolidatedBuffer.Count > 0)
  272. {
  273. DMMessage? message = null;
  274. try
  275. {
  276. message = DMFactory.ParseMessage(ConsolidatedBuffer.ToArray());
  277. }
  278. catch (Exception e)
  279. {
  280. Logger.Send(
  281. LogType.Error,
  282. Environment.CurrentManagedThreadId.ToString(),
  283. string.Format("Unable to Parse Record: {0} ({1})", e.Message, BitConverter.ToString(ConsolidatedBuffer.ToArray()))
  284. );
  285. }
  286. if (message is not null)
  287. {
  288. HandleMessage(message);
  289. ConsolidatedBuffer.RemoveRange(0, message.CheckSum + 5);
  290. }
  291. }
  292. }
  293. catch (Exception e)
  294. {
  295. Logger.Send(LogType.Error, Environment.CurrentManagedThreadId.ToString(), e.Message + "\n" + e.StackTrace);
  296. }
  297. }
  298. else
  299. {
  300. Finished = true;
  301. }
  302. if (Finished)
  303. {
  304. Finish();
  305. }
  306. else
  307. {
  308. // If we still have stuff to do, try reading again. Note that this isn't a recursive problem or anything, because
  309. // this method returns very soon.
  310. DoRead();
  311. }
  312. }, TokenSource.Token);
  313. // Essentially to make sure that OEM isn't losing threads on us (or that we're missing messages).
  314. // The hunch is that this shouldn't happen.
  315. // Basically, delay for 15 seconds, and if we still haven't got data, make a note.
  316. var delay = Task.Delay(15_000).ContinueWith(t =>
  317. {
  318. if (!readData)
  319. {
  320. Finished = true;
  321. Logger.Send(LogType.Error, "", "DEBUG: No data read for 15 seconds.");
  322. }
  323. });
  324. }
  325. public void Run()
  326. {
  327. Buffer = new byte[2048];
  328. ConsolidatedBuffer = new List<byte>();
  329. DoRead();
  330. }
  331. private void Finish()
  332. {
  333. Stream.Close();
  334. Client.Close();
  335. OnCompletion?.Invoke();
  336. }
  337. public void Stop()
  338. {
  339. // Cancel any read tasks which are yet to start.
  340. TokenSource.Cancel();
  341. Stream.Close();
  342. Client.Close();
  343. try
  344. {
  345. // Let the latest task finish.
  346. if(ReadTask?.Wait(1000) == false)
  347. {
  348. Logger.Send(LogType.Error, "", "The thread didn't die in time.");
  349. }
  350. }
  351. catch (AggregateException ae)
  352. {
  353. foreach(var exception in ae.InnerExceptions)
  354. {
  355. if(exception is not TaskCanceledException)
  356. {
  357. throw;
  358. }
  359. }
  360. }
  361. }
  362. }
  363. }