roo_transport
API Documentation for roo_transport
Loading...
Searching...
No Matches
channel.h
Go to the documentation of this file.
1#pragma once
2
3#include <memory>
4
6#ifdef ROO_USE_THREADS
7
8#include "roo_backport/string_view.h"
9#include "roo_io/core/input_stream.h"
10#include "roo_io/core/output_stream.h"
11#include "roo_io/memory/load.h"
12#include "roo_io/memory/store.h"
13#include "roo_logging.h"
14#include "roo_threads.h"
15#include "roo_threads/atomic.h"
16#include "roo_threads/mutex.h"
17#include "roo_threads/thread.h"
31
32namespace roo_transport {
33
34// Helper class to implement reliable bidirectional streaming over lossy
35// packet-based transport. Used as a building block of SingletonSerial.
36class Channel {
37 public:
38 Channel(PacketSender& sender, LinkBufferSize sendbuf, LinkBufferSize recvbuf,
39 roo::string_view name = "");
40
41 ~Channel();
42
43 void begin();
44 void end();
45
46 uint32_t my_stream_id() const;
47
48 size_t write(const roo::byte* buf, size_t count, uint32_t my_stream_id,
49 roo_io::Status& stream_status);
50
51 size_t tryWrite(const roo::byte* buf, size_t count, uint32_t my_stream_id,
52 roo_io::Status& stream_status);
53
54 size_t read(roo::byte* buf, size_t count, uint32_t my_stream_id,
55 roo_io::Status& stream_status);
56
57 size_t tryRead(roo::byte* buf, size_t count, uint32_t my_stream_id,
58 roo_io::Status& stream_status);
59
60 // Returns -1 if no data available to read immediately.
61 int peek(uint32_t my_stream_id, roo_io::Status& stream_status);
62
63 size_t availableForRead(uint32_t my_stream_id,
64 roo_io::Status& stream_status) const;
65
66 void flush(uint32_t my_stream_id, roo_io::Status& stream_status);
67
68 void close(uint32_t my_stream_id, roo_io::Status& stream_status);
69
70 void closeInput(uint32_t my_stream_id, roo_io::Status& stream_status);
71
72 // Returns the delay, in microseconds, until we're expected to need to
73 // (re)send the next packet.
74 long trySend();
75
76 void packetReceived(const roo::byte* buf, size_t len);
77
78 void disconnect(uint32_t my_stream_id);
79
80 // The lower bound of bytes that are guaranteed to be writable without
81 // blocking.
82 size_t availableForWrite(uint32_t my_stream_id,
83 roo_io::Status& stream_status) const;
84
85 uint32_t packets_sent() const { return transmitter_.packets_sent(); }
86
87 uint32_t packets_delivered() const {
88 return transmitter_.packets_delivered();
89 }
90
91 uint32_t packets_received() const { return receiver_.packets_received(); }
92
93 // Returns a newly-generated my_stream_id.
94 uint32_t connect(std::function<void()> disconnect_fn = nullptr);
95
96 LinkStatus getLinkStatus(uint32_t my_stream_id);
97
98 void awaitConnected(uint32_t my_stream_id);
99 bool awaitConnected(uint32_t my_stream_id, roo_time::Duration timeout);
100
101 private:
102 friend class SenderThread;
103
104#ifdef ROO_USE_THREADS
105
106 friend void SendLoop(Channel* retransmitter);
107
108#endif
109
110 LinkStatus getLinkStatusInternal(uint32_t my_stream_id);
111
112 void handleHandshakePacket(uint16_t peer_seq_num, uint32_t peer_stream_id,
113 uint32_t ack_stream_id, bool want_ack,
114 uint16_t peer_receive_buffer_size,
115 bool& outgoing_data_ready);
116
117 size_t conn(roo::byte* buf, long& next_send_micros);
118
119 void sendLoop();
120
121 roo::string_view getLogPrefix() const { return log_prefix_; }
122
123 bool my_control_bit() const { return my_stream_id_ > peer_stream_id_; }
124
125 PacketSender& packet_sender_;
126
127 // Signals the sender thread that there are packets to send.
128 internal::OutgoingDataReadyNotification outgoing_data_ready_;
129
130 internal::ThreadSafeTransmitter transmitter_;
131 internal::ThreadSafeReceiver receiver_;
132
133 // Random-generated; used in connect packets.
134 // GUARDED_BY(handshake_mutex_).
135 uint32_t my_stream_id_;
136
137 // Effectively says that the transmitter is connected to the peer.
138 // GUARDED_BY(handshake_mutex_).
139 bool my_stream_id_acked_by_peer_;
140
141 // As received from the peer in their connect packets.
142 // GUARDED_BY(handshake_mutex_).
143 uint32_t peer_stream_id_;
144
145 // Indicates whether we're expected to send the handshake ack message.
146 // GUARDED_BY(handshake_mutex_).
147 bool needs_handshake_ack_;
148
149 // Used in the handshake backoff protocol.
150 // GUARDED_BY(handshake_mutex_).
151 uint32_t successive_handshake_retries_;
152
153 // GUARDED_BY(handshake_mutex_).
154 roo_time::Uptime next_scheduled_handshake_update_;
155
156 // If not null, will be called, exactly once (from the receive thread) as soon
157 // as disconnection is detected.
158 // GUARDED_BY(handshake_mutex_).
159 std::function<void()> disconnect_fn_;
160
161#ifdef ROO_USE_THREADS
162 roo::thread sender_thread_;
163 roo::atomic<bool> active_;
164
165 mutable roo::mutex handshake_mutex_;
166
167 roo::condition_variable connected_cv_;
168#endif
169
170 std::string log_prefix_;
171 std::string send_thread_name_;
172};
173
174} // namespace roo_transport
175
176#endif // ROO_USE_THREADS