3#include "roo_io/memory/load.h"
4#include "roo_io/memory/store.h"
5#include "roo_logging.h"
10 : messaging_(messaging), dispatcher_(*this) {
17 const roo::byte* data,
size_t len) {
19 LOG(WARNING) <<
"Messaging: received message too short (" << len
23 ChannelId channel_id = (ChannelId)roo_io::LoadU8(&data[0]);
25 mux_.received(connection_id, channel_id, data + 1, len - 1);
29 ChannelId channel_id,
const roo::byte* data,
31 auto it = receivers_.find(channel_id);
32 if (it == receivers_.end()) {
33 LOG(WARNING) <<
"Messaging: received message for unknown channel "
37 it->second->received(connection_id, data, len);
42 for (
auto& entry : receivers_) {
43 entry.second->reset(connection_id);
47void MuxMessaging::registerChannel(Channel& channel) {
48 CHECK(receivers_.insert({channel.id_, &channel}).second)
49 <<
"Channel ID " << (int)channel.id_ <<
" is already registered.";
52void MuxMessaging::unregisterChannel(Channel& channel) {
53 CHECK(receivers_.erase(channel.id_))
54 <<
"Channel ID " << (int)channel.id_ <<
" is not registered.";
58 const roo::byte* payload,
size_t payload_size,
60 roo::byte new_header[header_size + 1];
61 roo_io::StoreU8((uint8_t)id_, &new_header[0]);
62 memcpy(&new_header[1], header, header_size);
63 return messaging_.messaging_.
send(new_header, header_size + 1, payload,
64 payload_size, connection_id);
69 size_t header_size,
const roo::byte* payload,
size_t payload_size) {
70 roo::byte new_header[header_size + 1];
71 roo_io::StoreU8((uint8_t)id_, &new_header[0]);
72 memcpy(&new_header[1], header, header_size);
74 connection_id, new_header, header_size + 1, payload, payload_size);
Abstract interface for message exchange over a reliable channel.
virtual bool sendContinuation(ConnectionId connection_id, const roo::byte *header, size_t header_size, const roo::byte *payload, size_t payload_size)=0
Sends continuation payload on an existing sender-side connection.
virtual bool send(const roo::byte *header, size_t header_size, const roo::byte *payload, size_t payload_size, ConnectionId *connection_id)=0
Sends message with optional header and payload.
void setReceiver(Receiver &receiver)
Registers message receiver. Call before channel initialization.
bool sendContinuation(ConnectionId connection_id, const roo::byte *header, size_t header_size, const roo::byte *payload, size_t payload_size) override
Sends continuation payload on an existing sender-side connection.
bool send(const roo::byte *header, size_t header_size, const roo::byte *payload, size_t payload_size, ConnectionId *connection_id) override
Sends message with optional header and payload.
MuxMessaging(Messaging &messaging)