roo_transport
API Documentation for roo_transport
Loading...
Searching...
No Matches
server.cpp
Go to the documentation of this file.
2
4
5namespace roo_transport {
6
8 : messaging_(messaging),
9 dispatcher_(*this),
10 handlers_(function_table),
11 connection_id_(0) {}
12
13void RpcServer::begin() { messaging_.setReceiver(dispatcher_); }
14
15void RpcServer::end() { messaging_.unsetReceiver(); }
16
17void RpcServer::handleRequest(Messaging::ConnectionId connection_id,
18 const roo::byte* data, size_t len) {
19 if (connection_id != connection_id_) {
20 connection_id_ = connection_id;
21 reconnected();
22 }
23 RpcHeader header;
24 size_t header_len = header.deserialize(data, len);
25 if (header_len == 0) {
26 LOG(WARNING) << "RpcServer: received invalid RPC header";
27 return;
28 }
29 data += header_len;
30 len -= header_len;
31
32 if (header.type() != RpcHeader::kRequest) {
33 LOG(WARNING) << "RpcServer: received non-request RPC message";
34 return;
35 }
36 if (header.isFirstMessage()) {
37 // New request.
38 RpcFunctionId function_id = header.functionId();
39 auto handler_it = handlers_->find(function_id);
40 if (handler_it == handlers_->end()) {
41 sendFailureResponse(connection_id, header.streamId(),
43 roo::string_view("Unknown function ID"));
44 return;
45 }
46 const RpcHandlerFn& handler = handler_it->second;
47
48 roo_time::Uptime deadline = roo_time::Uptime::Max();
49 if (header.hasTimeout()) {
50 deadline = roo_time::Uptime::Now() + roo_time::Millis(header.timeoutMs());
51 }
52
53 {
54 roo::lock_guard<roo::mutex> guard(mutex_);
55 if (pending_calls_.find(header.streamId()) != pending_calls_.end()) {
56 LOG(WARNING) << "RpcServer: received duplicate request for stream ID "
57 << header.streamId();
58 return;
59 }
60 pending_calls_.insert(
61 {header.streamId(),
62 RpcRequest(connection_id, function_id, header.streamId(), deadline,
63 header.isLastMessage())});
64 }
65
66 // Invoke the handler.
67 handler(RequestHandle(*this, connection_id, header.streamId()), data, len,
68 header.isLastMessage());
69 } else {
70 LOG(FATAL) << "Streaming RPC not yet supported";
71 }
72}
73
74void RpcServer::reconnected() {
75 roo::lock_guard<roo::mutex> guard(mutex_);
76 // Clear the info about pending requests, so that new requests don't clash
77 // when they use the same stream IDs.
78 pending_calls_.clear();
79}
80
81void RpcServer::sendSuccessResponse(Messaging::ConnectionId connection_id,
82 RpcStreamId stream_id,
83 const roo::byte* data, size_t len) {
84 if (!prepForResponse(connection_id, stream_id)) {
85 return;
86 }
87 RpcHeader header = RpcHeader::NewUnaryResponse(stream_id, RpcStatus::kOk);
88 roo::byte header_bytes[RpcHeader::kMaxSerializedSize];
89 size_t header_size =
90 header.serialize(header_bytes, RpcHeader::kMaxSerializedSize);
91 messaging_.sendContinuation(connection_id, header_bytes, header_size, data,
92 len);
93}
94
95void RpcServer::sendFailureResponse(Messaging::ConnectionId connection_id,
96 RpcStreamId stream_id, RpcStatus status,
97 roo::string_view msg) {
98 if (!prepForResponse(connection_id, stream_id)) {
99 return;
100 }
101 RpcHeader header = RpcHeader::NewUnaryResponse(stream_id, status);
102 roo::byte header_bytes[RpcHeader::kMaxSerializedSize];
103 size_t header_size =
104 header.serialize(header_bytes, RpcHeader::kMaxSerializedSize);
105 messaging_.sendContinuation(connection_id, header_bytes, header_size,
106 (const roo::byte*)msg.data(), msg.size());
107}
108
109bool RpcServer::prepForResponse(Messaging::ConnectionId connection_id,
110 RpcStreamId stream_id) {
111 // Look up the request.
112 roo::lock_guard<roo::mutex> guard(mutex_);
113 auto it = pending_calls_.find(stream_id);
114 if (it == pending_calls_.end()) {
115 LOG(WARNING) << "RpcServer: no pending request for stream ID " << stream_id;
116 return false;
117 }
118 RpcRequest& request = it->second;
119
120 bool ok = false;
121 if (request.serverFin()) {
122 LOG(WARNING) << "RpcServer: attempt to send response on closed stream ID "
123 << stream_id;
124 // } else if (request.isCancelled()) {
125 // LOG(WARNING) << "RpcServer: attempt to send response on cancelled "
126 // "request for stream ID "
127 // << stream_id;
128 } else if (connection_id != connection_id_) {
129 LOG(INFO) << "RpcServer: not sending the response because the connection "
130 "has been reset";
131 } else {
132 ok = true;
133 }
134 if (request.clientFin() || !ok) {
135 pending_calls_.erase(it);
136 }
137 return ok;
138}
139
140} // namespace roo_transport
Abstract interface for message exchange over a reliable channel.
Definition messaging.h:17
virtual bool sendContinuation(ConnectionId connection_id, const roo::byte *header, size_t header_size, const roo::byte *payload, size_t payload_size)=0
Sends continuation payload on an existing sender-side connection.
void unsetReceiver()
Unregisters receiver.
Definition messaging.h:30
void setReceiver(Receiver &receiver)
Registers message receiver. Call before channel initialization.
Definition messaging.h:27
static RpcHeader NewUnaryResponse(RpcStreamId stream_id, RpcStatus status)
Definition header.cpp:52
static constexpr size_t kMaxSerializedSize
Definition header.h:16
RpcServer(Messaging &messaging, const FunctionTable *function_table)
Definition server.cpp:7
friend class RequestHandle
Definition server.h:99
uint32_t RpcFunctionId
Definition rpc.h:10
uint32_t RpcStreamId
Definition rpc.h:11
roo_collections::FlatSmallHashMap< RpcFunctionId, RpcHandlerFn > FunctionTable
Definition server.h:18
std::function< void(RequestHandle handle, const roo::byte *payload, size_t payload_size, bool fin)> RpcHandlerFn
Definition handler.h:11