7#include "roo_io/data/input_stream_reader.h"
8#include "roo_io/data/output_stream_writer.h"
9#include "roo_io/fs/fsutil.h"
10#include "roo_logging.h"
14const char* GetVfsRoot();
17#ifndef MLOG_roo_monitoring_compaction
18#define MLOG_roo_monitoring_compaction 0
21#ifndef MLOG_roo_monitoring_vault_reader
22#define MLOG_roo_monitoring_vault_reader 0
31 resolution_(resolution),
32 transform_(
Transform::Linear(256, 0x8000)) {
39 : collection_(collection),
41 cache_(collection->fs(), log_dir_.c_str()),
42 writer_(collection->fs(), log_dir_.c_str(), cache_,
43 collection->resolution()),
44 io_state_(
Writer::IOSTATE_OK),
45 compaction_head_index_end_(0),
47 flush_in_progress_(false) {}
50 : transform_(&writer->collection_->transform()),
51 writer_(&writer->writer_) {}
62 uint16_t transformed = transform_->
apply(datum);
63 writer_->
write(ts_rounded, stream_id, transformed);
80 uint8_t target_datum_index_;
85String getLogCompactionCursorPath(
const Collection* collection,
86 const VaultFileRef& ref) {
87 String cursor_file_path;
88 collection->getVaultFilePath(ref, &cursor_file_path);
89 cursor_file_path +=
".cursor";
90 return cursor_file_path;
93roo_io::Status tryReadLogCompactionCursor(roo_io::Mount& fs,
94 const char* cursor_path,
95 LogCompactionCursor* result) {
96 auto reader = roo_io::OpenDataFile(fs, cursor_path);
98 if (reader.status() != roo_io::kNotFound) {
99 LOG(ERROR) <<
"Failed to open cursor file " << cursor_path <<
": "
100 << roo_io::StatusAsString(reader.status());
102 return reader.status();
105 uint8_t target_datum_index = reader.readU8();
106 uint64_t source_file = reader.readVarU64();
107 uint64_t source_checkpoint = reader.readVarU64();
109 LOG(ERROR) <<
"Error reading data from the cursor file: "
110 << roo_io::StatusAsString(reader.status())
111 <<
". Will ignore the cursor.";
112 return reader.status();
114 *result = LogCompactionCursor(LogCursor(source_file, source_checkpoint),
116 MLOG(roo_monitoring_compaction)
117 <<
"Successfully read the cursor content " << cursor_path <<
": "
118 << roo_logging::hex << source_file << roo_logging::dec <<
", "
119 << source_checkpoint <<
", " << (int)target_datum_index;
123bool writeCursor(roo_io::Mount& fs,
const char* cursor_path,
124 const LogCompactionCursor cursor) {
125 auto writer = OpenDataFileForWrite(fs, cursor_path, roo_io::kFailIfExists);
127 LOG(ERROR) <<
"Error opening the cursor file " << cursor_path
128 <<
"for write: " << roo_io::StatusAsString(writer.status());
130 MLOG(roo_monitoring_compaction)
131 <<
"Writing cursor content " << cursor_path <<
": " << roo_logging::hex
132 << cursor.log_cursor().file() << roo_logging::dec <<
", "
133 << cursor.log_cursor().position() <<
", "
134 << (int)cursor.target_datum_index();
136 writer.writeU8(cursor.target_datum_index());
137 writer.writeVarU64(cursor.log_cursor().file());
138 CHECK_GE(cursor.log_cursor().position(), 0);
139 writer.writeVarU64(cursor.log_cursor().position());
141 if (writer.status() != roo_io::kClosed) {
142 LOG(ERROR) <<
"Error writing to the cursor file " << cursor_path <<
": "
188 roo_io::Mount fs = collection_->
fs().mount();
189 if (!fs.ok())
return;
190 if (flush_in_progress_) {
191 Status status = compactVaultOneLevel();
193 flush_in_progress_ =
false;
201 compaction_head_index_end_ = writeToVault(fs, reader, compaction_head_);
204 flush_in_progress_ =
true;
207 LOG(ERROR) <<
"Vault compaction failed at resolution "
210 flush_in_progress_ =
false;
219 compaction_head_index_end_ = writeToVault(fs, reader, compaction_head_);
222 flush_in_progress_ =
true;
223 MLOG(roo_monitoring_compaction) <<
"Starting vault compaction.";
228int16_t Writer::writeToVault(roo_io::Mount& fs,
LogReader& reader,
233 String cursor_path = getLogCompactionCursorPath(collection_, ref);
235 roo_io::Status status;
237 status = tryReadLogCompactionCursor(fs, cursor_path.c_str(), &cursor);
238 if (status == roo_io::kOk) {
244 if (fs.remove(cursor_path.c_str()) != roo_io::kOk) {
250 if (status != roo_io::kNotFound) {
251 fs.remove(cursor_path.c_str());
265 writer.vault_ref().timestamp() +
268 std::vector<LogSample> data;
269 while (reader.
nextSample(×tamp, &data)) {
270 if (timestamp < current) {
274 while (current < timestamp) {
275 writer.writeEmptyData();
276 current += increment;
278 CHECK_EQ(current, timestamp);
279 writer.writeLogData(data);
280 current += increment;
289 fs, cursor_path.c_str(),
290 LogCompactionCursor(reader.
tell(), writer.write_index()))) {
296 writer.writeEmptyData();
297 current += increment;
302 int16_t compaction_index_end = writer.write_index();
304 return compaction_index_end;
308 roo_io::Mount fs = collection_->
fs().mount();
310 VaultFileRef parent = compaction_head_.
parent();
311 compaction_head_index_end_ =
313 (compaction_head_index_end_ >> 2);
314 compaction_head_ = parent;
316 MLOG(roo_monitoring_compaction) <<
"Vault compacton finished.";
319 if (compaction_head_index_end_ == 0) {
320 MLOG(roo_monitoring_compaction) <<
"Compaction index = 0";
325 CHECK_GT(compaction_head_index_end_, 0);
328 VaultWriter writer(collection_, compaction_head_);
329 VaultFileReader reader(collection_);
330 MLOG(roo_monitoring_compaction)
331 <<
"Compacting " << roo_logging::hex << writer.vault_ref()
332 <<
", with end index " << roo_logging::dec << compaction_head_index_end_;
336 getLogCompactionCursorPath(collection_, compaction_head_);
338 LogCompactionCursor cursor;
339 roo_io::Status status =
340 tryReadLogCompactionCursor(fs, cursor_path.c_str(), &cursor);
341 if (status == roo_io::kOk) {
342 reader.open(compaction_head_.
child(cursor.target_datum_index() /
345 cursor.log_cursor().position());
347 writer.openExisting(cursor.target_datum_index());
351 roo_io::Status cursor_status = fs.remove(cursor_path.c_str());
352 if (cursor_status != roo_io::kOk) {
353 LOG(ERROR) <<
"Failed to delete cursor file " << cursor_path <<
": "
354 << roo_io::StatusAsString(cursor_status);
358 }
else if (status != roo_io::kNotFound) {
359 fs.remove(cursor_path.c_str());
362 reader.open(compaction_head_.
child(0), 0, 0);
365 if (writer.write_index() >= compaction_head_index_end_) {
372 std::vector<Sample> sample_group;
373 Aggregator aggregator;
376 for (
int i = 0; i < 4; ++i) {
378 reader.next(&sample_group);
379 for (
const Sample& sample : sample_group) {
380 if (sample.fill() > 0) {
381 aggregator.add(sample);
385 writer.writeAggregatedData(aggregator);
387 if (reader.past_eof()) {
388 reader.open(reader.vault_ref().next(), 0, 0);
390 }
while (writer.write_index() < compaction_head_index_end_);
393 writeCursor(fs, cursor_path.c_str(),
394 LogCompactionCursor(reader.
tell(), writer.write_index()));
398 if (reader.status() != roo_io::kClosed) {
399 LOG(ERROR) <<
"Failed to process the input vault file: "
400 << roo_io::StatusAsString(reader.status());
403 if (writer.status() != roo_io::kClosed) {
404 LOG(ERROR) <<
"Failed to process the output vault file: "
405 << roo_io::StatusAsString(writer.status());
408 MLOG(roo_monitoring_compaction)
409 <<
"Finished compacting " << roo_logging::hex << writer.vault_ref()
410 <<
", with end index " << roo_logging::dec << writer.write_index();
442 : collection_(collection),
444 current_(collection) {
445 current_.
open(current_ref_, 0, 0);
451 current_ref_ = current_ref_.
next();
452 MLOG(roo_monitoring_vault_reader)
453 <<
"Advancing to next file: " << roo_logging::hex
455 current_.
open(current_ref_, 0, 0);
457 current_.
next(sample);
Collection of timeseries sharing transform and source resolution.
roo_io::Filesystem & fs() const
Collection(roo_io::Filesystem &fs, String name, Resolution resolution=kResolution_1024_ms)
Resolution resolution() const
const String & name() const
void getVaultFilePath(const VaultFileRef &ref, String *path) const
Helper class for generating filenames corresponding to timestamps.
const char * filename() const
Returns the generated filename as a null-terminated string.
static Filename forTimestamp(int64_t nanosSinceEpoch)
Creates a filename for the specified timestamp.
const LogCursor & log_cursor() const
LogCompactionCursor(LogCursor log_cursor, int16_t target_datum_index)
uint8_t target_datum_index() const
Cursor used when seeking through multiple log files.
Reader that walks across a sequence of log files.
LogCursor tell()
Returns the current cursor.
bool seek(LogCursor cursor)
Seeks to the specified cursor.
int64_t range_floor() const
Returns the lower bound of the current range.
bool nextSample(int64_t *timestamp, std::vector< LogSample > *data)
Reads the next sample in the current range.
bool isHotRange()
Returns true if the current range is hot (still being written).
bool nextRange()
Advances to the next time range.
void deleteRange()
Deletes the current range files.
int64_t first_timestamp() const
Returns the first timestamp recorded in the current file.
void write(int64_t timestamp, uint64_t stream_id, uint16_t datum)
Writes a single log sample.
void close()
Closes the log file.
Resolution resolution() const
Returns the resolution used for this writer.
bool can_skip_write(int64_t timestamp, uint64_t stream_id)
Returns true if a write can be skipped for this bucket.
void seekForward(int64_t timestamp)
Advances the cursor to the first entry at or after the timestamp.
int index() const
Returns the current entry index.
bool past_eof() const
Returns true if the reader has passed the end of file.
bool next(std::vector< Sample > *sample)
Reads the next entry and fills the sample vector.
bool open(const VaultFileRef &ref, int index, int64_t offset)
Opens the file and seeks to the specified index and byte offset.
Identifies a specific file in the monitoring vault.
int64_t timestamp() const
Returns the start timestamp for this vault file.
int64_t timestamp_at(int position) const
Returns the timestamp for the entry at the given position.
VaultFileRef parent() const
Returns the parent vault file at the next coarser resolution.
int sibling_index() const
Returns the index of this file within its parent range.
VaultFileRef next() const
Returns the next vault file at the same resolution.
Resolution resolution() const
Returns the resolution for this vault file.
VaultFileRef child(int index) const
Returns the child vault file at the next finer resolution.
static VaultFileRef Lookup(int64_t timestamp, Resolution resolution)
Creates a reference that encloses the timestamp at the given resolution.
int64_t cursor() const
Returns current iterator timestamp.
void next(std::vector< Sample > *sample)
Advances by one resolution step and fills sample.
VaultIterator(const Collection *collection, int64_t start, Resolution resolution)
Creates iterator over collection at resolution, starting at start.
Writes vault files for a collection at a specific resolution.
WriteTransaction(Writer *writer)
void write(int64_t timestamp, uint64_t stream_id, float data)
Write interface for a monitoring collection.
Writer(Collection *collection)
void flushAll()
Periodically flushes logged data into vault files.
Umbrella header for the roo_monitoring module.
static constexpr int64_t timestamp_ms_floor(int64_t timestamp_ms, Resolution resolution)
Rounds the timestamp down to the specified resolution bucket.
static const Resolution kMaxResolution
Maximum supported resolution.
static const int kRangeElementCount
Number of items in a range (4^(kRangeLength)).
Resolution
Time resolution used for log and vault files.
const char * kLogSubPath
Subdirectory name used for raw log files.
static const int kRangeLength
Number of base-4 digits used per range.
static constexpr int64_t timestamp_increment(int64_t steps, Resolution resolution)
Returns the timestamp delta for the given number of resolution steps.
const char * kMonitoringBasePath
Base directory for monitoring storage on the filesystem.
constexpr char toHexDigit(int d)
Converts a 0-15 value to an uppercase hex digit.
String subdir(String base, const String &sub)
Returns a path formed by joining the base directory and subdirectory.