8 : messaging_(messaging), dispatcher_(*this), next_stream_id_(1) {}
15 const roo::byte* payload,
size_t payload_size,
17 uint32_t stream_id = new_stream(std::move(cb));
22 CHECK(header_size > 0);
24 return messaging_.
send(header_bytes, header_size, payload, payload_size,
31 const roo::byte* payload,
35 uint32_t stream_id = new_stream(std::move(cb));
41 CHECK(header_size > 0);
43 return messaging_.
send(header_bytes, header_size, payload, payload_size,
50 roo::lock_guard<roo::mutex> guard(mutex_);
52 if (next_stream_id_ > 0x00FFFFFF) {
55 outgoing_calls_.insert({stream_id, std::move(cb)});
60 const roo::byte* data,
size_t len) {
62 size_t header_len = header.deserialize(data, len);
63 if (header_len == 0) {
64 LOG(WARNING) <<
"RpcClient: received invalid RPC header";
71 LOG(WARNING) <<
"RpcClient: received non-response RPC message";
77 roo::lock_guard<roo::mutex> guard(mutex_);
78 auto it = outgoing_calls_.find(header.streamId());
79 if (it == outgoing_calls_.end()) {
80 LOG(WARNING) <<
"RpcClient: received response for unknown stream ID "
84 cb = std::move(it->second);
85 if (header.isLastMessage()) {
86 outgoing_calls_.erase(it);
91 if (header.isLastMessage()) {
92 status = header.responseStatus();
95 cb(data, len, status);
99 OutgoingCalls calls_to_cancel;
101 roo::lock_guard<roo::mutex> guard(mutex_);
102 calls_to_cancel = std::move(outgoing_calls_);
103 outgoing_calls_ = OutgoingCalls();
105 for (
auto it = calls_to_cancel.begin(); it != calls_to_cancel.end(); ++it) {
Abstract interface for message exchange over a reliable channel.
void unsetReceiver()
Unregisters receiver.
virtual bool send(const roo::byte *header, size_t header_size, const roo::byte *payload, size_t payload_size, ConnectionId *connection_id)=0
Sends message with optional header and payload.
void setReceiver(Receiver &receiver)
Registers message receiver. Call before channel initialization.
RpcClient(Messaging &messaging)
RpcStatus sendUnaryRpcWithTimeout(RpcFunctionId function_id, const roo::byte *payload, size_t payload_size, uint32_t timeout_ms, UnaryCompletionCb cb)
std::function< void(const roo::byte *data, size_t data_size, RpcStatus status)> UnaryCompletionCb
RpcStatus sendUnaryRpc(RpcFunctionId function_id, const roo::byte *payload, size_t payload_size, UnaryCompletionCb cb)