roo_transport
API Documentation for roo_transport
Loading...
Searching...
No Matches
transmitter.cpp
Go to the documentation of this file.
2
3#include "roo_backport.h"
4#include "roo_backport/byte.h"
5
6namespace roo_transport {
7namespace internal {
8
9Transmitter::Transmitter(unsigned int sendbuf_log2)
10 : state_(kIdle),
11 end_of_stream_(false),
12 out_buffers_(new OutBuffer[1 << sendbuf_log2]),
13 current_out_buffer_(nullptr),
14 out_ring_(sendbuf_log2, 0),
15 next_to_send_(out_ring_.begin()),
16 recv_himark_(out_ring_.begin() + (1 << sendbuf_log2)),
17 has_pending_eof_(false),
18 packets_sent_(0),
19 packets_delivered_(0),
20 peer_receive_buffer_size_(0),
21 control_bit_(false) {}
22
23size_t Transmitter::tryWrite(const roo::byte* buf, size_t count,
24 bool& outgoing_data_ready) {
25 outgoing_data_ready = false;
26 if (count == 0) return 0;
27 if (end_of_stream_) return 0;
28 if (state_ == kIdle || state_ == kBroken) return 0;
29 size_t total_written = 0;
30 do {
31 CHECK_GE(recv_himark_, out_ring_.end());
32 if (current_out_buffer_ == nullptr) {
33 if (recv_himark_ == out_ring_.end()) {
34 // No more tokens.
35 break;
36 }
37 if (out_ring_.slotsFree() == 0) {
38 break;
39 }
40 SeqNum pos = out_ring_.push();
41 current_out_buffer_ = &getOutBuffer(pos);
42 current_out_buffer_->init(pos, control_bit_);
43 }
44 size_t written = current_out_buffer_->write(buf, count);
45 total_written += written;
46 buf += written;
47 count -= written;
48 if (current_out_buffer_->finished()) {
49 current_out_buffer_ = nullptr;
50 outgoing_data_ready = true;
51 }
52
53 } while (count > 0);
54 return total_written;
55}
56
58 if (current_out_buffer_ != nullptr) {
59 current_out_buffer_->flush();
60 return true;
61 }
62 return false;
63}
64
65bool Transmitter::hasPendingData() const { return !out_ring_.empty(); }
66
67void Transmitter::addEosPacket() {
68 SeqNum pos = out_ring_.push();
69 auto* buf = &getOutBuffer(pos);
70 buf->init(pos, control_bit_);
71 buf->markFinal();
72 buf->finish();
73}
74
76 if (state_ == kBroken) {
77 state_ = kIdle;
78 }
79 if (end_of_stream_ || state_ == kIdle) {
80 return;
81 }
82 flush();
83 if (out_ring_.slotsFree() == 0) {
84 has_pending_eof_ = true;
85 } else {
86 addEosPacket();
87 }
88 end_of_stream_ = true;
89}
90
92 while (!out_ring_.empty()) {
93 out_ring_.pop();
94 }
95 state_ = kBroken;
96}
97
99 if (end_of_stream_ || state_ == kIdle || state_ == kBroken) {
100 return 0;
101 }
102 // In the extreme case, if flush is issued after every write, we might only
103 // fit one byte per slot.
104 return out_ring_.slotsFree();
105}
106
107size_t Transmitter::send(roo::byte* buf, long& next_send_micros) {
108 const internal::OutBuffer* buf_to_send = getBufferToSend(next_send_micros);
109 if (buf_to_send == nullptr) return 0;
110 const roo::byte* data = buf_to_send->data();
111 size_t size = buf_to_send->size();
112 memcpy(buf, data, size);
113 return size;
114}
115
117 long& next_send_micros) {
118 if (state_ != kConnected) return nullptr;
119 if (out_ring_.contains(next_to_send_)) {
120 // Best-effort attempt to quickly send the next buffer in the sequence.
121 OutBuffer& buf = getOutBuffer(next_to_send_);
122 if (!buf.acked() && buf.flushed()) {
123 if (!buf.finished()) buf.finish();
124 if (buf.send_counter() == 0) {
125 // Never sent before.
126 ++next_to_send_;
127 ++packets_sent_;
128 buf.markSent(roo_time::Uptime::Now());
129 next_send_micros = 0;
130 return &buf;
131 }
132 }
133 }
134 // Fall back: find the earliest finished buffer to send.
135 roo_time::Uptime now = roo_time::Uptime::Now();
136 SeqNum to_send = out_ring_.end();
137 roo_time::Uptime min_send_time = roo_time::Uptime::Max();
138 for (SeqNum pos = out_ring_.begin();
139 pos < out_ring_.end() && pos < recv_himark_; ++pos) {
140 OutBuffer& buf = getOutBuffer(pos);
141 if (buf.acked()) {
142 continue;
143 }
144 if (!buf.flushed()) {
145 // No more ready to send buffers can follow.
146 break;
147 }
148 if (!buf.finished() || buf.expiration() == roo_time::Uptime::Start()) {
149 // This one can be sent immediately; no need to seek any further.
150 to_send = pos;
151 min_send_time = roo_time::Uptime::Start();
152 break;
153 }
154 // This is a viable candidate.
155 if (buf.expiration() < min_send_time) {
156 to_send = pos;
157 min_send_time = buf.expiration();
158 }
159 }
160 if (!out_ring_.contains(to_send)) {
161 // No more packets to send at all.
162 // Auto-flush: let's see if we can opportunistically close and send a
163 // packet?
164 if (out_ring_.slotsUsed() != 1 || out_ring_.begin() >= recv_himark_) {
165 return nullptr;
166 }
167 OutBuffer& buf = getOutBuffer(out_ring_.begin());
168 if (buf.finished()) {
169 DCHECK_GT(buf.send_counter(), 0);
170 return nullptr;
171 }
172 DCHECK(!buf.acked());
173 DCHECK_GT(buf.size(), 0);
174 buf.finish();
175 to_send = out_ring_.begin();
176 min_send_time = roo_time::Uptime::Start();
177 }
178 if (min_send_time > now) {
179 // The next packet to (re)send is not ready yet.
180 next_send_micros =
181 std::min(next_send_micros, (long)(min_send_time - now).inMicros());
182 return nullptr;
183 }
184
185 OutBuffer& buf = getOutBuffer(to_send);
186 if (!buf.finished()) {
187 buf.finish();
188 }
189 buf.markSent(now);
190 next_to_send_ = to_send + 1;
191 ++packets_sent_;
192 next_send_micros = 0;
193 return &buf;
194}
195
197 while (!out_ring_.empty()) {
198 out_ring_.pop();
199 }
200 end_of_stream_ = false;
201 my_stream_id_ = 0;
202 state_ = kIdle;
203 current_out_buffer_ = nullptr;
204 has_pending_eof_ = false;
205}
206
207void Transmitter::init(uint32_t my_stream_id, SeqNum new_start) {
208 my_stream_id_ = my_stream_id;
209 state_ = kConnecting;
210 end_of_stream_ = false;
211 while (!out_ring_.empty()) {
212 out_ring_.pop();
213 }
214 out_ring_.reset(new_start);
215 // To be updated by setConnected().
216 recv_himark_ = out_ring_.begin();
217 next_to_send_ = out_ring_.begin();
218 current_out_buffer_ = nullptr;
219 has_pending_eof_ = false;
220}
221
222bool Transmitter::ack(bool control_bit, uint16_t seq_id,
223 const roo::byte* ack_bitmap, size_t ack_bitmap_len) {
224 if (state_ != kConnected) {
225 // Send queue is empty anyway. The ack is either bogus or comes from a
226 // previous connection.
227 return false;
228 }
229 if (control_bit == control_bit_) {
230 LOG(WARNING) << "Cross-talk detected. Check wiring and power.";
231 return false;
232 }
233 // Remove all buffers up to the acked position.
234 SeqNum seq = out_ring_.restorePosHighBits(seq_id, 12);
235 if (seq > out_ring_.end()) {
236 // Peer is acking a position we have not yet sent. Ignore.
237 LOG(WARNING) << "Bogus ack received, ignoring: " << seq_id << " -> " << seq
238 << "; current: " << out_ring_.end();
239 return false;
240 }
241 while (out_ring_.begin() < seq && !out_ring_.empty()) {
242 out_ring_.pop();
243 ++packets_delivered_;
244 if (has_pending_eof_) {
245 // Process that pending EOF, now that we have space.
246 addEosPacket();
247 has_pending_eof_ = false;
248 }
249 }
250 if (out_ring_.empty()) {
251 if (end_of_stream_) {
252 reset();
253 }
254 return false;
255 }
256 // Process the skip-ack notifications.
257 size_t offset = 0;
258 SeqNum out_pos = out_ring_.begin() + 1;
259 SeqNum last_acked = out_ring_.begin() - 1;
260 while (offset < ack_bitmap_len) {
261 uint8_t val = (uint8_t)ack_bitmap[offset];
262 for (int i = 7; i >= 0; --i) {
263 if (out_ring_.contains(out_pos) && (val & (1 << i)) != 0) {
264 getOutBuffer(out_pos).ack();
265 last_acked = out_pos;
266 }
267 out_pos++;
268 }
269 offset++;
270 }
271 bool rushed = false;
272 // Try to increase send throughput by quickly detecting dropped packets,
273 // interpreting skip-ack as nack for packets that have only been sent once
274 // (which means that, assuming in-order delivery of the underlying package
275 // writer, if they were to be delivered, they would have been already
276 // delivered).
277 if (out_ring_.contains(last_acked)) {
278 bool next_to_send_updated = false;
279 // Rush re-delivery of any packets that have only been sent once and
280 // nacked.
281 for (SeqNum pos = out_ring_.begin(); pos != last_acked; ++pos) {
282 auto& buf = getOutBuffer(pos);
283 if (!buf.acked() && buf.send_counter() == 1) {
284 buf.rush();
285 rushed = true;
286 // Also, send the first nacked packet ASAP, to unblock the reader.
287 if (!next_to_send_updated) {
288 next_to_send_updated = true;
289 next_to_send_ = pos;
290 }
291 }
292 }
293 }
294 return rushed;
295}
296
297bool Transmitter::updateRecvHimark(bool control_bit, uint16_t recv_himark) {
298 if (state_ != kConnected) return false;
299 // Update to available slots received.
300 if (control_bit_ == control_bit) {
301 LOG(WARNING) << "Cross-talk detected. Check wiring and power.";
302 return false;
303 }
304 SeqNum new_recv_himark = out_ring_.restorePosHighBits(recv_himark, 12);
305 if (new_recv_himark < recv_himark_ ||
306 new_recv_himark > out_ring_.end() + peer_receive_buffer_size_) {
307 // Peer has sent a smaller himark than before, or too large to be able to
308 // consume. This is not expected.
309 LOG(WARNING) << "Bogus recv_himark received, ignoring: " << recv_himark
310 << " -> " << new_recv_himark << "; current: " << recv_himark_
311 << "; current outring_.end(): " << out_ring_.end();
312 return false;
313 }
314 recv_himark_ = out_ring_.restorePosHighBits(recv_himark, 12);
315 return true;
316}
317
318} // namespace internal
319} // namespace roo_transport
const roo::byte * data() const
Definition out_buffer.h:55
void init(SeqNum seq_id, bool control_bit)
size_t write(const roo::byte *buf, size_t count)
Definition out_buffer.h:28
const uint8_t size() const
Definition out_buffer.h:56
roo_time::Uptime expiration() const
Definition out_buffer.h:58
void markSent(roo_time::Uptime now)
bool contains(SeqNum seq) const
Definition ring_buffer.h:47
SeqNum restorePosHighBits(uint16_t truncated_pos, int pos_bits)
Definition ring_buffer.h:58
void init(uint32_t my_stream_id, SeqNum new_start)
size_t tryWrite(const roo::byte *buf, size_t count, bool &made_space)
const OutBuffer * getBufferToSend(long &next_send_micros)
Transmitter(unsigned int sendbuf_log2)
size_t send(roo::byte *buf, long &next_send_micros)
bool updateRecvHimark(bool control_bit, uint16_t recv_himark)
bool ack(bool control_bit, uint16_t seq_id, const roo::byte *ack_bitmap, size_t ack_bitmap_len)