roo_transport
API Documentation for roo_transport
Loading...
Searching...
No Matches
packet_receiver_over_stream.cpp
Go to the documentation of this file.
2
3#include <algorithm>
4
5#include "roo_backport.h"
6#include "roo_backport/byte.h"
7#include "roo_collections.h"
8#include "roo_collections/hash.h"
9#include "roo_io.h"
10#include "roo_io/memory/load.h"
11#include "roo_io/third_party/nanocobs/cobs.h"
12#include "roo_logging.h"
14
15namespace roo_transport {
16
18 : in_(in),
19 buf_(new roo::byte[256]),
20 tmp_(new roo::byte[256]),
21 pos_(0),
22 bytes_received_(0),
23 bytes_accepted_(0) {}
24
26 while (true) {
27 size_t len = in_.read(tmp_.get(), 256);
28 if (len == 0) return 0;
29 size_t packets = processIncoming(len, receiver_fn);
30 if (packets > 0) return packets;
31 }
32}
33
35 size_t len = in_.tryRead(tmp_.get(), 256);
36 return processIncoming(len, receiver_fn);
37}
38
39size_t PacketReceiverOverStream::processIncoming(
40 size_t len, const ReceiverFn& receiver_fn) {
41 bytes_received_ += len;
42 size_t received = 0;
43 roo::byte* data = &tmp_[0];
44 while (len > 0) {
45 // Find the possible packet delimiter (zero byte).
46 const roo::byte* delim = std::find(data, data + len, roo::byte{0});
47 size_t increment = delim - data;
48 bool finished = (increment < len);
49 if (finished) {
50 ++increment;
51 if (pos_ + increment <= 256 && pos_ + increment >= 6) {
52 // Packet is of an acceptable size.
53 if (pos_ == 0) {
54 // Fast path: the entire packet fits within the buffer, and we have no
55 // partial packet pending. Process it directly from the temporary
56 // buffer.
57 received += (processPacket(data, increment, receiver_fn) ? 1 : 0);
58 } else {
59 memcpy(&buf_[pos_], data, increment);
60 received +=
61 (processPacket(buf_.get(), pos_ + increment, receiver_fn) ? 1
62 : 0);
63 }
64 }
65 pos_ = 0;
66 } else {
67 if (pos_ + increment < 256) {
68 memcpy(&buf_[pos_], data, increment);
69 pos_ += increment;
70 } else {
71 pos_ = 256;
72 }
73 }
74 data += increment;
75 len -= increment;
76
77 // Alternative implementation:
78 // if (pos_ + increment <= 256 && pos_ + increment >= 6) {
79 // // Packet is of an acceptable size.
80 // memcpy(&buf_[pos_], data, increment);
81 // if (finished) {
82 // buf_[pos_ + increment] = 0;
83 // processPacket(buf_.get(), pos_ + increment + 1);
84 // pos_ = 0;
85 // // Skip the zero byte itself.
86 // increment++;
87 // } else {
88 // pos_ += increment;
89 // }
90 // } else {
91 // // Ignore all bytes up to the next packet.
92 // if (finished) {
93 // pos_ = 0;
94 // increment++;
95 // } else {
96 // pos_ = 256;
97 // }
98 // }
99 // data += increment;
100 // len -= increment;
101 }
102 return received;
103}
104
105bool PacketReceiverOverStream::processPacket(roo::byte* buf, size_t size,
106 const ReceiverFn& receiver_fn) {
107 if (cobs_decode_tinyframe(buf, size) != COBS_RET_SUCCESS) {
108 // Invalid payload (COBS decoding failed). Dropping packet.
109 return false;
110 }
111 {
112 // Verify the checksum.
113 uint32_t computed_hash =
114 roo_collections::murmur3_32(&buf[1], size - 6, kPacketOverStreamSeed);
115 uint32_t received_hash = roo_io::LoadBeU32(&buf[size - 5]);
116 if (computed_hash != received_hash) {
117 // Invalid checksum. Dropping packet.
118 return false;
119 }
120 bytes_accepted_ += size;
121 }
122 if (receiver_fn != nullptr) receiver_fn(&buf[1], size - 6);
123 return true;
124}
125
126} // namespace roo_transport
size_t receive(const ReceiverFn &receiver_fn) override
Receives packets, blocking as needed until at least one packet is delivered, or until stream end/erro...
PacketReceiverOverStream(roo_io::InputStream &in)
Creates a receiver reading framed bytes from in.
size_t tryReceive(const ReceiverFn &receiver_fn) override
Receives currently available packets without indefinite blocking.
std::function< void(const roo::byte *, size_t)> ReceiverFn
Callback invoked for each received packet.
static const uint32_t kPacketOverStreamSeed
Definition seed.h:9