3#include "roo_collections/flat_small_hash_set.h"
4#include "roo_collections/hash.h"
5#include "roo_logging.h"
8#if !defined(MLOG_roo_transceivers_remote_server)
9#define MLOG_roo_transceivers_remote_server 0
22 hash = roo_collections::murmur3_32(descriptor.
sensors[i].
id,
29 hash = roo_collections::murmur3_32(
41 : universe_(universe),
43 transmit_executor_(transmit_executor),
45 full_snapshot_transmitted_(false),
46 transmission_in_progress_(false),
47 state_snapshot_pending_(false),
48 device_update_pending_(false),
49 readings_pending_(false),
50 devices_changed_(false) {
53 handleClientMessage(msg);
68 roo::lock_guard<roo::mutex> lock(state_guard_);
69 if (transmission_in_progress_) {
70 device_update_pending_ =
true;
75 snapshotSensorState(
true);
76 transmission_in_progress_ =
true;
77 send_full_state = !full_snapshot_transmitted_;
79 triggerTransmission(send_full_state);
85 roo::lock_guard<roo::mutex> lock(state_guard_);
86 if (transmission_in_progress_) {
87 readings_pending_ =
true;
92 snapshotSensorState(
false);
93 transmission_in_progress_ =
true;
94 send_full_state = !full_snapshot_transmitted_;
96 triggerTransmission(send_full_state);
99void UniverseServer::handleClientMessage(
103 MLOG(roo_transceivers_remote_server) <<
"Received request update";
108 handleRequestState();
113 ActuatorLocator loc(req.device_locator_schema, req.device_locator_id,
114 req.device_locator_actuator_id);
115 MLOG(roo_transceivers_remote_server)
116 <<
"Received write request for " << loc <<
" with val " << req.value;
117 universe_.
write(loc, req.value);
121 LOG(ERROR) <<
"Unexpected client message type " << msg.
which_contents;
126void UniverseServer::handleRequestState() {
127 MLOG(roo_transceivers_remote_server) <<
"Received request state";
129 roo::lock_guard<roo::mutex> lock(state_guard_);
130 if (transmission_in_progress_) {
131 state_snapshot_pending_ =
true;
136 snapshotSensorState(
false);
137 transmission_in_progress_ =
true;
139 triggerTransmission(
true);
142void UniverseServer::triggerTransmission(
bool send_full_snapshot) {
144 [
this, send_full_snapshot]() { transmissionLoop(send_full_snapshot); });
147void UniverseServer::State::clearAll() {
150 descriptors_.clear();
151 descriptors_by_key_.clear();
155void UniverseServer::State::clearDelta() {
156 device_deltas_.clear();
157 descriptor_deltas_.clear();
158 reading_delta_groups_.clear();
159 reading_deltas_.clear();
162void UniverseServer::State::newSensorReadingDelta(
const SensorLocator& loc,
164 roo_time::Uptime time) {
165 if (reading_delta_groups_.empty() ||
166 loc.device_locator() != reading_delta_groups_.back().device) {
167 reading_delta_groups_.push_back(
168 SensorReadingDeltaDeviceGroup{loc.device_locator(), 1});
170 ++reading_delta_groups_.back().reading_count;
172 reading_deltas_.push_back(SensorReadingDelta{loc.sensor_id(), value, time});
175void UniverseServer::snapshotDevices() {
176 devices_changed_ =
false;
178 roo_collections::FlatSmallHashSet<DeviceLocator> removed(
179 state_.device_count());
180 for (
const auto& itr : state_.devices()) {
181 removed.insert(itr.first);
184 universe_.
forEachDevice([&](
const DeviceLocator& loc) ->
bool {
186 LOG(WARNING) <<
"Found device without a descriptor, ignoring.";
189 auto existing = state_.devices().find(loc);
190 if (existing == state_.devices().end()) {
192 state_.addDevice(loc, descriptor, ordinal);
193 devices_changed_ =
true;
197 int old_descriptor_key = existing->second.descriptor_key;
199 state_.descriptors_by_key()[old_descriptor_key];
201 if (old_descriptor == descriptor) {
203 existing->second.ordinal);
204 state_.addDeviceEntry(loc, ordinal, old_descriptor_key);
208 devices_changed_ =
true;
210 state_.removeReadings(loc, old_descriptor);
211 state_.removeDescriptorReference(old_descriptor);
212 int key = state_.addDescriptorReference(descriptor);
213 state_.addDeviceEntry(loc, ordinal, key);
220 for (
const auto& loc : removed) {
221 devices_changed_ =
true;
222 state_.removeDevice(loc);
224 device_update_pending_ =
false;
227void UniverseServer::snapshotSensorState(
bool new_only) {
231 for (
const auto& dev : state_.device_deltas()) {
235 state_.getDescriptor(dev.locator);
237 SensorLocator sensor_locator(dev.locator, descriptor.
sensors[i].
id);
238 Measurement measurement = universe_.
read(sensor_locator);
239 if (state_.updateSensorReading(sensor_locator, measurement)) {
240 state_.newSensorReadingDelta(sensor_locator, measurement.value(),
245 readings_pending_ =
false;
248int UniverseServer::State::addDescriptorReference(
252 auto itr = descriptors_.find(descriptor);
253 if (itr != descriptors_.end()) {
254 ++itr->second.refcount;
255 key = itr->second.key;
258 key = next_descriptor_key_++;
259 descriptors_[descriptor] = DescriptorEntry{.key = key, .refcount = 1};
260 descriptors_by_key_[key] = descriptor;
264 newDescriptorDelta(key, State::DescriptorDelta::ADDED);
269void UniverseServer::State::removeDescriptorReference(
271 auto itr = descriptors_.find(descriptor);
272 if (itr == descriptors_.end()) {
273 LOG(ERROR) <<
"Descriptor not found when trying to remove reference";
276 if (--itr->second.refcount > 0) {
279 int key = itr->second.key;
280 descriptors_.erase(descriptor);
281 newDescriptorDelta(key, State::DescriptorDelta::REMOVED);
282 descriptors_by_key_.erase(key);
285void UniverseServer::transmitInit() {
286 MLOG(roo_transceivers_remote_server) <<
"Transmitting Init";
290void UniverseServer::transmitUpdateBegin(
bool delta) {
292 MLOG(roo_transceivers_remote_server) <<
"Transmitting Delta update begin";
295 MLOG(roo_transceivers_remote_server)
296 <<
"Transmitting Full state update begin";
301void UniverseServer::transmitUpdateEnd() {
302 MLOG(roo_transceivers_remote_server) <<
"Transmitting Update end";
306void UniverseServer::transmitDescriptorAdded(
int key) {
307 MLOG(roo_transceivers_remote_server) <<
"Transmitting Descriptor added";
309 state_.descriptors_by_key()[key];
313void UniverseServer::transmitDescriptorRemoved(
int key) {
314 MLOG(roo_transceivers_remote_server) <<
"Transmitting Descriptor removed";
318void UniverseServer::transmitDeviceAdded(
const DeviceLocator& locator,
319 int descriptor_key) {
320 MLOG(roo_transceivers_remote_server) <<
"Transmitting Device added";
324void UniverseServer::transmitDevicesPreserved(
int first_preserved_ordinal,
326 MLOG(roo_transceivers_remote_server)
327 <<
"Transmitting Devices preserved (" << count <<
")";
332void UniverseServer::transmitDeviceModified(
int prev_ordinal,
333 int descriptor_key) {
334 MLOG(roo_transceivers_remote_server)
335 <<
"Transmitting Device modified at " << prev_ordinal;
340void UniverseServer::transmitDeviceRemoved(
int prev_ordinal) {
341 MLOG(roo_transceivers_remote_server)
342 <<
"Transmitting Device removed at " << prev_ordinal;
346void UniverseServer::transmitReadingsBegin() {
347 MLOG(roo_transceivers_remote_server) <<
"Transmitting readings begin";
351void UniverseServer::transmitReadingsEnd() {
352 MLOG(roo_transceivers_remote_server) <<
"Transmitting readings end";
356void UniverseServer::transmissionLoop(
bool send_full_snapshot) {
359 roo::lock_guard<roo::mutex> lock(state_guard_);
360 CHECK(transmission_in_progress_);
361 is_delta = !send_full_snapshot;
362 send_full_snapshot =
false;
363 if (MLOG_IS_ON(roo_transceivers_remote_server)) {
364 MLOG(roo_transceivers_remote_server) <<
"Pre-transmit state: ";
365 for (
const auto& device : state_.devices()) {
366 MLOG(roo_transceivers_remote_server)
367 <<
" " << device.first <<
": (" << device.second.descriptor_key
368 <<
", " << device.second.ordinal <<
")";
373 MLOG(roo_transceivers_remote_server) <<
"Begin transmission";
376 roo::lock_guard<roo::mutex> lock(state_guard_);
377 CHECK(transmission_in_progress_);
379 full_snapshot_transmitted_ =
true;
381 if (state_snapshot_pending_) {
384 snapshotSensorState(
false);
385 send_full_snapshot =
true;
386 state_snapshot_pending_ =
false;
387 }
else if (device_update_pending_) {
390 snapshotSensorState(
true);
391 }
else if (readings_pending_) {
393 snapshotSensorState(
false);
395 transmission_in_progress_ =
false;
398 is_delta = !send_full_snapshot;
401 MLOG(roo_transceivers_remote_server) <<
"End transmission";
404void UniverseServer::transmit(
bool is_delta) {
407 if (devices_changed_) {
408 transmitUpdateBegin(is_delta);
410 for (
const auto& itr : state_.descriptor_deltas()) {
412 transmitDescriptorAdded(itr.key);
416 size_t delta_count = state_.device_deltas().size();
417 while (i < delta_count) {
418 const auto& delta = state_.device_deltas()[i];
419 switch (delta.status) {
421 const auto& device = state_.devices()[delta.locator];
422 transmitDeviceAdded(delta.locator, device.descriptor_key);
427 int preserved_first_ordinal = delta.old_ordinal;
428 size_t preserved_count = 1;
429 while (i + 1 < delta_count) {
430 const auto& next_delta = state_.device_deltas()[i + 1];
433 next_delta.old_ordinal !=
434 delta.old_ordinal +
static_cast<int>(preserved_count)) {
440 transmitDevicesPreserved(preserved_first_ordinal, preserved_count);
445 const auto& device = state_.devices()[delta.locator];
446 transmitDeviceModified(device.ordinal, device.descriptor_key);
451 transmitDeviceRemoved(delta.old_ordinal);
461 for (
const auto& itr : state_.descriptor_deltas()) {
463 transmitDescriptorRemoved(itr.key);
469 if (!state_.reading_delta_groups().empty()) {
470 transmitReadingsBegin();
471 size_t reading_offset = 0;
472 roo_time::Uptime now = roo_time::Uptime::Now();
473 for (
const auto& group : state_.reading_delta_groups()) {
475 for (
size_t i = 0; i < group.reading_count; ++i) {
476 auto& reading = state_.reading_deltas()[reading_offset];
478 (now - reading.time).inMillis());
481 MLOG(roo_transceivers_remote_server)
482 <<
"Transmitting reading for " << group.device;
485 transmitReadingsEnd();
virtual void execute(std::function< void()> task)=0
virtual void registerClientMessageCallback(ClientMessageCb cb)=0
virtual void sendServerMessage(const roo_transceivers_ServerMessage &msg)=0
void devicesChanged() override
Called when the set of devices changes.
void newReadingsAvailable() override
Called when new readings are available.
UniverseServer(Universe &universe, UniverseServerChannel &channel, Executor &transmit_executor)
An abstract collection of transceiver devices.
virtual bool write(const ActuatorLocator &locator, float value)=0
Writes to the actuator identified by locator.
virtual Measurement read(const SensorLocator &locator) const =0
Returns the latest known reading of the sensor identified by locator.
virtual void removeEventListener(EventListener *listener)
Removes a previously registered event listener.
virtual bool getDeviceDescriptor(const DeviceLocator &locator, roo_transceivers_Descriptor &descriptor) const =0
Retrieves the descriptor for the transceiver identified by locator.
virtual void requestUpdate()=0
Requests sensor reading update from underlying devices.
virtual bool forEachDevice(std::function< bool(const DeviceLocator &)> callback) const =0
Iterates over all transceiver devices in this universe, calling callback for each device.
virtual void addEventListener(EventListener *listener)
Registers a listener for device-set and reading update events.
roo_transceivers_ServerMessage SrvDeviceRemoved(int prev_ordinal)
Removes a device.
roo_transceivers_ServerMessage SrvDeltaUpdateBegin()
Begins a delta update sequence.
roo_transceivers_ServerMessage SrvDescriptorAdded(int key, const roo_transceivers_Descriptor &descriptor)
Adds a descriptor in the server stream.
roo_transceivers_ServerMessage SrvDevicesModified(int prev_ordinal, int descriptor_key)
Marks a device as modified.
roo_transceivers_ServerMessage SrvUpdateEnd()
Ends an update sequence.
roo_transceivers_ServerMessage SrvReading(const DeviceLocator &device)
Begins readings for a device.
void AddReading(roo_transceivers_ServerMessage &reading, const SensorId &sensor_id, float value, uint64_t age_ms)
Appends a single sensor reading to a readings message.
roo_transceivers_ServerMessage SrvDeviceAdded(const DeviceLocator &locator, int descriptor_key)
Adds a device with a descriptor key.
roo_transceivers_ServerMessage SrvFullUpdateBegin()
Begins a full update sequence.
roo_transceivers_ServerMessage SrvInit()
Builds an init server message.
roo_transceivers_ServerMessage SrvDevicesPreserved(int first_preserved_ordinal, size_t count)
Marks a range of devices as preserved.
roo_transceivers_ServerMessage SrvReadingsEnd()
Ends a readings block.
roo_transceivers_ServerMessage SrvReadingsBegin()
Begins a readings block.
roo_transceivers_ServerMessage SrvDescriptorRemoved(int key)
Removes a descriptor from the server stream.
#define roo_transceivers_ClientMessage_request_update_tag
#define roo_transceivers_ClientMessage_request_state_tag
#define roo_transceivers_ClientMessage_write_tag
roo_transceivers_ClientMessage_Write write
union _roo_transceivers_ClientMessage::@0 contents
roo_transceivers_Quantity quantity
roo_transceivers_Quantity quantity
roo_transceivers_Descriptor_Sensor sensors[16]
pb_size_t actuators_count
roo_transceivers_Descriptor_Actuator actuators[16]
size_t operator()(const roo_transceivers_Descriptor &descriptor) const