roo_transport
API Documentation for roo_transport
Loading...
Searching...
No Matches
receiver.cpp
Go to the documentation of this file.
2
3#include "roo_io/memory/store.h"
5
6namespace roo_transport {
7namespace internal {
8
9Receiver::Receiver(unsigned int recvbuf_log2)
10 : my_stream_id_(0),
11 state_(kIdle),
12 self_closed_(false),
13 peer_closed_(false),
14 end_of_stream_(false),
15 in_buffers_(new InBuffer[1 << recvbuf_log2]),
16 current_in_buffer_(nullptr),
17 current_in_buffer_pos_(0),
18 in_ring_(recvbuf_log2, 0),
19 needs_ack_(false),
20 unack_seq_(0),
21 recv_himark_(in_ring_.begin() + (1 << recvbuf_log2)),
22 recv_himark_update_expiration_(roo_time::Uptime::Start()),
23 packets_received_(0) {}
24
25void Receiver::setConnected(SeqNum peer_seq_num, bool control_bit) {
26 CHECK(in_ring_.empty());
27 in_ring_.reset(peer_seq_num);
28 unack_seq_ = peer_seq_num.raw();
29 state_ = kConnected;
30 control_bit_ = control_bit;
31}
32
34 state_ = kIdle;
35 peer_closed_ = false;
36 self_closed_ = false;
37 end_of_stream_ = false;
38}
39
41 if (in_ring_.empty()) {
42 setIdle();
43 return;
44 }
45 peer_closed_ = true;
46 self_closed_ = false;
47 end_of_stream_ = false;
48}
49
50bool Receiver::done() const {
51 return empty() && (end_of_stream_ || self_closed_ || peer_closed_);
52}
53
54size_t Receiver::tryRead(roo::byte* buf, size_t count,
55 bool& outgoing_data_ready) {
56 size_t total_read = 0;
57 outgoing_data_ready = false;
58 if (state_ == kConnecting || state_ == kIdle) return 0;
59 do {
60 if (current_in_buffer_ == nullptr) {
61 if (in_ring_.empty()) {
62 if (state_ == kBroken) {
63 setIdle();
64 }
65 break;
66 }
67 current_in_buffer_ = &getInBuffer(in_ring_.begin());
68 current_in_buffer_pos_ = 0;
69 }
70 if (current_in_buffer_->type() == InBuffer::kUnset) {
71 // Not received yet.
72 break;
73 }
74 CHECK_GE(current_in_buffer_->size(), current_in_buffer_pos_);
75 size_t available = current_in_buffer_->size() - current_in_buffer_pos_;
76 if (count < available) {
77 memcpy(buf, current_in_buffer_->data() + current_in_buffer_pos_, count);
78 total_read += count;
79 current_in_buffer_pos_ += count;
80 break;
81 }
82 memcpy(buf, current_in_buffer_->data() + current_in_buffer_pos_, available);
83 buf += available;
84 total_read += available;
85 count -= available;
86 InBuffer::Type buffer_type = current_in_buffer_->type();
87 current_in_buffer_->clear();
88 current_in_buffer_ = nullptr;
89 recv_himark_update_expiration_ = roo_time::Uptime::Start();
90 outgoing_data_ready = true;
91 in_ring_.pop();
92 if (buffer_type == InBuffer::kFin) {
93 CHECK(in_ring_.empty()) << in_ring_.slotsUsed();
94 end_of_stream_ = true;
95 break;
96 }
97 } while (count > 0);
98 return total_read;
99}
100
101void Receiver::markInputClosed(bool& outgoing_data_ready) {
102 self_closed_ = true;
103 if (in_ring_.empty()) return;
104 recv_himark_update_expiration_ = roo_time::Uptime::Start();
105 outgoing_data_ready = true;
106 do {
107 in_ring_.pop();
108 } while (!in_ring_.empty());
109}
110
112 if (current_in_buffer_ == nullptr) {
113 if (in_ring_.empty()) return -1;
114 current_in_buffer_ = &getInBuffer(in_ring_.begin());
115 current_in_buffer_pos_ = 0;
116 }
117 if (current_in_buffer_->type() != InBuffer::kData) {
118 // Not received yet, or EOS.
119 return -1;
120 }
121 DCHECK_GT(current_in_buffer_->size(), current_in_buffer_pos_);
122 return (int)current_in_buffer_->data()[current_in_buffer_pos_];
123}
124
126 if (current_in_buffer_ == nullptr) {
127 if (in_ring_.empty()) return 0;
128 current_in_buffer_ = &getInBuffer(in_ring_.begin());
129 current_in_buffer_pos_ = 0;
130 }
131 switch (current_in_buffer_->type()) {
132 case InBuffer::kUnset: {
133 return 0;
134 }
135 case InBuffer::kFin: {
136 // Signal to the reader that EOF is here, available to read.
137 return 1;
138 }
139 case InBuffer::kData:
140 default: {
141 DCHECK_GT(current_in_buffer_->size(), current_in_buffer_pos_);
142 return current_in_buffer_->size() - current_in_buffer_pos_;
143 }
144 }
145}
146
148 my_stream_id_ = 0;
149 state_ = kIdle;
150 peer_closed_ = false;
151 self_closed_ = false;
152 end_of_stream_ = false;
153 while (!in_ring_.empty()) {
154 getInBuffer(in_ring_.begin()).clear();
155 in_ring_.pop();
156 }
157 current_in_buffer_ = nullptr;
158 current_in_buffer_pos_ = 0;
159 needs_ack_ = false;
160 recv_himark_update_expiration_ = roo_time::Uptime::Start();
161}
162
163void Receiver::init(uint32_t my_stream_id) {
164 while (!in_ring_.empty()) {
165 in_ring_.pop();
166 }
167 my_stream_id_ = my_stream_id;
168 peer_closed_ = false;
169 self_closed_ = false;
170 end_of_stream_ = false;
171 state_ = kConnecting;
172}
173
174size_t Receiver::updateRecvHimark(roo::byte* buf, long& next_send_micros) {
175 static const long kRecvHimarkExpirationTimeoutUs = 100000;
176 if (state_ == kConnecting || state_ == kIdle) return 0;
177 // Check if the deadline has expired to re-send the update.
178 roo_time::Uptime now = roo_time::Uptime::Now();
179 if (now < recv_himark_update_expiration_) {
180 next_send_micros =
181 std::min(next_send_micros,
182 (long)(recv_himark_update_expiration_ - now).inMicros());
183 return 0;
184 }
185 recv_himark_ = in_ring_.begin() + in_ring_.capacity();
186 uint16_t payload =
187 FormatPacketHeader(recv_himark_, kFlowControlPacket, control_bit_);
188 roo_io::StoreBeU16(payload, buf);
189 recv_himark_update_expiration_ =
190 now + roo_time::Micros(kRecvHimarkExpirationTimeoutUs);
191 next_send_micros = std::min(next_send_micros, kRecvHimarkExpirationTimeoutUs);
192 return 2;
193}
194
195size_t Receiver::ack(roo::byte* buf) {
196 if ((state_ != kConnected && state_ != kIdle) || !needs_ack_) {
197 return 0;
198 }
199 uint16_t payload =
200 FormatPacketHeader(unack_seq_, kDataAckPacket, control_bit_);
201 roo_io::StoreBeU16(payload, buf);
202
203 // For now, we only send ack about up to 64 packets. This should be more
204 // than enough in most cases. If needed, it can be extended, though; the
205 // receiver will understand the arbitrary number of bytes in the bitmask,
206 // not just 8.
207 uint64_t ack_bitmask = 0;
208 // Skipping the unack_seq_ itself, because it's status is obvious
209 // (unacked).
210 SeqNum in_pos = unack_seq_ + 1;
211 int idx = 63;
212 while (idx >= 0 && in_ring_.contains(in_pos)) {
213 if (getInBuffer(in_pos).type() != InBuffer::kUnset) {
214 ack_bitmask |= (((uint64_t)1) << idx);
215 }
216 ++in_pos;
217 --idx;
218 }
219 needs_ack_ = false;
220 if (ack_bitmask == 0) {
221 return 2;
222 } else {
223 roo_io::StoreBeU64(ack_bitmask, buf + 2);
224 size_t len = 10;
225 // No need to send bytes that are all zero.
226 while (buf[len - 1] == roo::byte{0}) --len;
227 return len;
228 }
229}
230
231bool Receiver::handleDataPacket(bool control_bit, uint16_t seq_id,
232 const roo::byte* payload, size_t len,
233 bool is_final, bool& has_new_data_to_read) {
234 has_new_data_to_read = false;
235 bool has_ack_to_send = false;
236 if (state_ == kConnecting || state_ == kIdle) {
237 return false;
238 }
239 if (control_bit == control_bit_) {
240 LOG(WARNING) << "Cross-talk detected; check wiring and power.";
241 return false;
242 }
243 SeqNum seq = in_ring_.restorePosHighBits(seq_id, 12);
244 if (!in_ring_.contains(seq)) {
245 if (seq < in_ring_.begin()) {
246 // Retransmit of a package that was already received and read. (Maybe the
247 // ack was lost.) Ignoring, but re-triggering the ack.
248 needs_ack_ = true;
249 has_ack_to_send = true;
250 return has_ack_to_send;
251 }
252 if (peer_closed_) {
253 // Not accepting new payload packages after having received the kFin.
254 return false;
255 }
256 // See if we can extend the buf to add the new packet.
257 size_t advance = seq - in_ring_.end() + 1;
258 if (advance > in_ring_.slotsFree()) {
259 // No more space; we didn't expect to receive this packet.
260 return has_ack_to_send;
261 }
262 for (size_t i = 0; i < advance; ++i) {
263 getInBuffer(in_ring_.push()).clear();
264 }
265 DCHECK(in_ring_.contains(seq))
266 << seq << ", " << in_ring_.begin() << "--" << in_ring_.end();
267 }
268 InBuffer& buffer = getInBuffer(seq);
269 if (buffer.type() == InBuffer::kUnset) {
270 buffer.set(is_final ? InBuffer::kFin : InBuffer::kData, payload, len);
271 } else {
272 // Ignore the retransmitted packet; stick to the previously received one.
273 }
274 if (is_final) {
275 peer_closed_ = true;
276 }
277 // Note: we send ack even if the packet we just received wasn't the oldest
278 // unacked (i.e. even if we don't update unack_seq_), because we are
279 // sending skip-acks as well.
280 needs_ack_ = true;
281 has_ack_to_send = true;
282 if (seq == unack_seq_) {
283 // Update the unack seq.
284 do {
285 ++unack_seq_;
286 ++packets_received_;
287 } while (in_ring_.contains(unack_seq_) &&
288 getInBuffer(unack_seq_).type() != InBuffer::kUnset);
289 if (self_closed_) {
290 // Remove all the received packets up to the updated unack_seq_, as if
291 // they were read.
292 while (!in_ring_.empty() && in_ring_.begin() < unack_seq_) {
293 in_ring_.pop();
294 }
295 } else {
296 has_new_data_to_read = true;
297 }
298 }
299 return has_ack_to_send;
300}
301
302} // namespace internal
303} // namespace roo_transport
const roo::byte * data() const
Definition in_buffer.h:28
void set(Type type, const roo::byte *payload, uint8_t size)
Definition in_buffer.h:21
size_t tryRead(roo::byte *buf, size_t count, bool &outgoing_data_ready)
Definition receiver.cpp:54
void setConnected(SeqNum peer_seq_num, bool control_bit)
Definition receiver.cpp:25
size_t updateRecvHimark(roo::byte *buf, long &next_send_micros)
Definition receiver.cpp:174
void markInputClosed(bool &outgoing_data_ready)
Definition receiver.cpp:101
bool handleDataPacket(bool control_bit, uint16_t seq_id, const roo::byte *payload, size_t len, bool is_final, bool &has_new_data_to_read)
Definition receiver.cpp:231
void init(uint32_t my_stream_id)
Definition receiver.cpp:163
Receiver(unsigned int recvbuf_log2)
Definition receiver.cpp:9
size_t ack(roo::byte *buf)
Definition receiver.cpp:195
uint32_t my_stream_id() const
Definition receiver.h:62
bool contains(SeqNum seq) const
Definition ring_buffer.h:47
SeqNum restorePosHighBits(uint16_t truncated_pos, int pos_bits)
Definition ring_buffer.h:58
uint16_t FormatPacketHeader(SeqNum seq, PacketType type, bool control_bit)
Definition protocol.h:121