roo_transport
API Documentation for roo_transport
Loading...
Searching...
No Matches
server.h
Go to the documentation of this file.
1#pragma once
2
3#include <vector>
4
5#include "roo_collections.h"
6#include "roo_collections/flat_small_hash_map.h"
7#include "roo_threads.h"
8#include "roo_threads/mutex.h"
9#include "roo_time.h"
14
15namespace roo_transport {
16
18 roo_collections::FlatSmallHashMap<RpcFunctionId, RpcHandlerFn>;
19
20// Convenience wrapper for implementing synchronous unary RPC handlers.
21template <typename Request, typename Response,
25 public:
26 using Fn = std::function<RpcStatus(const Request&, Response&)>;
27
28 UnaryHandler(Fn fn) : fn_(std::move(fn)) {}
29
30 void operator()(RequestHandle handle, const roo::byte* payload,
31 size_t payload_size, bool fin) const {
34 RpcStatus status = deserializer.deserialize(payload, payload_size, req);
35 if (status != roo_transport::kOk) {
36 handle.sendFailureResponse(status, "request deserialization failed");
37 return;
38 }
40 status = fn_(req, resp);
41 if (status != roo_transport::kOk) {
42 handle.sendFailureResponse(status, "application error");
43 return;
44 }
46 auto serialized = serializer.serialize(resp);
47 handle.sendSuccessResponse(serialized.data(), serialized.size(), true);
48 }
49
50 private:
51 Fn fn_;
52};
53
54// Convenience wrapper for implementing asynchronous unary RPC handlers.
55template <typename Request, typename Response,
56 typename RequestDeserializer = Deserializer<Request>,
57 typename ResponseSerializer = Serializer<Response>>
59 public:
60 using Fn = std::function<void(const Request&,
61 std::function<void(RpcStatus, Response)>)>;
62
64
65 void operator()(RequestHandle handle, const roo::byte* payload,
66 size_t payload_size, bool fin) const {
69 RpcStatus status = deserializer.deserialize(payload, payload_size, req);
70 if (status != roo_transport::kOk) {
71 handle.sendFailureResponse(status, "request deserialization failed");
72 return;
73 }
76 handle.sendFailureResponse(resp_status, "application error");
77 return;
78 }
80 auto serialized = serializer.serialize(resp_val);
81 handle.sendSuccessResponse(serialized.data(), serialized.size(), true);
82 });
83 }
84
85 private:
86 Fn fn_;
87};
88
89class RpcServer {
90 public:
92
93 void begin();
94 void end();
95
96 ~RpcServer() { messaging_.unsetReceiver(); }
97
98 private:
99 friend class RequestHandle;
100
101 class Dispatcher : public Messaging::Receiver {
102 public:
103 explicit Dispatcher(RpcServer& rpc_server) : rpc_server_(rpc_server) {}
104
105 void received(Messaging::ConnectionId connection_id, const roo::byte* data,
106 size_t len) override {
107 rpc_server_.handleRequest(connection_id, data, len);
108 }
109
110 private:
111 RpcServer& rpc_server_;
112 };
113
114 void handleRequest(Messaging::ConnectionId connection_id,
115 const roo::byte* data, size_t len);
116
117 void sendSuccessResponse(Messaging::ConnectionId connection_id,
118 RpcStreamId stream_id, const roo::byte* data,
119 size_t len);
120
121 void sendFailureResponse(Messaging::ConnectionId connection_id,
122 RpcStreamId stream_id, RpcStatus status,
123 roo::string_view msg);
124
125 // Returns true if the response should be sent; false otherwise. Destroys the
126 // request it it has been finished.
127 bool prepForResponse(Messaging::ConnectionId connection_id,
128 RpcStreamId stream_id);
129
130 void reconnected();
131
132 Messaging& messaging_;
133 Dispatcher dispatcher_;
134
135 const FunctionTable* handlers_;
136
137 Messaging::ConnectionId connection_id_;
138
139 roo::mutex mutex_;
140
141 // Guarded by mutex_.
142 roo_collections::FlatSmallHashMap<RpcStreamId, RpcRequest> pending_calls_;
143};
144
145} // namespace roo_transport
std::function< void(const Request &, std::function< void(RpcStatus, Response)>)> Fn
Definition server.h:61
void operator()(RequestHandle handle, const roo::byte *payload, size_t payload_size, bool fin) const
Definition server.h:65
Abstract interface for message exchange over a reliable channel.
Definition messaging.h:17
void unsetReceiver()
Unregisters receiver.
Definition messaging.h:30
void operator()(RequestHandle handle, const roo::byte *payload, size_t payload_size, bool fin) const
Definition server.h:30
std::function< RpcStatus(const Request &, Response &)> Fn
Definition server.h:26
uint32_t RpcStreamId
Definition rpc.h:11
roo_collections::FlatSmallHashMap< RpcFunctionId, RpcHandlerFn > FunctionTable
Definition server.h:18