8#include "roo_backport/string_view.h"
9#include "roo_io/core/input_stream.h"
10#include "roo_io/core/output_stream.h"
11#include "roo_io/memory/load.h"
12#include "roo_io/memory/store.h"
13#include "roo_logging.h"
14#include "roo_threads.h"
15#include "roo_threads/atomic.h"
16#include "roo_threads/mutex.h"
17#include "roo_threads/thread.h"
38 Channel(PacketSender& sender, LinkBufferSize sendbuf, LinkBufferSize recvbuf,
39 roo::string_view name =
"");
46 uint32_t my_stream_id()
const;
48 size_t write(
const roo::byte* buf,
size_t count, uint32_t my_stream_id,
49 roo_io::Status& stream_status);
51 size_t tryWrite(
const roo::byte* buf,
size_t count, uint32_t my_stream_id,
52 roo_io::Status& stream_status);
54 size_t read(roo::byte* buf,
size_t count, uint32_t my_stream_id,
55 roo_io::Status& stream_status);
57 size_t tryRead(roo::byte* buf,
size_t count, uint32_t my_stream_id,
58 roo_io::Status& stream_status);
61 int peek(uint32_t my_stream_id, roo_io::Status& stream_status);
63 size_t availableForRead(uint32_t my_stream_id,
64 roo_io::Status& stream_status)
const;
66 void flush(uint32_t my_stream_id, roo_io::Status& stream_status);
68 void close(uint32_t my_stream_id, roo_io::Status& stream_status);
70 void closeInput(uint32_t my_stream_id, roo_io::Status& stream_status);
76 void packetReceived(
const roo::byte* buf,
size_t len);
78 void disconnect(uint32_t my_stream_id);
82 size_t availableForWrite(uint32_t my_stream_id,
83 roo_io::Status& stream_status)
const;
85 uint32_t packets_sent()
const {
return transmitter_.packets_sent(); }
87 uint32_t packets_delivered()
const {
88 return transmitter_.packets_delivered();
91 uint32_t packets_received()
const {
return receiver_.packets_received(); }
94 uint32_t connect(std::function<
void()> disconnect_fn =
nullptr);
96 LinkStatus getLinkStatus(uint32_t my_stream_id);
98 void awaitConnected(uint32_t my_stream_id);
99 bool awaitConnected(uint32_t my_stream_id, roo_time::Duration timeout);
102 friend class SenderThread;
104#ifdef ROO_USE_THREADS
106 friend void SendLoop(Channel* retransmitter);
110 LinkStatus getLinkStatusInternal(uint32_t my_stream_id);
112 void handleHandshakePacket(uint16_t peer_seq_num, uint32_t peer_stream_id,
113 uint32_t ack_stream_id,
bool want_ack,
114 uint16_t peer_receive_buffer_size,
115 bool& outgoing_data_ready);
117 size_t conn(roo::byte* buf,
long& next_send_micros);
121 roo::string_view getLogPrefix()
const {
return log_prefix_; }
123 bool my_control_bit()
const {
return my_stream_id_ > peer_stream_id_; }
125 PacketSender& packet_sender_;
128 internal::OutgoingDataReadyNotification outgoing_data_ready_;
130 internal::ThreadSafeTransmitter transmitter_;
131 internal::ThreadSafeReceiver receiver_;
135 uint32_t my_stream_id_;
139 bool my_stream_id_acked_by_peer_;
143 uint32_t peer_stream_id_;
147 bool needs_handshake_ack_;
151 uint32_t successive_handshake_retries_;
154 roo_time::Uptime next_scheduled_handshake_update_;
159 std::function<void()> disconnect_fn_;
161#ifdef ROO_USE_THREADS
162 roo::thread sender_thread_;
163 roo::atomic<bool> active_;
165 mutable roo::mutex handshake_mutex_;
167 roo::condition_variable connected_cv_;
170 std::string log_prefix_;
171 std::string send_thread_name_;