roo_transport
API Documentation for roo_transport
Loading...
Searching...
No Matches
thread_safe_receiver.h
Go to the documentation of this file.
1#pragma once
2
4#ifdef ROO_USE_THREADS
5
6#include "roo_io/status.h"
9
10namespace roo_transport {
11namespace internal {
12
13class ThreadSafeReceiver {
14 public:
15 // Can be supplied to be notified when new data is available for read.
16 using RecvCb = std::function<void()>;
17
18 ThreadSafeReceiver(unsigned int recvbuf_log2);
19
20 Receiver::State state() const;
21
22 void setConnected(SeqNum peer_seq_num, bool control_bit);
23 void setBroken();
24
25 size_t read(roo::byte* buf, size_t count, uint32_t my_stream_id,
26 roo_io::Status& stream_status, bool& outgoing_data_ready);
27
28 size_t tryRead(roo::byte* buf, size_t count, uint32_t my_stream_id,
29 roo_io::Status& stream_status, bool& outgoing_data_ready);
30
31 int peek(uint32_t my_stream_id, roo_io::Status& stream_status);
32
33 size_t availableForRead(uint32_t my_stream_id,
34 roo_io::Status& stream_status) const;
35
36 void markInputClosed(uint32_t my_stream_id, roo_io::Status& stream_status,
37 bool& outgoing_data_ready);
38
39 void reset();
40 void init(uint32_t my_stream_id);
41
42 size_t ack(roo::byte* buf);
43 size_t updateRecvHimark(roo::byte* buf, long& next_send_micros);
44
45 bool handleDataPacket(bool control_bit, uint16_t seq_id,
46 const roo::byte* payload, size_t len, bool is_final);
47
48 bool empty() const {
49 roo::lock_guard<roo::mutex> guard(mutex_);
50 return receiver_.empty();
51 }
52
53 bool done() const {
54 roo::lock_guard<roo::mutex> guard(mutex_);
55 return receiver_.done();
56 }
57
58 uint32_t packets_received() const {
59 roo::lock_guard<roo::mutex> guard(mutex_);
60 return receiver_.packets_received();
61 }
62
63 unsigned int buffer_size_log2() const;
64
65 private:
66 // Checks the state of the underlying receiver, and whether its stream ID
67 // matches my_stream_id. If there is no match, it means that the connection
68 // has been interrupted. If there is a match but the receiver is in the
69 // 'closed' state, it means that EOF has been encountered. This method sets
70 // status accordingly, to either kOk (if match and not closed), kEndOfStream
71 // (if match and closed), or kConnectionError (if mismatch). It returns true
72 // when status is kOk; false otherwise.
73 //
74 // Must be called with mutex_ held.
75 bool checkConnectionStatus(uint32_t my_stream_id,
76 roo_io::Status& status) const;
77
78 internal::Receiver receiver_;
79
80 mutable roo::mutex mutex_;
81 roo::condition_variable has_data_;
82};
83
84} // namespace internal
85} // namespace roo_transport
86
87#endif // ROO_USE_THREADS