roo_transport
API Documentation for roo_transport
Loading...
Searching...
No Matches
client.h
Go to the documentation of this file.
1#pragma once
2
3#include <functional>
4
5#include "roo_collections.h"
6#include "roo_collections/flat_small_hash_map.h"
7#include "roo_threads.h"
8#include "roo_threads/latch.h"
9#include "roo_threads/mutex.h"
14
15namespace roo_transport {
16
17class RpcClient {
18 public:
19 using UnaryCompletionCb = std::function<void(
20 const roo::byte* data, size_t data_size, RpcStatus status)>;
21
22 explicit RpcClient(Messaging& messaging);
23
24 RpcStatus sendUnaryRpc(RpcFunctionId function_id, const roo::byte* payload,
25 size_t payload_size, UnaryCompletionCb cb);
26
28 const roo::byte* payload,
29 size_t payload_size, uint32_t timeout_ms,
31
32 ~RpcClient() = default;
33
34 void begin();
35 void end();
36
37 private:
38 class Dispatcher : public Messaging::Receiver {
39 public:
40 explicit Dispatcher(RpcClient& rpc_client) : rpc_client_(rpc_client) {}
41
42 void received(Messaging::ConnectionId connection_id, const roo::byte* data,
43 size_t len) override {
44 rpc_client_.handleResponse(connection_id, data, len);
45 }
46
47 void reset(Messaging::ConnectionId connection_id) override {
48 rpc_client_.connectionReset(connection_id);
49 }
50
51 private:
52 RpcClient& rpc_client_;
53 };
54
55 using OutgoingCalls =
56 roo_collections::FlatSmallHashMap<RpcStreamId, UnaryCompletionCb>;
57
58 // Called when we receive a response from the server. This method dispatches
59 // the response to the appropriate result callback.
60 void handleResponse(Messaging::ConnectionId connection_id,
61 const roo::byte* data, size_t len);
62
63 // Called when the transport layer detects reconnection, indicating that
64 // pending RPCs will never complete (and should thus be failed).
65 void connectionReset(Messaging::ConnectionId connection_id);
66
68
69 Messaging& messaging_;
70 Dispatcher dispatcher_;
71
72 roo::mutex mutex_;
73
74 // Guarded by mutex_.
75 uint32_t next_stream_id_ = 1;
76
77 OutgoingCalls outgoing_calls_;
78};
79
80// Convenience wrapper for implementing unary RPC stubs.
81template <typename Request, typename Response,
82 typename RequestSerializer = Serializer<Request>,
83 typename ResponseDeserializer = Deserializer<Response>>
84class UnaryStub {
85 public:
88
90 roo::latch completed(1);
92 // Serialize the request message.
93 auto serialized = serializer.serialize(request);
94 // Bail in case the argument serialization failed.
95 if (serialized.status() != kOk) {
96 return serialized.status();
97 }
98 RpcStatus status;
100 function_id_, serialized.data(), serialized.size(),
101 [&completed, &response, &status](const roo::byte* data, size_t len,
103 ResponseDeserializer deserializer;
104 if (resp_status == kOk) {
105 resp_status = deserializer.deserialize((const roo_io::byte*)data,
106 len, response);
107 }
108 status = resp_status;
109 completed.count_down();
110 });
111 if (req_status != kOk) {
112 return req_status;
113 }
114 completed.wait();
115 return status;
116 }
117
119 std::function<void(RpcStatus, Response)> completion_cb) {
121 // Serialize the request message.
122 auto serialized = serializer.serialize(request);
123 // Bail in case the argument serialization failed.
124 if (serialized.status() != kOk) {
125 return serialized.status();
126 }
127 return client_.sendUnaryRpc(
128 function_id_, serialized.data(), serialized.size(),
129 [completion_cb](const roo::byte* data, size_t len,
131 ResponseDeserializer deserializer;
132 Response resp;
133 if (resp_status == kOk) {
134 resp_status =
135 deserializer.deserialize((const roo_io::byte*)data, len, resp);
136 }
137 completion_cb(resp_status, std::move(resp));
138 });
139 }
140
141 private:
142 RpcClient& client_;
143 RpcFunctionId function_id_;
144};
145
146} // namespace roo_transport
Abstract interface for message exchange over a reliable channel.
Definition messaging.h:17
RpcStatus sendUnaryRpcWithTimeout(RpcFunctionId function_id, const roo::byte *payload, size_t payload_size, uint32_t timeout_ms, UnaryCompletionCb cb)
Definition client.cpp:30
std::function< void(const roo::byte *data, size_t data_size, RpcStatus status)> UnaryCompletionCb
Definition client.h:20
RpcStatus sendUnaryRpc(RpcFunctionId function_id, const roo::byte *payload, size_t payload_size, UnaryCompletionCb cb)
Definition client.cpp:14
RpcStatus callAsync(const Request &request, std::function< void(RpcStatus, Response)> completion_cb)
Definition client.h:118
UnaryStub(RpcClient &client, RpcFunctionId function_id)
Definition client.h:86
RpcStatus call(const Request &request, Response &response)
Definition client.h:89
uint32_t RpcFunctionId
Definition rpc.h:10
uint32_t RpcStreamId
Definition rpc.h:11