6 size_t max_recv_packet_size,
7 uint16_t recv_thread_stack_size,
8 const char* recv_thread_name)
9 : transport_(link_transport),
12 max_recv_packet_size_(max_recv_packet_size),
13 recv_thread_stack_size_(recv_thread_stack_size),
14 recv_thread_name_(recv_thread_name) {}
17 roo::thread::attributes attrs;
18 attrs.set_name(recv_thread_name_);
19 attrs.set_stack_size(recv_thread_stack_size_);
20 reader_thread_ = roo::thread(attrs, [
this]() { receiveLoop(); });
27 roo::lock_guard<roo::mutex> guard(mutex_);
29 reconnected_.notify_all();
31 if (reader_thread_.joinable()) {
32 reader_thread_.join();
37 const roo::byte* payload,
size_t payload_size,
39 roo::unique_lock<roo::mutex> guard(mutex_);
43 if (connection_id !=
nullptr) {
48 reconnected_.wait(guard);
50 return sendInternal(header, header_size, payload, payload_size);
54 const roo::byte* header,
56 const roo::byte* payload,
57 size_t payload_size) {
58 roo::unique_lock<roo::mutex> guard(mutex_);
64 return sendInternal(header, header_size, payload, payload_size);
67bool LinkMessaging::sendInternal(
const roo::byte* header,
size_t header_size,
68 const roo::byte* payload,
69 size_t payload_size) {
70 roo_io::OutputStream& out = link_.
out();
71 roo::byte serialized_size[4];
72 roo_io::StoreBeU32(header_size + payload_size, serialized_size);
73 out.writeFully(serialized_size, 4);
74 out.writeFully(header, header_size);
75 out.writeFully(payload, payload_size);
80uint32_t LinkMessaging::connect() {
81 roo::lock_guard<roo::mutex> guard(mutex_);
83 reconnected_.notify_all();
87roo_transport::LinkInputStream& LinkMessaging::in() {
88 roo::lock_guard<roo::mutex> guard(mutex_);
92roo_transport::LinkOutputStream& LinkMessaging::out() {
93 roo::lock_guard<roo::mutex> guard(mutex_);
97void LinkMessaging::receiveLoop() {
98 std::unique_ptr<roo::byte[]> incoming_payload(
99 new roo::byte[max_recv_packet_size_]);
102 roo_io::InputStream& in = this->in();
104 roo::byte serialized_size[4];
105 size_t count = in.readFully(serialized_size, 4);
107 if (in.status() == roo_io::kConnectionError &&
109 LOG(WARNING) <<
"Connection reset by peer.";
111 LOG(ERROR) <<
"Error: " << in.status();
113 reset(connection_id);
116 uint32_t incoming_size = roo_io::LoadBeU32(serialized_size);
117 if (incoming_size > max_recv_packet_size_) {
118 LOG(ERROR) <<
"Error: incoming size " << incoming_size
119 <<
" exceeds max " << max_recv_packet_size_;
120 reset(connection_id);
123 size_t read = in.readFully(incoming_payload.get(), incoming_size);
124 if (read < incoming_size) {
125 if (in.status() == roo_io::kConnectionError &&
127 LOG(WARNING) <<
"Connection reset by peer.";
129 LOG(ERROR) <<
"Error: " << in.status();
131 reset(connection_id);
134 received(connection_id, incoming_payload.get(), incoming_size);
LinkMessaging(roo_transport::LinkTransport &link_transport, size_t max_recv_packet_size, uint16_t recv_thread_stack_size=4096, const char *recv_thread_name="linkMsgRcv")
bool send(const roo::byte *header, size_t header_size, const roo::byte *payload, size_t payload_size, ConnectionId *connection_id) override
Sends message with optional header and payload.
bool sendContinuation(ConnectionId connection_id, const roo::byte *header, size_t header_size, const roo::byte *payload, size_t payload_size) override
Sends continuation payload on an existing sender-side connection.
Link connectAsync(std::function< void()> disconnect_fn=nullptr)
LinkStatus status() const
uint32_t streamId() const
void received(ConnectionId connection_id, const roo::byte *data, size_t len)
Dispatches received message to registered receiver.
void reset(ConnectionId connection_id)
Dispatches reset notification to registered receiver.