roo_transceivers
API Documentation for roo_transceivers
Loading...
Searching...
No Matches
server.cpp
Go to the documentation of this file.
2
3#include "roo_collections/flat_small_hash_set.h"
4#include "roo_collections/hash.h"
5#include "roo_logging.h"
7
8#if !defined(MLOG_roo_transceivers_remote_server)
9#define MLOG_roo_transceivers_remote_server 0
10#endif
11
12namespace roo_transceivers {
13
15 const roo_transceivers_Descriptor& descriptor) const {
16 size_t hash = 0;
17 hash = roo_collections::murmur3_32(&descriptor.sensors_count,
18 sizeof(descriptor.sensors_count), hash);
19 hash = roo_collections::murmur3_32(&descriptor.actuators_count,
20 sizeof(descriptor.actuators_count), hash);
21 for (size_t i = 0; i < descriptor.sensors_count; ++i) {
22 hash = roo_collections::murmur3_32(descriptor.sensors[i].id,
23 strlen(descriptor.sensors[i].id), hash);
24 hash = roo_collections::murmur3_32(&descriptor.sensors[i].quantity,
25 sizeof(descriptor.sensors[i].quantity),
26 hash);
27 }
28 for (size_t i = 0; i < descriptor.actuators_count; ++i) {
29 hash = roo_collections::murmur3_32(
30 descriptor.actuators[i].id, strlen(descriptor.actuators[i].id), hash);
31 hash = roo_collections::murmur3_32(&descriptor.actuators[i].quantity,
32 sizeof(descriptor.actuators[i].quantity),
33 hash);
34 }
35 return hash;
36}
37
39 UniverseServerChannel& channel,
40 Executor& transmit_executor)
41 : universe_(universe),
42 channel_(channel),
43 transmit_executor_(transmit_executor),
44 state_(),
45 full_snapshot_transmitted_(false),
46 transmission_in_progress_(false),
47 state_snapshot_pending_(false),
48 device_update_pending_(false),
49 readings_pending_(false),
50 devices_changed_(false) {
52 [this](const roo_transceivers_ClientMessage& msg) {
53 handleClientMessage(msg);
54 });
55 universe_.addEventListener(this);
56}
57
62
63void UniverseServer::begin() { transmitInit(); }
64
66 bool send_full_state;
67 {
68 roo::lock_guard<roo::mutex> lock(state_guard_);
69 if (transmission_in_progress_) {
70 device_update_pending_ = true;
71 return;
72 }
73 state_.clearDelta();
74 snapshotDevices();
75 snapshotSensorState(true);
76 transmission_in_progress_ = true;
77 send_full_state = !full_snapshot_transmitted_;
78 }
79 triggerTransmission(send_full_state);
80}
81
83 bool send_full_state;
84 {
85 roo::lock_guard<roo::mutex> lock(state_guard_);
86 if (transmission_in_progress_) {
87 readings_pending_ = true;
88 return;
89 }
90 state_.clearDelta();
91 snapshotDevices();
92 snapshotSensorState(false);
93 transmission_in_progress_ = true;
94 send_full_state = !full_snapshot_transmitted_;
95 }
96 triggerTransmission(send_full_state);
97}
98
99void UniverseServer::handleClientMessage(
101 switch (msg.which_contents) {
103 MLOG(roo_transceivers_remote_server) << "Received request update";
104 universe_.requestUpdate();
105 break;
106 }
108 handleRequestState();
109 break;
110 }
112 const auto& req = msg.contents.write;
113 ActuatorLocator loc(req.device_locator_schema, req.device_locator_id,
114 req.device_locator_actuator_id);
115 MLOG(roo_transceivers_remote_server)
116 << "Received write request for " << loc << " with val " << req.value;
117 universe_.write(loc, req.value);
118 break;
119 }
120 default: {
121 LOG(ERROR) << "Unexpected client message type " << msg.which_contents;
122 }
123 }
124}
125
126void UniverseServer::handleRequestState() {
127 MLOG(roo_transceivers_remote_server) << "Received request state";
128 {
129 roo::lock_guard<roo::mutex> lock(state_guard_);
130 if (transmission_in_progress_) {
131 state_snapshot_pending_ = true;
132 return;
133 }
134 state_.clearAll();
135 snapshotDevices();
136 snapshotSensorState(false);
137 transmission_in_progress_ = true;
138 }
139 triggerTransmission(true);
140}
141
142void UniverseServer::triggerTransmission(bool send_full_snapshot) {
143 transmit_executor_.execute(
144 [this, send_full_snapshot]() { transmissionLoop(send_full_snapshot); });
145}
146
147void UniverseServer::State::clearAll() {
148 clearDelta();
149 devices_.clear();
150 descriptors_.clear();
151 descriptors_by_key_.clear();
152 readings_.clear();
153}
154
155void UniverseServer::State::clearDelta() {
156 device_deltas_.clear();
157 descriptor_deltas_.clear();
158 reading_delta_groups_.clear();
159 reading_deltas_.clear();
160}
161
162void UniverseServer::State::newSensorReadingDelta(const SensorLocator& loc,
163 float value,
164 roo_time::Uptime time) {
165 if (reading_delta_groups_.empty() ||
166 loc.device_locator() != reading_delta_groups_.back().device) {
167 reading_delta_groups_.push_back(
168 SensorReadingDeltaDeviceGroup{loc.device_locator(), 1});
169 } else {
170 ++reading_delta_groups_.back().reading_count;
171 }
172 reading_deltas_.push_back(SensorReadingDelta{loc.sensor_id(), value, time});
173}
174
175void UniverseServer::snapshotDevices() {
176 devices_changed_ = false;
178 roo_collections::FlatSmallHashSet<DeviceLocator> removed(
179 state_.device_count());
180 for (const auto& itr : state_.devices()) {
181 removed.insert(itr.first);
182 }
183 int ordinal = 0;
184 universe_.forEachDevice([&](const DeviceLocator& loc) -> bool {
185 if (!universe_.getDeviceDescriptor(loc, descriptor)) {
186 LOG(WARNING) << "Found device without a descriptor, ignoring.";
187 return true;
188 }
189 auto existing = state_.devices().find(loc);
190 if (existing == state_.devices().end()) {
191 // New device.
192 state_.addDevice(loc, descriptor, ordinal);
193 devices_changed_ = true;
194 } else {
195 // Device exists.
196 removed.erase(loc);
197 int old_descriptor_key = existing->second.descriptor_key;
198 const roo_transceivers_Descriptor& old_descriptor =
199 state_.descriptors_by_key()[old_descriptor_key];
200 // Check if the descriptor changed.
201 if (old_descriptor == descriptor) {
202 state_.newDeviceDelta(loc, State::DeviceDelta::PRESERVED,
203 existing->second.ordinal);
204 state_.addDeviceEntry(loc, ordinal, old_descriptor_key);
205 } else {
206 // Changed, indeed. Need to deref the old descriptor, and reference
207 // the new one.
208 devices_changed_ = true;
209 state_.newDeviceDelta(loc, State::DeviceDelta::MODIFIED, -1);
210 state_.removeReadings(loc, old_descriptor);
211 state_.removeDescriptorReference(old_descriptor);
212 int key = state_.addDescriptorReference(descriptor);
213 state_.addDeviceEntry(loc, ordinal, key);
214 }
215 }
216 ++ordinal;
217 return true;
218 });
219 // Emit 'remove' entries for devices that we didn't see in the new snapshot.
220 for (const auto& loc : removed) {
221 devices_changed_ = true;
222 state_.removeDevice(loc);
223 }
224 device_update_pending_ = false;
225}
226
227void UniverseServer::snapshotSensorState(bool new_only) {
228 // We take devices from delta, because they are guaranteed to be the same as
229 // the last enumeration from the universe, and ordered the same way. We just
230 // need to skip the 'removed' ones.
231 for (const auto& dev : state_.device_deltas()) {
232 if (dev.status == State::DeviceDelta::REMOVED) continue;
233 if (new_only && dev.status != State::DeviceDelta::ADDED) continue;
234 const roo_transceivers_Descriptor& descriptor =
235 state_.getDescriptor(dev.locator);
236 for (size_t i = 0; i < descriptor.sensors_count; i++) {
237 SensorLocator sensor_locator(dev.locator, descriptor.sensors[i].id);
238 Measurement measurement = universe_.read(sensor_locator);
239 if (state_.updateSensorReading(sensor_locator, measurement)) {
240 state_.newSensorReadingDelta(sensor_locator, measurement.value(),
241 measurement.time());
242 }
243 }
244 }
245 readings_pending_ = false;
246}
247
248int UniverseServer::State::addDescriptorReference(
249 const roo_transceivers_Descriptor& descriptor) {
250 int key;
251 bool is_new = false;
252 auto itr = descriptors_.find(descriptor);
253 if (itr != descriptors_.end()) {
254 ++itr->second.refcount;
255 key = itr->second.key;
256 } else {
257 // Not found; create a new entry.
258 key = next_descriptor_key_++;
259 descriptors_[descriptor] = DescriptorEntry{.key = key, .refcount = 1};
260 descriptors_by_key_[key] = descriptor;
261 is_new = true;
262 }
263 if (is_new) {
264 newDescriptorDelta(key, State::DescriptorDelta::ADDED);
265 }
266 return key;
267}
268
269void UniverseServer::State::removeDescriptorReference(
270 const roo_transceivers_Descriptor& descriptor) {
271 auto itr = descriptors_.find(descriptor);
272 if (itr == descriptors_.end()) {
273 LOG(ERROR) << "Descriptor not found when trying to remove reference";
274 return;
275 }
276 if (--itr->second.refcount > 0) {
277 return;
278 }
279 int key = itr->second.key;
280 descriptors_.erase(descriptor);
281 newDescriptorDelta(key, State::DescriptorDelta::REMOVED);
282 descriptors_by_key_.erase(key);
283}
284
285void UniverseServer::transmitInit() {
286 MLOG(roo_transceivers_remote_server) << "Transmitting Init";
288}
289
290void UniverseServer::transmitUpdateBegin(bool delta) {
291 if (delta) {
292 MLOG(roo_transceivers_remote_server) << "Transmitting Delta update begin";
294 } else {
295 MLOG(roo_transceivers_remote_server)
296 << "Transmitting Full state update begin";
298 }
299}
300
301void UniverseServer::transmitUpdateEnd() {
302 MLOG(roo_transceivers_remote_server) << "Transmitting Update end";
304}
305
306void UniverseServer::transmitDescriptorAdded(int key) {
307 MLOG(roo_transceivers_remote_server) << "Transmitting Descriptor added";
308 const roo_transceivers_Descriptor& descriptor =
309 state_.descriptors_by_key()[key];
310 channel_.sendServerMessage(proto::SrvDescriptorAdded(key, descriptor));
311}
312
313void UniverseServer::transmitDescriptorRemoved(int key) {
314 MLOG(roo_transceivers_remote_server) << "Transmitting Descriptor removed";
316}
317
318void UniverseServer::transmitDeviceAdded(const DeviceLocator& locator,
319 int descriptor_key) {
320 MLOG(roo_transceivers_remote_server) << "Transmitting Device added";
321 channel_.sendServerMessage(proto::SrvDeviceAdded(locator, descriptor_key));
322}
323
324void UniverseServer::transmitDevicesPreserved(int first_preserved_ordinal,
325 size_t count) {
326 MLOG(roo_transceivers_remote_server)
327 << "Transmitting Devices preserved (" << count << ")";
328 channel_.sendServerMessage(
329 proto::SrvDevicesPreserved(first_preserved_ordinal, count));
330}
331
332void UniverseServer::transmitDeviceModified(int prev_ordinal,
333 int descriptor_key) {
334 MLOG(roo_transceivers_remote_server)
335 << "Transmitting Device modified at " << prev_ordinal;
336 channel_.sendServerMessage(
337 proto::SrvDevicesModified(prev_ordinal, descriptor_key));
338}
339
340void UniverseServer::transmitDeviceRemoved(int prev_ordinal) {
341 MLOG(roo_transceivers_remote_server)
342 << "Transmitting Device removed at " << prev_ordinal;
343 channel_.sendServerMessage(proto::SrvDeviceRemoved(prev_ordinal));
344}
345
346void UniverseServer::transmitReadingsBegin() {
347 MLOG(roo_transceivers_remote_server) << "Transmitting readings begin";
349}
350
351void UniverseServer::transmitReadingsEnd() {
352 MLOG(roo_transceivers_remote_server) << "Transmitting readings end";
354}
355
356void UniverseServer::transmissionLoop(bool send_full_snapshot) {
357 bool is_delta;
358 {
359 roo::lock_guard<roo::mutex> lock(state_guard_);
360 CHECK(transmission_in_progress_);
361 is_delta = !send_full_snapshot;
362 send_full_snapshot = false;
363 if (MLOG_IS_ON(roo_transceivers_remote_server)) {
364 MLOG(roo_transceivers_remote_server) << "Pre-transmit state: ";
365 for (const auto& device : state_.devices()) {
366 MLOG(roo_transceivers_remote_server)
367 << " " << device.first << ": (" << device.second.descriptor_key
368 << ", " << device.second.ordinal << ")";
369 }
370 }
371 }
372 while (true) {
373 MLOG(roo_transceivers_remote_server) << "Begin transmission";
374 transmit(is_delta);
375 {
376 roo::lock_guard<roo::mutex> lock(state_guard_);
377 CHECK(transmission_in_progress_);
378 if (!is_delta) {
379 full_snapshot_transmitted_ = true;
380 }
381 if (state_snapshot_pending_) {
382 state_.clearAll();
383 snapshotDevices();
384 snapshotSensorState(false);
385 send_full_snapshot = true;
386 state_snapshot_pending_ = false;
387 } else if (device_update_pending_) {
388 state_.clearDelta();
389 snapshotDevices();
390 snapshotSensorState(true);
391 } else if (readings_pending_) {
392 state_.clearDelta();
393 snapshotSensorState(false);
394 } else {
395 transmission_in_progress_ = false;
396 return;
397 }
398 is_delta = !send_full_snapshot;
399 }
400 }
401 MLOG(roo_transceivers_remote_server) << "End transmission";
402}
403
404void UniverseServer::transmit(bool is_delta) {
405 // Assumes that the flags have been checked under state_guard_ to authorize
406 // the transmission.
407 if (devices_changed_) {
408 transmitUpdateBegin(is_delta);
409 // Transmit new descriptors.
410 for (const auto& itr : state_.descriptor_deltas()) {
411 if (itr.status != State::DescriptorDelta::ADDED) continue;
412 transmitDescriptorAdded(itr.key);
413 }
414 // Transmit all devices.
415 size_t i = 0;
416 size_t delta_count = state_.device_deltas().size();
417 while (i < delta_count) {
418 const auto& delta = state_.device_deltas()[i];
419 switch (delta.status) {
421 const auto& device = state_.devices()[delta.locator];
422 transmitDeviceAdded(delta.locator, device.descriptor_key);
423 ++i;
424 break;
425 }
427 int preserved_first_ordinal = delta.old_ordinal;
428 size_t preserved_count = 1;
429 while (i + 1 < delta_count) {
430 const auto& next_delta = state_.device_deltas()[i + 1];
431 // const auto& next_device = state_.devices()[next_delta.locator];
432 if (next_delta.status != State::DeviceDelta::PRESERVED ||
433 next_delta.old_ordinal !=
434 delta.old_ordinal + static_cast<int>(preserved_count)) {
435 break;
436 }
437 ++i;
438 ++preserved_count;
439 }
440 transmitDevicesPreserved(preserved_first_ordinal, preserved_count);
441 ++i;
442 break;
443 }
445 const auto& device = state_.devices()[delta.locator];
446 transmitDeviceModified(device.ordinal, device.descriptor_key);
447 ++i;
448 break;
449 }
451 transmitDeviceRemoved(delta.old_ordinal);
452 ++i;
453 break;
454 }
455 default: {
456 break;
457 }
458 }
459 }
460 // Transmit removed descriptors.
461 for (const auto& itr : state_.descriptor_deltas()) {
462 if (itr.status != State::DescriptorDelta::REMOVED) continue;
463 transmitDescriptorRemoved(itr.key);
464 }
465 transmitUpdateEnd();
466 }
467
468 // Transmit new readings.
469 if (!state_.reading_delta_groups().empty()) {
470 transmitReadingsBegin();
471 size_t reading_offset = 0;
472 roo_time::Uptime now = roo_time::Uptime::Now();
473 for (const auto& group : state_.reading_delta_groups()) {
475 for (size_t i = 0; i < group.reading_count; ++i) {
476 auto& reading = state_.reading_deltas()[reading_offset];
477 proto::AddReading(msg, reading.sensor_id, reading.value,
478 (now - reading.time).inMillis());
479 ++reading_offset;
480 }
481 MLOG(roo_transceivers_remote_server)
482 << "Transmitting reading for " << group.device;
483 channel_.sendServerMessage(msg);
484 }
485 transmitReadingsEnd();
486 }
487}
488
489} // namespace roo_transceivers
virtual void execute(std::function< void()> task)=0
virtual void registerClientMessageCallback(ClientMessageCb cb)=0
virtual void sendServerMessage(const roo_transceivers_ServerMessage &msg)=0
void devicesChanged() override
Called when the set of devices changes.
Definition server.cpp:65
void newReadingsAvailable() override
Called when new readings are available.
Definition server.cpp:82
UniverseServer(Universe &universe, UniverseServerChannel &channel, Executor &transmit_executor)
Definition server.cpp:38
An abstract collection of transceiver devices.
Definition universe.h:19
virtual bool write(const ActuatorLocator &locator, float value)=0
Writes to the actuator identified by locator.
virtual Measurement read(const SensorLocator &locator) const =0
Returns the latest known reading of the sensor identified by locator.
virtual void removeEventListener(EventListener *listener)
Removes a previously registered event listener.
Definition universe.h:64
virtual bool getDeviceDescriptor(const DeviceLocator &locator, roo_transceivers_Descriptor &descriptor) const =0
Retrieves the descriptor for the transceiver identified by locator.
virtual void requestUpdate()=0
Requests sensor reading update from underlying devices.
virtual bool forEachDevice(std::function< bool(const DeviceLocator &)> callback) const =0
Iterates over all transceiver devices in this universe, calling callback for each device.
virtual void addEventListener(EventListener *listener)
Registers a listener for device-set and reading update events.
Definition universe.h:61
roo_transceivers_ServerMessage SrvDeviceRemoved(int prev_ordinal)
Removes a device.
Definition proto.cpp:89
roo_transceivers_ServerMessage SrvDeltaUpdateBegin()
Begins a delta update sequence.
Definition proto.cpp:22
roo_transceivers_ServerMessage SrvDescriptorAdded(int key, const roo_transceivers_Descriptor &descriptor)
Adds a descriptor in the server stream.
Definition proto.cpp:37
roo_transceivers_ServerMessage SrvDevicesModified(int prev_ordinal, int descriptor_key)
Marks a device as modified.
Definition proto.cpp:79
roo_transceivers_ServerMessage SrvUpdateEnd()
Ends an update sequence.
Definition proto.cpp:30
roo_transceivers_ServerMessage SrvReading(const DeviceLocator &device)
Begins readings for a device.
Definition proto.cpp:109
void AddReading(roo_transceivers_ServerMessage &reading, const SensorId &sensor_id, float value, uint64_t age_ms)
Appends a single sensor reading to a readings message.
Definition proto.cpp:120
roo_transceivers_ServerMessage SrvDeviceAdded(const DeviceLocator &locator, int descriptor_key)
Adds a device with a descriptor key.
Definition proto.cpp:54
roo_transceivers_ServerMessage SrvFullUpdateBegin()
Begins a full update sequence.
Definition proto.cpp:14
roo_transceivers_ServerMessage SrvInit()
Builds an init server message.
Definition proto.cpp:8
roo_transceivers_ServerMessage SrvDevicesPreserved(int first_preserved_ordinal, size_t count)
Marks a range of devices as preserved.
Definition proto.cpp:66
roo_transceivers_ServerMessage SrvReadingsEnd()
Ends a readings block.
Definition proto.cpp:103
roo_transceivers_ServerMessage SrvReadingsBegin()
Begins a readings block.
Definition proto.cpp:97
roo_transceivers_ServerMessage SrvDescriptorRemoved(int key)
Removes a descriptor from the server stream.
Definition proto.cpp:47
#define roo_transceivers_ClientMessage_request_update_tag
#define roo_transceivers_ClientMessage_request_state_tag
#define roo_transceivers_ClientMessage_write_tag
roo_transceivers_ClientMessage_Write write
union _roo_transceivers_ClientMessage::@0 contents
roo_transceivers_Descriptor_Sensor sensors[16]
roo_transceivers_Descriptor_Actuator actuators[16]
size_t operator()(const roo_transceivers_Descriptor &descriptor) const
Definition server.cpp:14