3#include "roo_logging.h"
6#if !defined(MLOG_roo_transceivers_remote_client)
7#define MLOG_roo_transceivers_remote_client 0
13 : channel_(channel), synced_(false) {
16 if (!handleServerMessage(msg)) {
31 const roo::lock_guard<roo::mutex> lock(state_guard_);
32 return devices_.size();
37 MLOG(roo_transceivers_remote_client) <<
"Enumerating devices ...";
47 const roo::lock_guard<roo::mutex> lock(state_guard_);
48 if (i >= devices_.size())
break;
49 loc = devices_[i].locator;
57 MLOG(roo_transceivers_remote_client) <<
"Enumerating devices done.";
63 auto device_itr = device_idx_by_locator_.find(locator);
64 if (device_itr == device_idx_by_locator_.end()) {
68 descriptor_key = devices_[device_itr->second].descriptor_key;
69 auto descriptor_itr = descriptors_.find(descriptor_key);
70 if (descriptor_itr == descriptors_.end()) {
72 LOG(WARNING) <<
"No descriptor for device " << locator;
75 return &descriptor_itr->second;
81 const roo::lock_guard<roo::mutex> lock(state_guard_);
83 const auto* result = lookupDeviceDescriptor(locator, descriptor_key);
84 if (result ==
nullptr)
return false;
90 const roo::lock_guard<roo::mutex> lock(state_guard_);
92 const auto* descriptor =
95 auto itr = readings_.find(locator);
96 if (itr == readings_.end()) {
104 const roo::lock_guard<roo::mutex> lock(state_guard_);
107 const auto* descriptor =
109 if (descriptor ==
nullptr) {
114 if (!actuators_.contains(locator)) {
115 LOG(WARNING) <<
"Attempt to write to an unknown actuator " << locator;
128 const roo::lock_guard<roo::mutex> lock(listener_guard_);
129 listeners_.insert(listener);
133 const roo::lock_guard<roo::mutex> lock(listener_guard_);
134 listeners_.erase(listener);
137void UniverseClient::notifyDevicesChanged() {
138 const roo::lock_guard<roo::mutex> lock(listener_guard_);
139 for (
auto* listener : listeners_) {
140 listener->devicesChanged();
144void UniverseClient::notifyReadingsAvailable() {
145 const roo::lock_guard<roo::mutex> lock(listener_guard_);
146 for (
auto* listener : listeners_) {
147 listener->newReadingsAvailable();
151bool UniverseClient::handleServerMessage(
178 if (payload.has_count) {
179 count = payload.
count;
181 return handleDevicePreserved(payload.prev_index, count);
188 return handleUpdateEnd();
191 return handleReadingsBegin();
195 DeviceLocator device(payload.device_locator_schema,
196 payload.device_locator_id);
197 return handleReadings(device, payload.sensor_values,
198 payload.sensor_values_count);
201 return handleReadingsEnd();
205 LOG(WARNING) <<
"Unexpected server message " << msg.
which_contents;
211bool UniverseClient::handleInit() {
214 roo::lock_guard<roo::mutex> lock(state_guard_);
215 updated_devices_.clear();
222bool UniverseClient::handleUpdateBegin(
bool delta) {
223 roo::lock_guard<roo::mutex> lock(state_guard_);
227 MLOG(roo_transceivers_remote_client)
228 <<
"Received full update begin message";
230 MLOG(roo_transceivers_remote_client)
231 <<
"Received delta update begin message";
234 if (!updated_devices_.empty())
return false;
238bool UniverseClient::handleUpdateEnd() {
240 roo::lock_guard<roo::mutex> lock(state_guard_);
241 if (!synced_)
return true;
242 devices_.swap(updated_devices_);
243 updated_devices_.clear();
244 device_idx_by_locator_.clear();
245 for (
size_t i = 0; i < devices_.size(); ++i) {
246 device_idx_by_locator_[devices_[i].locator] = i;
249 if (MLOG_IS_ON(roo_transceivers_remote_client)) {
250 MLOG(roo_transceivers_remote_client) <<
"Post-receive state: ";
251 for (
const auto& device : devices_) {
252 MLOG(roo_transceivers_remote_client)
253 <<
" " << device.locator <<
": " << device.descriptor_key;
256 notifyDevicesChanged();
260bool UniverseClient::handleDescriptorAdded(
262 MLOG(roo_transceivers_remote_client) <<
"Received added descriptor";
263 const roo::lock_guard<roo::mutex> lock(state_guard_);
264 if (!synced_)
return true;
265 descriptors_[key] = descriptor;
269bool UniverseClient::handleDescriptorRemoved(
int key) {
270 MLOG(roo_transceivers_remote_client) <<
"Received removed descriptor " << key;
271 const roo::lock_guard<roo::mutex> lock(state_guard_);
272 if (!synced_)
return true;
275 descriptors_.erase(key);
279bool UniverseClient::handleDeviceAdded(
const DeviceLocator& locator,
280 int descriptor_key) {
281 MLOG(roo_transceivers_remote_client) <<
"Received added device " << locator;
282 const roo::lock_guard<roo::mutex> lock(state_guard_);
283 if (!synced_)
return true;
284 auto itr = descriptors_.find(descriptor_key);
285 if (itr == descriptors_.end()) {
287 <<
"Bogus server message (DeviceAdded): unknown descriptor key "
291 updated_devices_.push_back(DeviceEntry{locator, descriptor_key});
295 SensorLocator sensor_locator(locator, descriptor.
sensors[i].
id);
296 readings_[sensor_locator] =
297 Measurement(descriptor.
sensors[i].
quantity, roo_time::Uptime::Start());
301 ActuatorLocator actuator_locator(locator, descriptor.
actuators[i].
id);
302 actuators_.insert(actuator_locator);
307bool UniverseClient::handleDeviceRemoved(
int prev_index) {
308 const roo::lock_guard<roo::mutex> lock(state_guard_);
309 if (!synced_)
return true;
311 if (prev_index < 0 ||
312 static_cast<size_t>(prev_index) >= devices_.size()) {
313 LOG(WARNING) <<
"Bogus server message (DeviceRemoved): prev_index of "
314 << prev_index <<
" is out of bounds; device count is "
318 const DeviceLocator& locator = devices_[prev_index].locator;
319 MLOG(roo_transceivers_remote_client) <<
"Received removed device " << locator;
322 lookupDeviceDescriptor(locator, descriptor_key);
323 if (descriptor ==
nullptr) {
324 LOG(WARNING) <<
"Bogus server message (DeviceRemoved): missing device "
330 SensorLocator sensor_locator(locator, descriptor->
sensors[i].
id);
331 readings_.erase(sensor_locator);
334 ActuatorLocator actuator_locator(locator, descriptor->
actuators[i].
id);
335 actuators_.erase(actuator_locator);
342bool UniverseClient::handleDevicePreserved(
int prev_index_first,
size_t count) {
343 const roo::lock_guard<roo::mutex> lock(state_guard_);
344 if (!synced_)
return true;
345 MLOG(roo_transceivers_remote_client)
346 <<
"Received preserved devices (" << count <<
" at " << prev_index_first
348 if (prev_index_first < 0 || prev_index_first + count > devices_.size()) {
349 LOG(WARNING) <<
"Bogus server message (DevicePreserved): the range ("
350 << prev_index_first <<
", " << prev_index_first + count
351 <<
") is out of bounds; device count is " << devices_.size();
354 for (
size_t i = 0; i < count; ++i) {
355 updated_devices_.push_back(devices_[prev_index_first + i]);
360bool UniverseClient::handleDeviceModified(
int prev_index,
int descriptor_key) {
361 MLOG(roo_transceivers_remote_client)
362 <<
"Received modified device at " << prev_index;
363 const roo::lock_guard<roo::mutex> lock(state_guard_);
364 if (!synced_)
return true;
365 if (prev_index < 0 ||
366 static_cast<size_t>(prev_index) >= devices_.size()) {
367 LOG(WARNING) <<
"Bogus server message (DeviceModified): prev_index of "
368 << prev_index <<
" is out of bounds; device count is "
372 updated_devices_.push_back(
373 DeviceEntry{devices_[prev_index].locator, descriptor_key});
377void UniverseClient::clearAll() {
378 descriptors_.clear();
380 updated_devices_.clear();
381 device_idx_by_locator_.clear();
386bool UniverseClient::handleReadings(
387 const DeviceLocator& device,
389 size_t readings_count) {
390 const roo::lock_guard<roo::mutex> lock(state_guard_);
391 if (!synced_)
return true;
394 lookupDeviceDescriptor(device, descriptor_key);
395 if (descriptor ==
nullptr) {
397 <<
"Bogus server message (Readings): missing device descriptor for "
398 << device <<
". Current count of known devices: " << devices_.size();
402 roo_time::Uptime now = roo_time::Uptime::Now();
403 for (
size_t i = 0; i < readings_count; ++i) {
404 SensorLocator sensor_locator(device, readings[i].device_locator_sensor_id);
405 auto itr = readings_.find(sensor_locator);
406 if (itr == readings_.end()) {
408 <<
"Bogus server message (Readings): sensor locator not found: "
414 MLOG(roo_transceivers_remote_client)
415 <<
"Received reading of " << sensor_locator <<
": "
416 << readings[i].
value;
417 itr->second = Measurement(itr->second.quantity(),
418 now - roo_time::Millis(readings[i].age_ms),
424bool UniverseClient::handleReadingsBegin() {
return true; }
426bool UniverseClient::handleReadingsEnd() {
427 notifyReadingsAvailable();
Identifies actuator within a transceiver device.
const DeviceLocator & device_locator() const
Returns the device locator.
Identifies a transceiver device by schema and device id.
Listener for universe-level change notifications.
Measurement of a quantity at a specific time.
Identifies sensor within a transceiver device.
const DeviceLocator & device_locator() const
Returns the device locator.
Communication channel for UniverseClient.
virtual void registerServerMessageCallback(ServerMessageCb cb)=0
virtual void sendClientMessage(const roo_transceivers_ClientMessage &msg)=0
void removeEventListener(EventListener *listener) override
Removes a previously registered event listener.
size_t deviceCount() const override
Returns the total number of transceiver devices in this universe.
Measurement read(const SensorLocator &locator) const override
Returns the latest known reading of the sensor identified by locator.
bool forEachDevice(std::function< bool(const DeviceLocator &)> callback) const override
Iterates over all transceiver devices in this universe, calling callback for each device.
void requestUpdate() override
Requests sensor reading update from underlying devices.
UniverseClient(UniverseClientChannel &channel)
void addEventListener(EventListener *listener) override
Registers a listener for device-set and reading update events.
bool write(const ActuatorLocator &locator, float value) override
Writes to the actuator identified by locator.
bool getDeviceDescriptor(const DeviceLocator &locator, roo_transceivers_Descriptor &descriptor) const override
Retrieves the descriptor for the transceiver identified by locator.
roo_transceivers_ClientMessage ClientRequestState()
Builds a client state request.
roo_transceivers_ClientMessage ClientWrite(const ActuatorLocator &actuator, float value)
Builds a client write request.
roo_transceivers_ClientMessage ClientRequestUpdate()
Builds a client update request.
#define roo_transceivers_ServerMessage_transceiver_update_end_tag
#define roo_transceivers_ServerMessage_device_modified_tag
#define roo_transceivers_ServerMessage_reading_tag
#define roo_transceivers_ServerMessage_device_added_tag
#define roo_transceivers_ServerMessage_transceiver_update_begin_tag
#define roo_transceivers_ServerMessage_device_preserved_tag
#define roo_transceivers_ServerMessage_readings_begin_tag
#define roo_transceivers_ServerMessage_init_tag
#define roo_transceivers_ServerMessage_readings_end_tag
#define roo_transceivers_ServerMessage_descriptor_removed_tag
#define roo_transceivers_ServerMessage_descriptor_added_tag
#define roo_transceivers_ServerMessage_device_removed_tag
roo_transceivers_Quantity quantity
roo_transceivers_Descriptor_Sensor sensors[16]
pb_size_t actuators_count
roo_transceivers_Descriptor_Actuator actuators[16]
roo_transceivers_Descriptor descriptor
roo_transceivers_ServerMessage_DeviceAdded device_added
roo_transceivers_ServerMessage_Reading reading
roo_transceivers_ServerMessage_UpdateBegin transceiver_update_begin
roo_transceivers_ServerMessage_DeviceRemoved device_removed
roo_transceivers_ServerMessage_DescriptorRemoved descriptor_removed
roo_transceivers_ServerMessage_DescriptorAdded descriptor_added
roo_transceivers_ServerMessage_DeviceModified device_modified
union _roo_transceivers_ServerMessage::@1 contents
roo_transceivers_ServerMessage_DevicePreserved device_preserved