9#include "roo_io/data/multipass_input_stream_reader.h"
10#include "roo_logging.h"
13#ifndef MLOG_roo_monitoring_vault_reader
14#define MLOG_roo_monitoring_vault_reader 0
21bool read_header(roo_io::MultipassInputStreamReader& is) {
22 uint8_t major = is.readU8();
23 uint8_t minor = is.readU8();
25 LOG(ERROR) <<
"Failed to read vault file header: "
26 << roo_io::StatusAsString(is.status());
29 if (major != 1 || minor != 1) {
30 LOG(ERROR) <<
"Invalid content of vault file header: " << major <<
", "
37roo_io::Status read_data(roo_io::MultipassInputStreamReader& is,
38 std::vector<Sample>* data,
bool ignore_fill) {
40 uint64_t sample_count = roo_io::ReadVarU64(is);
42 if (is.status() != roo_io::kEndOfStream) {
43 LOG(ERROR) <<
"Failed to read data from the vault file: "
44 << roo_io::StatusAsString(is.status());
48 for (uint64_t i = 0; i < sample_count; ++i) {
49 uint64_t stream_id = is.readVarU64();
50 uint16_t avg = is.readBeU16();
51 uint16_t min = is.readBeU16();
52 uint16_t max = is.readBeU16();
53 uint16_t fill = is.readBeU16();
58 LOG(ERROR) <<
"Failed to read a sample from the vault file: "
59 << roo_io::StatusAsString(is.status());
62 data->emplace_back(stream_id, avg, min, max, fill);
70 : collection_(collection),
82 fs_ = collection_->
fs().mount();
86 reader_.reset(fs_.fopen(path.c_str()));
89 if (!reader_.isOpen()) {
90 if (reader_.status() == roo_io::kNotFound) {
91 MLOG(roo_monitoring_vault_reader)
92 <<
"Vault file " << path.c_str()
93 <<
" doesn't exist; treating as-if empty";
95 LOG(ERROR) <<
"Failed to open vault file for read: " << path.c_str()
96 <<
": " << roo_io::StatusAsString(reader_.status());
101 if (!read_header(reader_)) {
105 position_ = reader_.position();
106 }
else if (offset < 0) {
107 LOG(ERROR) <<
"Invalid offset: " << offset;
110 reader_.seek(offset);
111 if (reader_.status() != roo_io::kOk) {
112 LOG(ERROR) <<
"Error seeking in the vault file " << path.c_str() <<
": "
113 << roo_io::StatusAsString(reader_.status());
118 MLOG(roo_monitoring_vault_reader)
119 <<
"Vault file " << path.c_str() <<
" opened for read at index " << index_
120 <<
" and position " << offset;
121 return reader_.status() == roo_io::kOk;
133 LOG(FATAL) <<
"Attempt to read a position in a file that has been fully "
134 "read and is now closed.";
135 }
else if (reader_.ok()) {
136 position_ = reader_.position();
137 }
else if (reader_.status() == roo_io::kClosed) {
138 LOG(FATAL) <<
"Attempt to read a position in a file that has been "
139 "unexpectedly closed at index "
156 if (read_data(reader_, sample, ignore_fill) == roo_io::kOk) {
159 MLOG(roo_monitoring_vault_reader)
160 <<
"End of file reached after successfully scanning the entire "
162 position_ = reader_.position();
167 if (reader_.status() == roo_io::kEndOfStream) {
168 MLOG(roo_monitoring_vault_reader)
169 <<
"End of file reached prematurely, while reading data at index "
173 position_ = reader_.position();
174 LOG(ERROR) <<
"Error reading data at index " << index_;
183 if (skip <= 0)
return;
185 MLOG(roo_monitoring_vault_reader) <<
"Skipping " << skip <<
" steps";
192 std::vector<Sample> ignored;
193 for (; !
past_eof() && skip > 0; --skip) {
205 os <<
"[" << file_ref.
resolution() <<
", " << roo_logging::hex
Collection of timeseries sharing transform and source resolution.
roo_io::Filesystem & fs() const
void getVaultFilePath(const VaultFileRef &ref, String *path) const
Cursor used when seeking through multiple log files.
VaultFileReader(const Collection *collection)
Creates a reader bound to the specified collection.
void seekForward(int64_t timestamp)
Advances the cursor to the first entry at or after the timestamp.
LogCursor tell()
Returns the current log cursor.
int index() const
Returns the current entry index.
bool past_eof() const
Returns true if the reader has passed the end of file.
bool next(std::vector< Sample > *sample)
Reads the next entry and fills the sample vector.
bool open(const VaultFileRef &ref, int index, int64_t offset)
Opens the file and seeks to the specified index and byte offset.
const VaultFileRef & vault_ref() const
Returns the vault file reference for this reader.
Identifies a specific file in the monitoring vault.
int64_t timestamp() const
Returns the start timestamp for this vault file.
int64_t time_span() const
Returns the total time span covered by the file.
Resolution resolution() const
Returns the resolution for this vault file.
int64_t time_step() const
Returns the time step between entries.
Umbrella header for the roo_monitoring module.
static const int kRangeElementCount
Number of items in a range (4^(kRangeLength)).
roo_logging::Stream & operator<<(roo_logging::Stream &os, const VaultFileRef &file_ref)
Writes a human-readable representation of the vault file reference.