3#include "roo_io/memory/store.h"
14 end_of_stream_(false),
15 in_buffers_(new
InBuffer[1 << recvbuf_log2]),
16 current_in_buffer_(nullptr),
17 current_in_buffer_pos_(0),
18 in_ring_(recvbuf_log2, 0),
21 recv_himark_(in_ring_.begin() + (1 << recvbuf_log2)),
22 recv_himark_update_expiration_(roo_time::Uptime::Start()),
23 packets_received_(0) {}
26 CHECK(in_ring_.
empty());
27 in_ring_.
reset(peer_seq_num);
28 unack_seq_ = peer_seq_num.
raw();
30 control_bit_ = control_bit;
37 end_of_stream_ =
false;
41 if (in_ring_.
empty()) {
47 end_of_stream_ =
false;
51 return empty() && (end_of_stream_ || self_closed_ || peer_closed_);
55 bool& outgoing_data_ready) {
56 size_t total_read = 0;
57 outgoing_data_ready =
false;
60 if (current_in_buffer_ ==
nullptr) {
61 if (in_ring_.
empty()) {
67 current_in_buffer_ = &getInBuffer(in_ring_.
begin());
68 current_in_buffer_pos_ = 0;
74 CHECK_GE(current_in_buffer_->
size(), current_in_buffer_pos_);
75 size_t available = current_in_buffer_->
size() - current_in_buffer_pos_;
76 if (count < available) {
77 memcpy(buf, current_in_buffer_->
data() + current_in_buffer_pos_, count);
79 current_in_buffer_pos_ += count;
82 memcpy(buf, current_in_buffer_->
data() + current_in_buffer_pos_, available);
84 total_read += available;
87 current_in_buffer_->
clear();
88 current_in_buffer_ =
nullptr;
89 recv_himark_update_expiration_ = roo_time::Uptime::Start();
90 outgoing_data_ready =
true;
94 end_of_stream_ =
true;
103 if (in_ring_.
empty())
return;
104 recv_himark_update_expiration_ = roo_time::Uptime::Start();
105 outgoing_data_ready =
true;
108 }
while (!in_ring_.
empty());
112 if (current_in_buffer_ ==
nullptr) {
113 if (in_ring_.
empty())
return -1;
114 current_in_buffer_ = &getInBuffer(in_ring_.
begin());
115 current_in_buffer_pos_ = 0;
121 DCHECK_GT(current_in_buffer_->
size(), current_in_buffer_pos_);
122 return (
int)current_in_buffer_->
data()[current_in_buffer_pos_];
126 if (current_in_buffer_ ==
nullptr) {
127 if (in_ring_.
empty())
return 0;
128 current_in_buffer_ = &getInBuffer(in_ring_.
begin());
129 current_in_buffer_pos_ = 0;
131 switch (current_in_buffer_->
type()) {
141 DCHECK_GT(current_in_buffer_->
size(), current_in_buffer_pos_);
142 return current_in_buffer_->
size() - current_in_buffer_pos_;
150 peer_closed_ =
false;
151 self_closed_ =
false;
152 end_of_stream_ =
false;
153 while (!in_ring_.
empty()) {
157 current_in_buffer_ =
nullptr;
158 current_in_buffer_pos_ = 0;
160 recv_himark_update_expiration_ = roo_time::Uptime::Start();
164 while (!in_ring_.
empty()) {
168 peer_closed_ =
false;
169 self_closed_ =
false;
170 end_of_stream_ =
false;
175 static const long kRecvHimarkExpirationTimeoutUs = 100000;
178 roo_time::Uptime now = roo_time::Uptime::Now();
179 if (now < recv_himark_update_expiration_) {
181 std::min(next_send_micros,
182 (
long)(recv_himark_update_expiration_ - now).inMicros());
188 roo_io::StoreBeU16(payload, buf);
189 recv_himark_update_expiration_ =
190 now + roo_time::Micros(kRecvHimarkExpirationTimeoutUs);
191 next_send_micros = std::min(next_send_micros, kRecvHimarkExpirationTimeoutUs);
201 roo_io::StoreBeU16(payload, buf);
207 uint64_t ack_bitmask = 0;
210 SeqNum in_pos = unack_seq_ + 1;
212 while (idx >= 0 && in_ring_.
contains(in_pos)) {
214 ack_bitmask |= (((uint64_t)1) << idx);
220 if (ack_bitmask == 0) {
223 roo_io::StoreBeU64(ack_bitmask, buf + 2);
226 while (buf[len - 1] == roo::byte{0}) --len;
232 const roo::byte* payload,
size_t len,
233 bool is_final,
bool& has_new_data_to_read) {
234 has_new_data_to_read =
false;
235 bool has_ack_to_send =
false;
239 if (control_bit == control_bit_) {
240 LOG(WARNING) <<
"Cross-talk detected; check wiring and power.";
245 if (seq < in_ring_.
begin()) {
249 has_ack_to_send =
true;
250 return has_ack_to_send;
257 size_t advance = seq - in_ring_.
end() + 1;
260 return has_ack_to_send;
262 for (
size_t i = 0; i < advance; ++i) {
266 << seq <<
", " << in_ring_.
begin() <<
"--" << in_ring_.
end();
268 InBuffer& buffer = getInBuffer(seq);
281 has_ack_to_send =
true;
282 if (seq == unack_seq_) {
287 }
while (in_ring_.
contains(unack_seq_) &&
292 while (!in_ring_.
empty() && in_ring_.
begin() < unack_seq_) {
296 has_new_data_to_read =
true;
299 return has_ack_to_send;
const roo::byte * data() const
void set(Type type, const roo::byte *payload, uint8_t size)
size_t tryRead(roo::byte *buf, size_t count, bool &outgoing_data_ready)
void setConnected(SeqNum peer_seq_num, bool control_bit)
size_t updateRecvHimark(roo::byte *buf, long &next_send_micros)
void markInputClosed(bool &outgoing_data_ready)
bool handleDataPacket(bool control_bit, uint16_t seq_id, const roo::byte *payload, size_t len, bool is_final, bool &has_new_data_to_read)
void init(uint32_t my_stream_id)
size_t availableForRead() const
Receiver(unsigned int recvbuf_log2)
size_t ack(roo::byte *buf)
uint32_t my_stream_id() const
uint16_t slotsUsed() const
bool contains(SeqNum seq) const
uint16_t capacity() const
uint16_t slotsFree() const
SeqNum restorePosHighBits(uint16_t truncated_pos, int pos_bits)
uint16_t FormatPacketHeader(SeqNum seq, PacketType type, bool control_bit)