roo_monitoring
API Documentation for roo_monitoring
Loading...
Searching...
No Matches
monitoring.cpp
Go to the documentation of this file.
1
2#include <map>
3
4#include "common.h"
5#include "compaction.h"
6#include "log.h"
7#include "roo_io/data/input_stream_reader.h"
8#include "roo_io/data/output_stream_writer.h"
9#include "roo_io/fs/fsutil.h"
10#include "roo_logging.h"
11#include "roo_monitoring.h"
12
13#ifdef ROO_TESTING
14const char* GetVfsRoot();
15#endif
16
17#ifndef MLOG_roo_monitoring_compaction
18#define MLOG_roo_monitoring_compaction 0
19#endif
20
21#ifndef MLOG_roo_monitoring_vault_reader
22#define MLOG_roo_monitoring_vault_reader 0
23#endif
24
25namespace roo_monitoring {
26
27Collection::Collection(roo_io::Filesystem& fs, String name,
28 Resolution resolution)
29 : fs_(fs),
30 name_(name),
31 resolution_(resolution),
32 transform_(Transform::Linear(256, 0x8000)) {
33 base_dir_ = kMonitoringBasePath;
34 base_dir_ += "/";
35 base_dir_ += name;
36}
37
39 : collection_(collection),
40 log_dir_(subdir(collection->base_dir_, kLogSubPath)),
41 cache_(collection->fs(), log_dir_.c_str()),
42 writer_(collection->fs(), log_dir_.c_str(), cache_,
43 collection->resolution()),
44 io_state_(Writer::IOSTATE_OK),
45 compaction_head_index_end_(0),
46 is_hot_range_(false),
47 flush_in_progress_(false) {}
48
50 : transform_(&writer->collection_->transform()),
51 writer_(&writer->writer_) {}
52
54
55void WriteTransaction::write(int64_t timestamp_ms, uint64_t stream_id,
56 float datum) {
57 int64_t ts_rounded = timestamp_ms_floor(timestamp_ms, writer_->resolution());
58 if (writer_->can_skip_write(ts_rounded, stream_id)) {
59 // Fast path: already written data for this bucket.
60 return;
61 }
62 uint16_t transformed = transform_->apply(datum);
63 writer_->write(ts_rounded, stream_id, transformed);
64}
65
67 public:
68 LogCompactionCursor() : log_cursor_(), target_datum_index_(0) {}
70 : log_cursor_(log_cursor), target_datum_index_(target_datum_index) {
71 CHECK_GE(target_datum_index, 0);
72 CHECK_LE(target_datum_index, 255);
73 }
74
75 const LogCursor& log_cursor() const { return log_cursor_; }
76 uint8_t target_datum_index() const { return target_datum_index_; }
77
78 private:
79 LogCursor log_cursor_;
80 uint8_t target_datum_index_;
81};
82
83namespace {
84
85String getLogCompactionCursorPath(const Collection* collection,
86 const VaultFileRef& ref) {
87 String cursor_file_path;
88 collection->getVaultFilePath(ref, &cursor_file_path);
89 cursor_file_path += ".cursor";
90 return cursor_file_path;
91}
92
93roo_io::Status tryReadLogCompactionCursor(roo_io::Mount& fs,
94 const char* cursor_path,
95 LogCompactionCursor* result) {
96 auto reader = roo_io::OpenDataFile(fs, cursor_path);
97 if (!reader.ok()) {
98 if (reader.status() != roo_io::kNotFound) {
99 LOG(ERROR) << "Failed to open cursor file " << cursor_path << ": "
100 << roo_io::StatusAsString(reader.status());
101 }
102 return reader.status();
103 }
104 // Maybe can append.
105 uint8_t target_datum_index = reader.readU8();
106 uint64_t source_file = reader.readVarU64();
107 uint64_t source_checkpoint = reader.readVarU64();
108 if (!reader.ok()) {
109 LOG(ERROR) << "Error reading data from the cursor file: "
110 << roo_io::StatusAsString(reader.status())
111 << ". Will ignore the cursor.";
112 return reader.status();
113 }
114 *result = LogCompactionCursor(LogCursor(source_file, source_checkpoint),
115 target_datum_index);
116 MLOG(roo_monitoring_compaction)
117 << "Successfully read the cursor content " << cursor_path << ": "
118 << roo_logging::hex << source_file << roo_logging::dec << ", "
119 << source_checkpoint << ", " << (int)target_datum_index;
120 return roo_io::kOk;
121}
122
123bool writeCursor(roo_io::Mount& fs, const char* cursor_path,
124 const LogCompactionCursor cursor) {
125 auto writer = OpenDataFileForWrite(fs, cursor_path, roo_io::kFailIfExists);
126 if (!writer.ok()) {
127 LOG(ERROR) << "Error opening the cursor file " << cursor_path
128 << "for write: " << roo_io::StatusAsString(writer.status());
129 }
130 MLOG(roo_monitoring_compaction)
131 << "Writing cursor content " << cursor_path << ": " << roo_logging::hex
132 << cursor.log_cursor().file() << roo_logging::dec << ", "
133 << cursor.log_cursor().position() << ", "
134 << (int)cursor.target_datum_index();
135
136 writer.writeU8(cursor.target_datum_index());
137 writer.writeVarU64(cursor.log_cursor().file());
138 CHECK_GE(cursor.log_cursor().position(), 0);
139 writer.writeVarU64(cursor.log_cursor().position());
140 writer.close();
141 if (writer.status() != roo_io::kClosed) {
142 LOG(ERROR) << "Error writing to the cursor file " << cursor_path << ": "
143 << strerror(errno);
144 return false;
145 }
146 return true;
147}
148
149} // namespace
150
151// Vault files form a hierarchy. Four vault files from a lower level cover the
152// same time span as a single vault file of a higher level, but with 4x time
153// resolution.
154//
155// Vault files are progressively compacted. Naively, when 4 lower-level vault
156// files are finished, they can be compacted to a single new higher-level vault
157// file.
158
159// In order to support more incremental compaction, we use a notion of 'hot'
160// vault files, which are only partially filled. Every time 4 new entries are
161// added to the lower-level 'hot' vault file, these new entries can be compacted
162// into one new entry in the higher level 'hot' vault file. In order to support
163// that, hot files are accompanied by 'compaction cursor' files. A compaction
164// cursor file has the following format:
165//
166// * target datum index (uint8): the current count of entries in the
167// higher-level vault file. Always within [0 - 255].
168// * source file (varint): the start_timestamp (thus filename) of the
169// lower-level hot file that is being compacted.
170// * source checkpoint (varint): byte offset in the lower level file up
171// to which the data has already been compacted.
172//
173// The compaction algorithm tries to pick up where it left off, by looking for
174// the cursor file and seeking in both the source and the destination files. If
175// the cursor file is missing or malformed, the compaction is simply done from
176// scratch (i.e. the destination file is rebuild rather than appended to). After
177// the compaction, if the destination file is still hot (i.e. has less than 256
178// entries), a new cursor file is created to be used for the next compaction
179// run.
180
182 while (flush_in_progress_) flushSome();
183 flushSome();
184 while (flush_in_progress_) flushSome();
185}
186
188 roo_io::Mount fs = collection_->fs().mount();
189 if (!fs.ok()) return;
190 if (flush_in_progress_) {
191 Status status = compactVaultOneLevel();
192 if (status == Writer::OK) {
193 flush_in_progress_ = false;
194 // We're done compacting. Check if there is more to read?
195 LogReader reader(fs, log_dir_.c_str(), cache_, collection_->resolution(),
196 writer_.first_timestamp());
197 if (reader.nextRange() && !reader.isHotRange()) {
198 // Has some historic range; let's continue compacting.
199 compaction_head_ = VaultFileRef::Lookup(reader.range_floor(),
200 collection_->resolution());
201 compaction_head_index_end_ = writeToVault(fs, reader, compaction_head_);
202 is_hot_range_ = reader.isHotRange();
203 if (io_state() != IOSTATE_OK) return;
204 flush_in_progress_ = true;
205 }
206 } else if (status == Writer::FAILED) {
207 LOG(ERROR) << "Vault compaction failed at resolution "
208 << compaction_head_.resolution();
209 io_state_ = IOSTATE_ERROR;
210 flush_in_progress_ = false;
211 }
212 } else {
213 // flush not in progress.
214 LogReader reader(fs, log_dir_.c_str(), cache_, collection_->resolution(),
215 writer_.first_timestamp());
216 if (reader.nextRange()) {
217 compaction_head_ =
218 VaultFileRef::Lookup(reader.range_floor(), collection_->resolution());
219 compaction_head_index_end_ = writeToVault(fs, reader, compaction_head_);
220 is_hot_range_ = reader.isHotRange();
221 if (io_state() != IOSTATE_OK) return;
222 flush_in_progress_ = true;
223 MLOG(roo_monitoring_compaction) << "Starting vault compaction.";
224 }
225 }
226}
227
228int16_t Writer::writeToVault(roo_io::Mount& fs, LogReader& reader,
229 VaultFileRef ref) {
230 VaultWriter writer(collection_, ref);
231
232 // See if we can use cursor.
233 String cursor_path = getLogCompactionCursorPath(collection_, ref);
234 LogCompactionCursor cursor;
235 roo_io::Status status;
236 bool opened = false;
237 status = tryReadLogCompactionCursor(fs, cursor_path.c_str(), &cursor);
238 if (status == roo_io::kOk) {
239 if (reader.seek(cursor.log_cursor())) {
240 writer.openExisting(cursor.target_datum_index());
241 if (writer.ok()) {
242 opened = true;
243 }
244 if (fs.remove(cursor_path.c_str()) != roo_io::kOk) {
245 io_state_ = IOSTATE_ERROR;
246 return -1;
247 }
248 }
249 }
250 if (status != roo_io::kNotFound) {
251 fs.remove(cursor_path.c_str());
252 }
253 if (!opened) {
254 // Cursor not found.
255 writer.openNew();
256 if (!writer.ok()) {
257 io_state_ = IOSTATE_ERROR;
258 return -1;
259 }
260 }
261
262 // In any case, now just iterate and compact.
263 int64_t increment = timestamp_increment(1, collection_->resolution());
264 int64_t current =
265 writer.vault_ref().timestamp() +
266 timestamp_increment(writer.write_index(), collection_->resolution());
267 int64_t timestamp;
268 std::vector<LogSample> data;
269 while (reader.nextSample(&timestamp, &data)) {
270 if (timestamp < current) {
271 // Ignoring out-of-order log entries.
272 continue;
273 }
274 while (current < timestamp) {
275 writer.writeEmptyData();
276 current += increment;
277 }
278 CHECK_EQ(current, timestamp);
279 writer.writeLogData(data);
280 current += increment;
281 }
282 if (!writer.ok()) {
283 io_state_ = IOSTATE_ERROR;
284 return -1;
285 }
286
287 if (reader.isHotRange()) {
288 if (!writeCursor(
289 fs, cursor_path.c_str(),
290 LogCompactionCursor(reader.tell(), writer.write_index()))) {
291 io_state_ = IOSTATE_ERROR;
292 return -1;
293 }
294 } else {
295 while (writer.write_index() < kRangeElementCount) {
296 writer.writeEmptyData();
297 current += increment;
298 }
299 reader.deleteRange();
300 }
301 // compaction_range.index_begin = compaction_index_begin;
302 int16_t compaction_index_end = writer.write_index();
303 writer.close();
304 return compaction_index_end;
305}
306
307Writer::Status Writer::compactVaultOneLevel() {
308 roo_io::Mount fs = collection_->fs().mount();
309 if (!fs.ok()) return Writer::FAILED;
310 VaultFileRef parent = compaction_head_.parent();
311 compaction_head_index_end_ =
312 (kRangeElementCount / 4) * compaction_head_.sibling_index() +
313 (compaction_head_index_end_ >> 2);
314 compaction_head_ = parent;
315 if (compaction_head_.resolution() > kMaxResolution) {
316 MLOG(roo_monitoring_compaction) << "Vault compacton finished.";
317 return Writer::OK;
318 }
319 if (compaction_head_index_end_ == 0) {
320 MLOG(roo_monitoring_compaction) << "Compaction index = 0";
321 // We're definitely done compacting.
322 return Writer::OK;
323 }
324 CHECK_LE(compaction_head_index_end_, kRangeElementCount);
325 CHECK_GT(compaction_head_index_end_, 0);
326 is_hot_range_ |= (compaction_head_.sibling_index() < 3);
327
328 VaultWriter writer(collection_, compaction_head_);
329 VaultFileReader reader(collection_);
330 MLOG(roo_monitoring_compaction)
331 << "Compacting " << roo_logging::hex << writer.vault_ref()
332 << ", with end index " << roo_logging::dec << compaction_head_index_end_;
333
334 // See if we can use a cursor file.
335 String cursor_path =
336 getLogCompactionCursorPath(collection_, compaction_head_);
337 bool opened = false;
338 LogCompactionCursor cursor;
339 roo_io::Status status =
340 tryReadLogCompactionCursor(fs, cursor_path.c_str(), &cursor);
341 if (status == roo_io::kOk) {
342 reader.open(compaction_head_.child(cursor.target_datum_index() /
343 (kRangeElementCount / 4)),
344 (cursor.target_datum_index() % (kRangeElementCount / 4)) << 2,
345 cursor.log_cursor().position());
346 if (reader.ok()) {
347 writer.openExisting(cursor.target_datum_index());
348 if (writer.ok()) {
349 opened = true;
350 }
351 roo_io::Status cursor_status = fs.remove(cursor_path.c_str());
352 if (cursor_status != roo_io::kOk) {
353 LOG(ERROR) << "Failed to delete cursor file " << cursor_path << ": "
354 << roo_io::StatusAsString(cursor_status);
355 return Writer::FAILED;
356 }
357 }
358 } else if (status != roo_io::kNotFound) {
359 fs.remove(cursor_path.c_str());
360 }
361 if (!opened) {
362 reader.open(compaction_head_.child(0), 0, 0);
363 writer.openNew();
364 }
365 if (writer.write_index() >= compaction_head_index_end_) {
366 // The vault already has data past the current index. We will not be
367 // overwriting it. Nothing more to do.
368 return Writer::OK;
369 }
370
371 // Now iterate and compact.
372 std::vector<Sample> sample_group;
373 Aggregator aggregator;
374 do {
375 CHECK_LE(reader.index(), kRangeElementCount - 4);
376 for (int i = 0; i < 4; ++i) {
377 // Ignore missing input files when compacting.
378 reader.next(&sample_group);
379 for (const Sample& sample : sample_group) {
380 if (sample.fill() > 0) {
381 aggregator.add(sample);
382 }
383 }
384 }
385 writer.writeAggregatedData(aggregator);
386 aggregator.clear();
387 if (reader.past_eof()) {
388 reader.open(reader.vault_ref().next(), 0, 0);
389 }
390 } while (writer.write_index() < compaction_head_index_end_);
391 if (writer.write_index() > 0 && writer.write_index() < kRangeElementCount) {
392 // The vault file is unfinished; create a write cursor for it.
393 writeCursor(fs, cursor_path.c_str(),
394 LogCompactionCursor(reader.tell(), writer.write_index()));
395 }
396 reader.close();
397 writer.close();
398 if (reader.status() != roo_io::kClosed) {
399 LOG(ERROR) << "Failed to process the input vault file: "
400 << roo_io::StatusAsString(reader.status());
401 return Writer::FAILED;
402 }
403 if (writer.status() != roo_io::kClosed) {
404 LOG(ERROR) << "Failed to process the output vault file: "
405 << roo_io::StatusAsString(writer.status());
406 return Writer::FAILED;
407 }
408 MLOG(roo_monitoring_compaction)
409 << "Finished compacting " << roo_logging::hex << writer.vault_ref()
410 << ", with end index " << roo_logging::dec << writer.write_index();
411 return Writer::IN_PROGRESS;
412}
413
419
420void Collection::getVaultFilePath(const VaultFileRef& ref, String* path) const {
421 // Introduce a 2nd level directory structure with max 256 (4^4) files.
422 // Each file covers 256 (4 ^ range length) time steps, and each time step
423 // covers 4^resolution milliseconds.
424 Resolution group_range_resolution =
426 Filename filename = Filename::forTimestamp(ref.timestamp());
428 timestamp_ms_floor(ref.timestamp(), group_range_resolution));
429 *path = base_dir_;
430 *path += "/";
431 *path += "vault-";
432 *path += toHexDigit((ref.resolution() >> 4) & 0xF);
433 *path += toHexDigit((ref.resolution() >> 0) & 0xF);
434 *path += "/";
435 *path += dirname.filename();
436 *path += "/";
437 *path += filename.filename();
438}
439
440VaultIterator::VaultIterator(const Collection* collection, int64_t start,
441 Resolution resolution)
442 : collection_(collection),
443 current_ref_(VaultFileRef::Lookup(start, resolution)),
444 current_(collection) {
445 current_.open(current_ref_, 0, 0);
446 current_.seekForward(start);
447}
448
449void VaultIterator::next(std::vector<Sample>* sample) {
450 if (current_.past_eof()) {
451 current_ref_ = current_ref_.next();
452 MLOG(roo_monitoring_vault_reader)
453 << "Advancing to next file: " << roo_logging::hex
454 << current_ref_.timestamp();
455 current_.open(current_ref_, 0, 0);
456 }
457 current_.next(sample);
458}
459
460int64_t VaultIterator::cursor() const {
461 return current_ref_.timestamp_at(current_.index());
462}
463
464} // namespace roo_monitoring
Collection of timeseries sharing transform and source resolution.
roo_io::Filesystem & fs() const
Collection(roo_io::Filesystem &fs, String name, Resolution resolution=kResolution_1024_ms)
Resolution resolution() const
const String & name() const
void getVaultFilePath(const VaultFileRef &ref, String *path) const
Helper class for generating filenames corresponding to timestamps.
Definition common.h:45
const char * filename() const
Returns the generated filename as a null-terminated string.
Definition common.h:50
static Filename forTimestamp(int64_t nanosSinceEpoch)
Creates a filename for the specified timestamp.
Definition common.cpp:69
const LogCursor & log_cursor() const
LogCompactionCursor(LogCursor log_cursor, int16_t target_datum_index)
Cursor used when seeking through multiple log files.
Definition log.h:95
Reader that walks across a sequence of log files.
Definition log.h:114
LogCursor tell()
Returns the current cursor.
Definition log.cpp:205
bool seek(LogCursor cursor)
Seeks to the specified cursor.
Definition log.cpp:188
int64_t range_floor() const
Returns the lower bound of the current range.
Definition log.h:123
bool nextSample(int64_t *timestamp, std::vector< LogSample > *data)
Reads the next sample in the current range.
Definition log.cpp:173
bool isHotRange()
Returns true if the current range is hot (still being written).
Definition log.cpp:167
bool nextRange()
Advances to the next time range.
Definition log.cpp:145
void deleteRange()
Deletes the current range files.
Definition log.cpp:210
int64_t first_timestamp() const
Returns the first timestamp recorded in the current file.
Definition log.h:176
void write(int64_t timestamp, uint64_t stream_id, uint16_t datum)
Writes a single log sample.
Definition log.cpp:269
void close()
Closes the log file.
Definition log.cpp:259
Resolution resolution() const
Returns the resolution used for this writer.
Definition log.h:163
bool can_skip_write(int64_t timestamp, uint64_t stream_id)
Returns true if a write can be skipped for this bucket.
Definition log.cpp:264
Maps application-domain floats to 16-bit stored values.
Definition transform.h:10
uint16_t apply(float value) const
Applies the transform and clamps to [0, 65535].
Definition transform.cpp:13
void seekForward(int64_t timestamp)
Advances the cursor to the first entry at or after the timestamp.
Definition vault.cpp:181
int index() const
Returns the current entry index.
Definition vault.h:127
bool past_eof() const
Returns true if the reader has passed the end of file.
Definition vault.cpp:201
bool next(std::vector< Sample > *sample)
Reads the next entry and fills the sample vector.
Definition vault.cpp:145
bool open(const VaultFileRef &ref, int index, int64_t offset)
Opens the file and seeks to the specified index and byte offset.
Definition vault.cpp:77
Identifies a specific file in the monitoring vault.
Definition vault.h:16
int64_t timestamp() const
Returns the start timestamp for this vault file.
Definition vault.h:26
int64_t timestamp_at(int position) const
Returns the timestamp for the entry at the given position.
Definition vault.h:28
VaultFileRef parent() const
Returns the parent vault file at the next coarser resolution.
Definition vault.h:46
int sibling_index() const
Returns the index of this file within its parent range.
Definition vault.h:71
VaultFileRef next() const
Returns the next vault file at the same resolution.
Definition vault.h:61
Resolution resolution() const
Returns the resolution for this vault file.
Definition vault.h:32
VaultFileRef child(int index) const
Returns the child vault file at the next finer resolution.
Definition vault.h:51
static VaultFileRef Lookup(int64_t timestamp, Resolution resolution)
Creates a reference that encloses the timestamp at the given resolution.
int64_t cursor() const
Returns current iterator timestamp.
void next(std::vector< Sample > *sample)
Advances by one resolution step and fills sample.
VaultIterator(const Collection *collection, int64_t start, Resolution resolution)
Creates iterator over collection at resolution, starting at start.
Writes vault files for a collection at a specific resolution.
Definition compaction.h:39
void write(int64_t timestamp, uint64_t stream_id, float data)
Write interface for a monitoring collection.
Writer(Collection *collection)
void flushAll()
Periodically flushes logged data into vault files.
IoState io_state() const
Umbrella header for the roo_monitoring module.
static constexpr int64_t timestamp_ms_floor(int64_t timestamp_ms, Resolution resolution)
Rounds the timestamp down to the specified resolution bucket.
Definition resolution.h:34
static const Resolution kMaxResolution
Maximum supported resolution.
Definition resolution.h:31
static const int kRangeElementCount
Number of items in a range (4^(kRangeLength)).
Definition common.h:22
Resolution
Time resolution used for log and vault files.
Definition resolution.h:8
const char * kLogSubPath
Subdirectory name used for raw log files.
Definition common.cpp:11
static const int kRangeLength
Number of base-4 digits used per range.
Definition common.h:19
static constexpr int64_t timestamp_increment(int64_t steps, Resolution resolution)
Returns the timestamp delta for the given number of resolution steps.
Definition resolution.h:51
const char * kMonitoringBasePath
Base directory for monitoring storage on the filesystem.
Definition common.cpp:10
constexpr char toHexDigit(int d)
Converts a 0-15 value to an uppercase hex digit.
Definition common.h:30
String subdir(String base, const String &sub)
Returns a path formed by joining the base directory and subdirectory.
Definition common.cpp:13