roo_transport
API Documentation for roo_transport
Loading...
Searching...
No Matches
thread_safe_transmitter.h
Go to the documentation of this file.
1#pragma once
2
4#ifdef ROO_USE_THREADS
5
6#include "roo_io/status.h"
7#include "roo_threads.h"
8#include "roo_threads/mutex.h"
10
11namespace roo_transport {
12namespace internal {
13
14class ThreadSafeTransmitter {
15 public:
16 ThreadSafeTransmitter(unsigned int sendbuf_log2);
17
18 void reset();
19
20 void init(uint32_t my_stream_id, SeqNum new_start);
21
22 uint32_t packets_sent() const {
23 roo::lock_guard<roo::mutex> guard(mutex_);
24 return transmitter_.packets_sent();
25 }
26
27 uint32_t packets_delivered() const {
28 roo::lock_guard<roo::mutex> guard(mutex_);
29 return transmitter_.packets_delivered();
30 }
31
32 size_t write(const roo::byte* buf, size_t count, uint32_t my_stream_id,
33 roo_io::Status& stream_status, bool& outgoing_data_ready);
34
35 size_t tryWrite(const roo::byte* buf, size_t count, uint32_t my_stream_id,
36 roo_io::Status& stream_status, bool& outgoing_data_ready);
37
38 size_t availableForWrite(uint32_t my_stream_id,
39 roo_io::Status& stream_status) const;
40
41 void flush(uint32_t my_stream_id, roo_io::Status& stream_status,
42 bool& outgoing_data_ready);
43
44 bool hasPendingData(uint32_t my_stream_id,
45 roo_io::Status& stream_status) const;
46
47 // Closes the stream and blocks until all the data has been confirmed by the
48 // recipient.
49 void close(uint32_t my_stream_id, roo_io::Status& stream_status,
50 bool& outgoing_data_ready);
51
52 void setConnected(uint16_t peer_receive_buffer_size, bool control_bit) {
53 roo::lock_guard<roo::mutex> guard(mutex_);
54 transmitter_.setConnected(peer_receive_buffer_size, control_bit);
55 }
56
57 void setBroken() {
58 roo::lock_guard<roo::mutex> guard(mutex_);
59 transmitter_.setBroken();
60 all_acked_.notify_all();
61 has_space_.notify_all();
62 }
63
64 Transmitter::State state() const {
65 roo::lock_guard<roo::mutex> guard(mutex_);
66 return transmitter_.state();
67 }
68
69 size_t send(roo::byte* buf, long& next_send_micros);
70
71 SeqNum front() const {
72 roo::lock_guard<roo::mutex> guard(mutex_);
73 return transmitter_.front();
74 }
75
76 void ack(bool control_bit, uint16_t seq_id, const roo::byte* ack_bitmap,
77 size_t ack_bitmap_len, bool& outgoing_data_ready);
78
79 void updateRecvHimark(bool control_bit, uint16_t recv_himark) {
80 roo::lock_guard<roo::mutex> guard(mutex_);
81 if (transmitter_.updateRecvHimark(control_bit, recv_himark)) {
82 has_space_.notify_all();
83 }
84 }
85
86 private:
87 // Checks the state of the underlying receiver, and whether its stream ID
88 // matches my_stream_id. If there is no match, it means that the connection
89 // has been interrupted. This method sets
90 // status accordingly, to either kOk (if match), or kConnectionError (if
91 // mismatch). It returns true when status is kOk; false otherwise.
92 //
93 // Must be called with mutex_ held.
94 bool checkConnectionStatus(uint32_t my_stream_id,
95 roo_io::Status& status) const;
96
97 internal::Transmitter transmitter_;
98
99 mutable roo::mutex mutex_;
100
101 // Notifies the application writer thread that the output stream might have
102 // some space for writing new data.
103 roo::condition_variable has_space_;
104
105 // Notifies the application writer thread that the send buffer has been
106 // entirely acked (all data has been delivered).
107 roo::condition_variable all_acked_;
108};
109
110} // namespace internal
111} // namespace roo_transport
112
113#endif // ROO_USE_THREADS