roo_monitoring
API Documentation for roo_monitoring
Loading...
Searching...
No Matches
log.cpp
Go to the documentation of this file.
1#include "log.h"
2
3#include <Arduino.h>
4
5#include <algorithm>
6
7#include "common.h"
8#include "roo_io/fs/fsutil.h"
9#include "roo_logging.h"
10
11#ifndef MLOG_roo_monitoring_compaction
12#define MLOG_roo_monitoring_compaction 0
13#endif
14
15#ifndef MLOG_roo_logging_writer
16#define MLOG_roo_logging_writer 0
17#endif
18
19namespace roo_monitoring {
20
22
23bool LogFileReader::open(const char* path, int64_t checkpoint) {
24 MLOG(roo_monitoring_compaction)
25 << "Opening log file " << path << " at " << checkpoint;
26 reader_.reset(fs_.fopen(path));
27 if (!reader_.isOpen()) {
28 LOG(ERROR) << "Failed to open log file " << path << ": "
29 << roo_io::StatusAsString(reader_.status());
30 return false;
31 }
32 if (checkpoint > 0) {
33 reader_.seek(checkpoint);
34 if (!reader_.ok()) {
35 LOG(ERROR) << "Failed to seek in the log file " << path << ": "
36 << roo_io::StatusAsString(reader_.status());
37 return false;
38 }
39 }
40 checkpoint_ = checkpoint;
41 lookahead_entry_type_ = reader_.readU8();
42 return true;
43}
44
45bool LogFileReader::next(int64_t* timestamp, std::vector<LogSample>* data,
46 bool is_hot) {
47 data->clear();
48 if (checkpoint_ < 0) return false;
49 if (!reader_.ok()) return false;
50 if (lookahead_entry_type_ != CODE_TIMESTAMP) {
51 LOG(ERROR) << "Unexpected content in the log file: "
52 << (int)lookahead_entry_type_;
53 return false;
54 }
55 *timestamp = reader_.readVarU64();
56 lookahead_entry_type_ = reader_.readU8();
57 if (!reader_.ok()) return false;
58
59 // LOG(INFO) << "Read log data at timestamp: " << roo_logging::hex <<
60 // *timestamp;
61 while (true) {
62 if (reader_.status() == roo_io::kEndOfStream) {
63 if (is_hot) {
64 // Indicate that the data isn't complete yet. Not updating the
65 // source checkpoint in this case.
66 return false;
67 } else {
68 // Indicating that we have reached the end of a 'historical' log file.
69 MLOG(roo_monitoring_compaction)
70 << "Reached EOF of a historical log file ";
71 checkpoint_ = -1;
72 break;
73 }
74 } else if (!reader_.ok()) {
75 // Must be I/O error.
76 LOG(ERROR) << "Failed to read timestamped data from a log file: "
77 << roo_io::StatusAsString(reader_.status());
78 return false;
79 } else if (lookahead_entry_type_ == CODE_DATUM) {
80 uint64_t stream_id;
81 uint16_t datum;
82 stream_id = reader_.readVarU64();
83 datum = reader_.readBeU16();
84 if (!reader_.ok()) return false;
85 data->emplace_back(stream_id, datum);
86 lookahead_entry_type_ = reader_.readU8();
87 continue;
88 } else if (lookahead_entry_type_ == CODE_TIMESTAMP) {
89 checkpoint_ = (int64_t)reader_.position() - 1;
90 break;
91 } else {
92 LOG(ERROR) << "Unexpected entry type " << (int)lookahead_entry_type_;
93 return false;
94 }
95 }
96 std::sort(data->begin(), data->end());
97 return true;
98}
99
100std::vector<int64_t> CachedLogDir::list() {
101 sync();
102 std::vector<int64_t> result;
103 for (int64_t e : entries_) {
104 result.push_back(e);
105 }
106 std::sort(result.begin(), result.end());
107 return result;
108}
109
110void CachedLogDir::sync() {
111 if (synced_) return;
112 entries_.clear();
113 roo_io::Mount fs = fs_.mount();
114 if (!fs.ok()) return;
115 std::vector<int64_t> entries = listFiles(fs, log_dir_);
116 for (int64_t e : entries) {
117 entries_.insert(e);
118 }
119 synced_ = true;
120}
121
122LogReader::LogReader(roo_io::Mount& fs, const char* log_dir,
123 CachedLogDir& cache, Resolution resolution,
124 int64_t hot_file)
125 : fs_(fs),
126 log_dir_(log_dir),
127 cache_(cache),
128 resolution_(resolution),
129 entries_(cache_.list()),
130 group_begin_(entries_.begin()),
131 cursor_(entries_.begin()),
132 group_end_(entries_.begin()),
133 hot_file_(hot_file >= 0 ? hot_file
134 : entries_.empty() ? 0
135 : entries_.back()),
136 reached_hot_file_(false),
137 range_floor_(0),
138 range_ceil_(0),
139 reader_(fs) {
140 // Ensure that the hot file is at the end of the list, even if it is not, for
141 // some reason, chronologically the newest. This way, we will always delete
142 // the non-hot logs and leave the hot file be.
143}
144
146 if (group_end_ == entries_.end() || reached_hot_file_) {
147 MLOG(roo_monitoring_compaction) << "No more log files to process.";
148 return false;
149 }
150 cursor_ = group_begin_ = group_end_;
151 Resolution range_resolution = Resolution(resolution_ + kRangeLength);
152 range_floor_ = timestamp_ms_floor(*cursor_, range_resolution);
153 range_ceil_ = timestamp_ms_ceil(*cursor_, range_resolution);
154 while (!reached_hot_file_ && group_end_ != entries_.end() &&
155 *group_end_ <= range_ceil_) {
156 if (*group_end_ == hot_file_) {
157 reached_hot_file_ = true;
158 }
159 ++group_end_;
160 }
161 MLOG(roo_monitoring_compaction)
162 << "Processing log files for the range starting at " << roo_logging::hex
163 << *group_begin_;
164 return true;
165}
166
167bool LogReader::isHotRange() { return hot_file_ < range_ceil_; }
168
169bool LogReader::open(int64_t file, uint64_t position) {
170 return reader_.open(filepath(log_dir_, file).c_str(), position);
171}
172
173bool LogReader::nextSample(int64_t* timestamp, std::vector<LogSample>* data) {
174 for (; cursor_ != group_end_; ++cursor_) {
175 if (!reader_.is_open()) {
176 if (!open(*cursor_, 0)) {
177 LOG(ERROR) << "Failed to open log file " << *cursor_;
178 continue;
179 }
180 }
181 CHECK(reader_.is_open());
182 if (reader_.next(timestamp, data, *cursor_ == hot_file_)) return true;
183 reader_.close();
184 }
185 return false;
186}
187
189 auto i = std::lower_bound(group_begin_, group_end_, cursor.file());
190 if (i == group_end_ || *i != cursor.file()) {
191 LOG(WARNING) << "Seek failed; file not found: " << roo_logging::hex
192 << cursor.file();
193 return false;
194 }
195 if (!reader_.open(filepath(log_dir_, cursor.file()).c_str(),
196 cursor.position())) {
197 LOG(WARNING) << "Seek failed; could not open: " << roo_logging::hex
198 << cursor.file();
199 return false;
200 }
201 cursor_ = i;
202 return true;
203}
204
206 CHECK(isHotRange() && cursor_ == group_end_);
207 return LogCursor(hot_file_, reader_.checkpoint());
208}
209
211 CHECK(!isHotRange());
212 for (auto i = group_begin_; i != group_end_; ++i) {
213 MLOG(roo_monitoring_compaction)
214 << "Removing processed log file " << roo_logging::hex << *i;
215 if (fs_.remove(filepath(log_dir_, *i).c_str()) != roo_io::kOk) {
216 LOG(ERROR) << "Failed to remove processed log file " << roo_logging::hex
217 << *i;
218 }
219 cache_.erase(*i);
220 }
221}
222
223LogWriter::LogWriter(roo_io::Filesystem& fs, const char* log_dir,
224 CachedLogDir& cache, Resolution resolution)
225 : log_dir_(log_dir),
226 cache_(cache),
227 resolution_(resolution),
228 fs_(fs),
229 mount_(),
230 first_timestamp_(-1),
231 last_timestamp_(-1),
232 range_ceil_(-1) {}
233
234void writeTimestamp(roo_io::OutputStreamWriter& writer, int64_t timestamp) {
235 writer.writeU8(CODE_TIMESTAMP);
236 writer.writeVarU64(timestamp);
237}
238
239void writeDatum(roo_io::OutputStreamWriter& writer, uint64_t stream_id,
240 uint16_t transformed_datum) {
241 writer.writeU8(CODE_DATUM);
242 writer.writeVarU64(stream_id);
243 writer.writeBeU16(transformed_datum);
244 // write_float(file, datum);
245}
246
247void LogWriter::open(roo_io::FileUpdatePolicy update_policy) {
248 String path = log_dir_;
249 path += "/";
250 path += Filename::forTimestamp(first_timestamp_).filename();
251 mount_ = fs_.mount();
252 roo_io::Status status = roo_io::MkParentDirRecursively(mount_, path.c_str());
253 if (status != roo_io::kOk && status != roo_io::kDirectoryExists) return;
254 // last_log_file_path_ = path;
255 writer_.reset(mount_.fopenForWrite(path.c_str(), update_policy));
256 cache_.insert(first_timestamp_);
257}
258
260 writer_.close();
261 mount_.close();
262}
263
264bool LogWriter::can_skip_write(int64_t timestamp, uint64_t stream_id) {
265 return timestamp == last_timestamp_ &&
266 streams_.find(stream_id) != streams_.end();
267}
268
269void LogWriter::write(int64_t timestamp, uint64_t stream_id, uint16_t datum) {
270 // Need to handle various cases:
271 // 1. Log file not yet initiated since process start
272 // 2. Log file initiated, but timestamp falls outside its range
273 // 3. Log file initiated, and timestamp in range, but not yet opened
274 // 4. Log file initiated, timestamp in range, file opened
275 if (timestamp < last_timestamp_ || timestamp > range_ceil_) {
276 // Log file either not yet created after start, or the timestamp
277 // falls outside its range.
278 close();
279 Resolution range_resolution = Resolution(resolution_ + kRangeLength);
280 first_timestamp_ = timestamp;
281 range_ceil_ = timestamp_ms_ceil(timestamp, range_resolution);
282 streams_.clear();
283 open(roo_io::kFailIfExists);
284 } else {
285 if (!writer_.ok()) {
286 open(roo_io::kAppendIfExists);
287 }
288 }
289
290 if (timestamp != last_timestamp_) {
291 last_timestamp_ = timestamp;
292 streams_.clear();
293 writeTimestamp(writer_, timestamp);
294 }
295 if (streams_.insert(stream_id).second) {
296 // Did not exist.
297 writeDatum(writer_, stream_id, datum);
298 }
299}
300
301} // namespace roo_monitoring
In-memory cache of log directory entries.
Definition log.h:35
void insert(int64_t entry)
Inserts an entry into the cache.
Definition log.h:42
void erase(int64_t entry)
Removes an entry from the cache.
Definition log.h:48
std::vector< int64_t > list()
Returns the cached entries sorted by timestamp.
Definition log.cpp:100
const char * filename() const
Returns the generated filename as a null-terminated string.
Definition common.h:50
static Filename forTimestamp(int64_t nanosSinceEpoch)
Creates a filename for the specified timestamp.
Definition common.cpp:69
Cursor used when seeking through multiple log files.
Definition log.h:95
int64_t file() const
Returns the file timestamp associated with the cursor.
Definition log.h:104
int64_t position() const
Returns the byte position within the file.
Definition log.h:106
void close()
Closes the reader.
Definition log.h:79
int64_t checkpoint() const
Returns the current checkpoint position.
Definition log.h:82
bool next(int64_t *timestamp, std::vector< LogSample > *data, bool is_hot)
Reads the next entry from the file.
Definition log.cpp:45
bool is_open() const
Returns true if a file is currently open.
Definition log.h:76
bool open(const char *path, int64_t checkpoint)
Opens the log file at path and seeks to checkpoint.
Definition log.cpp:23
LogCursor tell()
Returns the current cursor.
Definition log.cpp:205
bool seek(LogCursor cursor)
Seeks to the specified cursor.
Definition log.cpp:188
bool nextSample(int64_t *timestamp, std::vector< LogSample > *data)
Reads the next sample in the current range.
Definition log.cpp:173
bool isHotRange()
Returns true if the current range is hot (still being written).
Definition log.cpp:167
bool nextRange()
Advances to the next time range.
Definition log.cpp:145
void deleteRange()
Deletes the current range files.
Definition log.cpp:210
LogReader(roo_io::Mount &fs, const char *log_dir, CachedLogDir &cache, Resolution resolution, int64_t hot_file=-1)
Creates a reader for the specified log directory and resolution.
Definition log.cpp:122
void open(roo_io::FileUpdatePolicy update_policy)
Opens the log file according to the update policy.
Definition log.cpp:247
void write(int64_t timestamp, uint64_t stream_id, uint16_t datum)
Writes a single log sample.
Definition log.cpp:269
void close()
Closes the log file.
Definition log.cpp:259
bool can_skip_write(int64_t timestamp, uint64_t stream_id)
Returns true if a write can be skipped for this bucket.
Definition log.cpp:264
LogWriter(roo_io::Filesystem &fs, const char *log_dir, CachedLogDir &cache, Resolution resolution)
Creates a log writer for the specified directory and resolution.
Definition log.cpp:223
Umbrella header for the roo_monitoring module.
static constexpr int64_t timestamp_ms_floor(int64_t timestamp_ms, Resolution resolution)
Rounds the timestamp down to the specified resolution bucket.
Definition resolution.h:34
Resolution
Time resolution used for log and vault files.
Definition resolution.h:8
static const int kRangeLength
Number of base-4 digits used per range.
Definition common.h:19
@ CODE_TIMESTAMP
Definition log.cpp:21
void writeDatum(roo_io::OutputStreamWriter &writer, uint64_t stream_id, uint16_t transformed_datum)
Definition log.cpp:239
String filepath(String dir, int64_t file)
Returns a file path for the given directory and timestamp-like value.
Definition common.cpp:19
std::vector< int64_t > listFiles(roo_io::Mount &fs, const char *dirname)
Lists timestamp-named files in the directory and returns their timestamps.
Definition common.cpp:45
void writeTimestamp(roo_io::OutputStreamWriter &writer, int64_t timestamp)
Definition log.cpp:234
static constexpr int64_t timestamp_ms_ceil(int64_t timestamp_ms, Resolution resolution)
Rounds the timestamp up to the specified resolution bucket.
Definition resolution.h:43