3#include "roo_backport.h"
4#include "roo_backport/byte.h"
11 end_of_stream_(false),
12 out_buffers_(new
OutBuffer[1 << sendbuf_log2]),
13 current_out_buffer_(nullptr),
14 out_ring_(sendbuf_log2, 0),
15 next_to_send_(out_ring_.begin()),
16 recv_himark_(out_ring_.begin() + (1 << sendbuf_log2)),
17 has_pending_eof_(false),
19 packets_delivered_(0),
20 peer_receive_buffer_size_(0),
21 control_bit_(false) {}
24 bool& outgoing_data_ready) {
25 outgoing_data_ready =
false;
26 if (count == 0)
return 0;
27 if (end_of_stream_)
return 0;
29 size_t total_written = 0;
31 CHECK_GE(recv_himark_, out_ring_.
end());
32 if (current_out_buffer_ ==
nullptr) {
33 if (recv_himark_ == out_ring_.
end()) {
41 current_out_buffer_ = &getOutBuffer(pos);
42 current_out_buffer_->
init(pos, control_bit_);
44 size_t written = current_out_buffer_->
write(buf, count);
45 total_written += written;
48 if (current_out_buffer_->
finished()) {
49 current_out_buffer_ =
nullptr;
50 outgoing_data_ready =
true;
58 if (current_out_buffer_ !=
nullptr) {
59 current_out_buffer_->
flush();
67void Transmitter::addEosPacket() {
69 auto* buf = &getOutBuffer(pos);
70 buf->init(pos, control_bit_);
79 if (end_of_stream_ || state_ ==
kIdle) {
84 has_pending_eof_ =
true;
88 end_of_stream_ =
true;
92 while (!out_ring_.
empty()) {
99 if (end_of_stream_ || state_ ==
kIdle || state_ ==
kBroken) {
109 if (buf_to_send ==
nullptr)
return 0;
110 const roo::byte* data = buf_to_send->
data();
111 size_t size = buf_to_send->
size();
112 memcpy(buf, data, size);
117 long& next_send_micros) {
119 if (out_ring_.
contains(next_to_send_)) {
121 OutBuffer& buf = getOutBuffer(next_to_send_);
128 buf.
markSent(roo_time::Uptime::Now());
129 next_send_micros = 0;
135 roo_time::Uptime now = roo_time::Uptime::Now();
137 roo_time::Uptime min_send_time = roo_time::Uptime::Max();
139 pos < out_ring_.
end() && pos < recv_himark_; ++pos) {
151 min_send_time = roo_time::Uptime::Start();
164 if (out_ring_.
slotsUsed() != 1 || out_ring_.
begin() >= recv_himark_) {
172 DCHECK(!buf.
acked());
173 DCHECK_GT(buf.
size(), 0);
175 to_send = out_ring_.
begin();
176 min_send_time = roo_time::Uptime::Start();
178 if (min_send_time > now) {
181 std::min(next_send_micros, (
long)(min_send_time - now).inMicros());
190 next_to_send_ = to_send + 1;
192 next_send_micros = 0;
197 while (!out_ring_.
empty()) {
200 end_of_stream_ =
false;
203 current_out_buffer_ =
nullptr;
204 has_pending_eof_ =
false;
210 end_of_stream_ =
false;
211 while (!out_ring_.
empty()) {
214 out_ring_.
reset(new_start);
216 recv_himark_ = out_ring_.
begin();
217 next_to_send_ = out_ring_.
begin();
218 current_out_buffer_ =
nullptr;
219 has_pending_eof_ =
false;
223 const roo::byte* ack_bitmap,
size_t ack_bitmap_len) {
229 if (control_bit == control_bit_) {
230 LOG(WARNING) <<
"Cross-talk detected. Check wiring and power.";
235 if (seq > out_ring_.
end()) {
237 LOG(WARNING) <<
"Bogus ack received, ignoring: " << seq_id <<
" -> " << seq
238 <<
"; current: " << out_ring_.
end();
241 while (out_ring_.
begin() < seq && !out_ring_.
empty()) {
243 ++packets_delivered_;
244 if (has_pending_eof_) {
247 has_pending_eof_ =
false;
250 if (out_ring_.
empty()) {
251 if (end_of_stream_) {
260 while (offset < ack_bitmap_len) {
261 uint8_t val = (uint8_t)ack_bitmap[offset];
262 for (
int i = 7; i >= 0; --i) {
263 if (out_ring_.
contains(out_pos) && (val & (1 << i)) != 0) {
264 getOutBuffer(out_pos).
ack();
265 last_acked = out_pos;
277 if (out_ring_.
contains(last_acked)) {
278 bool next_to_send_updated =
false;
281 for (
SeqNum pos = out_ring_.
begin(); pos != last_acked; ++pos) {
282 auto& buf = getOutBuffer(pos);
283 if (!buf.acked() && buf.send_counter() == 1) {
287 if (!next_to_send_updated) {
288 next_to_send_updated =
true;
300 if (control_bit_ == control_bit) {
301 LOG(WARNING) <<
"Cross-talk detected. Check wiring and power.";
305 if (new_recv_himark < recv_himark_ ||
306 new_recv_himark > out_ring_.
end() + peer_receive_buffer_size_) {
309 LOG(WARNING) <<
"Bogus recv_himark received, ignoring: " << recv_himark
310 <<
" -> " << new_recv_himark <<
"; current: " << recv_himark_
311 <<
"; current outring_.end(): " << out_ring_.
end();
const roo::byte * data() const
void init(SeqNum seq_id, bool control_bit)
size_t write(const roo::byte *buf, size_t count)
const uint8_t size() const
roo_time::Uptime expiration() const
uint8_t send_counter() const
void markSent(roo_time::Uptime now)
uint16_t slotsUsed() const
bool contains(SeqNum seq) const
uint16_t slotsFree() const
SeqNum restorePosHighBits(uint16_t truncated_pos, int pos_bits)
void init(uint32_t my_stream_id, SeqNum new_start)
size_t tryWrite(const roo::byte *buf, size_t count, bool &made_space)
size_t availableForWrite() const
const OutBuffer * getBufferToSend(long &next_send_micros)
Transmitter(unsigned int sendbuf_log2)
size_t send(roo::byte *buf, long &next_send_micros)
bool updateRecvHimark(bool control_bit, uint16_t recv_himark)
uint32_t my_stream_id() const
bool hasPendingData() const
bool ack(bool control_bit, uint16_t seq_id, const roo::byte *ack_bitmap, size_t ack_bitmap_len)