9ThreadSafeReceiver::ThreadSafeReceiver(
unsigned int recvbuf_log2)
10 : receiver_(recvbuf_log2) {}
12Receiver::State ThreadSafeReceiver::state()
const {
13 roo::lock_guard<roo::mutex> guard(mutex_);
14 return receiver_.state();
17void ThreadSafeReceiver::setConnected(SeqNum peer_seq_num,
bool control_bit) {
18 roo::lock_guard<roo::mutex> guard(mutex_);
19 receiver_.setConnected(peer_seq_num, control_bit);
22void ThreadSafeReceiver::setBroken() {
23 roo::lock_guard<roo::mutex> guard(mutex_);
24 receiver_.setBroken();
25 has_data_.notify_all();
28bool ThreadSafeReceiver::checkConnectionStatus(uint32_t my_stream_id,
29 roo_io::Status& status)
const {
30 if (my_stream_id != receiver_.my_stream_id()) {
33 status = roo_io::kConnectionError;
36 if (receiver_.state() == Receiver::kIdle) {
39 status = roo_io::kConnectionError;
42 if (receiver_.eos()) {
43 status = roo_io::kEndOfStream;
50size_t ThreadSafeReceiver::read(roo::byte* buf,
size_t count,
51 uint32_t my_stream_id,
52 roo_io::Status& stream_status,
53 bool& outgoing_data_ready) {
54 if (count == 0)
return 0;
55 roo::unique_lock<roo::mutex> guard(mutex_);
56 if (!checkConnectionStatus(my_stream_id, stream_status))
return 0;
58 size_t total_read = receiver_.tryRead(buf, count, outgoing_data_ready);
62 if (!checkConnectionStatus(my_stream_id, stream_status))
return 0;
63 has_data_.wait(guard);
67size_t ThreadSafeReceiver::tryRead(roo::byte* buf,
size_t count,
68 uint32_t my_stream_id,
69 roo_io::Status& stream_status,
70 bool& outgoing_data_ready) {
71 if (count == 0)
return 0;
72 roo::lock_guard<roo::mutex> guard(mutex_);
73 if (!checkConnectionStatus(my_stream_id, stream_status))
return 0;
74 size_t total_read = receiver_.tryRead(buf, count, outgoing_data_ready);
78int ThreadSafeReceiver::peek(uint32_t my_stream_id,
79 roo_io::Status& stream_status) {
80 roo::lock_guard<roo::mutex> guard(mutex_);
81 if (!checkConnectionStatus(my_stream_id, stream_status))
return -1;
82 return receiver_.peek();
85size_t ThreadSafeReceiver::availableForRead(
86 uint32_t my_stream_id, roo_io::Status& stream_status)
const {
87 roo::lock_guard<roo::mutex> guard(mutex_);
88 if (!checkConnectionStatus(my_stream_id, stream_status))
return 0;
89 return receiver_.availableForRead();
92void ThreadSafeReceiver::markInputClosed(uint32_t my_stream_id,
93 roo_io::Status& stream_status,
94 bool& outgoing_data_ready) {
95 roo::lock_guard<roo::mutex> guard(mutex_);
96 if (!checkConnectionStatus(my_stream_id, stream_status))
return;
97 receiver_.markInputClosed(outgoing_data_ready);
100void ThreadSafeReceiver::reset() {
101 roo::lock_guard<roo::mutex> guard(mutex_);
103 has_data_.notify_all();
106void ThreadSafeReceiver::init(uint32_t my_stream_id) {
107 roo::lock_guard<roo::mutex> guard(mutex_);
108 receiver_.init(my_stream_id);
109 has_data_.notify_all();
112size_t ThreadSafeReceiver::ack(roo::byte* buf) {
113 roo::lock_guard<roo::mutex> guard(mutex_);
114 return receiver_.ack(buf);
117size_t ThreadSafeReceiver::updateRecvHimark(roo::byte* buf,
118 long& next_send_micros) {
119 roo::lock_guard<roo::mutex> guard(mutex_);
120 return receiver_.updateRecvHimark(buf, next_send_micros);
123bool ThreadSafeReceiver::handleDataPacket(
bool control_bit, uint16_t seq_id,
124 const roo::byte* payload,
size_t len,
126 bool has_new_data_to_read =
false;
127 bool has_ack_to_send;
128 roo::lock_guard<roo::mutex> guard(mutex_);
129 has_ack_to_send = receiver_.handleDataPacket(
130 control_bit, seq_id, payload, len, is_final, has_new_data_to_read);
131 if (has_new_data_to_read) {
132 has_data_.notify_all();
134 return has_ack_to_send;
137unsigned int ThreadSafeReceiver::buffer_size_log2()
const {
138 roo::lock_guard<roo::mutex> guard(mutex_);
139 return receiver_.buffer_size_log2();