roo_transport
API Documentation for roo_transport
Loading...
Searching...
No Matches
thread_safe_transmitter.cpp
Go to the documentation of this file.
2#ifdef ROO_USE_THREADS
3
5
6namespace roo_transport {
7namespace internal {
8
9ThreadSafeTransmitter::ThreadSafeTransmitter(unsigned int sendbuf_log2)
10 : transmitter_(sendbuf_log2) {}
11
12bool ThreadSafeTransmitter::checkConnectionStatus(
13 uint32_t my_stream_id, roo_io::Status& status) const {
14 if ((transmitter_.state() == Transmitter::kConnected ||
15 transmitter_.state() == Transmitter::kConnecting) &&
16 my_stream_id == transmitter_.my_stream_id()) {
17 status = roo_io::kOk;
18 return true;
19 }
20 status = roo_io::kConnectionError;
21 return false;
22}
23
24size_t ThreadSafeTransmitter::write(const roo::byte* buf, size_t count,
25 uint32_t my_stream_id,
26 roo_io::Status& stream_status,
27 bool& outgoing_data_ready) {
28 roo::unique_lock<roo::mutex> guard(mutex_);
29 if (!checkConnectionStatus(my_stream_id, stream_status)) return 0;
30 while (true) {
31 size_t total_written =
32 transmitter_.tryWrite(buf, count, outgoing_data_ready);
33 if (total_written > 0) {
34 return total_written;
35 }
36 if (!checkConnectionStatus(my_stream_id, stream_status)) return 0;
37 // Wait for space to be available.
38 has_space_.wait(guard);
39 }
40}
41
42size_t ThreadSafeTransmitter::tryWrite(const roo::byte* buf, size_t count,
43 uint32_t my_stream_id,
44 roo_io::Status& stream_status,
45 bool& outgoing_data_ready) {
46 roo::lock_guard<roo::mutex> guard(mutex_);
47 if (!checkConnectionStatus(my_stream_id, stream_status)) return 0;
48 size_t total_written = transmitter_.tryWrite(buf, count, outgoing_data_ready);
49 return total_written;
50}
51
52void ThreadSafeTransmitter::flush(uint32_t my_stream_id,
53 roo_io::Status& stream_status,
54 bool& outgoing_data_ready) {
55 roo::lock_guard<roo::mutex> guard(mutex_);
56 if (!checkConnectionStatus(my_stream_id, stream_status)) return;
57 if (transmitter_.flush()) {
58 outgoing_data_ready = true;
59 }
60}
61
62bool ThreadSafeTransmitter::hasPendingData(
63 uint32_t my_stream_id, roo_io::Status& stream_status) const {
64 if (!checkConnectionStatus(my_stream_id, stream_status)) return false;
65 roo::lock_guard<roo::mutex> guard(mutex_);
66 return transmitter_.hasPendingData();
67}
68
69void ThreadSafeTransmitter::close(uint32_t my_stream_id,
70 roo_io::Status& stream_status,
71 bool& outgoing_data_ready) {
72 roo::unique_lock<roo::mutex> guard(mutex_);
73 if (!checkConnectionStatus(my_stream_id, stream_status)) return;
74 transmitter_.close();
75 outgoing_data_ready = true;
76 if (!transmitter_.hasPendingData()) return;
77 while (true) {
78 all_acked_.wait(guard);
79 if (!transmitter_.hasPendingData()) return;
80 if (!checkConnectionStatus(my_stream_id, stream_status)) return;
81 }
82}
83
84size_t ThreadSafeTransmitter::availableForWrite(
85 uint32_t my_stream_id, roo_io::Status& stream_status) const {
86 roo::lock_guard<roo::mutex> guard(mutex_);
87 if (!checkConnectionStatus(my_stream_id, stream_status)) return 0;
88 return transmitter_.availableForWrite();
89}
90
91size_t ThreadSafeTransmitter::send(roo::byte* buf, long& next_send_micros) {
92 roo::lock_guard<roo::mutex> guard(mutex_);
93 const internal::OutBuffer* buf_to_send =
94 transmitter_.getBufferToSend(next_send_micros);
95 if (buf_to_send == nullptr) return 0;
96
97 const roo::byte* data = buf_to_send->data();
98 size_t size = buf_to_send->size();
99 memcpy(buf, data, size);
100 return size;
101}
102
103void ThreadSafeTransmitter::reset() {
104 roo::lock_guard<roo::mutex> guard(mutex_);
105 transmitter_.reset();
106 all_acked_.notify_all();
107}
108
109void ThreadSafeTransmitter::init(uint32_t my_stream_id, SeqNum new_start) {
110 roo::lock_guard<roo::mutex> guard(mutex_);
111 transmitter_.init(my_stream_id, new_start);
112 all_acked_.notify_all();
113}
114
115void ThreadSafeTransmitter::ack(bool control_bit, uint16_t seq_id,
116 const roo::byte* ack_bitmap,
117 size_t ack_bitmap_len,
118 bool& outgoing_data_ready) {
119 roo::lock_guard<roo::mutex> guard(mutex_);
120 if (transmitter_.ack(control_bit, seq_id, ack_bitmap, ack_bitmap_len)) {
121 // We have a new packet ready to be sent.
122 outgoing_data_ready = true;
123 }
124 if (!transmitter_.hasPendingData()) {
125 all_acked_.notify_all();
126 }
127}
128
129} // namespace internal
130} // namespace roo_transport
131
132#endif // ROO_USE_THREADS