4#include "roo_io/data/output_stream_writer.h"
5#include "roo_io/fs/fsutil.h"
6#include "roo_logging.h"
8#ifndef MLOG_roo_monitoring_compaction
9#define MLOG_roo_monitoring_compaction 0
21 auto pos = index_.find(input.
stream_id());
22 if (pos == index_.end()) {
25 index_.insert(std::make_pair(input.
stream_id(), idx));
29 SampleAggregator& output = data_[idx];
31 output.weight += input.
fill();
32 if (output.min_value > input.
min_value()) {
35 if (output.max_value < input.
max_value()) {
41 : collection_(collection), ref_(ref), write_index_(0) {}
46 roo_io::Mount fs = collection_->
fs().mount();
47 if (!fs.ok())
return fs.status();
48 roo_io::Status result = roo_io::MkParentDirRecursively(fs, path.c_str());
49 if (result != roo_io::kOk && result != roo_io::kDirectoryExists) {
52 MLOG(roo_monitoring_compaction)
53 <<
"Opening a new vault file " << path.c_str() <<
" for write";
54 writer_.reset(fs.fopenForWrite(path.c_str(), roo_io::kTruncateIfExists));
58 LOG(ERROR) <<
"Failed to open vault file " << path.c_str()
59 <<
" for write: " << roo_io::StatusAsString(writer_.status());
61 return writer_.status();
69 MLOG(roo_monitoring_compaction)
70 <<
"Opening an existing vault file " << path.c_str() <<
" for append";
71 roo_io::Mount fs = collection_->
fs().mount();
72 if (!fs.ok())
return fs.status();
73 writer_.reset(fs.fopenForWrite(path.c_str(), roo_io::kAppendIfExists));
76 LOG(ERROR) <<
"Failed to open vault file " << path.c_str()
77 <<
" for append: " << roo_io::StatusAsString(writer_.status());
79 return writer_.status();
84 writer_.writeVarU64(0);
86 LOG(ERROR) <<
"Failed to write empty data at index " << write_index_ <<
": "
87 << roo_io::StatusAsString(writer_.status());
94 writer_.writeVarU64(data.size());
95 for (
const auto& sample : data) {
96 writer_.writeVarU64(sample.stream_id());
98 writer_.writeBeU16(sample.value());
100 writer_.writeBeU16(sample.value());
102 writer_.writeBeU16(sample.value());
104 writer_.writeBeU16(0x2000);
107 LOG(ERROR) <<
"Failed to write real data (" << data.size() <<
") at index "
108 << write_index_ <<
": "
109 << roo_io::StatusAsString(writer_.status());
116 writer_.writeVarU64(data.data_.size());
117 for (
const auto& entry : data.index_) {
118 const Aggregator::SampleAggregator& sample = data.data_[entry.second];
120 writer_.writeVarU64(entry.first);
122 writer_.writeBeU16(sample.weight > 0 ? sample.weighted_total / sample.weight
125 writer_.writeBeU16(sample.min_value);
127 writer_.writeBeU16(sample.max_value);
129 writer_.writeBeU16(sample.weight / 4);
131 LOG(ERROR) <<
"Failed to write aggregated data (" << data.data_.size()
132 <<
") at index " << write_index_ <<
": "
133 << roo_io::Status(writer_.status());
140void VaultWriter::writeHeader() {
141 CHECK_EQ(0, write_index_);
142 writer_.writeU8(0x01);
143 writer_.writeU8(0x01);
Aggregates samples for a vault file time bucket.
void clear()
Clears any accumulated data.
void add(const Sample &sample)
Adds a sample into the aggregation state.
Collection of timeseries sharing transform and source resolution.
roo_io::Filesystem & fs() const
void getVaultFilePath(const VaultFileRef &ref, String *path) const
Represents a single data sample stored in a vault file.
uint16_t min_value() const
Returns the minimum value for the bucket.
uint16_t avg_value() const
Returns the average value for the bucket.
uint16_t max_value() const
Returns the maximum value for the bucket.
uint16_t fill() const
Returns the fill value (0x2000 == 100%).
uint64_t stream_id() const
Returns the stream identifier.
Identifies a specific file in the monitoring vault.
int write_index() const
Returns the current write index within the vault file.
roo_io::Status openExisting(int write_index)
Opens an existing vault file, seeking to the specified entry index.
VaultWriter(Collection *collection, VaultFileRef ref)
Creates a writer for the given collection and vault file.
void writeAggregatedData(const Aggregator &aggregator)
Writes aggregated samples into the vault file.
void writeEmptyData()
Writes an empty vault file payload.
roo_io::Status openNew()
Opens a new vault file for writing.
void writeLogData(const std::vector< LogSample > &data)
Writes raw log samples into the vault file.
Umbrella header for the roo_monitoring module.
static const int kRangeElementCount
Number of items in a range (4^(kRangeLength)).