roo_transport
API Documentation for roo_transport
Loading...
Searching...
No Matches
channel.cpp
Go to the documentation of this file.
2#ifdef ROO_USE_THREADS
3
4#include <cmath>
5
8
9#if (defined ESP32 || defined ROO_TESTING)
10#include "esp_random.h"
11#define RANDOM_INTEGER esp_random
12#elif (defined ARDUINO_ARCH_RP2040)
13#define RANDOM_INTEGER rp2040.hwrand32
14#else
15#define RANDOM_INTEGER rand
16#endif
17
18#if !defined(MLOG_roo_transport_reliable_channel_connection)
19#define MLOG_roo_transport_reliable_channel_connection 0
20#endif
21
22namespace roo_transport {
23
24namespace {
25
26roo_time::Duration Backoff(int retry_count) {
27 float min_delay_us = 1000.0f; // 1ms
28 float max_delay_us = 1000000.0f; // 1s
29 float delay = pow(1.33, retry_count) * min_delay_us;
30 if (delay > max_delay_us) {
31 delay = max_delay_us;
32 }
33 // Randomize by +=20%, to make unrelated retries spread more evenly in time.
34 delay += (float)delay * ((float)RANDOM_INTEGER() / RAND_MAX - 0.5f) * 0.4f;
35 return roo_time::Micros((uint64_t)delay);
36}
37
38} // namespace
39
40Channel::Channel(PacketSender& sender, LinkBufferSize sendbuf,
41 LinkBufferSize recvbuf, roo::string_view name)
42 : packet_sender_(sender),
43 outgoing_data_ready_(),
44 transmitter_((unsigned int)sendbuf),
45 receiver_((unsigned int)recvbuf),
46 my_stream_id_(0),
47 my_stream_id_acked_by_peer_(false),
48 peer_stream_id_(0),
49 needs_handshake_ack_(false),
50 successive_handshake_retries_(0),
51 next_scheduled_handshake_update_(roo_time::Uptime::Start()),
52 disconnect_fn_(nullptr),
53 sender_thread_(),
54 active_(true),
55 log_prefix_(name.empty() ? std::string("")
56 : (std::string("(").append(name).append(") "))),
57 send_thread_name_(name.empty() ? "send_loop"
58 : std::string(name) + "-send") {
59 CHECK_LE(static_cast<unsigned int>(sendbuf), 12u);
60 CHECK_LE(static_cast<unsigned int>(sendbuf),
61 static_cast<unsigned int>(recvbuf));
62}
63
64Channel::~Channel() { end(); }
65
66size_t Channel::write(const roo::byte* buf, size_t count, uint32_t my_stream_id,
67 roo_io::Status& stream_status) {
68 bool outgoing_data_ready = false;
69 return transmitter_.write(buf, count, my_stream_id, stream_status,
70 outgoing_data_ready);
71 if (outgoing_data_ready) {
72 outgoing_data_ready_.notify();
73 }
74}
75
76size_t Channel::tryWrite(const roo::byte* buf, size_t count,
77 uint32_t my_stream_id, roo_io::Status& stream_status) {
78 bool outgoing_data_ready = false;
79 return transmitter_.tryWrite(buf, count, my_stream_id, stream_status,
80 outgoing_data_ready);
81 if (outgoing_data_ready) {
82 outgoing_data_ready_.notify();
83 }
84}
85
86size_t Channel::read(roo::byte* buf, size_t count, uint32_t my_stream_id,
87 roo_io::Status& stream_status) {
88 bool outgoing_data_ready = false;
89 return receiver_.read(buf, count, my_stream_id, stream_status,
90 outgoing_data_ready);
91 if (outgoing_data_ready) {
92 outgoing_data_ready_.notify();
93 }
94}
95
96size_t Channel::tryRead(roo::byte* buf, size_t count, uint32_t my_stream_id,
97 roo_io::Status& stream_status) {
98 bool outgoing_data_ready = false;
99 return receiver_.tryRead(buf, count, my_stream_id, stream_status,
100 outgoing_data_ready);
101 if (outgoing_data_ready) {
102 outgoing_data_ready_.notify();
103 }
104}
105
106int Channel::peek(uint32_t my_stream_id, roo_io::Status& stream_status) {
107 return receiver_.peek(my_stream_id, stream_status);
108}
109
110size_t Channel::availableForRead(uint32_t my_stream_id,
111 roo_io::Status& stream_status) const {
112 return receiver_.availableForRead(my_stream_id, stream_status);
113}
114
115void Channel::flush(uint32_t my_stream_id, roo_io::Status& stream_status) {
116 bool outgoing_data_ready = false;
117 transmitter_.flush(my_stream_id, stream_status, outgoing_data_ready);
118 if (outgoing_data_ready) {
119 outgoing_data_ready_.notify();
120 }
121}
122
123void Channel::close(uint32_t my_stream_id, roo_io::Status& stream_status) {
124 bool outgoing_data_ready = false;
125 transmitter_.close(my_stream_id, stream_status, outgoing_data_ready);
126 if (outgoing_data_ready) {
127 outgoing_data_ready_.notify();
128 }
129}
130
131// Called by ChannelInput::close().
132void Channel::closeInput(uint32_t my_stream_id, roo_io::Status& stream_status) {
133 bool outgoing_data_ready = false;
134 receiver_.markInputClosed(my_stream_id, stream_status, outgoing_data_ready);
135 if (outgoing_data_ready) {
136 outgoing_data_ready_.notify();
137 }
138}
139
140size_t Channel::availableForWrite(uint32_t my_stream_id,
141 roo_io::Status& stream_status) const {
142 return transmitter_.availableForWrite(my_stream_id, stream_status);
143}
144
145uint32_t Channel::my_stream_id() const {
146 roo::lock_guard<roo::mutex> guard(handshake_mutex_);
147 return my_stream_id_;
148}
149
150uint32_t Channel::connect(std::function<void()> disconnect_fn) {
151 uint32_t my_stream_id;
152 std::function<void()> old_disconnect_fn;
153 {
154 roo::lock_guard<roo::mutex> guard(handshake_mutex_);
155 old_disconnect_fn = std::move(disconnect_fn_);
156 disconnect_fn_ = disconnect_fn;
157 my_stream_id_ = 0;
158 my_stream_id_acked_by_peer_ = false;
159 peer_stream_id_ = 0;
160 // The stream ID is a random number, but it can't be zero.
161 while (my_stream_id_ == 0) my_stream_id_ = RANDOM_INTEGER();
162 MLOG(roo_transport_reliable_channel_connection)
163 << getLogPrefix() << "Transmitter and receiver are now connecting.";
164 transmitter_.init(my_stream_id_, RANDOM_INTEGER() % 0x0FFF);
165 receiver_.init(my_stream_id_);
166 needs_handshake_ack_ = false;
167 successive_handshake_retries_ = 0;
168 next_scheduled_handshake_update_ = roo_time::Uptime::Start();
169 connected_cv_.notify_all();
170 my_stream_id = my_stream_id_;
171 }
172 // We need to send that handshake message.
173 outgoing_data_ready_.notify();
174 if (old_disconnect_fn != nullptr) {
175 old_disconnect_fn();
176 }
177 return my_stream_id;
178}
179
180void Channel::disconnect(uint32_t my_stream_id) {
181 std::function<void()> disconnect_fn;
182 {
183 roo::lock_guard<roo::mutex> guard(handshake_mutex_);
184 if (my_stream_id_ != my_stream_id) return;
185 my_stream_id_ = 0;
186 my_stream_id_acked_by_peer_ = false;
187 peer_stream_id_ = 0;
188 MLOG(roo_transport_reliable_channel_connection)
189 << getLogPrefix() << "Transmitter and receiver are now disconnected.";
190 transmitter_.reset();
191 receiver_.reset();
192 connected_cv_.notify_all();
193 disconnect_fn = std::move(disconnect_fn_);
194 disconnect_fn_ = nullptr;
195 }
196 outgoing_data_ready_.notify();
197 if (disconnect_fn != nullptr) {
198 disconnect_fn();
199 }
200}
201
202LinkStatus Channel::getLinkStatus(uint32_t stream_id) {
203 roo::lock_guard<roo::mutex> guard(handshake_mutex_);
204 return getLinkStatusInternal(stream_id);
205}
206
207LinkStatus Channel::getLinkStatusInternal(uint32_t stream_id) {
208 if (stream_id == 0) return LinkStatus::kIdle;
209 if (my_stream_id_ != stream_id) return LinkStatus::kBroken;
210 return peer_stream_id_ == 0 || !my_stream_id_acked_by_peer_
211 ? LinkStatus::kConnecting
212 : LinkStatus::kConnected;
213}
214
215void Channel::awaitConnected(uint32_t stream_id) {
216 roo::unique_lock<roo::mutex> guard(handshake_mutex_);
217 while (getLinkStatusInternal(stream_id) == LinkStatus::kConnecting) {
218 connected_cv_.wait(guard);
219 }
220}
221
222bool Channel::awaitConnected(uint32_t stream_id, roo_time::Duration timeout) {
223 roo::unique_lock<roo::mutex> guard(handshake_mutex_);
224 roo_time::Uptime when = roo_time::Uptime::Now() + timeout;
225 while (getLinkStatusInternal(stream_id) == LinkStatus::kConnecting) {
226 if (connected_cv_.wait_until(guard, when) == roo::cv_status::timeout) {
227 return false;
228 }
229 }
230 return true;
231}
232
233long Channel::trySend() {
234 roo::byte buf[250];
235 long next_send_micros = std::numeric_limits<long>::max();
236 size_t len = 0;
237 len = conn(buf, next_send_micros);
238 if (len > 0) {
239 packet_sender_.send(buf, len);
240 }
241 // Don't send anything besides handshake while we're connecting. But, keep
242 // sending stuff (acks, etc.) if we're idle, which normally means that our
243 // output stream has been closed, but we still need to keep sending acks and
244 // flow control.
245 if (transmitter_.state() == internal::Transmitter::kConnecting) {
246 return next_send_micros;
247 }
248 len = receiver_.ack(buf);
249 if (len > 0) {
250 packet_sender_.send(buf, len);
251 }
252 len = receiver_.updateRecvHimark(buf, next_send_micros);
253 if (len > 0) {
254 packet_sender_.send(buf, len);
255 }
256 len = transmitter_.send(buf, next_send_micros);
257 if (len > 0) {
258 packet_sender_.send(buf, len);
259 }
260 return next_send_micros;
261}
262
263namespace {
264struct HandshakePacket {
265 uint16_t self_seq_num;
266 uint32_t self_stream_id;
267 uint32_t ack_stream_id;
268 bool want_ack;
269
270 friend roo_logging::Stream& operator<<(roo_logging::Stream& s,
271 const HandshakePacket& p) {
272 if (p.ack_stream_id == 0) {
273 s << "CONN " << p.self_stream_id << ":" << p.self_seq_num;
274 } else {
275 s << (p.want_ack ? "CONN/ACK " : "ACK ") << p.self_stream_id << ":"
276 << p.self_seq_num << "/" << p.ack_stream_id;
277 }
278 return s;
279 }
280};
281
282} // namespace
283
284size_t Channel::conn(roo::byte* buf, long& next_send_micros) {
285 roo::lock_guard<roo::mutex> guard(handshake_mutex_);
286 auto transmitter_state = transmitter_.state();
287 if (transmitter_state == internal::Transmitter::kIdle ||
288 transmitter_state == internal::Transmitter::kBroken) {
289 // Don't send handshake requests until we're connecting.
290 return 0;
291 }
292 if (transmitter_state == internal::Transmitter::kConnected &&
293 !needs_handshake_ack_) {
294 // The handshake has already been concluded.
295 return 0;
296 }
297 // In case we're in backoff or expect an ack, the delay is updated to reflect
298 // the remaining timeout.
299 long delay = std::numeric_limits<long>::max();
300 if (transmitter_state == internal::Transmitter::kConnecting) {
301 roo_time::Uptime now = roo_time::Uptime::Now();
302 if (now < next_scheduled_handshake_update_) {
303 // We do need to send a handshake packet, but not yet.
304 long delay = (next_scheduled_handshake_update_ - now).inMicros();
305 next_send_micros = std::min(next_send_micros, delay);
306 return 0;
307 }
308 delay = Backoff(successive_handshake_retries_++).inMicros();
309 next_scheduled_handshake_update_ = now + roo_time::Micros(delay);
310 }
311 // Clearing the flag, as we're about to send the handshake packet.
312 needs_handshake_ack_ = false;
313 bool we_need_ack = (transmitter_state != internal::Transmitter::kConnected);
314 uint16_t header = FormatPacketHeader(transmitter_.front(),
315 internal::kHandshakePacket, false);
316 roo_io::StoreBeU16(header, buf);
317 roo_io::StoreBeU32(my_stream_id_, buf + 2);
318 roo_io::StoreBeU32(peer_stream_id_, buf + 6);
319 uint8_t last_byte = we_need_ack ? 0x80 : 0x00;
320 last_byte |= receiver_.buffer_size_log2();
321 roo_io::StoreU8(last_byte, buf + 10);
322 next_send_micros = std::min(next_send_micros, delay);
323 MLOG(roo_transport_reliable_channel_connection)
324 << getLogPrefix() << "Handshake packet sent: "
325 << HandshakePacket{
326 .self_seq_num = (uint16_t)(header & 0x0FFF),
327 .self_stream_id = my_stream_id_,
328 .ack_stream_id = peer_stream_id_,
329 .want_ack = we_need_ack,
330 };
331 return 11;
332}
333
334void Channel::handleHandshakePacket(uint16_t peer_seq_num,
335 uint32_t peer_stream_id,
336 uint32_t ack_stream_id, bool want_ack,
337 uint16_t peer_receive_buffer_size,
338 bool& outgoing_data_ready) {
339 std::function<void()> disconnect_fn;
340 roo::lock_guard<roo::mutex> guard(handshake_mutex_);
341 MLOG(roo_transport_reliable_channel_connection)
342 << getLogPrefix() << "Handshake packet received: "
343 << HandshakePacket{
344 .self_seq_num = peer_seq_num,
345 .self_stream_id = peer_stream_id,
346 .ack_stream_id = ack_stream_id,
347 .want_ack = want_ack,
348 };
349 if (peer_stream_id == my_stream_id_) {
350 // The peer is echoing our own stream ID. This is probably a cross-talk from
351 // our own packets.
352 if (peer_seq_num == transmitter_.front().raw()) {
353 MLOG(roo_transport_reliable_channel_connection)
354 << getLogPrefix()
355 << "Ignoring the handshake, since it's an echo of the last one we "
356 "sent.";
357 LOG(WARNING) << "Cross-talk detected. Please check the wiring.";
358 return;
359 }
360 }
361 switch (receiver_.state()) {
362 case internal::Receiver::kConnecting: {
363 if (ack_stream_id != 0 && ack_stream_id != my_stream_id_) {
364 // The peer is acknowledging a different stream than the one we're
365 // connecting on. Ignoring.
366 MLOG(roo_transport_reliable_channel_connection)
367 << getLogPrefix()
368 << "Ignoring the handshake, since the ack_stream_id doesn't match.";
369 break;
370 }
371 if (peer_stream_id == my_stream_id_) {
372 // Since we ruled out the echo case above, we're going to assume that
373 // this is a freakish accident of two endpoints accidentally choosing
374 // the same stream ID. Bailing out, as if the peer has disconnected.
375 MLOG(roo_transport_reliable_channel_connection)
376 << getLogPrefix()
377 << "Aborting: peer is using the same stream ID as ours: "
378 << peer_stream_id;
379 disconnect_fn = std::move(disconnect_fn_);
380 disconnect_fn_ = nullptr;
381 receiver_.setBroken();
382 transmitter_.setBroken();
383 my_stream_id_ = 0;
384 connected_cv_.notify_all();
385 break;
386 }
387 peer_stream_id_ = peer_stream_id;
388 CHECK(receiver_.empty());
389 MLOG(roo_transport_reliable_channel_connection)
390 << getLogPrefix() << "Receiver is now connected.";
391 receiver_.setConnected(peer_seq_num, my_control_bit());
392 outgoing_data_ready = true;
393
394 if (ack_stream_id == my_stream_id_) {
395 MLOG(roo_transport_reliable_channel_connection)
396 << getLogPrefix() << "Transmitter is now connected.";
397 my_stream_id_acked_by_peer_ = true;
398 transmitter_.setConnected(peer_receive_buffer_size, my_control_bit());
399 }
400 needs_handshake_ack_ = want_ack;
401 connected_cv_.notify_all();
402 break;
403 }
404 case internal::Receiver::kConnected: {
405 // Note: we only consider initial connection requests as 'breaking' -
406 // others might be latend acks.
407 if (want_ack && (peer_stream_id_ != peer_stream_id) &&
408 ack_stream_id == 0) {
409 // The peer opened a new stream.
410 disconnect_fn = std::move(disconnect_fn_);
411 disconnect_fn_ = nullptr;
412 if (!receiver_.done()) {
413 MLOG(roo_transport_reliable_channel_connection)
414 << getLogPrefix() << "Disconnection detected: " << peer_stream_id_
415 << ", " << peer_stream_id;
416 // Ignore until all in-flight packets have been delivered.
417 if (transmitter_.state() == internal::Transmitter::kConnected) {
418 MLOG(roo_transport_reliable_channel_connection)
419 << getLogPrefix() << "Transmitter is now broken.";
420 transmitter_.setBroken();
421 } else {
422 MLOG(roo_transport_reliable_channel_connection)
423 << getLogPrefix() << "Transmitter is now idle.";
424 transmitter_.reset();
425 }
426 my_stream_id_ = 0;
427 connected_cv_.notify_all();
428 MLOG(roo_transport_reliable_channel_connection)
429 << getLogPrefix() << "Receiver is now broken.";
430 receiver_.setBroken();
431 break;
432 }
433 MLOG(roo_transport_reliable_channel_connection)
434 << getLogPrefix() << "Transmitter and receiver are now idle.";
435 transmitter_.reset();
436 my_stream_id_ = 0;
437 receiver_.reset();
438 connected_cv_.notify_all();
439 break;
440 }
441 if (ack_stream_id == my_stream_id_ && !my_stream_id_acked_by_peer_) {
442 CHECK(my_stream_id_ != 0);
443 MLOG(roo_transport_reliable_channel_connection)
444 << getLogPrefix() << "Transmitter is now connected.";
445 my_stream_id_acked_by_peer_ = true;
446 transmitter_.setConnected(peer_receive_buffer_size, my_control_bit());
447 outgoing_data_ready = true;
448 connected_cv_.notify_all();
449 }
450 needs_handshake_ack_ = want_ack;
451 break;
452 }
453 case internal::Receiver::kIdle:
454 case internal::Receiver::kBroken: {
455 // We're idle; ignoring handshake;
456 MLOG(roo_transport_reliable_channel_connection)
457 << getLogPrefix()
458 << "Ignoring the handshake, since the receiver is not connecting.";
459 break;
460 }
461 default: {
462 break;
463 }
464 }
465 if (disconnect_fn != nullptr) {
466 disconnect_fn();
467 }
468}
469
470void Channel::packetReceived(const roo::byte* buf, size_t len) {
471 bool outgoing_data_ready = false;
472 uint16_t header = roo_io::LoadBeU16(buf);
473 bool control_bit = internal::GetPacketControlBit(header);
474 auto type = internal::GetPacketType(header);
475 switch (type) {
476 case internal::kDataAckPacket: {
477 transmitter_.ack(control_bit, header & 0x0FFF, buf + 2, len - 2,
478 outgoing_data_ready);
479 break;
480 }
481 case internal::kFlowControlPacket: {
482 // Update to available slots received.
483 transmitter_.updateRecvHimark(control_bit, header & 0x0FFF);
484 break;
485 }
486 case internal::kHandshakePacket: {
487 if (len != 11) {
488 // Malformed packet.
489 break;
490 }
491 uint16_t peer_seq_num = header & 0x0FFF;
492 uint32_t peer_stream_id = roo_io::LoadBeU32(buf + 2);
493 uint32_t ack_stream_id = roo_io::LoadBeU32(buf + 6);
494 uint8_t last_byte = roo_io::LoadU8(buf + 10);
495 bool want_ack = ((last_byte & 0x80) != 0);
496 uint8_t peer_receive_buffer_size_log2 = last_byte & 0x0F;
497 if (peer_receive_buffer_size_log2 > 12) {
498 peer_receive_buffer_size_log2 = 12;
499 }
500 handleHandshakePacket(peer_seq_num, peer_stream_id, ack_stream_id,
501 want_ack, (1 << peer_receive_buffer_size_log2),
502 outgoing_data_ready);
503 break;
504 }
505 case internal::kDataPacket:
506 case internal::kFinPacket: {
507 if (receiver_.handleDataPacket(control_bit, header & 0x0FFF, buf + 2,
508 len - 2, type == internal::kFinPacket)) {
509 outgoing_data_ready = true;
510 }
511 break;
512 }
513 default: {
514 // Unrecognized packet type; ignoring.
515 }
516 }
517 if (outgoing_data_ready) {
518 outgoing_data_ready_.notify();
519 }
520}
521
522void Channel::sendLoop() {
523 while (active_) {
524 long delay_micros = trySend();
525 roo::this_thread::yield();
526 if (delay_micros > 0) {
527 // Wait for the delay, or the notification that we have data to send,
528 // whichever comes first.
529 outgoing_data_ready_.await(delay_micros);
530 }
531 }
532}
533
534void Channel::begin() {
535 roo::thread::attributes attrs;
536 attrs.set_stack_size(4096);
537#if (defined __FREERTOS || defined ESP_PLATFORM)
538 // Set the priority just the notch below the receiver thread, so that it
539 // doesn't starve the receiver thread (which receives acks) but is still high.
540 attrs.set_priority(configMAX_PRIORITIES - 2);
541
542#endif
543 attrs.set_name(send_thread_name_.c_str());
544 sender_thread_ = roo::thread(attrs, [this]() { sendLoop(); });
545}
546
547void Channel::end() {
548 active_ = false;
549 outgoing_data_ready_.notify();
550 if (sender_thread_.joinable()) {
551 sender_thread_.join();
552 }
553}
554
555} // namespace roo_transport
556
557#endif // ROO_USE_THREADS
uint16_t FormatPacketHeader(SeqNum seq, PacketType type, bool control_bit)
Definition protocol.h:121
#define RANDOM_INTEGER