roo_monitoring
API Documentation for roo_monitoring
Loading...
Searching...
No Matches
compaction.cpp
Go to the documentation of this file.
1#include "compaction.h"
2
3#include "common.h"
4#include "roo_io/data/output_stream_writer.h"
5#include "roo_io/fs/fsutil.h"
6#include "roo_logging.h"
7
8#ifndef MLOG_roo_monitoring_compaction
9#define MLOG_roo_monitoring_compaction 0
10#endif
11
12namespace roo_monitoring {
13
15 data_.clear();
16 index_.clear();
17}
18
19void Aggregator::add(const Sample& input) {
20 int idx;
21 auto pos = index_.find(input.stream_id());
22 if (pos == index_.end()) {
23 idx = data_.size();
24 data_.emplace_back();
25 index_.insert(std::make_pair(input.stream_id(), idx));
26 } else {
27 idx = pos->second;
28 }
29 SampleAggregator& output = data_[idx];
30 output.weighted_total += (input.avg_value() * input.fill());
31 output.weight += input.fill();
32 if (output.min_value > input.min_value()) {
33 output.min_value = input.min_value();
34 }
35 if (output.max_value < input.max_value()) {
36 output.max_value = input.max_value();
37 }
38}
39
41 : collection_(collection), ref_(ref), write_index_(0) {}
42
43roo_io::Status VaultWriter::openNew() {
44 String path;
45 collection_->getVaultFilePath(ref_, &path);
46 roo_io::Mount fs = collection_->fs().mount();
47 if (!fs.ok()) return fs.status();
48 roo_io::Status result = roo_io::MkParentDirRecursively(fs, path.c_str());
49 if (result != roo_io::kOk && result != roo_io::kDirectoryExists) {
50 return result;
51 }
52 MLOG(roo_monitoring_compaction)
53 << "Opening a new vault file " << path.c_str() << " for write";
54 writer_.reset(fs.fopenForWrite(path.c_str(), roo_io::kTruncateIfExists));
55 write_index_ = 0;
56 writeHeader();
57 if (!writer_.ok()) {
58 LOG(ERROR) << "Failed to open vault file " << path.c_str()
59 << " for write: " << roo_io::StatusAsString(writer_.status());
60 }
61 return writer_.status();
62}
63
64roo_io::Status VaultWriter::openExisting(int write_index) {
65 CHECK_GE(write_index, 0);
67 String path;
68 collection_->getVaultFilePath(ref_, &path);
69 MLOG(roo_monitoring_compaction)
70 << "Opening an existing vault file " << path.c_str() << " for append";
71 roo_io::Mount fs = collection_->fs().mount();
72 if (!fs.ok()) return fs.status();
73 writer_.reset(fs.fopenForWrite(path.c_str(), roo_io::kAppendIfExists));
74 write_index_ = write_index;
75 if (!writer_.ok()) {
76 LOG(ERROR) << "Failed to open vault file " << path.c_str()
77 << " for append: " << roo_io::StatusAsString(writer_.status());
78 }
79 return writer_.status();
80}
81
83 CHECK_LE(write_index_, kRangeElementCount);
84 writer_.writeVarU64(0);
85 if (!writer_.ok()) {
86 LOG(ERROR) << "Failed to write empty data at index " << write_index_ << ": "
87 << roo_io::StatusAsString(writer_.status());
88 }
89 ++write_index_;
90}
91
92void VaultWriter::writeLogData(const std::vector<LogSample>& data) {
93 CHECK_LE(write_index_, kRangeElementCount);
94 writer_.writeVarU64(data.size());
95 for (const auto& sample : data) {
96 writer_.writeVarU64(sample.stream_id());
97 // Write the 'average'
98 writer_.writeBeU16(sample.value());
99 // Write the 'min'
100 writer_.writeBeU16(sample.value());
101 // Write the 'max'
102 writer_.writeBeU16(sample.value());
103 // Write the 'fill ratio'
104 writer_.writeBeU16(0x2000);
105 }
106 if (!writer_.ok()) {
107 LOG(ERROR) << "Failed to write real data (" << data.size() << ") at index "
108 << write_index_ << ": "
109 << roo_io::StatusAsString(writer_.status());
110 }
111 ++write_index_;
112}
113
115 CHECK_LE(write_index_, kRangeElementCount);
116 writer_.writeVarU64(data.data_.size());
117 for (const auto& entry : data.index_) {
118 const Aggregator::SampleAggregator& sample = data.data_[entry.second];
119 // uint16_t fill = sample.weight / 4;
120 writer_.writeVarU64(entry.first);
121 // Write the 'average'
122 writer_.writeBeU16(sample.weight > 0 ? sample.weighted_total / sample.weight
123 : 0);
124 // Write the 'min'
125 writer_.writeBeU16(sample.min_value);
126 // Write the 'max'
127 writer_.writeBeU16(sample.max_value);
128 // Write the 'fill ratio'
129 writer_.writeBeU16(sample.weight / 4); // Can become zero.
130 if (!writer_.ok()) {
131 LOG(ERROR) << "Failed to write aggregated data (" << data.data_.size()
132 << ") at index " << write_index_ << ": "
133 << roo_io::Status(writer_.status());
134 return;
135 }
136 }
137 ++write_index_;
138}
139
140void VaultWriter::writeHeader() {
141 CHECK_EQ(0, write_index_);
142 writer_.writeU8(0x01);
143 writer_.writeU8(0x01);
144}
145
146} // namespace roo_monitoring
Aggregates samples for a vault file time bucket.
Definition compaction.h:14
void clear()
Clears any accumulated data.
void add(const Sample &sample)
Adds a sample into the aggregation state.
Collection of timeseries sharing transform and source resolution.
roo_io::Filesystem & fs() const
void getVaultFilePath(const VaultFileRef &ref, String *path) const
Represents a single data sample stored in a vault file.
Definition sample.h:6
uint16_t min_value() const
Returns the minimum value for the bucket.
Definition sample.h:22
uint16_t avg_value() const
Returns the average value for the bucket.
Definition sample.h:20
uint16_t max_value() const
Returns the maximum value for the bucket.
Definition sample.h:24
uint16_t fill() const
Returns the fill value (0x2000 == 100%).
Definition sample.h:26
uint64_t stream_id() const
Returns the stream identifier.
Definition sample.h:18
Identifies a specific file in the monitoring vault.
Definition vault.h:16
int write_index() const
Returns the current write index within the vault file.
Definition compaction.h:56
roo_io::Status openExisting(int write_index)
Opens an existing vault file, seeking to the specified entry index.
VaultWriter(Collection *collection, VaultFileRef ref)
Creates a writer for the given collection and vault file.
void writeAggregatedData(const Aggregator &aggregator)
Writes aggregated samples into the vault file.
void writeEmptyData()
Writes an empty vault file payload.
roo_io::Status openNew()
Opens a new vault file for writing.
void writeLogData(const std::vector< LogSample > &data)
Writes raw log samples into the vault file.
Umbrella header for the roo_monitoring module.
static const int kRangeElementCount
Number of items in a range (4^(kRangeLength)).
Definition common.h:22