XRootD
Loading...
Searching...
No Matches
XrdOssStatsFileSystem.cc
Go to the documentation of this file.
1
5#include "XrdOssStatsFile.hh"
9
10#include <inttypes.h>
11#include <stdexcept>
12#include <thread>
13
14using namespace XrdOssStats;
15using namespace XrdOssStats::detail;
16
17FileSystem::FileSystem(XrdOss *oss, XrdSysLogger *lp, const char *configfn, XrdOucEnv *envP) :
18 XrdOssWrapper(*oss),
19 m_oss(oss),
20 m_env(envP),
21 m_log(lp, "fsstat_"),
22 m_slow_duration(std::chrono::seconds(1))
23{
24 m_log.Say("------ Initializing the storage statistics plugin.");
25 if (!Config(configfn)) {
26 m_failure = "Failed to configure the storage statistics plugin.";
27 return;
28 }
29
30 // While the plugin _does_ print its activity to the debugging facility (if enabled), its relatively useless
31 // unless the g-stream is available. Hence, if it's _not_ available, we stop the OSS initialization but do
32 // not cause the server startup to fail.
33 if (envP) {
34 m_gstream = reinterpret_cast<XrdXrootdGStream*>(envP->GetPtr("oss.gStream*"));
35 if (m_gstream) {
36 m_log.Say("Config", "Stats monitoring has been configured via xrootd.mongstream directive");
37 } else {
38 m_log.Say("Config", "XrdOssStats plugin is loaded but it requires the oss monitoring g-stream to also be enabled to be useful; try adding `xrootd.mongstream oss ...` to your configuration");
39 return;
40 }
41 } else {
42 m_failure = "XrdOssStats plugin invoked without a configured environment; likely an internal error";
43 return;
44 }
45
46 pthread_t tid;
47 int rc;
48 if ((rc = XrdSysThread::Run(&tid, FileSystem::AggregateBootstrap, static_cast<void *>(this), 0, "FS Stats Compute Thread"))) {
49 m_log.Emsg("FileSystem", rc, "create stats compute thread");
50 m_failure = "Failed to create the statistics computing thread.";
51 return;
52 }
53
54 m_ready = true;
55}
56
58
59bool
60FileSystem::InitSuccessful(std::string &errMsg) {
61 if (m_ready) return true;
62
63 errMsg = m_failure;
64 if (errMsg.empty()) {
65 m_oss.release();
66 }
67 return false;
68}
69
70void *
71FileSystem::AggregateBootstrap(void *me) {
72 auto myself = static_cast<FileSystem*>(me);
73 while (1) {
74 std::this_thread::sleep_for(std::chrono::seconds(1));
75 myself->AggregateStats();
76 }
77 return nullptr;
78}
79
80bool
81FileSystem::Config(const char *configfn)
82{
83 m_log.setMsgMask(LogMask::Error | LogMask::Warning);
84
85 XrdOucGatherConf statsConf("fsstats.trace fsstats.slowop", &m_log);
86 int result;
87 if ((result = statsConf.Gather(configfn, XrdOucGatherConf::trim_lines)) < 0) {
88 m_log.Emsg("Config", -result, "parsing config file", configfn);
89 return false;
90 }
91
92 char *val;
93 while (statsConf.GetLine()) {
94 val = statsConf.GetToken(); // Ignore -- we asked for a single value
95 if (!strcmp(val, "trace")) {
96 m_log.setMsgMask(0);
97 if (!(val = statsConf.GetToken())) {
98 m_log.Emsg("Config", "fsstats.trace requires an argument. Usage: fsstats.trace [all|err|warning|info|debug|none]");
99 return false;
100 }
101 do {
102 if (!strcmp(val, "all")) {m_log.setMsgMask(m_log.getMsgMask() | LogMask::All);}
103 else if (!strcmp(val, "error")) {m_log.setMsgMask(m_log.getMsgMask() | LogMask::Error);}
104 else if (!strcmp(val, "warning")) {m_log.setMsgMask(m_log.getMsgMask() | LogMask::Error | LogMask::Warning);}
105 else if (!strcmp(val, "info")) {m_log.setMsgMask(m_log.getMsgMask() | LogMask::Error | LogMask::Warning | LogMask::Info);}
106 else if (!strcmp(val, "debug")) {m_log.setMsgMask(m_log.getMsgMask() | LogMask::Error | LogMask::Warning | LogMask::Info | LogMask::Debug);}
107 else if (!strcmp(val, "none")) {m_log.setMsgMask(0);}
108 } while ((val = statsConf.GetToken()));
109 } else if (!strcmp(val, "slowop")) {
110 if (!(val = statsConf.GetToken())) {
111 m_log.Emsg("Config", "fsstats.slowop requires an argument. Usage: fsstats.slowop [duration]");
112 return false;
113 }
114 std::string errmsg;
115 if (!ParseDuration(val, m_slow_duration, errmsg)) {
116 m_log.Emsg("Config", "fsstats.slowop couldn't parse duration", val, errmsg.c_str());
117 return false;
118 }
119 }
120 }
121 m_log.Emsg("Config", "Logging levels enabled", LogMaskToString(m_log.getMsgMask()).c_str());
122
123 return true;
124}
125
126XrdOssDF *FileSystem::newDir(const char *user)
127{
128 // Call the underlying OSS newDir
129 std::unique_ptr<XrdOssDF> wrapped(wrapPI.newDir(user));
130 return new Directory(std::move(wrapped), m_log, *this);
131}
132
134{
135 // Call the underlying OSS newFile
136 std::unique_ptr<XrdOssDF> wrapped(wrapPI.newFile(user));
137 return new File(std::move(wrapped), m_log, *this);
138}
139
140int FileSystem::Chmod(const char * path, mode_t mode, XrdOucEnv *env)
141{
142 OpTimer op(m_ops.m_chmod_ops, m_slow_ops.m_chmod_ops, m_times.m_chmod, m_slow_times.m_chmod, m_slow_duration);
143 return wrapPI.Chmod(path, mode, env);
144}
145
146int FileSystem::Rename(const char *oPath, const char *nPath,
147 XrdOucEnv *oEnvP, XrdOucEnv *nEnvP)
148{
149 OpTimer op(m_ops.m_rename_ops, m_slow_ops.m_rename_ops, m_times.m_rename, m_slow_times.m_rename, m_slow_duration);
150 return wrapPI.Rename(oPath, nPath, oEnvP, nEnvP);
151}
152
153int FileSystem::Stat(const char *path, struct stat *buff,
154 int opts, XrdOucEnv *env)
155{
156 OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
157 return wrapPI.Stat(path, buff, opts, env);
158}
159
160int FileSystem::StatFS(const char *path, char *buff, int &blen,
161 XrdOucEnv *env)
162{
163 OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
164 return wrapPI.StatFS(path, buff, blen, env);
165}
166
167int FileSystem::StatLS(XrdOucEnv &env, const char *path,
168 char *buff, int &blen)
169{
170 OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
171 return wrapPI.StatLS(env, path, buff, blen);
172}
173
174int FileSystem::StatPF(const char *path, struct stat *buff, int opts)
175{
176 OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
177 return wrapPI.StatPF(path, buff, opts);
178}
179
180int FileSystem::StatPF(const char *path, struct stat *buff)
181{
182 OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
183 return wrapPI.StatPF(path, buff, 0);
184}
185
186int FileSystem::StatVS(XrdOssVSInfo *vsP, const char *sname, int updt)
187{
188 OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
189 return wrapPI.StatVS(vsP, sname, updt);
190}
191
192int FileSystem::StatXA(const char *path, char *buff, int &blen,
193 XrdOucEnv *env)
194{
195 OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
196 return wrapPI.StatXA(path, buff, blen, env);
197}
198
199int FileSystem::StatXP(const char *path, unsigned long long &attr,
200 XrdOucEnv *env)
201{
202 OpTimer op(m_ops.m_stat_ops, m_slow_ops.m_stat_ops, m_times.m_stat, m_slow_times.m_stat, m_slow_duration);
203 return wrapPI.StatXP(path, attr, env);
204}
205
206int FileSystem::Truncate(const char *path, unsigned long long fsize,
207 XrdOucEnv *env)
208{
209 OpTimer op(m_ops.m_truncate_ops, m_slow_ops.m_truncate_ops, m_times.m_truncate, m_slow_times.m_truncate, m_slow_duration);
210 return wrapPI.Truncate(path, fsize, env);
211}
212
213int FileSystem::Unlink(const char *path, int Opts, XrdOucEnv *env)
214{
215 OpTimer op(m_ops.m_unlink_ops, m_slow_ops.m_unlink_ops, m_times.m_unlink, m_slow_times.m_unlink, m_slow_duration);
216 return wrapPI.Unlink(path, Opts, env);
217}
218
219void FileSystem::AggregateStats()
220{
221 char buf[1500];
222 auto len = snprintf(buf, 1500,
223 "{"
224 "\"event\":\"oss_stats\"," \
225 "\"reads\":%" PRIu64 ",\"writes\":%" PRIu64 ",\"stats\":%" PRIu64 "," \
226 "\"pgreads\":%" PRIu64 ",\"pgwrites\":%" PRIu64 ",\"readvs\":%" PRIu64 "," \
227 "\"readv_segs\":%" PRIu64 ",\"dirlists\":%" PRIu64 ",\"dirlist_ents\":%" PRIu64 ","
228 "\"truncates\":%" PRIu64 ",\"unlinks\":%" PRIu64 ",\"chmods\":%" PRIu64 ","
229 "\"opens\":%" PRIu64 ",\"renames\":%" PRIu64 ","
230 "\"slow_reads\":%" PRIu64 ",\"slow_writes\":%" PRIu64 ",\"slow_stats\":%" PRIu64 ","
231 "\"slow_pgreads\":%" PRIu64 ",\"slow_pgwrites\":%" PRIu64 ",\"slow_readvs\":%" PRIu64 ","
232 "\"slow_readv_segs\":%" PRIu64 ",\"slow_dirlists\":%" PRIu64 ",\"slow_dirlist_ents\":%" PRIu64 ","
233 "\"slow_truncates\":%" PRIu64 ",\"slow_unlinks\":%" PRIu64 ",\"slow_chmods\":%" PRIu64 ","
234 "\"slow_opens\":%" PRIu64 ",\"slow_renames\":%" PRIu64 ","
235 "\"open_t\":%.4f,\"read_t\":%.4f,\"readv_t\":%.4f,"
236 "\"pgread_t\":%.4f,\"write_t\":%.4f,\"pgwrite_t\":%.4f,"
237 "\"dirlist_t\":%.4f,\"stat_t\":%.4f,\"truncate_t\":%.4f,"
238 "\"unlink_t\":%.4f,\"rename_t\":%.4f,\"chmod_t\":%.4f,"
239 "\"slow_open_t\":%.4f,\"slow_read_t\":%.4f,\"slow_readv_t\":%.4f,"
240 "\"slow_pgread_t\":%.4f,\"slow_write_t\":%.4f,\"slow_pgwrite_t\":%.4f,"
241 "\"slow_dirlist_t\":%.4f,\"slow_stat_t\":%.4f,\"slow_truncate_t\":%.4f,"
242 "\"slow_unlink_t\":%.4f,\"slow_rename_t\":%.4f,\"slow_chmod_t\":%.4f"
243 "}",
244 static_cast<uint64_t>(m_ops.m_read_ops), static_cast<uint64_t>(m_ops.m_write_ops), static_cast<uint64_t>(m_ops.m_stat_ops),
245 static_cast<uint64_t>(m_ops.m_pgread_ops), static_cast<uint64_t>(m_ops.m_pgwrite_ops), static_cast<uint64_t>(m_ops.m_readv_ops),
246 static_cast<uint64_t>(m_ops.m_readv_segs), static_cast<uint64_t>(m_ops.m_dirlist_ops), static_cast<uint64_t>(m_ops.m_dirlist_entries),
247 static_cast<uint64_t>(m_ops.m_truncate_ops), static_cast<uint64_t>(m_ops.m_unlink_ops), static_cast<uint64_t>(m_ops.m_chmod_ops),
248 static_cast<uint64_t>(m_ops.m_open_ops), static_cast<uint64_t>(m_ops.m_rename_ops),
249 static_cast<uint64_t>(m_slow_ops.m_read_ops), static_cast<uint64_t>(m_slow_ops.m_write_ops), static_cast<uint64_t>(m_slow_ops.m_stat_ops),
250 static_cast<uint64_t>(m_slow_ops.m_pgread_ops), static_cast<uint64_t>(m_slow_ops.m_pgwrite_ops), static_cast<uint64_t>(m_slow_ops.m_readv_ops),
251 static_cast<uint64_t>(m_slow_ops.m_readv_segs), static_cast<uint64_t>(m_slow_ops.m_dirlist_ops), static_cast<uint64_t>(m_slow_ops.m_dirlist_entries),
252 static_cast<uint64_t>(m_slow_ops.m_truncate_ops), static_cast<uint64_t>(m_slow_ops.m_unlink_ops), static_cast<uint64_t>(m_slow_ops.m_chmod_ops),
253 static_cast<uint64_t>(m_slow_ops.m_open_ops), static_cast<uint64_t>(m_slow_ops.m_rename_ops),
254 static_cast<float>(m_times.m_open)/1e9, static_cast<float>(m_times.m_read)/1e9, static_cast<float>(m_times.m_readv)/1e9,
255 static_cast<float>(m_times.m_pgread)/1e9, static_cast<float>(m_times.m_write)/1e9, static_cast<float>(m_times.m_pgwrite)/1e9,
256 static_cast<float>(m_times.m_dirlist)/1e9, static_cast<float>(m_times.m_stat)/1e9, static_cast<float>(m_times.m_truncate)/1e9,
257 static_cast<float>(m_times.m_unlink)/1e9, static_cast<float>(m_times.m_rename)/1e9, static_cast<float>(m_times.m_chmod)/1e9,
258 static_cast<float>(m_slow_times.m_open)/1e9, static_cast<float>(m_slow_times.m_read)/1e9, static_cast<float>(m_slow_times.m_readv)/1e9,
259 static_cast<float>(m_slow_times.m_pgread)/1e9, static_cast<float>(m_slow_times.m_write)/1e9, static_cast<float>(m_slow_times.m_pgwrite)/1e9,
260 static_cast<float>(m_slow_times.m_dirlist)/1e9, static_cast<float>(m_slow_times.m_stat)/1e9, static_cast<float>(m_slow_times.m_truncate)/1e9,
261 static_cast<float>(m_slow_times.m_unlink)/1e9, static_cast<float>(m_slow_times.m_rename)/1e9, static_cast<float>(m_slow_times.m_chmod)/1e9
262
263 );
264 if (len >= 1500) {
265 m_log.Log(LogMask::Error, "Aggregate", "Failed to generate g-stream statistics packet");
266 return;
267 }
268 m_log.Log(LogMask::Debug, "Aggregate", buf);
269 if (m_gstream && !m_gstream->Insert(buf, len + 1)) {
270 m_log.Log(LogMask::Error, "Aggregate", "Failed to send g-stream statistics packet");
271 return;
272 }
273}
274
275FileSystem::OpTimer::OpTimer(RAtomic_uint64_t &op_count, RAtomic_uint64_t &slow_op_count, RAtomic_uint64_t &timing, RAtomic_uint64_t &slow_timing, std::chrono::steady_clock::duration duration)
276 : m_op_count(op_count),
277 m_slow_op_count(slow_op_count),
278 m_timing(timing),
279 m_slow_timing(slow_timing),
280 m_start(std::chrono::steady_clock::now()),
281 m_slow_duration(duration)
282{}
283
284FileSystem::OpTimer::~OpTimer()
285{
286 auto dur = std::chrono::steady_clock::now() - m_start;
287 m_op_count++;
288 m_timing += std::chrono::nanoseconds(dur).count();
289 if (dur > m_slow_duration) {
290 m_slow_op_count++;
291 m_slow_timing += std::chrono::nanoseconds(dur).count();
292 }
293}
#define stat(a, b)
Definition XrdPosix.hh:101
struct myOpts opts
int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *env=0) override
FileSystem(XrdOss *oss, XrdSysLogger *log, const char *configName, XrdOucEnv *envP)
int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0) override
int StatPF(const char *path, struct stat *buff, int opts) override
int Rename(const char *oPath, const char *nPath, XrdOucEnv *oEnvP=0, XrdOucEnv *nEnvP=0) override
XrdOssDF * newDir(const char *user=0) override
int StatXA(const char *path, char *buff, int &blen, XrdOucEnv *env=0) override
XrdOssDF * newFile(const char *user=0) override
int Chmod(const char *path, mode_t mode, XrdOucEnv *env=0) override
int Unlink(const char *path, int Opts=0, XrdOucEnv *env=0) override
bool InitSuccessful(std::string &errMsg)
int StatFS(const char *path, char *buff, int &blen, XrdOucEnv *env=0) override
int Truncate(const char *path, unsigned long long fsize, XrdOucEnv *env=0) override
bool Config(const char *configfn)
int StatXP(const char *path, unsigned long long &attr, XrdOucEnv *env=0) override
int StatLS(XrdOucEnv &env, const char *path, char *buff, int &blen) override
virtual int StatLS(XrdOucEnv &env, const char *path, char *buff, int &blen)
Definition XrdOss.cc:97
virtual int StatXA(const char *path, char *buff, int &blen, XrdOucEnv *envP=0)
Definition XrdOss.cc:127
virtual int StatXP(const char *path, unsigned long long &attr, XrdOucEnv *envP=0)
Definition XrdOss.cc:137
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Chmod(const char *path, mode_t mode, XrdOucEnv *envP=0)=0
virtual int StatPF(const char *path, struct stat *buff, int opts)
Definition XrdOss.cc:107
virtual int StatVS(XrdOssVSInfo *vsP, const char *sname=0, int updt=0)
Definition XrdOss.cc:117
virtual int StatFS(const char *path, char *buff, int &blen, XrdOucEnv *envP=0)
Definition XrdOss.cc:87
virtual int Rename(const char *oPath, const char *nPath, XrdOucEnv *oEnvP=0, XrdOucEnv *nEnvP=0)=0
virtual int Truncate(const char *path, unsigned long long fsize, XrdOucEnv *envP=0)=0
virtual XrdOssDF * newDir(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual int Unlink(const char *path, int Opts=0, XrdOucEnv *envP=0)=0
void * GetPtr(const char *varname)
Definition XrdOucEnv.cc:263
char * GetToken(char **rest=0, int lowcase=0)
int Gather(const char *cfname, Level lvl, const char *parms=0)
@ trim_lines
Prefix trimmed lines.
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
void Say(const char *text1, const char *text2=0, const char *txt3=0, const char *text4=0, const char *text5=0, const char *txt6=0)
void setMsgMask(int mask)
void Log(int mask, const char *esfx, const char *text1, const char *text2=0, const char *text3=0)
static int Run(pthread_t *, void *(*proc)(void *), void *arg, int opts=0, const char *desc=0)
bool Insert(const char *data, int dlen)
bool ParseDuration(const std::string &duration, std::chrono::steady_clock::duration &result, std::string &errmsg)
std::string LogMaskToString(int mask)