roo_transceivers
API Documentation for roo_transceivers
Loading...
Searching...
No Matches
server.h
Go to the documentation of this file.
1#pragma once
2
3#include <functional>
4#include <vector>
5
6#include "roo_threads/mutex.h"
7#include "roo_transceivers.h"
8
9namespace roo_transceivers {
10
11class Executor {
12 public:
13 virtual ~Executor() = default;
14
15 virtual void execute(std::function<void()> task) = 0;
16};
17
19 size_t operator()(const roo_transceivers_Descriptor& descriptor) const;
20};
21
23 int key;
25};
26
28 roo_collections::FlatSmallHashMap<roo_transceivers_Descriptor,
30
32 public:
34 std::function<void(const roo_transceivers_ClientMessage&)>;
35
36 virtual ~UniverseServerChannel() = default;
37
39
41};
42
43/// The server keeps cached universe state and the most recent delta.
44///
45/// It also tracks whether a transmission is in progress and whether the
46/// universe has changed since the last snapshot. At any given time, the server
47/// either is not transmitting (no deltas) or is transmitting the most recent
48/// delta.
49///
50/// Starting a transmission is always preceded by taking a new snapshot,
51/// calculating the delta relative to the previous snapshot, and clearing the
52/// state to "unchanged". The new snapshot and delta then remain unchanged
53/// during transmission.
54///
55/// Upon a change notification, if a transmission is in progress, the state is
56/// updated to indicate the universe has changed, but no other action is taken.
57/// If transmission is not in progress, the snapshot-delta-transmit process is
58/// triggered.
59///
60/// Upon finishing the transfer, if the "change" state is "changed", the
61/// snapshot-delta-transmit process is immediately triggered again (a new delta
62/// is calculated and a new transfer starts). Otherwise, the "transmit" state
63/// switches to "not transmitting".
64///
65/// All state changes are guarded by a mutex to synchronize transmits with
66/// change events.
68 public:
70 Executor& transmit_executor);
71
73
74 void begin();
75
76 void devicesChanged() override;
77 void newReadingsAvailable() override;
78
79 private:
80 class State {
81 public:
82 State() : next_descriptor_key_(0) {}
83
84 size_t device_count() const { return devices_.size(); }
85
86 void addDevice(const DeviceLocator& loc,
87 const roo_transceivers_Descriptor& descriptor, int ordinal) {
88 newDeviceDelta(loc, DeviceDelta::ADDED, -1);
89 int key = addDescriptorReference(descriptor);
90 addDeviceEntry(loc, ordinal, key);
91 }
92
93 void removeReadings(const DeviceLocator& loc,
94 const roo_transceivers_Descriptor& descriptor) {
95 for (size_t i = 0; i < descriptor.sensors_count; ++i) {
96 eraseSensorReading(SensorLocator(loc, descriptor.sensors[i].id));
97 }
98 }
99
100 void removeDevice(const DeviceLocator& loc) {
101 const auto& device = devices_[loc];
102 int old_descriptor_key = device.descriptor_key;
103 int old_ordinal = device.ordinal;
104 newDeviceDelta(loc, State::DeviceDelta::REMOVED, old_ordinal);
105 const roo_transceivers_Descriptor& old_descriptor =
106 descriptors_by_key_[old_descriptor_key];
107 removeReadings(loc, old_descriptor);
108 removeDescriptorReference(old_descriptor);
109 devices_.erase(loc);
110 }
111
112 int addDescriptorReference(const roo_transceivers_Descriptor& descriptor);
113
114 void addDeviceEntry(const DeviceLocator& loc, int ordinal,
115 int descriptor_key) {
116 devices_[loc] = DeviceEntry{ordinal, descriptor_key};
117 }
118
119 void removeDescriptorReference(
120 const roo_transceivers_Descriptor& descriptor);
121
122 void eraseSensorReading(const SensorLocator& loc) { readings_.erase(loc); }
123
124 bool updateSensorReading(const SensorLocator& loc, const Measurement& m) {
125 auto itr = readings_.find(loc);
126 if (itr == readings_.end()) {
127 // Did not have prior measurement.
128 if (m.isInitial()) return false;
129 readings_[loc] = SensorReading{m.value(), m.time()};
130 } else {
131 // Did have prior measurement.
132 auto& v = itr->second;
133 if (((m.value() == v.value) ||
134 (isnanf(m.value()) && isnanf(v.value))) &&
135 (m.time() == v.time)) {
136 // The value did not change.
137 return false;
138 }
139 // The value did change. Update, and write to the delta.
140 if (m.isInitial()) {
141 eraseSensorReading(loc);
142 } else {
143 v.value = m.value();
144 v.time = m.time();
145 }
146 }
147 return true;
148 }
149
150 const roo_transceivers_Descriptor& getDescriptor(
151 const DeviceLocator& loc) const {
152 return descriptors_by_key_[devices_[loc].descriptor_key];
153 }
154
155 void clearAll();
156 void clearDelta();
157
158 struct DeviceEntry {
159 int ordinal; // the index in the universe.
161 };
162
169
175
178 float value;
179 roo_time::Uptime time;
180 };
181
186
187 void newDeviceDelta(const DeviceLocator& loc, DeviceDelta::Status status,
188 int old_ordinal) {
189 device_deltas_.emplace_back(DeviceDelta{loc, status, old_ordinal});
190 }
191
192 void newDescriptorDelta(int key, DescriptorDelta::Status kind) {
193 descriptor_deltas_.emplace_back(DescriptorDelta{key, kind});
194 }
195
196 void newSensorReadingDelta(const SensorLocator& loc, float value,
197 roo_time::Uptime time);
198
199 const std::vector<DeviceDelta>& device_deltas() const {
200 return device_deltas_;
201 }
202
203 const std::vector<DescriptorDelta>& descriptor_deltas() const {
204 return descriptor_deltas_;
205 }
206
207 const roo_collections::FlatSmallHashMap<DeviceLocator, DeviceEntry>&
208 devices() const {
209 return devices_;
210 }
211
212 const roo_collections::FlatSmallHashMap<int, roo_transceivers_Descriptor>&
213 descriptors_by_key() const {
214 return descriptors_by_key_;
215 }
216
217 const std::vector<SensorReadingDeltaDeviceGroup>& reading_delta_groups()
218 const {
219 return reading_delta_groups_;
220 }
221
222 const std::vector<SensorReadingDelta>& reading_deltas() const {
223 return reading_deltas_;
224 }
225
226 private:
227 struct SensorReading {
228 float value;
229 roo_time::Uptime time;
230 };
231
232 roo_collections::FlatSmallHashMap<DeviceLocator, DeviceEntry> devices_;
233
234 DescriptorMap descriptors_;
235
236 roo_collections::FlatSmallHashMap<int, roo_transceivers_Descriptor>
237 descriptors_by_key_;
238
239 // Used as a hashtable key to identify device descriptors.
240 int next_descriptor_key_;
241
242 roo_collections::FlatSmallHashMap<SensorLocator, SensorReading> readings_;
243
244 std::vector<DeviceDelta> device_deltas_;
245 std::vector<DescriptorDelta> descriptor_deltas_;
246
247 // Devices appear in the order of enumeration by the universe.
248 std::vector<SensorReadingDeltaDeviceGroup> reading_delta_groups_;
249 std::vector<SensorReadingDelta> reading_deltas_;
250 };
251
252 void handleClientMessage(const roo_transceivers_ClientMessage& msg);
253
254 void handleRequestState();
255
256 void triggerTransmission(bool send_full_snapshot);
257
258 void snapshotDevices();
259 void snapshotSensorState(bool new_only);
260
261 // Send the handhake message.
262 void transmitInit();
263
264 void transmissionLoop(bool send_full_snapshot);
265
266 // Sends a single delta or snapshot over the channel.
267 void transmit(bool is_delta);
268
269 void transmitUpdateBegin(bool delta);
270 void transmitUpdateEnd();
271
272 void transmitDescriptorAdded(int key);
273 void transmitDescriptorRemoved(int key);
274
275 void transmitDeviceAdded(const DeviceLocator& locator, int descriptor_key);
276 void transmitDevicesPreserved(int first_preserved_ordinal, size_t count);
277 void transmitDeviceModified(int prev_ordinal, int descriptor_key);
278 void transmitDeviceRemoved(int prev_ordinal);
279
280 void transmitReadingsBegin();
281 void transmitReadingsEnd();
282
283 Universe& universe_;
284 UniverseServerChannel& channel_;
285 Executor& transmit_executor_;
286
287 State state_;
288
289 // Indicates that the full snapshot has been sent at least once, and thus, we
290 // can assume that the client can interpret deltas. Guarded by state_guard_.
291 bool full_snapshot_transmitted_;
292
293 // Indicates that the transmit loop is currently active. Guarded by
294 // state_guard_.
295 bool transmission_in_progress_;
296
297 // Set when RequestState is received by the client while the transmit loop is
298 // in progress. Guarded by state_guard_.
299 bool state_snapshot_pending_;
300
301 // Set when devicesChanged() gets called while the transmit loop is
302 // in progress. Guarded by state_guard_.
303 bool device_update_pending_;
304
305 // Set when newReadingsAvaialble() gets called while the transmit loop is in
306 // progress. Guarded by state_guard_.
307 bool readings_pending_;
308
309 // Set by snapshotDevices(); true if the snapshot noticed any changes from the
310 // previous state. Guarded by state_guard_.
311 bool devices_changed_;
312
313 mutable roo::mutex state_guard_;
314};
315
316} // namespace roo_transceivers
Identifies a transceiver device by schema and device id.
Definition id.h:21
Listener for universe-level change notifications.
Definition notification.h:9
virtual void execute(std::function< void()> task)=0
virtual ~Executor()=default
Measurement of a quantity at a specific time.
Definition measurement.h:11
float value() const
Returns the measurement value.
Definition measurement.h:43
bool isInitial() const
Returns true if this is the initial/empty measurement.
Definition measurement.h:19
roo_time::Uptime time() const
Returns the measurement timestamp.
Definition measurement.h:38
Identifies sensor within a transceiver device.
Definition id.h:57
virtual void registerClientMessageCallback(ClientMessageCb cb)=0
virtual void sendServerMessage(const roo_transceivers_ServerMessage &msg)=0
std::function< void(const roo_transceivers_ClientMessage &)> ClientMessageCb
Definition server.h:34
The server keeps cached universe state and the most recent delta.
Definition server.h:67
void devicesChanged() override
Called when the set of devices changes.
Definition server.cpp:65
void newReadingsAvailable() override
Called when new readings are available.
Definition server.cpp:82
An abstract collection of transceiver devices.
Definition universe.h:19
roo_collections::SmallString< 24 > SensorId
Sensor identifier (short string).
Definition id.h:16
roo_collections::FlatSmallHashMap< roo_transceivers_Descriptor, DescriptorEntry, DescriptorHashFn > DescriptorMap
Definition server.h:29
struct _roo_transceivers_Descriptor roo_transceivers_Descriptor
roo_transceivers_Descriptor_Sensor sensors[16]
size_t operator()(const roo_transceivers_Descriptor &descriptor) const
Definition server.cpp:14