9ThreadSafeTransmitter::ThreadSafeTransmitter(
unsigned int sendbuf_log2)
10 : transmitter_(sendbuf_log2) {}
12bool ThreadSafeTransmitter::checkConnectionStatus(
13 uint32_t my_stream_id, roo_io::Status& status)
const {
14 if ((transmitter_.state() == Transmitter::kConnected ||
15 transmitter_.state() == Transmitter::kConnecting) &&
16 my_stream_id == transmitter_.my_stream_id()) {
20 status = roo_io::kConnectionError;
24size_t ThreadSafeTransmitter::write(
const roo::byte* buf,
size_t count,
25 uint32_t my_stream_id,
26 roo_io::Status& stream_status,
27 bool& outgoing_data_ready) {
28 roo::unique_lock<roo::mutex> guard(mutex_);
29 if (!checkConnectionStatus(my_stream_id, stream_status))
return 0;
31 size_t total_written =
32 transmitter_.tryWrite(buf, count, outgoing_data_ready);
33 if (total_written > 0) {
36 if (!checkConnectionStatus(my_stream_id, stream_status))
return 0;
38 has_space_.wait(guard);
42size_t ThreadSafeTransmitter::tryWrite(
const roo::byte* buf,
size_t count,
43 uint32_t my_stream_id,
44 roo_io::Status& stream_status,
45 bool& outgoing_data_ready) {
46 roo::lock_guard<roo::mutex> guard(mutex_);
47 if (!checkConnectionStatus(my_stream_id, stream_status))
return 0;
48 size_t total_written = transmitter_.tryWrite(buf, count, outgoing_data_ready);
52void ThreadSafeTransmitter::flush(uint32_t my_stream_id,
53 roo_io::Status& stream_status,
54 bool& outgoing_data_ready) {
55 roo::lock_guard<roo::mutex> guard(mutex_);
56 if (!checkConnectionStatus(my_stream_id, stream_status))
return;
57 if (transmitter_.flush()) {
58 outgoing_data_ready =
true;
62bool ThreadSafeTransmitter::hasPendingData(
63 uint32_t my_stream_id, roo_io::Status& stream_status)
const {
64 if (!checkConnectionStatus(my_stream_id, stream_status))
return false;
65 roo::lock_guard<roo::mutex> guard(mutex_);
66 return transmitter_.hasPendingData();
69void ThreadSafeTransmitter::close(uint32_t my_stream_id,
70 roo_io::Status& stream_status,
71 bool& outgoing_data_ready) {
72 roo::unique_lock<roo::mutex> guard(mutex_);
73 if (!checkConnectionStatus(my_stream_id, stream_status))
return;
75 outgoing_data_ready =
true;
76 if (!transmitter_.hasPendingData())
return;
78 all_acked_.wait(guard);
79 if (!transmitter_.hasPendingData())
return;
80 if (!checkConnectionStatus(my_stream_id, stream_status))
return;
84size_t ThreadSafeTransmitter::availableForWrite(
85 uint32_t my_stream_id, roo_io::Status& stream_status)
const {
86 roo::lock_guard<roo::mutex> guard(mutex_);
87 if (!checkConnectionStatus(my_stream_id, stream_status))
return 0;
88 return transmitter_.availableForWrite();
91size_t ThreadSafeTransmitter::send(roo::byte* buf,
long& next_send_micros) {
92 roo::lock_guard<roo::mutex> guard(mutex_);
93 const internal::OutBuffer* buf_to_send =
94 transmitter_.getBufferToSend(next_send_micros);
95 if (buf_to_send ==
nullptr)
return 0;
97 const roo::byte* data = buf_to_send->data();
98 size_t size = buf_to_send->size();
99 memcpy(buf, data, size);
103void ThreadSafeTransmitter::reset() {
104 roo::lock_guard<roo::mutex> guard(mutex_);
105 transmitter_.reset();
106 all_acked_.notify_all();
109void ThreadSafeTransmitter::init(uint32_t my_stream_id, SeqNum new_start) {
110 roo::lock_guard<roo::mutex> guard(mutex_);
111 transmitter_.init(my_stream_id, new_start);
112 all_acked_.notify_all();
115void ThreadSafeTransmitter::ack(
bool control_bit, uint16_t seq_id,
116 const roo::byte* ack_bitmap,
117 size_t ack_bitmap_len,
118 bool& outgoing_data_ready) {
119 roo::lock_guard<roo::mutex> guard(mutex_);
120 if (transmitter_.ack(control_bit, seq_id, ack_bitmap, ack_bitmap_len)) {
122 outgoing_data_ready =
true;
124 if (!transmitter_.hasPendingData()) {
125 all_acked_.notify_all();