5#include "roo_backport.h"
6#include "roo_backport/byte.h"
7#include "roo_collections.h"
8#include "roo_collections/hash.h"
10#include "roo_io/memory/load.h"
11#include "roo_io/third_party/nanocobs/cobs.h"
12#include "roo_logging.h"
19 buf_(new roo::byte[256]),
20 tmp_(new roo::byte[256]),
27 size_t len = in_.read(tmp_.get(), 256);
28 if (len == 0)
return 0;
29 size_t packets = processIncoming(len, receiver_fn);
30 if (packets > 0)
return packets;
35 size_t len = in_.tryRead(tmp_.get(), 256);
36 return processIncoming(len, receiver_fn);
39size_t PacketReceiverOverStream::processIncoming(
40 size_t len,
const ReceiverFn& receiver_fn) {
41 bytes_received_ += len;
43 roo::byte* data = &tmp_[0];
46 const roo::byte* delim = std::find(data, data + len, roo::byte{0});
47 size_t increment = delim - data;
48 bool finished = (increment < len);
51 if (pos_ + increment <= 256 && pos_ + increment >= 6) {
57 received += (processPacket(data, increment, receiver_fn) ? 1 : 0);
59 memcpy(&buf_[pos_], data, increment);
61 (processPacket(buf_.get(), pos_ + increment, receiver_fn) ? 1
67 if (pos_ + increment < 256) {
68 memcpy(&buf_[pos_], data, increment);
105bool PacketReceiverOverStream::processPacket(roo::byte* buf,
size_t size,
106 const ReceiverFn& receiver_fn) {
107 if (cobs_decode_tinyframe(buf, size) != COBS_RET_SUCCESS) {
113 uint32_t computed_hash =
115 uint32_t received_hash = roo_io::LoadBeU32(&buf[size - 5]);
116 if (computed_hash != received_hash) {
120 bytes_accepted_ += size;
122 if (receiver_fn !=
nullptr) receiver_fn(&buf[1], size - 6);
size_t receive(const ReceiverFn &receiver_fn) override
Receives packets, blocking as needed until at least one packet is delivered, or until stream end/erro...
PacketReceiverOverStream(roo_io::InputStream &in)
Creates a receiver reading framed bytes from in.
size_t tryReceive(const ReceiverFn &receiver_fn) override
Receives currently available packets without indefinite blocking.
std::function< void(const roo::byte *, size_t)> ReceiverFn
Callback invoked for each received packet.
static const uint32_t kPacketOverStreamSeed