roo_transceivers
API Documentation for roo_transceivers
Loading...
Searching...
No Matches
client.cpp
Go to the documentation of this file.
2
3#include "roo_logging.h"
5
6#if !defined(MLOG_roo_transceivers_remote_client)
7#define MLOG_roo_transceivers_remote_client 0
8#endif
9
10namespace roo_transceivers {
11
13 : channel_(channel), synced_(false) {
15 [this](const roo_transceivers_ServerMessage& msg) {
16 if (!handleServerMessage(msg)) {
18 }
19 });
20}
21
25
29
31 const roo::lock_guard<roo::mutex> lock(state_guard_);
32 return devices_.size();
33}
34
36 std::function<bool(const DeviceLocator&)> callback) const {
37 MLOG(roo_transceivers_remote_client) << "Enumerating devices ...";
38 size_t i = 0;
39 DeviceLocator loc;
40
41 bool result = true;
42 while (true) {
43 {
44 // Can't hold mutex for the entire iteration, b/c of deadlocks, when the
45 // callback tries to call getDescriptor() or something else that grabs the
46 // mutex.
47 const roo::lock_guard<roo::mutex> lock(state_guard_);
48 if (i >= devices_.size()) break;
49 loc = devices_[i].locator;
50 }
51 if (!callback(loc)) {
52 result = false;
53 break;
54 }
55 ++i;
56 }
57 MLOG(roo_transceivers_remote_client) << "Enumerating devices done.";
58 return result;
59}
60
61const roo_transceivers_Descriptor* UniverseClient::lookupDeviceDescriptor(
62 const DeviceLocator& locator, int& descriptor_key) const {
63 auto device_itr = device_idx_by_locator_.find(locator);
64 if (device_itr == device_idx_by_locator_.end()) {
65 // Device with the specified locator has not been found.
66 return nullptr;
67 }
68 descriptor_key = devices_[device_itr->second].descriptor_key;
69 auto descriptor_itr = descriptors_.find(descriptor_key);
70 if (descriptor_itr == descriptors_.end()) {
71 // Descriptor for the specified device is (erroneously) missing.
72 LOG(WARNING) << "No descriptor for device " << locator;
73 return nullptr;
74 }
75 return &descriptor_itr->second;
76}
77
79 const DeviceLocator& locator,
80 roo_transceivers_Descriptor& descriptor) const {
81 const roo::lock_guard<roo::mutex> lock(state_guard_);
82 int descriptor_key;
83 const auto* result = lookupDeviceDescriptor(locator, descriptor_key);
84 if (result == nullptr) return false;
85 descriptor = *result;
86 return true;
87}
88
90 const roo::lock_guard<roo::mutex> lock(state_guard_);
91 int descriptor_key;
92 const auto* descriptor =
93 lookupDeviceDescriptor(locator.device_locator(), descriptor_key);
94 if (descriptor == nullptr) return Measurement();
95 auto itr = readings_.find(locator);
96 if (itr == readings_.end()) {
97 return Measurement();
98 }
99 return itr->second;
100}
101
102bool UniverseClient::write(const ActuatorLocator& locator, float value) {
103 {
104 const roo::lock_guard<roo::mutex> lock(state_guard_);
105
106 int descriptor_key;
107 const auto* descriptor =
108 lookupDeviceDescriptor(locator.device_locator(), descriptor_key);
109 if (descriptor == nullptr) {
110 // LOG(WARNING) << "Attempt to write to an unknown device " <<
111 // locator.device_locator();
112 return false;
113 }
114 if (!actuators_.contains(locator)) {
115 LOG(WARNING) << "Attempt to write to an unknown actuator " << locator;
116 return false;
117 }
118 }
119 channel_.sendClientMessage(proto::ClientWrite(locator, value));
120 return true;
121}
122
126
128 const roo::lock_guard<roo::mutex> lock(listener_guard_);
129 listeners_.insert(listener);
130}
131
133 const roo::lock_guard<roo::mutex> lock(listener_guard_);
134 listeners_.erase(listener);
135}
136
137void UniverseClient::notifyDevicesChanged() {
138 const roo::lock_guard<roo::mutex> lock(listener_guard_);
139 for (auto* listener : listeners_) {
140 listener->devicesChanged();
141 }
142}
143
144void UniverseClient::notifyReadingsAvailable() {
145 const roo::lock_guard<roo::mutex> lock(listener_guard_);
146 for (auto* listener : listeners_) {
147 listener->newReadingsAvailable();
148 }
149}
150
151bool UniverseClient::handleServerMessage(
153 switch (msg.which_contents) {
155 return handleInit();
156 }
158 return handleUpdateBegin(msg.contents.transceiver_update_begin.delta);
159 }
161 return handleDescriptorAdded(msg.contents.descriptor_added.key,
163 }
165 return handleDescriptorRemoved(msg.contents.descriptor_removed.key);
166 }
168 DeviceLocator loc(msg.contents.device_added.locator_schema,
170 return handleDeviceAdded(loc, msg.contents.device_added.descriptor_key);
171 }
173 return handleDeviceRemoved(msg.contents.device_removed.prev_index);
174 }
176 size_t count = 1;
177 const auto& payload = msg.contents.device_preserved;
178 if (payload.has_count) {
179 count = payload.count;
180 }
181 return handleDevicePreserved(payload.prev_index, count);
182 }
184 return handleDeviceModified(msg.contents.device_modified.prev_index,
186 }
188 return handleUpdateEnd();
189 }
191 return handleReadingsBegin();
192 }
194 auto& payload = msg.contents.reading;
195 DeviceLocator device(payload.device_locator_schema,
196 payload.device_locator_id);
197 return handleReadings(device, payload.sensor_values,
198 payload.sensor_values_count);
199 }
201 return handleReadingsEnd();
202 }
203
204 default: {
205 LOG(WARNING) << "Unexpected server message " << msg.which_contents;
206 return false;
207 }
208 }
209}
210
211bool UniverseClient::handleInit() {
212 {
213 // Cancel the update, if any pending.
214 roo::lock_guard<roo::mutex> lock(state_guard_);
215 updated_devices_.clear();
216 synced_ = false;
217 }
219 return true;
220}
221
222bool UniverseClient::handleUpdateBegin(bool delta) {
223 roo::lock_guard<roo::mutex> lock(state_guard_);
224 if (!delta) {
225 clearAll();
226 synced_ = true;
227 MLOG(roo_transceivers_remote_client)
228 << "Received full update begin message";
229 } else {
230 MLOG(roo_transceivers_remote_client)
231 << "Received delta update begin message";
232 }
233 // Protocol error if the update is already in progress.
234 if (!updated_devices_.empty()) return false;
235 return true;
236}
237
238bool UniverseClient::handleUpdateEnd() {
239 {
240 roo::lock_guard<roo::mutex> lock(state_guard_);
241 if (!synced_) return true;
242 devices_.swap(updated_devices_);
243 updated_devices_.clear();
244 device_idx_by_locator_.clear();
245 for (size_t i = 0; i < devices_.size(); ++i) {
246 device_idx_by_locator_[devices_[i].locator] = i;
247 }
248 }
249 if (MLOG_IS_ON(roo_transceivers_remote_client)) {
250 MLOG(roo_transceivers_remote_client) << "Post-receive state: ";
251 for (const auto& device : devices_) {
252 MLOG(roo_transceivers_remote_client)
253 << " " << device.locator << ": " << device.descriptor_key;
254 }
255 }
256 notifyDevicesChanged();
257 return true;
258}
259
260bool UniverseClient::handleDescriptorAdded(
261 int key, const roo_transceivers_Descriptor& descriptor) {
262 MLOG(roo_transceivers_remote_client) << "Received added descriptor";
263 const roo::lock_guard<roo::mutex> lock(state_guard_);
264 if (!synced_) return true;
265 descriptors_[key] = descriptor;
266 return true;
267}
268
269bool UniverseClient::handleDescriptorRemoved(int key) {
270 MLOG(roo_transceivers_remote_client) << "Received removed descriptor " << key;
271 const roo::lock_guard<roo::mutex> lock(state_guard_);
272 if (!synced_) return true;
273 // At this point we do not expect to have any devices pointing to this
274 // descriptor.
275 descriptors_.erase(key);
276 return true;
277}
278
279bool UniverseClient::handleDeviceAdded(const DeviceLocator& locator,
280 int descriptor_key) {
281 MLOG(roo_transceivers_remote_client) << "Received added device " << locator;
282 const roo::lock_guard<roo::mutex> lock(state_guard_);
283 if (!synced_) return true;
284 auto itr = descriptors_.find(descriptor_key);
285 if (itr == descriptors_.end()) {
286 LOG(WARNING)
287 << "Bogus server message (DeviceAdded): unknown descriptor key "
288 << descriptor_key;
289 return false;
290 }
291 updated_devices_.push_back(DeviceEntry{locator, descriptor_key});
292 const roo_transceivers_Descriptor& descriptor = itr->second;
293 // Pre-initialize all sensor readings to set quantities.
294 for (size_t i = 0; i < descriptor.sensors_count; ++i) {
295 SensorLocator sensor_locator(locator, descriptor.sensors[i].id);
296 readings_[sensor_locator] =
297 Measurement(descriptor.sensors[i].quantity, roo_time::Uptime::Start());
298 }
299 // Also, register the actuators for fast lookup during write().
300 for (size_t i = 0; i < descriptor.actuators_count; ++i) {
301 ActuatorLocator actuator_locator(locator, descriptor.actuators[i].id);
302 actuators_.insert(actuator_locator);
303 }
304 return true;
305}
306
307bool UniverseClient::handleDeviceRemoved(int prev_index) {
308 const roo::lock_guard<roo::mutex> lock(state_guard_);
309 if (!synced_) return true;
310 int descriptor_key;
311 if (prev_index < 0 ||
312 static_cast<size_t>(prev_index) >= devices_.size()) {
313 LOG(WARNING) << "Bogus server message (DeviceRemoved): prev_index of "
314 << prev_index << " is out of bounds; device count is "
315 << devices_.size();
316 return false;
317 }
318 const DeviceLocator& locator = devices_[prev_index].locator;
319 MLOG(roo_transceivers_remote_client) << "Received removed device " << locator;
320 // Erase all readings.
321 const roo_transceivers_Descriptor* descriptor =
322 lookupDeviceDescriptor(locator, descriptor_key);
323 if (descriptor == nullptr) {
324 LOG(WARNING) << "Bogus server message (DeviceRemoved): missing device "
325 "descriptor for "
326 << locator;
327 return false;
328 }
329 for (size_t i = 0; i < descriptor->sensors_count; ++i) {
330 SensorLocator sensor_locator(locator, descriptor->sensors[i].id);
331 readings_.erase(sensor_locator);
332 }
333 for (size_t i = 0; i < descriptor->actuators_count; ++i) {
334 ActuatorLocator actuator_locator(locator, descriptor->actuators[i].id);
335 actuators_.erase(actuator_locator);
336 }
337 // That's it - we're not adding anything to updated_devices_, and
338 // device_idx_by_locator will be refreshed on transceiver_update_end.
339 return true;
340}
341
342bool UniverseClient::handleDevicePreserved(int prev_index_first, size_t count) {
343 const roo::lock_guard<roo::mutex> lock(state_guard_);
344 if (!synced_) return true;
345 MLOG(roo_transceivers_remote_client)
346 << "Received preserved devices (" << count << " at " << prev_index_first
347 << ")";
348 if (prev_index_first < 0 || prev_index_first + count > devices_.size()) {
349 LOG(WARNING) << "Bogus server message (DevicePreserved): the range ("
350 << prev_index_first << ", " << prev_index_first + count
351 << ") is out of bounds; device count is " << devices_.size();
352 return false;
353 }
354 for (size_t i = 0; i < count; ++i) {
355 updated_devices_.push_back(devices_[prev_index_first + i]);
356 }
357 return true;
358}
359
360bool UniverseClient::handleDeviceModified(int prev_index, int descriptor_key) {
361 MLOG(roo_transceivers_remote_client)
362 << "Received modified device at " << prev_index;
363 const roo::lock_guard<roo::mutex> lock(state_guard_);
364 if (!synced_) return true;
365 if (prev_index < 0 ||
366 static_cast<size_t>(prev_index) >= devices_.size()) {
367 LOG(WARNING) << "Bogus server message (DeviceModified): prev_index of "
368 << prev_index << " is out of bounds; device count is "
369 << devices_.size();
370 return false;
371 }
372 updated_devices_.push_back(
373 DeviceEntry{devices_[prev_index].locator, descriptor_key});
374 return true;
375}
376
377void UniverseClient::clearAll() {
378 descriptors_.clear();
379 devices_.clear();
380 updated_devices_.clear();
381 device_idx_by_locator_.clear();
382 readings_.clear();
383 actuators_.clear();
384}
385
386bool UniverseClient::handleReadings(
387 const DeviceLocator& device,
389 size_t readings_count) {
390 const roo::lock_guard<roo::mutex> lock(state_guard_);
391 if (!synced_) return true;
392 int descriptor_key;
393 const roo_transceivers_Descriptor* descriptor =
394 lookupDeviceDescriptor(device, descriptor_key);
395 if (descriptor == nullptr) {
396 LOG(WARNING)
397 << "Bogus server message (Readings): missing device descriptor for "
398 << device << ". Current count of known devices: " << devices_.size();
399 synced_ = false;
400 return false;
401 }
402 roo_time::Uptime now = roo_time::Uptime::Now();
403 for (size_t i = 0; i < readings_count; ++i) {
404 SensorLocator sensor_locator(device, readings[i].device_locator_sensor_id);
405 auto itr = readings_.find(sensor_locator);
406 if (itr == readings_.end()) {
407 LOG(WARNING)
408 << "Bogus server message (Readings): sensor locator not found: "
409 << sensor_locator;
410 continue;
411 }
412 // Overwrite the measurement with new value and time (but keep the
413 // quantity).
414 MLOG(roo_transceivers_remote_client)
415 << "Received reading of " << sensor_locator << ": "
416 << readings[i].value;
417 itr->second = Measurement(itr->second.quantity(),
418 now - roo_time::Millis(readings[i].age_ms),
419 readings[i].value);
420 }
421 return true;
422}
423
424bool UniverseClient::handleReadingsBegin() { return true; }
425
426bool UniverseClient::handleReadingsEnd() {
427 notifyReadingsAvailable();
428 return true;
429}
430
431} // namespace roo_transceivers
Identifies actuator within a transceiver device.
Definition id.h:104
const DeviceLocator & device_locator() const
Returns the device locator.
Definition id.h:115
Identifies a transceiver device by schema and device id.
Definition id.h:21
Listener for universe-level change notifications.
Definition notification.h:9
Measurement of a quantity at a specific time.
Definition measurement.h:11
Identifies sensor within a transceiver device.
Definition id.h:57
const DeviceLocator & device_locator() const
Returns the device locator.
Definition id.h:67
Communication channel for UniverseClient.
Definition client.h:19
virtual void registerServerMessageCallback(ServerMessageCb cb)=0
virtual void sendClientMessage(const roo_transceivers_ClientMessage &msg)=0
void removeEventListener(EventListener *listener) override
Removes a previously registered event listener.
Definition client.cpp:132
size_t deviceCount() const override
Returns the total number of transceiver devices in this universe.
Definition client.cpp:30
Measurement read(const SensorLocator &locator) const override
Returns the latest known reading of the sensor identified by locator.
Definition client.cpp:89
bool forEachDevice(std::function< bool(const DeviceLocator &)> callback) const override
Iterates over all transceiver devices in this universe, calling callback for each device.
Definition client.cpp:35
void requestUpdate() override
Requests sensor reading update from underlying devices.
Definition client.cpp:123
UniverseClient(UniverseClientChannel &channel)
Definition client.cpp:12
void addEventListener(EventListener *listener) override
Registers a listener for device-set and reading update events.
Definition client.cpp:127
bool write(const ActuatorLocator &locator, float value) override
Writes to the actuator identified by locator.
Definition client.cpp:102
bool getDeviceDescriptor(const DeviceLocator &locator, roo_transceivers_Descriptor &descriptor) const override
Retrieves the descriptor for the transceiver identified by locator.
Definition client.cpp:78
roo_transceivers_ClientMessage ClientRequestState()
Builds a client state request.
Definition proto.cpp:139
roo_transceivers_ClientMessage ClientWrite(const ActuatorLocator &actuator, float value)
Builds a client write request.
Definition proto.cpp:145
roo_transceivers_ClientMessage ClientRequestUpdate()
Builds a client update request.
Definition proto.cpp:133
#define roo_transceivers_ServerMessage_transceiver_update_end_tag
#define roo_transceivers_ServerMessage_device_modified_tag
#define roo_transceivers_ServerMessage_reading_tag
#define roo_transceivers_ServerMessage_device_added_tag
#define roo_transceivers_ServerMessage_transceiver_update_begin_tag
#define roo_transceivers_ServerMessage_device_preserved_tag
#define roo_transceivers_ServerMessage_readings_begin_tag
#define roo_transceivers_ServerMessage_init_tag
#define roo_transceivers_ServerMessage_readings_end_tag
#define roo_transceivers_ServerMessage_descriptor_removed_tag
#define roo_transceivers_ServerMessage_descriptor_added_tag
#define roo_transceivers_ServerMessage_device_removed_tag
roo_transceivers_Descriptor_Sensor sensors[16]
roo_transceivers_Descriptor_Actuator actuators[16]
roo_transceivers_ServerMessage_DeviceAdded device_added
roo_transceivers_ServerMessage_Reading reading
roo_transceivers_ServerMessage_UpdateBegin transceiver_update_begin
roo_transceivers_ServerMessage_DeviceRemoved device_removed
roo_transceivers_ServerMessage_DescriptorRemoved descriptor_removed
roo_transceivers_ServerMessage_DescriptorAdded descriptor_added
roo_transceivers_ServerMessage_DeviceModified device_modified
union _roo_transceivers_ServerMessage::@1 contents
roo_transceivers_ServerMessage_DevicePreserved device_preserved