roo_transport
API Documentation for roo_transport
Loading...
Searching...
No Matches
thread_safe_receiver.cpp
Go to the documentation of this file.
2#ifdef ROO_USE_THREADS
3
5
6namespace roo_transport {
7namespace internal {
8
9ThreadSafeReceiver::ThreadSafeReceiver(unsigned int recvbuf_log2)
10 : receiver_(recvbuf_log2) {}
11
12Receiver::State ThreadSafeReceiver::state() const {
13 roo::lock_guard<roo::mutex> guard(mutex_);
14 return receiver_.state();
15}
16
17void ThreadSafeReceiver::setConnected(SeqNum peer_seq_num, bool control_bit) {
18 roo::lock_guard<roo::mutex> guard(mutex_);
19 receiver_.setConnected(peer_seq_num, control_bit);
20}
21
22void ThreadSafeReceiver::setBroken() {
23 roo::lock_guard<roo::mutex> guard(mutex_);
24 receiver_.setBroken();
25 has_data_.notify_all();
26}
27
28bool ThreadSafeReceiver::checkConnectionStatus(uint32_t my_stream_id,
29 roo_io::Status& status) const {
30 if (my_stream_id != receiver_.my_stream_id()) {
31 // Peer reconnected with a new ID, and we already accepted. This connection
32 // is dead.
33 status = roo_io::kConnectionError;
34 return false;
35 }
36 if (receiver_.state() == Receiver::kIdle) {
37 // Peer reconnected with a new ID. We have not accepted yet, but this
38 // connection is dead.
39 status = roo_io::kConnectionError;
40 return false;
41 }
42 if (receiver_.eos()) {
43 status = roo_io::kEndOfStream;
44 return false;
45 }
46 status = roo_io::kOk;
47 return true;
48}
49
50size_t ThreadSafeReceiver::read(roo::byte* buf, size_t count,
51 uint32_t my_stream_id,
52 roo_io::Status& stream_status,
53 bool& outgoing_data_ready) {
54 if (count == 0) return 0;
55 roo::unique_lock<roo::mutex> guard(mutex_);
56 if (!checkConnectionStatus(my_stream_id, stream_status)) return 0;
57 while (true) {
58 size_t total_read = receiver_.tryRead(buf, count, outgoing_data_ready);
59 if (total_read > 0) {
60 return total_read;
61 }
62 if (!checkConnectionStatus(my_stream_id, stream_status)) return 0;
63 has_data_.wait(guard);
64 }
65}
66
67size_t ThreadSafeReceiver::tryRead(roo::byte* buf, size_t count,
68 uint32_t my_stream_id,
69 roo_io::Status& stream_status,
70 bool& outgoing_data_ready) {
71 if (count == 0) return 0;
72 roo::lock_guard<roo::mutex> guard(mutex_);
73 if (!checkConnectionStatus(my_stream_id, stream_status)) return 0;
74 size_t total_read = receiver_.tryRead(buf, count, outgoing_data_ready);
75 return total_read;
76}
77
78int ThreadSafeReceiver::peek(uint32_t my_stream_id,
79 roo_io::Status& stream_status) {
80 roo::lock_guard<roo::mutex> guard(mutex_);
81 if (!checkConnectionStatus(my_stream_id, stream_status)) return -1;
82 return receiver_.peek();
83}
84
85size_t ThreadSafeReceiver::availableForRead(
86 uint32_t my_stream_id, roo_io::Status& stream_status) const {
87 roo::lock_guard<roo::mutex> guard(mutex_);
88 if (!checkConnectionStatus(my_stream_id, stream_status)) return 0;
89 return receiver_.availableForRead();
90}
91
92void ThreadSafeReceiver::markInputClosed(uint32_t my_stream_id,
93 roo_io::Status& stream_status,
94 bool& outgoing_data_ready) {
95 roo::lock_guard<roo::mutex> guard(mutex_);
96 if (!checkConnectionStatus(my_stream_id, stream_status)) return;
97 receiver_.markInputClosed(outgoing_data_ready);
98}
99
100void ThreadSafeReceiver::reset() {
101 roo::lock_guard<roo::mutex> guard(mutex_);
102 receiver_.reset();
103 has_data_.notify_all();
104}
105
106void ThreadSafeReceiver::init(uint32_t my_stream_id) {
107 roo::lock_guard<roo::mutex> guard(mutex_);
108 receiver_.init(my_stream_id);
109 has_data_.notify_all();
110}
111
112size_t ThreadSafeReceiver::ack(roo::byte* buf) {
113 roo::lock_guard<roo::mutex> guard(mutex_);
114 return receiver_.ack(buf);
115}
116
117size_t ThreadSafeReceiver::updateRecvHimark(roo::byte* buf,
118 long& next_send_micros) {
119 roo::lock_guard<roo::mutex> guard(mutex_);
120 return receiver_.updateRecvHimark(buf, next_send_micros);
121}
122
123bool ThreadSafeReceiver::handleDataPacket(bool control_bit, uint16_t seq_id,
124 const roo::byte* payload, size_t len,
125 bool is_final) {
126 bool has_new_data_to_read = false;
127 bool has_ack_to_send;
128 roo::lock_guard<roo::mutex> guard(mutex_);
129 has_ack_to_send = receiver_.handleDataPacket(
130 control_bit, seq_id, payload, len, is_final, has_new_data_to_read);
131 if (has_new_data_to_read) {
132 has_data_.notify_all();
133 }
134 return has_ack_to_send;
135}
136
137unsigned int ThreadSafeReceiver::buffer_size_log2() const {
138 roo::lock_guard<roo::mutex> guard(mutex_);
139 return receiver_.buffer_size_log2();
140}
141
142} // namespace internal
143} // namespace roo_transport
144
145#endif // ROO_USE_THREADS