8#include "roo_io/fs/fsutil.h"
9#include "roo_logging.h"
11#ifndef MLOG_roo_monitoring_compaction
12#define MLOG_roo_monitoring_compaction 0
15#ifndef MLOG_roo_logging_writer
16#define MLOG_roo_logging_writer 0
24 MLOG(roo_monitoring_compaction)
25 <<
"Opening log file " << path <<
" at " <<
checkpoint;
26 reader_.reset(fs_.fopen(path));
27 if (!reader_.isOpen()) {
28 LOG(ERROR) <<
"Failed to open log file " << path <<
": "
29 << roo_io::StatusAsString(reader_.status());
35 LOG(ERROR) <<
"Failed to seek in the log file " << path <<
": "
36 << roo_io::StatusAsString(reader_.status());
41 lookahead_entry_type_ = reader_.readU8();
48 if (checkpoint_ < 0)
return false;
49 if (!reader_.ok())
return false;
51 LOG(ERROR) <<
"Unexpected content in the log file: "
52 << (int)lookahead_entry_type_;
55 *timestamp = reader_.readVarU64();
56 lookahead_entry_type_ = reader_.readU8();
57 if (!reader_.ok())
return false;
62 if (reader_.status() == roo_io::kEndOfStream) {
69 MLOG(roo_monitoring_compaction)
70 <<
"Reached EOF of a historical log file ";
74 }
else if (!reader_.ok()) {
76 LOG(ERROR) <<
"Failed to read timestamped data from a log file: "
77 << roo_io::StatusAsString(reader_.status());
79 }
else if (lookahead_entry_type_ ==
CODE_DATUM) {
82 stream_id = reader_.readVarU64();
83 datum = reader_.readBeU16();
84 if (!reader_.ok())
return false;
85 data->emplace_back(stream_id, datum);
86 lookahead_entry_type_ = reader_.readU8();
89 checkpoint_ = (int64_t)reader_.position() - 1;
92 LOG(ERROR) <<
"Unexpected entry type " << (int)lookahead_entry_type_;
96 std::sort(data->begin(), data->end());
102 std::vector<int64_t> result;
103 for (int64_t e : entries_) {
106 std::sort(result.begin(), result.end());
110void CachedLogDir::sync() {
113 roo_io::Mount fs = fs_.mount();
114 if (!fs.ok())
return;
115 std::vector<int64_t> entries =
listFiles(fs, log_dir_);
116 for (int64_t e : entries) {
128 resolution_(resolution),
129 entries_(cache_.list()),
130 group_begin_(entries_.begin()),
131 cursor_(entries_.begin()),
132 group_end_(entries_.begin()),
133 hot_file_(hot_file >= 0 ? hot_file
134 : entries_.empty() ? 0
136 reached_hot_file_(false),
146 if (group_end_ == entries_.end() || reached_hot_file_) {
147 MLOG(roo_monitoring_compaction) <<
"No more log files to process.";
150 cursor_ = group_begin_ = group_end_;
154 while (!reached_hot_file_ && group_end_ != entries_.end() &&
155 *group_end_ <= range_ceil_) {
156 if (*group_end_ == hot_file_) {
157 reached_hot_file_ =
true;
161 MLOG(roo_monitoring_compaction)
162 <<
"Processing log files for the range starting at " << roo_logging::hex
169bool LogReader::open(int64_t file, uint64_t position) {
170 return reader_.
open(
filepath(log_dir_, file).c_str(), position);
174 for (; cursor_ != group_end_; ++cursor_) {
176 if (!open(*cursor_, 0)) {
177 LOG(ERROR) <<
"Failed to open log file " << *cursor_;
182 if (reader_.
next(timestamp, data, *cursor_ == hot_file_))
return true;
189 auto i = std::lower_bound(group_begin_, group_end_, cursor.
file());
190 if (i == group_end_ || *i != cursor.
file()) {
191 LOG(WARNING) <<
"Seek failed; file not found: " << roo_logging::hex
197 LOG(WARNING) <<
"Seek failed; could not open: " << roo_logging::hex
212 for (
auto i = group_begin_; i != group_end_; ++i) {
213 MLOG(roo_monitoring_compaction)
214 <<
"Removing processed log file " << roo_logging::hex << *i;
215 if (fs_.remove(
filepath(log_dir_, *i).c_str()) != roo_io::kOk) {
216 LOG(ERROR) <<
"Failed to remove processed log file " << roo_logging::hex
227 resolution_(resolution),
230 first_timestamp_(-1),
236 writer.writeVarU64(timestamp);
239void writeDatum(roo_io::OutputStreamWriter& writer, uint64_t stream_id,
240 uint16_t transformed_datum) {
242 writer.writeVarU64(stream_id);
243 writer.writeBeU16(transformed_datum);
248 String path = log_dir_;
251 mount_ = fs_.mount();
252 roo_io::Status status = roo_io::MkParentDirRecursively(mount_, path.c_str());
253 if (status != roo_io::kOk && status != roo_io::kDirectoryExists)
return;
255 writer_.reset(mount_.fopenForWrite(path.c_str(), update_policy));
256 cache_.
insert(first_timestamp_);
265 return timestamp == last_timestamp_ &&
266 streams_.find(stream_id) != streams_.end();
275 if (timestamp < last_timestamp_ || timestamp > range_ceil_) {
280 first_timestamp_ = timestamp;
283 open(roo_io::kFailIfExists);
286 open(roo_io::kAppendIfExists);
290 if (timestamp != last_timestamp_) {
291 last_timestamp_ = timestamp;
295 if (streams_.insert(stream_id).second) {
In-memory cache of log directory entries.
void insert(int64_t entry)
Inserts an entry into the cache.
void erase(int64_t entry)
Removes an entry from the cache.
std::vector< int64_t > list()
Returns the cached entries sorted by timestamp.
const char * filename() const
Returns the generated filename as a null-terminated string.
static Filename forTimestamp(int64_t nanosSinceEpoch)
Creates a filename for the specified timestamp.
Cursor used when seeking through multiple log files.
int64_t file() const
Returns the file timestamp associated with the cursor.
int64_t position() const
Returns the byte position within the file.
void close()
Closes the reader.
int64_t checkpoint() const
Returns the current checkpoint position.
bool next(int64_t *timestamp, std::vector< LogSample > *data, bool is_hot)
Reads the next entry from the file.
bool is_open() const
Returns true if a file is currently open.
bool open(const char *path, int64_t checkpoint)
Opens the log file at path and seeks to checkpoint.
LogCursor tell()
Returns the current cursor.
bool seek(LogCursor cursor)
Seeks to the specified cursor.
bool nextSample(int64_t *timestamp, std::vector< LogSample > *data)
Reads the next sample in the current range.
bool isHotRange()
Returns true if the current range is hot (still being written).
bool nextRange()
Advances to the next time range.
void deleteRange()
Deletes the current range files.
LogReader(roo_io::Mount &fs, const char *log_dir, CachedLogDir &cache, Resolution resolution, int64_t hot_file=-1)
Creates a reader for the specified log directory and resolution.
void open(roo_io::FileUpdatePolicy update_policy)
Opens the log file according to the update policy.
void write(int64_t timestamp, uint64_t stream_id, uint16_t datum)
Writes a single log sample.
void close()
Closes the log file.
bool can_skip_write(int64_t timestamp, uint64_t stream_id)
Returns true if a write can be skipped for this bucket.
LogWriter(roo_io::Filesystem &fs, const char *log_dir, CachedLogDir &cache, Resolution resolution)
Creates a log writer for the specified directory and resolution.
Umbrella header for the roo_monitoring module.
static constexpr int64_t timestamp_ms_floor(int64_t timestamp_ms, Resolution resolution)
Rounds the timestamp down to the specified resolution bucket.
Resolution
Time resolution used for log and vault files.
static const int kRangeLength
Number of base-4 digits used per range.
void writeDatum(roo_io::OutputStreamWriter &writer, uint64_t stream_id, uint16_t transformed_datum)
String filepath(String dir, int64_t file)
Returns a file path for the given directory and timestamp-like value.
std::vector< int64_t > listFiles(roo_io::Mount &fs, const char *dirname)
Lists timestamp-named files in the directory and returns their timestamps.
void writeTimestamp(roo_io::OutputStreamWriter &writer, int64_t timestamp)
static constexpr int64_t timestamp_ms_ceil(int64_t timestamp_ms, Resolution resolution)
Rounds the timestamp up to the specified resolution bucket.