roo_transport
API Documentation for roo_transport
Loading...
Searching...
No Matches
mux_messaging.cpp
Go to the documentation of this file.
2
3#include "roo_io/memory/load.h"
4#include "roo_io/memory/store.h"
5#include "roo_logging.h"
6
7namespace roo_transport {
8
10 : messaging_(messaging), dispatcher_(*this) {
11 messaging_.setReceiver(dispatcher_);
12}
13
15
16void MuxMessaging::Dispatcher::received(Messaging::ConnectionId connection_id,
17 const roo::byte* data, size_t len) {
18 if (len < 1) {
19 LOG(WARNING) << "Messaging: received message too short (" << len
20 << " bytes)";
21 return;
22 }
23 ChannelId channel_id = (ChannelId)roo_io::LoadU8(&data[0]);
24
25 mux_.received(connection_id, channel_id, data + 1, len - 1);
26}
27
28void MuxMessaging::received(Messaging::ConnectionId connection_id,
29 ChannelId channel_id, const roo::byte* data,
30 size_t len) {
31 auto it = receivers_.find(channel_id);
32 if (it == receivers_.end()) {
33 LOG(WARNING) << "Messaging: received message for unknown channel "
34 << (int)channel_id;
35 } else {
36 // Dispatch the message to the appropriate channel receiver.
37 it->second->received(connection_id, data, len);
38 }
39}
40
41void MuxMessaging::reset(Messaging::ConnectionId connection_id) {
42 for (auto& entry : receivers_) {
43 entry.second->reset(connection_id);
44 }
45}
46
47void MuxMessaging::registerChannel(Channel& channel) {
48 CHECK(receivers_.insert({channel.id_, &channel}).second)
49 << "Channel ID " << (int)channel.id_ << " is already registered.";
50}
51
52void MuxMessaging::unregisterChannel(Channel& channel) {
53 CHECK(receivers_.erase(channel.id_))
54 << "Channel ID " << (int)channel.id_ << " is not registered.";
55}
56
57bool MuxMessaging::Channel::send(const roo::byte* header, size_t header_size,
58 const roo::byte* payload, size_t payload_size,
59 ConnectionId* connection_id) {
60 roo::byte new_header[header_size + 1];
61 roo_io::StoreU8((uint8_t)id_, &new_header[0]);
62 memcpy(&new_header[1], header, header_size);
63 return messaging_.messaging_.send(new_header, header_size + 1, payload,
64 payload_size, connection_id);
65}
66
68 Messaging::ConnectionId connection_id, const roo::byte* header,
69 size_t header_size, const roo::byte* payload, size_t payload_size) {
70 roo::byte new_header[header_size + 1];
71 roo_io::StoreU8((uint8_t)id_, &new_header[0]);
72 memcpy(&new_header[1], header, header_size);
73 return messaging_.messaging_.sendContinuation(
74 connection_id, new_header, header_size + 1, payload, payload_size);
75}
76
77} // namespace roo_transport
Abstract interface for message exchange over a reliable channel.
Definition messaging.h:17
virtual bool sendContinuation(ConnectionId connection_id, const roo::byte *header, size_t header_size, const roo::byte *payload, size_t payload_size)=0
Sends continuation payload on an existing sender-side connection.
virtual bool send(const roo::byte *header, size_t header_size, const roo::byte *payload, size_t payload_size, ConnectionId *connection_id)=0
Sends message with optional header and payload.
void setReceiver(Receiver &receiver)
Registers message receiver. Call before channel initialization.
Definition messaging.h:27
bool sendContinuation(ConnectionId connection_id, const roo::byte *header, size_t header_size, const roo::byte *payload, size_t payload_size) override
Sends continuation payload on an existing sender-side connection.
bool send(const roo::byte *header, size_t header_size, const roo::byte *payload, size_t payload_size, ConnectionId *connection_id) override
Sends message with optional header and payload.
MuxMessaging(Messaging &messaging)