roo_transport
API Documentation for roo_transport
Loading...
Searching...
No Matches
link_messaging.cpp
Go to the documentation of this file.
2
3namespace roo_transport {
4
6 size_t max_recv_packet_size,
7 uint16_t recv_thread_stack_size,
8 const char* recv_thread_name)
9 : transport_(link_transport),
10 link_(),
11 closed_(false),
12 max_recv_packet_size_(max_recv_packet_size),
13 recv_thread_stack_size_(recv_thread_stack_size),
14 recv_thread_name_(recv_thread_name) {}
15
17 roo::thread::attributes attrs;
18 attrs.set_name(recv_thread_name_);
19 attrs.set_stack_size(recv_thread_stack_size_);
20 reader_thread_ = roo::thread(attrs, [this]() { receiveLoop(); });
21}
22
24 if (closed_) return;
25 closed_ = true;
26 {
27 roo::lock_guard<roo::mutex> guard(mutex_);
28 link_.disconnect();
29 reconnected_.notify_all();
30 }
31 if (reader_thread_.joinable()) {
32 reader_thread_.join();
33 }
34}
35
36bool LinkMessaging::send(const roo::byte* header, size_t header_size,
37 const roo::byte* payload, size_t payload_size,
38 Messaging::ConnectionId* connection_id) {
39 roo::unique_lock<roo::mutex> guard(mutex_);
40 while (true) {
41 LinkStatus status = link_.status();
42 if (status == LinkStatus::kConnected || status == LinkStatus::kConnecting) {
43 if (connection_id != nullptr) {
44 *connection_id = (Messaging::ConnectionId)link_.streamId();
45 }
46 break;
47 }
48 reconnected_.wait(guard);
49 }
50 return sendInternal(header, header_size, payload, payload_size);
51}
52
54 const roo::byte* header,
55 size_t header_size,
56 const roo::byte* payload,
57 size_t payload_size) {
58 roo::unique_lock<roo::mutex> guard(mutex_);
59 if ((ConnectionId)link_.streamId() != connection_id) {
60 // Connection ID does not match the current link stream ID; the connection
61 // must have been reset.
62 return false;
63 }
64 return sendInternal(header, header_size, payload, payload_size);
65}
66
67bool LinkMessaging::sendInternal(const roo::byte* header, size_t header_size,
68 const roo::byte* payload,
69 size_t payload_size) {
70 roo_io::OutputStream& out = link_.out();
71 roo::byte serialized_size[4];
72 roo_io::StoreBeU32(header_size + payload_size, serialized_size);
73 out.writeFully(serialized_size, 4);
74 out.writeFully(header, header_size);
75 out.writeFully(payload, payload_size);
76 out.flush();
77 return out.isOpen();
78}
79
80uint32_t LinkMessaging::connect() {
81 roo::lock_guard<roo::mutex> guard(mutex_);
82 link_ = transport_.connectAsync();
83 reconnected_.notify_all();
84 return link_.streamId();
85}
86
87roo_transport::LinkInputStream& LinkMessaging::in() {
88 roo::lock_guard<roo::mutex> guard(mutex_);
89 return link_.in();
90}
91
92roo_transport::LinkOutputStream& LinkMessaging::out() {
93 roo::lock_guard<roo::mutex> guard(mutex_);
94 return link_.out();
95}
96
97void LinkMessaging::receiveLoop() {
98 std::unique_ptr<roo::byte[]> incoming_payload(
99 new roo::byte[max_recv_packet_size_]);
100 while (!closed_) {
101 ConnectionId connection_id = (ConnectionId)connect();
102 roo_io::InputStream& in = this->in();
103 while (true) {
104 roo::byte serialized_size[4];
105 size_t count = in.readFully(serialized_size, 4);
106 if (count < 4) {
107 if (in.status() == roo_io::kConnectionError &&
108 link_.status() == LinkStatus::kBroken) {
109 LOG(WARNING) << "Connection reset by peer.";
110 } else {
111 LOG(ERROR) << "Error: " << in.status();
112 }
113 reset(connection_id);
114 break;
115 }
116 uint32_t incoming_size = roo_io::LoadBeU32(serialized_size);
117 if (incoming_size > max_recv_packet_size_) {
118 LOG(ERROR) << "Error: incoming size " << incoming_size
119 << " exceeds max " << max_recv_packet_size_;
120 reset(connection_id);
121 break;
122 }
123 size_t read = in.readFully(incoming_payload.get(), incoming_size);
124 if (read < incoming_size) {
125 if (in.status() == roo_io::kConnectionError &&
126 link_.status() == LinkStatus::kBroken) {
127 LOG(WARNING) << "Connection reset by peer.";
128 } else {
129 LOG(ERROR) << "Error: " << in.status();
130 }
131 reset(connection_id);
132 break;
133 }
134 received(connection_id, incoming_payload.get(), incoming_size);
135 }
136 }
137}
138
139} // namespace roo_transport
LinkMessaging(roo_transport::LinkTransport &link_transport, size_t max_recv_packet_size, uint16_t recv_thread_stack_size=4096, const char *recv_thread_name="linkMsgRcv")
bool send(const roo::byte *header, size_t header_size, const roo::byte *payload, size_t payload_size, ConnectionId *connection_id) override
Sends message with optional header and payload.
bool sendContinuation(ConnectionId connection_id, const roo::byte *header, size_t header_size, const roo::byte *payload, size_t payload_size) override
Sends continuation payload on an existing sender-side connection.
Link connectAsync(std::function< void()> disconnect_fn=nullptr)
void received(ConnectionId connection_id, const roo::byte *data, size_t len)
Dispatches received message to registered receiver.
Definition messaging.cpp:5
void reset(ConnectionId connection_id)
Dispatches reset notification to registered receiver.
Definition messaging.cpp:12