9#if (defined ESP32 || defined ROO_TESTING)
10#include "esp_random.h"
11#define RANDOM_INTEGER esp_random
12#elif (defined ARDUINO_ARCH_RP2040)
13#define RANDOM_INTEGER rp2040.hwrand32
15#define RANDOM_INTEGER rand
18#if !defined(MLOG_roo_transport_reliable_channel_connection)
19#define MLOG_roo_transport_reliable_channel_connection 0
26roo_time::Duration Backoff(
int retry_count) {
27 float min_delay_us = 1000.0f;
28 float max_delay_us = 1000000.0f;
29 float delay = pow(1.33, retry_count) * min_delay_us;
30 if (delay > max_delay_us) {
34 delay += (float)delay * ((
float)
RANDOM_INTEGER() / RAND_MAX - 0.5f) * 0.4f;
35 return roo_time::Micros((uint64_t)delay);
42 : packet_sender_(sender),
43 outgoing_data_ready_(),
44 transmitter_((unsigned int)sendbuf),
45 receiver_((unsigned int)recvbuf),
47 my_stream_id_acked_by_peer_(false),
49 needs_handshake_ack_(false),
50 successive_handshake_retries_(0),
51 next_scheduled_handshake_update_(roo_time::Uptime::Start()),
52 disconnect_fn_(nullptr),
55 log_prefix_(name.empty() ? std::string(
"")
56 : (std::string(
"(").append(name).append(
") "))),
57 send_thread_name_(name.empty() ?
"send_loop"
58 : std::string(name) +
"-send") {
59 CHECK_LE(
static_cast<unsigned int>(sendbuf), 12u);
60 CHECK_LE(
static_cast<unsigned int>(sendbuf),
61 static_cast<unsigned int>(recvbuf));
64Channel::~Channel() { end(); }
66size_t Channel::write(
const roo::byte* buf,
size_t count, uint32_t my_stream_id,
67 roo_io::Status& stream_status) {
68 bool outgoing_data_ready =
false;
69 return transmitter_.write(buf, count, my_stream_id, stream_status,
71 if (outgoing_data_ready) {
72 outgoing_data_ready_.notify();
76size_t Channel::tryWrite(
const roo::byte* buf,
size_t count,
77 uint32_t my_stream_id, roo_io::Status& stream_status) {
78 bool outgoing_data_ready =
false;
79 return transmitter_.tryWrite(buf, count, my_stream_id, stream_status,
81 if (outgoing_data_ready) {
82 outgoing_data_ready_.notify();
86size_t Channel::read(roo::byte* buf,
size_t count, uint32_t my_stream_id,
87 roo_io::Status& stream_status) {
88 bool outgoing_data_ready =
false;
89 return receiver_.read(buf, count, my_stream_id, stream_status,
91 if (outgoing_data_ready) {
92 outgoing_data_ready_.notify();
96size_t Channel::tryRead(roo::byte* buf,
size_t count, uint32_t my_stream_id,
97 roo_io::Status& stream_status) {
98 bool outgoing_data_ready =
false;
99 return receiver_.tryRead(buf, count, my_stream_id, stream_status,
100 outgoing_data_ready);
101 if (outgoing_data_ready) {
102 outgoing_data_ready_.notify();
106int Channel::peek(uint32_t my_stream_id, roo_io::Status& stream_status) {
107 return receiver_.peek(my_stream_id, stream_status);
110size_t Channel::availableForRead(uint32_t my_stream_id,
111 roo_io::Status& stream_status)
const {
112 return receiver_.availableForRead(my_stream_id, stream_status);
115void Channel::flush(uint32_t my_stream_id, roo_io::Status& stream_status) {
116 bool outgoing_data_ready =
false;
117 transmitter_.flush(my_stream_id, stream_status, outgoing_data_ready);
118 if (outgoing_data_ready) {
119 outgoing_data_ready_.notify();
123void Channel::close(uint32_t my_stream_id, roo_io::Status& stream_status) {
124 bool outgoing_data_ready =
false;
125 transmitter_.close(my_stream_id, stream_status, outgoing_data_ready);
126 if (outgoing_data_ready) {
127 outgoing_data_ready_.notify();
132void Channel::closeInput(uint32_t my_stream_id, roo_io::Status& stream_status) {
133 bool outgoing_data_ready =
false;
134 receiver_.markInputClosed(my_stream_id, stream_status, outgoing_data_ready);
135 if (outgoing_data_ready) {
136 outgoing_data_ready_.notify();
140size_t Channel::availableForWrite(uint32_t my_stream_id,
141 roo_io::Status& stream_status)
const {
142 return transmitter_.availableForWrite(my_stream_id, stream_status);
145uint32_t Channel::my_stream_id()
const {
146 roo::lock_guard<roo::mutex> guard(handshake_mutex_);
147 return my_stream_id_;
150uint32_t Channel::connect(std::function<
void()> disconnect_fn) {
151 uint32_t my_stream_id;
152 std::function<void()> old_disconnect_fn;
154 roo::lock_guard<roo::mutex> guard(handshake_mutex_);
155 old_disconnect_fn = std::move(disconnect_fn_);
156 disconnect_fn_ = disconnect_fn;
158 my_stream_id_acked_by_peer_ =
false;
162 MLOG(roo_transport_reliable_channel_connection)
163 << getLogPrefix() <<
"Transmitter and receiver are now connecting.";
165 receiver_.init(my_stream_id_);
166 needs_handshake_ack_ =
false;
167 successive_handshake_retries_ = 0;
168 next_scheduled_handshake_update_ = roo_time::Uptime::Start();
169 connected_cv_.notify_all();
170 my_stream_id = my_stream_id_;
173 outgoing_data_ready_.notify();
174 if (old_disconnect_fn !=
nullptr) {
180void Channel::disconnect(uint32_t my_stream_id) {
181 std::function<void()> disconnect_fn;
183 roo::lock_guard<roo::mutex> guard(handshake_mutex_);
184 if (my_stream_id_ != my_stream_id)
return;
186 my_stream_id_acked_by_peer_ =
false;
188 MLOG(roo_transport_reliable_channel_connection)
189 << getLogPrefix() <<
"Transmitter and receiver are now disconnected.";
190 transmitter_.reset();
192 connected_cv_.notify_all();
193 disconnect_fn = std::move(disconnect_fn_);
194 disconnect_fn_ =
nullptr;
196 outgoing_data_ready_.notify();
197 if (disconnect_fn !=
nullptr) {
202LinkStatus Channel::getLinkStatus(uint32_t stream_id) {
203 roo::lock_guard<roo::mutex> guard(handshake_mutex_);
204 return getLinkStatusInternal(stream_id);
207LinkStatus Channel::getLinkStatusInternal(uint32_t stream_id) {
208 if (stream_id == 0)
return LinkStatus::kIdle;
209 if (my_stream_id_ != stream_id)
return LinkStatus::kBroken;
210 return peer_stream_id_ == 0 || !my_stream_id_acked_by_peer_
211 ? LinkStatus::kConnecting
212 : LinkStatus::kConnected;
215void Channel::awaitConnected(uint32_t stream_id) {
216 roo::unique_lock<roo::mutex> guard(handshake_mutex_);
217 while (getLinkStatusInternal(stream_id) == LinkStatus::kConnecting) {
218 connected_cv_.wait(guard);
222bool Channel::awaitConnected(uint32_t stream_id, roo_time::Duration timeout) {
223 roo::unique_lock<roo::mutex> guard(handshake_mutex_);
224 roo_time::Uptime when = roo_time::Uptime::Now() + timeout;
225 while (getLinkStatusInternal(stream_id) == LinkStatus::kConnecting) {
226 if (connected_cv_.wait_until(guard, when) == roo::cv_status::timeout) {
233long Channel::trySend() {
235 long next_send_micros = std::numeric_limits<long>::max();
237 len = conn(buf, next_send_micros);
239 packet_sender_.send(buf, len);
245 if (transmitter_.state() == internal::Transmitter::kConnecting) {
246 return next_send_micros;
248 len = receiver_.ack(buf);
250 packet_sender_.send(buf, len);
252 len = receiver_.updateRecvHimark(buf, next_send_micros);
254 packet_sender_.send(buf, len);
256 len = transmitter_.send(buf, next_send_micros);
258 packet_sender_.send(buf, len);
260 return next_send_micros;
264struct HandshakePacket {
265 uint16_t self_seq_num;
266 uint32_t self_stream_id;
267 uint32_t ack_stream_id;
270 friend roo_logging::Stream& operator<<(roo_logging::Stream& s,
271 const HandshakePacket& p) {
272 if (p.ack_stream_id == 0) {
273 s <<
"CONN " << p.self_stream_id <<
":" << p.self_seq_num;
275 s << (p.want_ack ?
"CONN/ACK " :
"ACK ") << p.self_stream_id <<
":"
276 << p.self_seq_num <<
"/" << p.ack_stream_id;
284size_t Channel::conn(roo::byte* buf,
long& next_send_micros) {
285 roo::lock_guard<roo::mutex> guard(handshake_mutex_);
286 auto transmitter_state = transmitter_.state();
287 if (transmitter_state == internal::Transmitter::kIdle ||
288 transmitter_state == internal::Transmitter::kBroken) {
292 if (transmitter_state == internal::Transmitter::kConnected &&
293 !needs_handshake_ack_) {
299 long delay = std::numeric_limits<long>::max();
300 if (transmitter_state == internal::Transmitter::kConnecting) {
301 roo_time::Uptime now = roo_time::Uptime::Now();
302 if (now < next_scheduled_handshake_update_) {
304 long delay = (next_scheduled_handshake_update_ - now).inMicros();
305 next_send_micros = std::min(next_send_micros, delay);
308 delay = Backoff(successive_handshake_retries_++).inMicros();
309 next_scheduled_handshake_update_ = now + roo_time::Micros(delay);
312 needs_handshake_ack_ =
false;
313 bool we_need_ack = (transmitter_state != internal::Transmitter::kConnected);
315 internal::kHandshakePacket,
false);
316 roo_io::StoreBeU16(header, buf);
317 roo_io::StoreBeU32(my_stream_id_, buf + 2);
318 roo_io::StoreBeU32(peer_stream_id_, buf + 6);
319 uint8_t last_byte = we_need_ack ? 0x80 : 0x00;
320 last_byte |= receiver_.buffer_size_log2();
321 roo_io::StoreU8(last_byte, buf + 10);
322 next_send_micros = std::min(next_send_micros, delay);
323 MLOG(roo_transport_reliable_channel_connection)
324 << getLogPrefix() <<
"Handshake packet sent: "
326 .self_seq_num = (uint16_t)(header & 0x0FFF),
327 .self_stream_id = my_stream_id_,
328 .ack_stream_id = peer_stream_id_,
329 .want_ack = we_need_ack,
334void Channel::handleHandshakePacket(uint16_t peer_seq_num,
335 uint32_t peer_stream_id,
336 uint32_t ack_stream_id,
bool want_ack,
337 uint16_t peer_receive_buffer_size,
338 bool& outgoing_data_ready) {
339 std::function<void()> disconnect_fn;
340 roo::lock_guard<roo::mutex> guard(handshake_mutex_);
341 MLOG(roo_transport_reliable_channel_connection)
342 << getLogPrefix() <<
"Handshake packet received: "
344 .self_seq_num = peer_seq_num,
345 .self_stream_id = peer_stream_id,
346 .ack_stream_id = ack_stream_id,
347 .want_ack = want_ack,
349 if (peer_stream_id == my_stream_id_) {
352 if (peer_seq_num == transmitter_.front().raw()) {
353 MLOG(roo_transport_reliable_channel_connection)
355 <<
"Ignoring the handshake, since it's an echo of the last one we "
357 LOG(WARNING) <<
"Cross-talk detected. Please check the wiring.";
361 switch (receiver_.state()) {
362 case internal::Receiver::kConnecting: {
363 if (ack_stream_id != 0 && ack_stream_id != my_stream_id_) {
366 MLOG(roo_transport_reliable_channel_connection)
368 <<
"Ignoring the handshake, since the ack_stream_id doesn't match.";
371 if (peer_stream_id == my_stream_id_) {
375 MLOG(roo_transport_reliable_channel_connection)
377 <<
"Aborting: peer is using the same stream ID as ours: "
379 disconnect_fn = std::move(disconnect_fn_);
380 disconnect_fn_ =
nullptr;
381 receiver_.setBroken();
382 transmitter_.setBroken();
384 connected_cv_.notify_all();
387 peer_stream_id_ = peer_stream_id;
388 CHECK(receiver_.empty());
389 MLOG(roo_transport_reliable_channel_connection)
390 << getLogPrefix() <<
"Receiver is now connected.";
391 receiver_.setConnected(peer_seq_num, my_control_bit());
392 outgoing_data_ready =
true;
394 if (ack_stream_id == my_stream_id_) {
395 MLOG(roo_transport_reliable_channel_connection)
396 << getLogPrefix() <<
"Transmitter is now connected.";
397 my_stream_id_acked_by_peer_ =
true;
398 transmitter_.setConnected(peer_receive_buffer_size, my_control_bit());
400 needs_handshake_ack_ = want_ack;
401 connected_cv_.notify_all();
404 case internal::Receiver::kConnected: {
407 if (want_ack && (peer_stream_id_ != peer_stream_id) &&
408 ack_stream_id == 0) {
410 disconnect_fn = std::move(disconnect_fn_);
411 disconnect_fn_ =
nullptr;
412 if (!receiver_.done()) {
413 MLOG(roo_transport_reliable_channel_connection)
414 << getLogPrefix() <<
"Disconnection detected: " << peer_stream_id_
415 <<
", " << peer_stream_id;
417 if (transmitter_.state() == internal::Transmitter::kConnected) {
418 MLOG(roo_transport_reliable_channel_connection)
419 << getLogPrefix() <<
"Transmitter is now broken.";
420 transmitter_.setBroken();
422 MLOG(roo_transport_reliable_channel_connection)
423 << getLogPrefix() <<
"Transmitter is now idle.";
424 transmitter_.reset();
427 connected_cv_.notify_all();
428 MLOG(roo_transport_reliable_channel_connection)
429 << getLogPrefix() <<
"Receiver is now broken.";
430 receiver_.setBroken();
433 MLOG(roo_transport_reliable_channel_connection)
434 << getLogPrefix() <<
"Transmitter and receiver are now idle.";
435 transmitter_.reset();
438 connected_cv_.notify_all();
441 if (ack_stream_id == my_stream_id_ && !my_stream_id_acked_by_peer_) {
442 CHECK(my_stream_id_ != 0);
443 MLOG(roo_transport_reliable_channel_connection)
444 << getLogPrefix() <<
"Transmitter is now connected.";
445 my_stream_id_acked_by_peer_ =
true;
446 transmitter_.setConnected(peer_receive_buffer_size, my_control_bit());
447 outgoing_data_ready =
true;
448 connected_cv_.notify_all();
450 needs_handshake_ack_ = want_ack;
453 case internal::Receiver::kIdle:
454 case internal::Receiver::kBroken: {
456 MLOG(roo_transport_reliable_channel_connection)
458 <<
"Ignoring the handshake, since the receiver is not connecting.";
465 if (disconnect_fn !=
nullptr) {
470void Channel::packetReceived(
const roo::byte* buf,
size_t len) {
471 bool outgoing_data_ready =
false;
472 uint16_t header = roo_io::LoadBeU16(buf);
473 bool control_bit = internal::GetPacketControlBit(header);
474 auto type = internal::GetPacketType(header);
476 case internal::kDataAckPacket: {
477 transmitter_.ack(control_bit, header & 0x0FFF, buf + 2, len - 2,
478 outgoing_data_ready);
481 case internal::kFlowControlPacket: {
483 transmitter_.updateRecvHimark(control_bit, header & 0x0FFF);
486 case internal::kHandshakePacket: {
491 uint16_t peer_seq_num = header & 0x0FFF;
492 uint32_t peer_stream_id = roo_io::LoadBeU32(buf + 2);
493 uint32_t ack_stream_id = roo_io::LoadBeU32(buf + 6);
494 uint8_t last_byte = roo_io::LoadU8(buf + 10);
495 bool want_ack = ((last_byte & 0x80) != 0);
496 uint8_t peer_receive_buffer_size_log2 = last_byte & 0x0F;
497 if (peer_receive_buffer_size_log2 > 12) {
498 peer_receive_buffer_size_log2 = 12;
500 handleHandshakePacket(peer_seq_num, peer_stream_id, ack_stream_id,
501 want_ack, (1 << peer_receive_buffer_size_log2),
502 outgoing_data_ready);
505 case internal::kDataPacket:
506 case internal::kFinPacket: {
507 if (receiver_.handleDataPacket(control_bit, header & 0x0FFF, buf + 2,
508 len - 2, type == internal::kFinPacket)) {
509 outgoing_data_ready =
true;
517 if (outgoing_data_ready) {
518 outgoing_data_ready_.notify();
522void Channel::sendLoop() {
524 long delay_micros = trySend();
525 roo::this_thread::yield();
526 if (delay_micros > 0) {
529 outgoing_data_ready_.await(delay_micros);
534void Channel::begin() {
535 roo::thread::attributes attrs;
536 attrs.set_stack_size(4096);
537#if (defined __FREERTOS || defined ESP_PLATFORM)
540 attrs.set_priority(configMAX_PRIORITIES - 2);
543 attrs.set_name(send_thread_name_.c_str());
544 sender_thread_ = roo::thread(attrs, [
this]() { sendLoop(); });
549 outgoing_data_ready_.notify();
550 if (sender_thread_.joinable()) {
551 sender_thread_.join();
uint16_t FormatPacketHeader(SeqNum seq, PacketType type, bool control_bit)