roo_monitoring
API Documentation for roo_monitoring
Loading...
Searching...
No Matches
vault.cpp
Go to the documentation of this file.
1
2#include "vault.h"
3
4#include <map>
5
6#include "common.h"
7#include "log.h"
8#include "resolution.h"
9#include "roo_io/data/multipass_input_stream_reader.h"
10#include "roo_logging.h"
11#include "roo_monitoring.h"
12
13#ifndef MLOG_roo_monitoring_vault_reader
14#define MLOG_roo_monitoring_vault_reader 0
15#endif
16
17namespace roo_monitoring {
18
19namespace {
20
21bool read_header(roo_io::MultipassInputStreamReader& is) {
22 uint8_t major = is.readU8();
23 uint8_t minor = is.readU8();
24 if (!is.ok()) {
25 LOG(ERROR) << "Failed to read vault file header: "
26 << roo_io::StatusAsString(is.status());
27 return false;
28 }
29 if (major != 1 || minor != 1) {
30 LOG(ERROR) << "Invalid content of vault file header: " << major << ", "
31 << minor;
32 return false;
33 }
34 return true;
35}
36
37roo_io::Status read_data(roo_io::MultipassInputStreamReader& is,
38 std::vector<Sample>* data, bool ignore_fill) {
39 data->clear();
40 uint64_t sample_count = roo_io::ReadVarU64(is);
41 if (!is.ok()) {
42 if (is.status() != roo_io::kEndOfStream) {
43 LOG(ERROR) << "Failed to read data from the vault file: "
44 << roo_io::StatusAsString(is.status());
45 }
46 return is.status();
47 }
48 for (uint64_t i = 0; i < sample_count; ++i) {
49 uint64_t stream_id = is.readVarU64();
50 uint16_t avg = is.readBeU16();
51 uint16_t min = is.readBeU16();
52 uint16_t max = is.readBeU16();
53 uint16_t fill = is.readBeU16();
54 if (ignore_fill) {
55 fill = 0x2000;
56 }
57 if (!is.ok()) {
58 LOG(ERROR) << "Failed to read a sample from the vault file: "
59 << roo_io::StatusAsString(is.status());
60 return is.status();
61 }
62 data->emplace_back(stream_id, avg, min, max, fill);
63 }
64 return roo_io::kOk;
65}
66
67} // namespace
68
70 : collection_(collection),
71 ref_(),
72 fs_(),
73 reader_(),
74 index_(0),
75 position_(0) {}
76
77bool VaultFileReader::open(const VaultFileRef& vault_ref, int index,
78 int64_t offset) {
79 String path;
80 ref_ = vault_ref;
81 collection_->getVaultFilePath(vault_ref, &path);
82 fs_ = collection_->fs().mount();
83 if (!fs_.ok()) {
84 return false;
85 }
86 reader_.reset(fs_.fopen(path.c_str()));
87 index_ = index;
88 position_ = 0;
89 if (!reader_.isOpen()) {
90 if (reader_.status() == roo_io::kNotFound) {
91 MLOG(roo_monitoring_vault_reader)
92 << "Vault file " << path.c_str()
93 << " doesn't exist; treating as-if empty";
94 } else {
95 LOG(ERROR) << "Failed to open vault file for read: " << path.c_str()
96 << ": " << roo_io::StatusAsString(reader_.status());
97 }
98 return false;
99 }
100 if (offset == 0) {
101 if (!read_header(reader_)) {
102 reader_.close();
103 return false;
104 }
105 position_ = reader_.position();
106 } else if (offset < 0) {
107 LOG(ERROR) << "Invalid offset: " << offset;
108 return false;
109 } else {
110 reader_.seek(offset);
111 if (reader_.status() != roo_io::kOk) {
112 LOG(ERROR) << "Error seeking in the vault file " << path.c_str() << ": "
113 << roo_io::StatusAsString(reader_.status());
114 return false;
115 }
116 position_ = offset;
117 }
118 MLOG(roo_monitoring_vault_reader)
119 << "Vault file " << path.c_str() << " opened for read at index " << index_
120 << " and position " << offset;
121 return reader_.status() == roo_io::kOk;
122}
123
125
127 if (index_ == 0) {
128 // In this case, the file might have not existed, but that's OK;
129 // we will just return that we're at the beginning of it.
130 return LogCursor(ref_.timestamp(), 0);
131 }
132 if (past_eof()) {
133 LOG(FATAL) << "Attempt to read a position in a file that has been fully "
134 "read and is now closed.";
135 } else if (reader_.ok()) {
136 position_ = reader_.position();
137 } else if (reader_.status() == roo_io::kClosed) {
138 LOG(FATAL) << "Attempt to read a position in a file that has been "
139 "unexpectedly closed at index "
140 << index_;
141 }
142 return LogCursor(ref_.timestamp(), position_);
143}
144
145bool VaultFileReader::next(std::vector<Sample>* sample) {
146 sample->clear();
147 if (past_eof()) {
148 return false;
149 }
150 if (!reader_.ok()) {
151 ++index_;
152 return false;
153 }
154 // TODO: make this configurable.
155 bool ignore_fill = (ref_.resolution() <= kResolution_65536_ms);
156 if (read_data(reader_, sample, ignore_fill) == roo_io::kOk) {
157 ++index_;
158 if (past_eof()) {
159 MLOG(roo_monitoring_vault_reader)
160 << "End of file reached after successfully scanning the entire "
161 "vault file ";
162 position_ = reader_.position();
163 reader_.close();
164 }
165 return true;
166 }
167 if (reader_.status() == roo_io::kEndOfStream) {
168 MLOG(roo_monitoring_vault_reader)
169 << "End of file reached prematurely, while reading data at index "
170 << index_;
171 position_ = 0;
172 } else {
173 position_ = reader_.position();
174 LOG(ERROR) << "Error reading data at index " << index_;
175 }
176 ++index_;
177 reader_.close();
178 return false;
179}
180
181void VaultFileReader::seekForward(int64_t timestamp) {
182 int skip = (timestamp - ref_.timestamp()) >> (ref_.resolution() << 1);
183 if (skip <= 0) return;
184 DCHECK_LE(skip + index_, kRangeElementCount);
185 MLOG(roo_monitoring_vault_reader) << "Skipping " << skip << " steps";
186 if (skip + index_ >= kRangeElementCount) {
187 index_ = kRangeElementCount;
188 reader_.close();
189 return;
190 }
191 if (reader_.ok()) {
192 std::vector<Sample> ignored;
193 for (; !past_eof() && skip > 0; --skip) {
194 next(&ignored);
195 }
196 } else {
197 index_ += skip;
198 }
199}
200
201bool VaultFileReader::past_eof() const { return index_ >= kRangeElementCount; }
202
203roo_logging::Stream& operator<<(roo_logging::Stream& os,
204 const VaultFileRef& file_ref) {
205 os << "[" << file_ref.resolution() << ", " << roo_logging::hex
206 << file_ref.timestamp() << ", " << file_ref.time_step() << ", "
207 << (file_ref.timestamp() + file_ref.time_span()) << "]";
208 return os;
209}
210
211} // namespace roo_monitoring
Collection of timeseries sharing transform and source resolution.
roo_io::Filesystem & fs() const
void getVaultFilePath(const VaultFileRef &ref, String *path) const
Cursor used when seeking through multiple log files.
Definition log.h:95
VaultFileReader(const Collection *collection)
Creates a reader bound to the specified collection.
Definition vault.cpp:69
void seekForward(int64_t timestamp)
Advances the cursor to the first entry at or after the timestamp.
Definition vault.cpp:181
LogCursor tell()
Returns the current log cursor.
Definition vault.cpp:126
int index() const
Returns the current entry index.
Definition vault.h:127
bool past_eof() const
Returns true if the reader has passed the end of file.
Definition vault.cpp:201
bool next(std::vector< Sample > *sample)
Reads the next entry and fills the sample vector.
Definition vault.cpp:145
bool open(const VaultFileRef &ref, int index, int64_t offset)
Opens the file and seeks to the specified index and byte offset.
Definition vault.cpp:77
const VaultFileRef & vault_ref() const
Returns the vault file reference for this reader.
Definition vault.h:144
Identifies a specific file in the monitoring vault.
Definition vault.h:16
int64_t timestamp() const
Returns the start timestamp for this vault file.
Definition vault.h:26
int64_t time_span() const
Returns the total time span covered by the file.
Definition vault.h:41
Resolution resolution() const
Returns the resolution for this vault file.
Definition vault.h:32
int64_t time_step() const
Returns the time step between entries.
Definition vault.h:35
Umbrella header for the roo_monitoring module.
static const int kRangeElementCount
Number of items in a range (4^(kRangeLength)).
Definition common.h:22
roo_logging::Stream & operator<<(roo_logging::Stream &os, const VaultFileRef &file_ref)
Writes a human-readable representation of the vault file reference.
Definition vault.cpp:203