roo_transport
API Documentation for roo_transport
Loading...
Searching...
No Matches
client.cpp
Go to the documentation of this file.
2
4
5namespace roo_transport {
6
8 : messaging_(messaging), dispatcher_(*this), next_stream_id_(1) {}
9
10void RpcClient::begin() { messaging_.setReceiver(dispatcher_); }
11
12void RpcClient::end() { messaging_.unsetReceiver(); }
13
15 const roo::byte* payload, size_t payload_size,
17 uint32_t stream_id = new_stream(std::move(cb));
18 RpcHeader header = RpcHeader::NewUnaryRequest(function_id, stream_id);
19 roo::byte header_bytes[RpcHeader::kMaxSerializedSize];
20 size_t header_size =
21 header.serialize(header_bytes, RpcHeader::kMaxSerializedSize);
22 CHECK(header_size > 0);
23 Messaging::ConnectionId connection_id;
24 return messaging_.send(header_bytes, header_size, payload, payload_size,
25 &connection_id)
28}
29
31 const roo::byte* payload,
32 size_t payload_size,
33 uint32_t timeout_ms,
35 uint32_t stream_id = new_stream(std::move(cb));
36 RpcHeader header =
37 RpcHeader::NewUnaryRequest(function_id, stream_id, timeout_ms);
38 roo::byte header_bytes[RpcHeader::kMaxSerializedSize];
39 size_t header_size =
40 header.serialize(header_bytes, RpcHeader::kMaxSerializedSize);
41 CHECK(header_size > 0);
42 Messaging::ConnectionId connection_id;
43 return messaging_.send(header_bytes, header_size, payload, payload_size,
44 &connection_id)
47}
48
49RpcStreamId RpcClient::new_stream(RpcClient::UnaryCompletionCb cb) {
50 roo::lock_guard<roo::mutex> guard(mutex_);
51 RpcStreamId stream_id = next_stream_id_++;
52 if (next_stream_id_ > 0x00FFFFFF) {
53 next_stream_id_ = 1;
54 }
55 outgoing_calls_.insert({stream_id, std::move(cb)});
56 return stream_id;
57}
58
59void RpcClient::handleResponse(Messaging::ConnectionId connection_id,
60 const roo::byte* data, size_t len) {
61 RpcHeader header;
62 size_t header_len = header.deserialize(data, len);
63 if (header_len == 0) {
64 LOG(WARNING) << "RpcClient: received invalid RPC header";
65 return;
66 }
67 data += header_len;
68 len -= header_len;
69
70 if (header.type() != RpcHeader::kResponse) {
71 LOG(WARNING) << "RpcClient: received non-response RPC message";
72 return;
73 }
74
76 {
77 roo::lock_guard<roo::mutex> guard(mutex_);
78 auto it = outgoing_calls_.find(header.streamId());
79 if (it == outgoing_calls_.end()) {
80 LOG(WARNING) << "RpcClient: received response for unknown stream ID "
81 << header.streamId();
82 return;
83 }
84 cb = std::move(it->second);
85 if (header.isLastMessage()) {
86 outgoing_calls_.erase(it);
87 }
88 }
89
91 if (header.isLastMessage()) {
92 status = header.responseStatus();
93 }
94 // Call the response callback.
95 cb(data, len, status);
96}
97
98void RpcClient::connectionReset(Messaging::ConnectionId connection_id) {
99 OutgoingCalls calls_to_cancel;
100 {
101 roo::lock_guard<roo::mutex> guard(mutex_);
102 calls_to_cancel = std::move(outgoing_calls_);
103 outgoing_calls_ = OutgoingCalls();
104 }
105 for (auto it = calls_to_cancel.begin(); it != calls_to_cancel.end(); ++it) {
106 // Call the response callback with cancelled status.
107 it->second(nullptr, 0, kUnavailable);
108 }
109}
110
111} // namespace roo_transport
Abstract interface for message exchange over a reliable channel.
Definition messaging.h:17
void unsetReceiver()
Unregisters receiver.
Definition messaging.h:30
virtual bool send(const roo::byte *header, size_t header_size, const roo::byte *payload, size_t payload_size, ConnectionId *connection_id)=0
Sends message with optional header and payload.
void setReceiver(Receiver &receiver)
Registers message receiver. Call before channel initialization.
Definition messaging.h:27
RpcClient(Messaging &messaging)
Definition client.cpp:7
RpcStatus sendUnaryRpcWithTimeout(RpcFunctionId function_id, const roo::byte *payload, size_t payload_size, uint32_t timeout_ms, UnaryCompletionCb cb)
Definition client.cpp:30
std::function< void(const roo::byte *data, size_t data_size, RpcStatus status)> UnaryCompletionCb
Definition client.h:20
RpcStatus sendUnaryRpc(RpcFunctionId function_id, const roo::byte *payload, size_t payload_size, UnaryCompletionCb cb)
Definition client.cpp:14
size_t serialize(roo::byte *buffer, size_t buffer_size) const
Definition header.cpp:63
static constexpr size_t kMaxSerializedSize
Definition header.h:16
static RpcHeader NewUnaryRequest(RpcFunctionId function_id, RpcStreamId stream_id)
Definition header.cpp:25
uint32_t RpcFunctionId
Definition rpc.h:10
uint32_t RpcStreamId
Definition rpc.h:11