roo_monitoring
API Documentation for roo_monitoring
Loading...
Searching...
No Matches
log.h
Go to the documentation of this file.
1#pragma once
2
3#include <vector>
4
5#include "resolution.h"
6#include "roo_collections/flat_small_hash_set.h"
7#include "roo_io/data/multipass_input_stream_reader.h"
8#include "roo_io/data/output_stream_writer.h"
9#include "roo_io/fs/filesystem.h"
10
11namespace roo_monitoring {
12
13/// Sample stored in log files before compaction.
14class LogSample {
15 public:
16 /// Creates a log sample for a stream/value pair.
17 LogSample(uint64_t stream_id, uint16_t value)
18 : stream_id_(stream_id), value_(value) {}
19
20 /// Returns the stream identifier.
21 uint64_t stream_id() const { return stream_id_; }
22 /// Returns the encoded sample value.
23 uint16_t value() const { return value_; }
24
25 private:
26 uint64_t stream_id_;
27 uint16_t value_;
28};
29
30inline bool operator<(const LogSample& a, const LogSample& b) {
31 return a.stream_id() < b.stream_id();
32}
33
34/// In-memory cache of log directory entries.
36 public:
37 /// Creates a cache for a specific filesystem and log directory.
38 CachedLogDir(roo_io::Filesystem& fs, const char* log_dir)
39 : fs_(fs), log_dir_(log_dir), synced_(false) {}
40
41 /// Inserts an entry into the cache.
42 void insert(int64_t entry) {
43 sync();
44 entries_.insert(entry);
45 }
46
47 /// Removes an entry from the cache.
48 void erase(int64_t entry) {
49 sync();
50 entries_.erase(entry);
51 }
52
53 /// Returns the cached entries sorted by timestamp.
54 std::vector<int64_t> list();
55
56 private:
57 void sync();
58
59 roo_io::Filesystem& fs_;
60 const char* log_dir_;
61 bool synced_;
62
63 roo_collections::FlatSmallHashSet<int64_t> entries_;
64};
65
66/// Reader for a single log file.
68 public:
69 /// Creates a reader over the specified mount.
70 LogFileReader(roo_io::Mount& mount) : fs_(mount) {}
71
72 /// Opens the log file at path and seeks to checkpoint.
73 bool open(const char* path, int64_t checkpoint);
74
75 /// Returns true if a file is currently open.
76 bool is_open() const { return reader_.isOpen(); }
77
78 /// Closes the reader.
79 void close() { reader_.close(); }
80
81 /// Returns the current checkpoint position.
82 int64_t checkpoint() const { return checkpoint_; }
83
84 /// Reads the next entry from the file.
85 bool next(int64_t* timestamp, std::vector<LogSample>* data, bool is_hot);
86
87 private:
88 roo_io::Mount& fs_;
89 roo_io::MultipassInputStreamReader reader_;
90 uint8_t lookahead_entry_type_;
91 int64_t checkpoint_;
92};
93
94/// Cursor used when seeking through multiple log files.
95class LogCursor {
96 public:
97 /// Creates a cursor at the start of the log sequence.
98 LogCursor() : file_(0), position_(0) {}
99 /// Creates a cursor for a specific file and position.
100 LogCursor(int64_t file, int64_t position)
101 : file_(file), position_(position) {}
102
103 /// Returns the file timestamp associated with the cursor.
104 int64_t file() const { return file_; }
105 /// Returns the byte position within the file.
106 int64_t position() const { return position_; }
107
108 private:
109 int64_t file_;
110 int64_t position_;
111};
112
113/// Reader that walks across a sequence of log files.
115 public:
116 /// Creates a reader for the specified log directory and resolution.
117 LogReader(roo_io::Mount& fs, const char* log_dir, CachedLogDir& cache,
118 Resolution resolution, int64_t hot_file = -1);
119
120 /// Advances to the next time range.
121 bool nextRange();
122 /// Returns the lower bound of the current range.
123 int64_t range_floor() const { return range_floor_; }
124 /// Seeks to the specified cursor.
125 bool seek(LogCursor cursor);
126 /// Returns the current cursor.
127 LogCursor tell();
128
129 /// Returns true if the current range is hot (still being written).
130 bool isHotRange();
131 /// Deletes the current range files.
132 void deleteRange();
133
134 /// Reads the next sample in the current range.
135 bool nextSample(int64_t* timestamp, std::vector<LogSample>* data);
136
137 private:
138 bool open(int64_t file, uint64_t position);
139
140 roo_io::Mount& fs_;
141 const char* log_dir_;
142 CachedLogDir& cache_;
143 Resolution resolution_;
144 std::vector<int64_t> entries_;
145 std::vector<int64_t>::const_iterator group_begin_;
146 std::vector<int64_t>::const_iterator cursor_;
147 std::vector<int64_t>::const_iterator group_end_;
148 int64_t hot_file_;
149 bool reached_hot_file_;
150 int64_t range_floor_;
151 int64_t range_ceil_;
152 LogFileReader reader_;
153};
154
155/// Writer for log files at a fixed resolution.
157 public:
158 /// Creates a log writer for the specified directory and resolution.
159 LogWriter(roo_io::Filesystem& fs, const char* log_dir, CachedLogDir& cache,
161
162 /// Returns the resolution used for this writer.
163 Resolution resolution() const { return resolution_; }
164
165 /// Opens the log file according to the update policy.
166 void open(roo_io::FileUpdatePolicy update_policy);
167 /// Closes the log file.
168 void close();
169
170 /// Writes a single log sample.
171 void write(int64_t timestamp, uint64_t stream_id, uint16_t datum);
172 /// Returns true if a write can be skipped for this bucket.
173 bool can_skip_write(int64_t timestamp, uint64_t stream_id);
174
175 /// Returns the first timestamp recorded in the current file.
176 int64_t first_timestamp() const { return first_timestamp_; }
177
178 private:
179 // const that contains the path where log files are stored.
180 const char* log_dir_;
181 CachedLogDir& cache_;
182 Resolution resolution_;
183
184 roo_io::Filesystem& fs_;
185 roo_io::Mount mount_;
186 roo_io::OutputStreamWriter writer_;
187
188 // For tentatively deduplicating data reported in the same target
189 // resolution bucket.
190 roo_collections::FlatSmallHashSet<uint64_t> streams_;
191
192 int64_t first_timestamp_;
193 int64_t last_timestamp_;
194 int64_t range_ceil_;
195};
196
197} // namespace roo_monitoring
In-memory cache of log directory entries.
Definition log.h:35
void insert(int64_t entry)
Inserts an entry into the cache.
Definition log.h:42
void erase(int64_t entry)
Removes an entry from the cache.
Definition log.h:48
std::vector< int64_t > list()
Returns the cached entries sorted by timestamp.
Definition log.cpp:100
CachedLogDir(roo_io::Filesystem &fs, const char *log_dir)
Creates a cache for a specific filesystem and log directory.
Definition log.h:38
Cursor used when seeking through multiple log files.
Definition log.h:95
int64_t file() const
Returns the file timestamp associated with the cursor.
Definition log.h:104
LogCursor()
Creates a cursor at the start of the log sequence.
Definition log.h:98
LogCursor(int64_t file, int64_t position)
Creates a cursor for a specific file and position.
Definition log.h:100
int64_t position() const
Returns the byte position within the file.
Definition log.h:106
Reader for a single log file.
Definition log.h:67
void close()
Closes the reader.
Definition log.h:79
int64_t checkpoint() const
Returns the current checkpoint position.
Definition log.h:82
bool next(int64_t *timestamp, std::vector< LogSample > *data, bool is_hot)
Reads the next entry from the file.
Definition log.cpp:45
bool is_open() const
Returns true if a file is currently open.
Definition log.h:76
LogFileReader(roo_io::Mount &mount)
Creates a reader over the specified mount.
Definition log.h:70
bool open(const char *path, int64_t checkpoint)
Opens the log file at path and seeks to checkpoint.
Definition log.cpp:23
Reader that walks across a sequence of log files.
Definition log.h:114
LogCursor tell()
Returns the current cursor.
Definition log.cpp:205
bool seek(LogCursor cursor)
Seeks to the specified cursor.
Definition log.cpp:188
int64_t range_floor() const
Returns the lower bound of the current range.
Definition log.h:123
bool nextSample(int64_t *timestamp, std::vector< LogSample > *data)
Reads the next sample in the current range.
Definition log.cpp:173
bool isHotRange()
Returns true if the current range is hot (still being written).
Definition log.cpp:167
bool nextRange()
Advances to the next time range.
Definition log.cpp:145
void deleteRange()
Deletes the current range files.
Definition log.cpp:210
Sample stored in log files before compaction.
Definition log.h:14
LogSample(uint64_t stream_id, uint16_t value)
Creates a log sample for a stream/value pair.
Definition log.h:17
uint64_t stream_id() const
Returns the stream identifier.
Definition log.h:21
uint16_t value() const
Returns the encoded sample value.
Definition log.h:23
Writer for log files at a fixed resolution.
Definition log.h:156
int64_t first_timestamp() const
Returns the first timestamp recorded in the current file.
Definition log.h:176
void open(roo_io::FileUpdatePolicy update_policy)
Opens the log file according to the update policy.
Definition log.cpp:247
void write(int64_t timestamp, uint64_t stream_id, uint16_t datum)
Writes a single log sample.
Definition log.cpp:269
void close()
Closes the log file.
Definition log.cpp:259
Resolution resolution() const
Returns the resolution used for this writer.
Definition log.h:163
bool can_skip_write(int64_t timestamp, uint64_t stream_id)
Returns true if a write can be skipped for this bucket.
Definition log.cpp:264
Umbrella header for the roo_monitoring module.
Resolution
Time resolution used for log and vault files.
Definition resolution.h:8
bool operator<(const LogSample &a, const LogSample &b)
Definition log.h:30