18 const roo::byte* data,
size_t len) {
24 size_t header_len = header.deserialize(data, len);
25 if (header_len == 0) {
26 LOG(WARNING) <<
"RpcServer: received invalid RPC header";
33 LOG(WARNING) <<
"RpcServer: received non-request RPC message";
36 if (header.isFirstMessage()) {
39 auto handler_it = handlers_->find(function_id);
40 if (handler_it == handlers_->end()) {
41 sendFailureResponse(connection_id, header.streamId(),
43 roo::string_view(
"Unknown function ID"));
48 roo_time::Uptime deadline = roo_time::Uptime::Max();
49 if (header.hasTimeout()) {
50 deadline = roo_time::Uptime::Now() + roo_time::Millis(header.timeoutMs());
54 roo::lock_guard<roo::mutex> guard(mutex_);
55 if (pending_calls_.find(header.streamId()) != pending_calls_.end()) {
56 LOG(WARNING) <<
"RpcServer: received duplicate request for stream ID "
60 pending_calls_.insert(
62 RpcRequest(connection_id, function_id, header.streamId(), deadline,
63 header.isLastMessage())});
67 handler(
RequestHandle(*
this, connection_id, header.streamId()), data, len,
68 header.isLastMessage());
70 LOG(FATAL) <<
"Streaming RPC not yet supported";
74void RpcServer::reconnected() {
75 roo::lock_guard<roo::mutex> guard(mutex_);
78 pending_calls_.clear();
83 const roo::byte* data,
size_t len) {
84 if (!prepForResponse(connection_id, stream_id)) {
97 roo::string_view msg) {
98 if (!prepForResponse(connection_id, stream_id)) {
106 (
const roo::byte*)msg.data(), msg.size());
112 roo::lock_guard<roo::mutex> guard(mutex_);
113 auto it = pending_calls_.find(stream_id);
114 if (it == pending_calls_.end()) {
115 LOG(WARNING) <<
"RpcServer: no pending request for stream ID " << stream_id;
118 RpcRequest& request = it->second;
121 if (request.serverFin()) {
122 LOG(WARNING) <<
"RpcServer: attempt to send response on closed stream ID "
128 }
else if (connection_id != connection_id_) {
129 LOG(INFO) <<
"RpcServer: not sending the response because the connection "
134 if (request.clientFin() || !ok) {
135 pending_calls_.erase(it);
Abstract interface for message exchange over a reliable channel.
virtual bool sendContinuation(ConnectionId connection_id, const roo::byte *header, size_t header_size, const roo::byte *payload, size_t payload_size)=0
Sends continuation payload on an existing sender-side connection.
void unsetReceiver()
Unregisters receiver.
void setReceiver(Receiver &receiver)
Registers message receiver. Call before channel initialization.
RpcServer(Messaging &messaging, const FunctionTable *function_table)
friend class RequestHandle
roo_collections::FlatSmallHashMap< RpcFunctionId, RpcHandlerFn > FunctionTable
std::function< void(RequestHandle handle, const roo::byte *payload, size_t payload_size, bool fin)> RpcHandlerFn