XRootD
Loading...
Searching...
No Matches
XrdPfcFile.cc
Go to the documentation of this file.
1//----------------------------------------------------------------------------------
2// Copyright (c) 2014 by Board of Trustees of the Leland Stanford, Jr., University
3// Author: Alja Mrak-Tadel, Matevz Tadel
4//----------------------------------------------------------------------------------
5// XRootD is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Lesser General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// XRootD is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14//
15// You should have received a copy of the GNU Lesser General Public License
16// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17//----------------------------------------------------------------------------------
18
19
20#include "XrdPfcFile.hh"
21#include "XrdPfc.hh"
23#include "XrdPfcIO.hh"
24#include "XrdPfcTrace.hh"
25
27#include "XrdSys/XrdSysTimer.hh"
28#include "XrdOss/XrdOss.hh"
29#include "XrdOuc/XrdOucEnv.hh"
31
32#include <cstdio>
33#include <sstream>
34#include <fcntl.h>
35#include <cassert>
36
37
38using namespace XrdPfc;
39
40namespace
41{
42
43const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
44
45Cache* cache() { return &Cache::GetInstance(); }
46
47}
48
49const char *File::m_traceID = "File";
50
51//------------------------------------------------------------------------------
52
53File::File(const std::string& path, long long iOffset, long long iFileSize) :
54 m_ref_cnt(0),
55 m_data_file(0),
56 m_info_file(0),
57 m_cfi(Cache::GetInstance().GetTrace(), Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks > 0),
58 m_filename(path),
59 m_offset(iOffset),
60 m_file_size(iFileSize),
61 m_current_io(m_io_set.end()),
62 m_ios_in_detach(0),
63 m_non_flushed_cnt(0),
64 m_in_sync(false),
65 m_detach_time_logged(false),
66 m_in_shutdown(false),
67 m_state_cond(0),
68 m_block_size(0),
69 m_num_blocks(0),
70 m_resmon_token(-1),
71 m_prefetch_state(kOff),
72 m_prefetch_bytes(0),
73 m_prefetch_read_cnt(0),
74 m_prefetch_hit_cnt(0),
75 m_prefetch_score(0)
76{}
77
78File::~File()
79{
80 TRACEF(Debug, "~File() for ");
81}
82
83void File::Close()
84{
85 // Close is called while nullptr is put into Cache::m_active map, see Cache::dec_ref_count(File*).
86 // A stat is called after close to re-check that m_stat_blocks have been reported correctly
87 // to the resource-monitor. Note that the reporting is already clamped down to m_file_size
88 // in report_and_merge_delta_stats() below.
89 //
90 // XFS can pre-allocate significant amount of blocks (1 GB at 1GB mark, 4 GB above 4GB) and those
91 // get reported in as stat.st_blocks.
92 // The reported number is correct in a stat immediately following a close.
93 // If one starts off by writing the last byte of the file, this pre-allocation does not get
94 // triggered up to that point. But comes back with a vengeance right after.
95 //
96 // To be determined if other FSes do something similar (Ceph, ZFS, ...). Ext4 doesn't.
97
98 if (m_info_file)
99 {
100 TRACEF(Debug, "Close() closing info-file ");
101 m_info_file->Close();
102 delete m_info_file;
103 m_info_file = nullptr;
104 }
105
106 if (m_data_file)
107 {
108 TRACEF(Debug, "Close() closing data-file ");
109 m_data_file->Close();
110 delete m_data_file;
111 m_data_file = nullptr;
112 }
113
114 if (m_resmon_token >= 0)
115 {
116 // Last update of file stats has been sent from the final Sync unless we are in_shutdown --
117 // but in this case the file will get unlinked by the cache and reported as purge event.
118 // We check if the reported st_blocks so far is correct.
119 if (m_stats.m_BytesWritten > 0 && ! m_in_shutdown) {
120 struct stat s;
121 int sr = Cache::GetInstance().GetOss()->Stat(m_filename.c_str(), &s);
122 if (sr == 0 && s.st_blocks != m_st_blocks) {
123 Stats stats;
124 stats.m_StBlocksAdded = s.st_blocks - m_st_blocks;
125 m_st_blocks = s.st_blocks;
126 Cache::ResMon().register_file_update_stats(m_resmon_token, stats);
127 }
128 }
129
130 Cache::ResMon().register_file_close(m_resmon_token, time(0), m_stats);
131 }
132
133 TRACEF(Debug, "Close() finished, prefetch score = " << m_prefetch_score);
134}
135
136//------------------------------------------------------------------------------
137
138File* File::FileOpen(const std::string &path, long long offset, long long fileSize)
139{
140 File *file = new File(path, offset, fileSize);
141 if ( ! file->Open())
142 {
143 delete file;
144 file = 0;
145 }
146 return file;
147}
148
149//------------------------------------------------------------------------------
150
152{
153 // Called from Cache::Unlink() when the file is currently open.
154 // Cache::Unlink is also called on FSync error and when wrong number of bytes
155 // is received from a remote read.
156 //
157 // From this point onward the file will not be written to, cinfo file will
158 // not be updated, and all new read requests will return -ENOENT.
159 //
160 // File's entry in the Cache's active map is set to nullptr and will be
161 // removed from there shortly, in any case, well before this File object
162 // shuts down. Cache::Unlink() also reports the appropriate purge event.
163
164 XrdSysCondVarHelper _lck(m_state_cond);
165
166 m_in_shutdown = true;
167
168 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
169 {
170 m_prefetch_state = kStopped;
171 cache()->DeRegisterPrefetchFile(this);
172 }
173
174 report_and_merge_delta_stats();
175
176 return m_st_blocks;
177}
178
179//------------------------------------------------------------------------------
180
181void File::check_delta_stats()
182{
183 // Called under m_state_cond lock.
184 // BytesWritten indirectly trigger an unconditional merge through periodic Sync().
185 if (m_delta_stats.BytesReadAndWritten() >= m_resmon_report_threshold && ! m_in_shutdown)
186 report_and_merge_delta_stats();
187}
188
189void File::report_and_merge_delta_stats()
190{
191 // Called under m_state_cond lock.
192 struct stat s;
193 m_data_file->Fstat(&s);
194 // Do not report st_blocks beyond 4kB round-up over m_file_size. Some FSs report
195 // aggressive pre-allocation in this field (XFS, 4GB).
196 long long max_st_blocks_to_report = (m_file_size & 0xfff) ? ((m_file_size >> 12) + 1) << 3
197 : m_file_size >> 9;
198 long long st_blocks_to_report = std::min((long long) s.st_blocks, max_st_blocks_to_report);
199 m_delta_stats.m_StBlocksAdded = st_blocks_to_report - m_st_blocks;
200 m_st_blocks = st_blocks_to_report;
201 Cache::ResMon().register_file_update_stats(m_resmon_token, m_delta_stats);
202 m_stats.AddUp(m_delta_stats);
203 m_delta_stats.Reset();
204}
205
206//------------------------------------------------------------------------------
207
209{
210 TRACEF(Dump, "BlockRemovedFromWriteQ() block = " << (void*) b << " idx= " << b->m_offset/m_block_size);
211
212 XrdSysCondVarHelper _lck(m_state_cond);
213 dec_ref_count(b);
214}
215
216void File::BlocksRemovedFromWriteQ(std::list<Block*>& blocks)
217{
218 TRACEF(Dump, "BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
219
220 XrdSysCondVarHelper _lck(m_state_cond);
221
222 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
223 {
224 dec_ref_count(*i);
225 }
226}
227
228//------------------------------------------------------------------------------
229
231{
232 std::string loc(io->GetLocation());
233 XrdSysCondVarHelper _lck(m_state_cond);
234 insert_remote_location(loc);
235}
236
237//------------------------------------------------------------------------------
238
240{
241 // Returns true if delay is needed.
242
243 TRACEF(Debug, "ioActive start for io " << io);
244
245 std::string loc(io->GetLocation());
246
247 {
248 XrdSysCondVarHelper _lck(m_state_cond);
249
250 IoSet_i mi = m_io_set.find(io);
251
252 if (mi != m_io_set.end())
253 {
254 unsigned int n_active_reads = io->m_active_read_reqs;
255
256 TRACE(Info, "ioActive for io " << io <<
257 ", active_reads " << n_active_reads <<
258 ", active_prefetches " << io->m_active_prefetches <<
259 ", allow_prefetching " << io->m_allow_prefetching <<
260 ", ios_in_detach " << m_ios_in_detach);
261 TRACEF(Info,
262 "\tio_map.size() " << m_io_set.size() <<
263 ", block_map.size() " << m_block_map.size() << ", file");
264
265 insert_remote_location(loc);
266
267 io->m_allow_prefetching = false;
268 io->m_in_detach = true;
269
270 // Check if any IO is still available for prfetching. If not, stop it.
271 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
272 {
273 if ( ! select_current_io_or_disable_prefetching(false) )
274 {
275 TRACEF(Debug, "ioActive stopping prefetching after io " << io << " retreat.");
276 }
277 }
278
279 // On last IO, consider write queue blocks. Note, this also contains
280 // blocks being prefetched.
281
282 bool io_active_result;
283
284 if (n_active_reads > 0)
285 {
286 io_active_result = true;
287 }
288 else if (m_io_set.size() - m_ios_in_detach == 1)
289 {
290 io_active_result = ! m_block_map.empty();
291 }
292 else
293 {
294 io_active_result = io->m_active_prefetches > 0;
295 }
296
297 if ( ! io_active_result)
298 {
299 ++m_ios_in_detach;
300 }
301
302 TRACEF(Info, "ioActive for io " << io << " returning " << io_active_result << ", file");
303
304 return io_active_result;
305 }
306 else
307 {
308 TRACEF(Error, "ioActive io " << io <<" not found in IoSet. This should not happen.");
309 return false;
310 }
311 }
312}
313
314//------------------------------------------------------------------------------
315
317{
318 XrdSysCondVarHelper _lck(m_state_cond);
319 m_detach_time_logged = false;
320}
321
323{
324 // Returns true if sync is required.
325 // This method is called after corresponding IO is detached from PosixCache.
326
327 XrdSysCondVarHelper _lck(m_state_cond);
328 if ( ! m_in_shutdown)
329 {
330 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
331 {
332 report_and_merge_delta_stats();
333 m_cfi.WriteIOStatDetach(m_stats);
334 m_detach_time_logged = true;
335 m_in_sync = true;
336 TRACEF(Debug, "FinalizeSyncBeforeExit requesting sync to write detach stats");
337 return true;
338 }
339 }
340 TRACEF(Debug, "FinalizeSyncBeforeExit sync not required");
341 return false;
342}
343
344//------------------------------------------------------------------------------
345
347{
348 // Called from Cache::GetFile() when a new IO asks for the file.
349
350 TRACEF(Debug, "AddIO() io = " << (void*)io);
351
352 time_t now = time(0);
353 std::string loc(io->GetLocation());
354
355 m_state_cond.Lock();
356
357 IoSet_i mi = m_io_set.find(io);
358
359 if (mi == m_io_set.end())
360 {
361 m_io_set.insert(io);
362 io->m_attach_time = now;
363 m_delta_stats.IoAttach();
364
365 insert_remote_location(loc);
366
367 if (m_prefetch_state == kStopped)
368 {
369 m_prefetch_state = kOn;
370 cache()->RegisterPrefetchFile(this);
371 }
372 }
373 else
374 {
375 TRACEF(Error, "AddIO() io = " << (void*)io << " already registered.");
376 }
377
378 m_state_cond.UnLock();
379}
380
381//------------------------------------------------------------------------------
382
384{
385 // Called from Cache::ReleaseFile.
386
387 TRACEF(Debug, "RemoveIO() io = " << (void*)io);
388
389 time_t now = time(0);
390
391 m_state_cond.Lock();
392
393 IoSet_i mi = m_io_set.find(io);
394
395 if (mi != m_io_set.end())
396 {
397 if (mi == m_current_io)
398 {
399 ++m_current_io;
400 }
401
402 m_delta_stats.IoDetach(now - io->m_attach_time);
403 m_io_set.erase(mi);
404 --m_ios_in_detach;
405
406 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
407 {
408 TRACEF(Error, "RemoveIO() io = " << (void*)io << " Prefetching is not stopped/complete -- it should be by now.");
409 m_prefetch_state = kStopped;
410 cache()->DeRegisterPrefetchFile(this);
411 }
412 }
413 else
414 {
415 TRACEF(Error, "RemoveIO() io = " << (void*)io << " is NOT registered.");
416 }
417
418 m_state_cond.UnLock();
419}
420
421//------------------------------------------------------------------------------
422
423bool File::Open()
424{
425 // Sets errno accordingly.
426
427 static const char *tpfx = "Open() ";
428
429 TRACEF(Dump, tpfx << "entered");
430
431 // Before touching anything, check with ResourceMonitor if a scan is in progress.
432 // This function will wait internally if needed until it is safe to proceed.
433 Cache::ResMon().CrossCheckIfScanIsInProgress(m_filename, m_state_cond);
434
436
437 XrdOss &myOss = * Cache::GetInstance().GetOss();
438 const char *myUser = conf.m_username.c_str();
439 XrdOucEnv myEnv;
440 struct stat data_stat, info_stat;
441
442 std::string ifn = m_filename + Info::s_infoExtension;
443
444 bool data_existed = (myOss.Stat(m_filename.c_str(), &data_stat) == XrdOssOK);
445 bool info_existed = (myOss.Stat(ifn.c_str(), &info_stat) == XrdOssOK);
446
447 // Create the data file itself.
448 char size_str[32]; sprintf(size_str, "%lld", m_file_size);
449 myEnv.Put("oss.asize", size_str);
450 myEnv.Put("oss.cgroup", conf.m_data_space.c_str());
451
452 int res;
453
454 if ((res = myOss.Create(myUser, m_filename.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
455 {
456 TRACEF(Error, tpfx << "Create failed " << ERRNO_AND_ERRSTR(-res));
457 errno = -res;
458 return false;
459 }
460
461 m_data_file = myOss.newFile(myUser);
462 if ((res = m_data_file->Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
463 {
464 TRACEF(Error, tpfx << "Open failed " << ERRNO_AND_ERRSTR(-res));
465 errno = -res;
466 delete m_data_file; m_data_file = 0;
467 return false;
468 }
469
470 myEnv.Put("oss.asize", "64k"); // Advisory, block-map and access list lengths vary.
471 myEnv.Put("oss.cgroup", conf.m_meta_space.c_str());
472 if ((res = myOss.Create(myUser, ifn.c_str(), 0600, myEnv, XRDOSS_mkpath)) != XrdOssOK)
473 {
474 TRACE(Error, tpfx << "Create failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
475 errno = -res;
476 m_data_file->Close(); delete m_data_file; m_data_file = 0;
477 return false;
478 }
479
480 m_info_file = myOss.newFile(myUser);
481 if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) != XrdOssOK)
482 {
483 TRACEF(Error, tpfx << "Failed for info file " << ifn << ERRNO_AND_ERRSTR(-res));
484 errno = -res;
485 delete m_info_file; m_info_file = 0;
486 m_data_file->Close(); delete m_data_file; m_data_file = 0;
487 return false;
488 }
489
490 bool initialize_info_file = true;
491
492 if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
493 {
494 TRACEF(Debug, tpfx << "Reading existing info file. (data_existed=" << data_existed <<
495 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
496 ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() << ")");
497
498 // Check if data file exists and is of reasonable size.
499 if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
500 {
501 initialize_info_file = false;
502 } else {
503 TRACEF(Warning, tpfx << "Basic sanity checks on data file failed, resetting info file, truncating data file.");
504 m_cfi.ResetAllAccessStats();
505 m_data_file->Ftruncate(0);
506 Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
507 }
508 }
509
510 if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.get_cs_Chk())
511 {
513 conf.should_uvkeep_purge(time(0) - m_cfi.GetNoCkSumTimeForUVKeep()))
514 {
515 TRACEF(Info, tpfx << "Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
516 initialize_info_file = true;
517 m_cfi.ResetAllAccessStats();
518 m_data_file->Ftruncate(0);
519 Cache::ResMon().register_file_purge(m_filename, data_stat.st_blocks);
520 } else {
521 // TODO: If the file is complete, we don't need to reset net cksums.
522 m_cfi.DowngradeCkSumState(conf.get_cs_Chk());
523 }
524 }
525
526 if (initialize_info_file)
527 {
528 m_cfi.SetBufferSizeFileSizeAndCreationTime(conf.m_bufferSize, m_file_size);
529 m_cfi.SetCkSumState(conf.get_cs_Chk());
530 m_cfi.ResetNoCkSumTime();
531 m_cfi.Write(m_info_file, ifn.c_str());
532 m_info_file->Fsync();
533 cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size);
534 TRACEF(Debug, tpfx << "Creating new file info, data size = " << m_file_size << " num blocks = " << m_cfi.GetNBlocks());
535 }
536 else
537 {
538 if (futimens(m_info_file->getFD(), NULL)) {
539 TRACEF(Error, tpfx << "failed setting modification time " << ERRNO_AND_ERRSTR(errno));
540 }
541 }
542
543 m_cfi.WriteIOStatAttach();
544 m_state_cond.Lock();
545 m_block_size = m_cfi.GetBufferSize();
546 m_num_blocks = m_cfi.GetNBlocks();
547 m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped; // Will engage in AddIO().
548
549 m_data_file->Fstat(&data_stat);
550 m_st_blocks = data_stat.st_blocks;
551
552 m_resmon_token = Cache::ResMon().register_file_open(m_filename, time(0), data_existed);
553 constexpr long long MB = 1024 * 1024;
554 m_resmon_report_threshold = std::min(std::max(10 * MB, m_file_size / 20), 500 * MB);
555 // m_resmon_report_threshold_scaler; // something like 10% of original threshold, to adjust
556 // actual threshold based on return values from register_file_update_stats().
557
558 m_state_cond.UnLock();
559
560 return true;
561}
562
563int File::Fstat(struct stat &sbuff)
564{
565 // Stat on an open file.
566 // Corrects size to actual full size of the file.
567 // Sets atime to 0 if the file is only partially downloaded, in accordance
568 // with pfc.onlyifcached settings.
569 // Called from IO::Fstat() and Cache::Stat() when the file is active.
570 // Returns 0 on success, -errno on error.
571
572 int res;
573
574 if ((res = m_data_file->Fstat(&sbuff))) return res;
575
576 sbuff.st_size = m_file_size;
577
578 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
579 if ( ! is_cached)
580 sbuff.st_atime = 0;
581
582 return 0;
583}
584
585//==============================================================================
586// Read and helpers
587//==============================================================================
588
589bool File::overlap(int blk, // block to query
590 long long blk_size, //
591 long long req_off, // offset of user request
592 int req_size, // size of user request
593 // output:
594 long long &off, // offset in user buffer
595 long long &blk_off, // offset in block
596 int &size) // size to copy
597{
598 const long long beg = blk * blk_size;
599 const long long end = beg + blk_size;
600 const long long req_end = req_off + req_size;
601
602 if (req_off < end && req_end > beg)
603 {
604 const long long ovlp_beg = std::max(beg, req_off);
605 const long long ovlp_end = std::min(end, req_end);
606
607 off = ovlp_beg - req_off;
608 blk_off = ovlp_beg - beg;
609 size = (int) (ovlp_end - ovlp_beg);
610
611 assert(size <= blk_size);
612 return true;
613 }
614 else
615 {
616 return false;
617 }
618}
619
620//------------------------------------------------------------------------------
621
622Block* File::PrepareBlockRequest(int i, IO *io, void *req_id, bool prefetch)
623{
624 // Must be called w/ state_cond locked.
625 // Checks on size etc should be done before.
626 //
627 // Reference count is 0 so increase it in calling function if you want to
628 // catch the block while still in memory.
629
630 const long long off = i * m_block_size;
631 const int last_block = m_num_blocks - 1;
632 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
633
634 int blk_size, req_size;
635 if (i == last_block) {
636 blk_size = req_size = m_file_size - off;
637 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
638 } else {
639 blk_size = req_size = m_block_size;
640 }
641
642 Block *b = 0;
643 char *buf = cache()->RequestRAM(req_size);
644
645 if (buf)
646 {
647 b = new (std::nothrow) Block(this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
648
649 if (b)
650 {
651 m_block_map[i] = b;
652
653 // Actual Read request is issued in ProcessBlockRequests().
654
655 if (m_prefetch_state == kOn && (int) m_block_map.size() >= Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
656 {
657 m_prefetch_state = kHold;
658 cache()->DeRegisterPrefetchFile(this);
659 }
660 }
661 else
662 {
663 TRACEF(Dump, "PrepareBlockRequest() " << i << " prefetch " << prefetch << ", allocation failed.");
664 }
665 }
666
667 return b;
668}
669
670void File::ProcessBlockRequest(Block *b)
671{
672 // This *must not* be called with block_map locked.
673
675
676 if (XRD_TRACE What >= TRACE_Dump) {
677 char buf[256];
678 snprintf(buf, 256, "idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
679 b->get_offset()/m_block_size, b, b->m_prefetch, b->get_offset(), b->get_req_size(), b->get_buff(), brh);
680 TRACEF(Dump, "ProcessBlockRequest() " << buf);
681 }
682
683 if (b->req_cksum_net())
684 {
685 b->get_io()->GetInput()->pgRead(*brh, b->get_buff(), b->get_offset(), b->get_req_size(),
686 b->ref_cksum_vec(), 0, b->ptr_n_cksum_errors());
687 } else {
688 b->get_io()->GetInput()-> Read(*brh, b->get_buff(), b->get_offset(), b->get_size());
689 }
690}
691
692void File::ProcessBlockRequests(BlockList_t& blks)
693{
694 // This *must not* be called with block_map locked.
695
696 for (BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
697 {
698 ProcessBlockRequest(*bi);
699 }
700}
701
702//------------------------------------------------------------------------------
703
704void File::RequestBlocksDirect(IO *io, ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec, int expected_size)
705{
706 int n_chunks = ioVec.size();
707 int n_vec_reads = (n_chunks - 1) / XrdProto::maxRvecsz + 1;
708
709 TRACEF(DumpXL, "RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
710 ", total_size = " << expected_size << ", n_vec_reads = " << n_vec_reads);
711
712 DirectResponseHandler *handler = new DirectResponseHandler(this, read_req, n_vec_reads);
713
714 int pos = 0;
715 while (n_chunks > XrdProto::maxRvecsz) {
716 io->GetInput()->ReadV( *handler, ioVec.data() + pos, XrdProto::maxRvecsz);
717 pos += XrdProto::maxRvecsz;
718 n_chunks -= XrdProto::maxRvecsz;
719 }
720 io->GetInput()->ReadV( *handler, ioVec.data() + pos, n_chunks);
721}
722
723//------------------------------------------------------------------------------
724
725int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec, int expected_size)
726{
727 TRACEF(DumpXL, "ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (int) ioVec.size() << ", total_size = " << expected_size);
728
729 long long rs = m_data_file->ReadV(ioVec.data(), (int) ioVec.size());
730
731 if (rs < 0)
732 {
733 TRACEF(Error, "ReadBlocksFromDisk neg retval = " << rs);
734 return rs;
735 }
736
737 if (rs != expected_size)
738 {
739 TRACEF(Error, "ReadBlocksFromDisk incomplete size = " << rs);
740 return -EIO;
741 }
742
743 return (int) rs;
744}
745
746//------------------------------------------------------------------------------
747
748int File::Read(IO *io, char* iUserBuff, long long iUserOff, int iUserSize, ReadReqRH *rh)
749{
750 // rrc_func is ONLY called from async processing.
751 // If this function returns anything other than -EWOULDBLOCK, rrc_func needs to be called by the caller.
752 // This streamlines implementation of synchronous IO::Read().
753
754 TRACEF(Dump, "Read() sid: " << Xrd::hex1 << rh->m_seq_id << " size: " << iUserSize);
755
756 m_state_cond.Lock();
757
758 if (m_in_shutdown || io->m_in_detach)
759 {
760 m_state_cond.UnLock();
761 return m_in_shutdown ? -ENOENT : -EBADF;
762 }
763
764 // Shortcut -- file is fully downloaded.
765
766 if (m_cfi.IsComplete())
767 {
768 m_state_cond.UnLock();
769 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
770 if (ret > 0) {
771 XrdSysCondVarHelper _lck(m_state_cond);
772 m_delta_stats.AddBytesHit(ret);
773 check_delta_stats();
774 }
775 return ret;
776 }
777
778 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
779
780 return ReadOpusCoalescere(io, &readV, 1, rh, "Read() ");
781}
782
783//------------------------------------------------------------------------------
784
785int File::ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
786{
787 TRACEF(Dump, "ReadV() for " << readVnum << " chunks.");
788
789 m_state_cond.Lock();
790
791 if (m_in_shutdown || io->m_in_detach)
792 {
793 m_state_cond.UnLock();
794 return m_in_shutdown ? -ENOENT : -EBADF;
795 }
796
797 // Shortcut -- file is fully downloaded.
798
799 if (m_cfi.IsComplete())
800 {
801 m_state_cond.UnLock();
802 int ret = m_data_file->ReadV(const_cast<XrdOucIOVec*>(readV), readVnum);
803 if (ret > 0) {
804 XrdSysCondVarHelper _lck(m_state_cond);
805 m_delta_stats.AddBytesHit(ret);
806 check_delta_stats();
807 }
808 return ret;
809 }
810
811 return ReadOpusCoalescere(io, readV, readVnum, rh, "ReadV() ");
812}
813
814//------------------------------------------------------------------------------
815
816int File::ReadOpusCoalescere(IO *io, const XrdOucIOVec *readV, int readVnum,
817 ReadReqRH *rh, const char *tpfx)
818{
819 // Non-trivial processing for Read and ReadV.
820 // Entered under lock.
821 //
822 // loop over reqired blocks:
823 // - if on disk, ok;
824 // - if in ram or incoming, inc ref-count
825 // - otherwise request and inc ref count (unless RAM full => request direct)
826 // unlock
827
828 int prefetch_cnt = 0;
829
830 ReadRequest *read_req = nullptr;
831 BlockList_t blks_to_request; // blocks we are issuing a new remote request for
832
833 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
834
835 std::vector<XrdOucIOVec> iovec_disk;
836 std::vector<XrdOucIOVec> iovec_direct;
837 int iovec_disk_total = 0;
838 int iovec_direct_total = 0;
839
840 for (int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
841 {
842 const XrdOucIOVec &iov = readV[iov_idx];
843 long long iUserOff = iov.offset;
844 int iUserSize = iov.size;
845 char *iUserBuff = iov.data;
846
847 const int idx_first = iUserOff / m_block_size;
848 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
849
850 TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx_first: " << idx_first << " idx_last: " << idx_last);
851
852 enum LastBlock_e { LB_other, LB_disk, LB_direct };
853
854 LastBlock_e lbe = LB_other;
855
856 for (int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
857 {
858 TRACEF(DumpXL, tpfx << "sid: " << Xrd::hex1 << rh->m_seq_id << " idx: " << block_idx);
859 BlockMap_i bi = m_block_map.find(block_idx);
860
861 // overlap and read
862 long long off; // offset in user buffer
863 long long blk_off; // offset in block
864 int size; // size to copy
865
866 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
867
868 // In RAM or incoming?
869 if (bi != m_block_map.end())
870 {
871 inc_ref_count(bi->second);
872 TRACEF(Dump, tpfx << (void*) iUserBuff << " inc_ref_count for existing block " << bi->second << " idx = " << block_idx);
873
874 if (bi->second->is_finished())
875 {
876 // note, blocks with error should not be here !!!
877 // they should be either removed or reissued in ProcessBlockResponse()
878 assert(bi->second->is_ok());
879
880 blks_ready[bi->second].emplace_back( ChunkRequest(nullptr, iUserBuff + off, blk_off, size) );
881
882 if (bi->second->m_prefetch)
883 ++prefetch_cnt;
884 }
885 else
886 {
887 if ( ! read_req)
888 read_req = new ReadRequest(io, rh);
889
890 // We have a lock on state_cond --> as we register the request before releasing the lock,
891 // we are sure to get a call-in via the ChunkRequest handling when this block arrives.
892
893 bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
894 ++read_req->m_n_chunk_reqs;
895 }
896
897 lbe = LB_other;
898 }
899 // On disk?
900 else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
901 {
902 TRACEF(DumpXL, tpfx << "read from disk " << (void*)iUserBuff << " idx = " << block_idx);
903
904 if (lbe == LB_disk)
905 iovec_disk.back().size += size;
906 else
907 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
908 iovec_disk_total += size;
909
910 if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
911 ++prefetch_cnt;
912
913 lbe = LB_disk;
914 }
915 // Neither ... then we have to go get it ...
916 else
917 {
918 if ( ! read_req)
919 read_req = new ReadRequest(io, rh);
920
921 // Is there room for one more RAM Block?
922 Block *b = PrepareBlockRequest(block_idx, io, read_req, false);
923 if (b)
924 {
925 TRACEF(Dump, tpfx << "inc_ref_count new " << (void*)iUserBuff << " idx = " << block_idx);
926 inc_ref_count(b);
927 blks_to_request.push_back(b);
928
929 b->m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
930 ++read_req->m_n_chunk_reqs;
931
932 lbe = LB_other;
933 }
934 else // Nope ... read this directly without caching.
935 {
936 TRACEF(DumpXL, tpfx << "direct block " << block_idx << ", blk_off " << blk_off << ", size " << size);
937
938 iovec_direct_total += size;
939 read_req->m_direct_done = false;
940
941 // Make sure we do not issue a ReadV with chunk size above XrdProto::maxRVdsz.
942 // Number of actual ReadVs issued so as to not exceed the XrdProto::maxRvecsz limit
943 // is determined in the RequestBlocksDirect().
944 if (lbe == LB_direct && iovec_direct.back().size + size <= XrdProto::maxRVdsz) {
945 iovec_direct.back().size += size;
946 } else {
947 long long in_offset = block_idx * m_block_size + blk_off;
948 char *out_pos = iUserBuff + off;
949 while (size > XrdProto::maxRVdsz) {
950 iovec_direct.push_back( { in_offset, XrdProto::maxRVdsz, 0, out_pos } );
951 in_offset += XrdProto::maxRVdsz;
952 out_pos += XrdProto::maxRVdsz;
953 size -= XrdProto::maxRVdsz;
954 }
955 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
956 }
957
958 lbe = LB_direct;
959 }
960 }
961 } // end for over blocks in an IOVec
962 } // end for over readV IOVec
963
964 inc_prefetch_hit_cnt(prefetch_cnt);
965
966 m_state_cond.UnLock();
967
968 // First, send out remote requests for new blocks.
969 if ( ! blks_to_request.empty())
970 {
971 ProcessBlockRequests(blks_to_request);
972 blks_to_request.clear();
973 }
974
975 // Second, send out remote direct read requests.
976 if ( ! iovec_direct.empty())
977 {
978 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
979
980 TRACEF(Dump, tpfx << "direct read requests sent out, n_chunks = " << (int) iovec_direct.size() << ", total_size = " << iovec_direct_total);
981 }
982
983 // Begin synchronous part where we process data that is already in RAM or on disk.
984
985 long long bytes_read = 0;
986 int error_cond = 0; // to be set to -errno
987
988 // Third, process blocks that are available in RAM.
989 if ( ! blks_ready.empty())
990 {
991 for (auto &bvi : blks_ready)
992 {
993 for (auto &cr : bvi.second)
994 {
995 TRACEF(DumpXL, tpfx << "ub=" << (void*)cr.m_buf << " from pre-finished block " << bvi.first->m_offset/m_block_size << " size " << cr.m_size);
996 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
997 bytes_read += cr.m_size;
998 }
999 }
1000 }
1001
1002 // Fourth, read blocks from disk.
1003 if ( ! iovec_disk.empty())
1004 {
1005 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
1006 TRACEF(DumpXL, tpfx << "from disk finished size = " << rc);
1007 if (rc >= 0)
1008 {
1009 bytes_read += rc;
1010 }
1011 else
1012 {
1013 error_cond = rc;
1014 TRACEF(Error, tpfx << "failed read from disk");
1015 }
1016 }
1017
1018 // End synchronous part -- update with sync stats and determine actual state of this read.
1019 // Note: remote reads might have already finished during disk-read!
1020
1021 m_state_cond.Lock();
1022
1023 for (auto &bvi : blks_ready)
1024 dec_ref_count(bvi.first, (int) bvi.second.size());
1025
1026 if (read_req)
1027 {
1028 read_req->m_bytes_read += bytes_read;
1029 if (error_cond)
1030 read_req->update_error_cond(error_cond);
1031 read_req->m_stats.m_BytesHit += bytes_read;
1032 read_req->m_sync_done = true;
1033
1034 if (read_req->is_complete())
1035 {
1036 // Almost like FinalizeReadRequest(read_req) -- but no callout!
1037 m_delta_stats.AddReadStats(read_req->m_stats);
1038 check_delta_stats();
1039 m_state_cond.UnLock();
1040
1041 int ret = read_req->return_value();
1042 delete read_req;
1043 return ret;
1044 }
1045 else
1046 {
1047 m_state_cond.UnLock();
1048 return -EWOULDBLOCK;
1049 }
1050 }
1051 else
1052 {
1053 m_delta_stats.m_BytesHit += bytes_read;
1054 check_delta_stats();
1055 m_state_cond.UnLock();
1056
1057 // !!! No callout.
1058
1059 return error_cond ? error_cond : bytes_read;
1060 }
1061}
1062
1063
1064//==============================================================================
1065// WriteBlock and Sync
1066//==============================================================================
1067
1069{
1070 // write block buffer into disk file
1071 long long offset = b->m_offset - m_offset;
1072 long long size = b->get_size();
1073 ssize_t retval;
1074
1075 if (m_cfi.IsCkSumCache())
1076 if (b->has_cksums())
1077 retval = m_data_file->pgWrite(b->get_buff(), offset, size, b->ref_cksum_vec().data(), 0);
1078 else
1079 retval = m_data_file->pgWrite(b->get_buff(), offset, size, 0, 0);
1080 else
1081 retval = m_data_file->Write(b->get_buff(), offset, size);
1082
1083 if (retval < size)
1084 {
1085 if (retval < 0) {
1086 TRACEF(Error, "WriteToDisk() write error " << retval);
1087 } else {
1088 TRACEF(Error, "WriteToDisk() incomplete block write ret=" << retval << " (should be " << size << ")");
1089 }
1090
1091 XrdSysCondVarHelper _lck(m_state_cond);
1092
1093 dec_ref_count(b);
1094
1095 return;
1096 }
1097
1098 const int blk_idx = (b->m_offset - m_offset) / m_block_size;
1099
1100 // Set written bit.
1101 TRACEF(Dump, "WriteToDisk() success set bit for block " << b->m_offset << " size=" << size);
1102
1103 bool schedule_sync = false;
1104 {
1105 XrdSysCondVarHelper _lck(m_state_cond);
1106
1107 m_cfi.SetBitWritten(blk_idx);
1108
1109 if (b->m_prefetch)
1110 {
1111 m_cfi.SetBitPrefetch(blk_idx);
1112 }
1113 if (b->req_cksum_net() && ! b->has_cksums() && m_cfi.IsCkSumNet())
1114 {
1115 m_cfi.ResetCkSumNet();
1116 }
1117
1118 dec_ref_count(b);
1119
1120 // Set synced bit or stash block index if in actual sync.
1121 // Synced state is only written out to cinfo file when data file is synced.
1122 if (m_in_sync)
1123 {
1124 m_writes_during_sync.push_back(blk_idx);
1125 }
1126 else
1127 {
1128 m_cfi.SetBitSynced(blk_idx);
1129 ++m_non_flushed_cnt;
1130 if ((m_cfi.IsComplete() || m_non_flushed_cnt >= Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1131 ! m_in_shutdown)
1132 {
1133 schedule_sync = true;
1134 m_in_sync = true;
1135 m_non_flushed_cnt = 0;
1136 }
1137 }
1138 }
1139
1140 if (schedule_sync)
1141 {
1142 cache()->ScheduleFileSync(this);
1143 }
1144}
1145
1146//------------------------------------------------------------------------------
1147
1149{
1150 TRACEF(Dump, "Sync()");
1151
1152 int ret = m_data_file->Fsync();
1153 bool errorp = false;
1154 if (ret == XrdOssOK)
1155 {
1156 Stats loc_stats;
1157 {
1158 XrdSysCondVarHelper _lck(&m_state_cond);
1159 report_and_merge_delta_stats();
1160 loc_stats = m_stats;
1161 }
1162 m_cfi.WriteIOStat(loc_stats);
1163 m_cfi.Write(m_info_file, m_filename.c_str());
1164 int cret = m_info_file->Fsync();
1165 if (cret != XrdOssOK)
1166 {
1167 TRACEF(Error, "Sync cinfo file sync error " << cret);
1168 errorp = true;
1169 }
1170 }
1171 else
1172 {
1173 TRACEF(Error, "Sync data file sync error " << ret << ", cinfo file has not been updated");
1174 errorp = true;
1175 }
1176
1177 if (errorp)
1178 {
1179 TRACEF(Error, "Sync failed, unlinking local files and initiating shutdown of File object");
1180
1181 // Unlink will also call this->initiate_emergency_shutdown()
1182 Cache::GetInstance().UnlinkFile(m_filename, false);
1183
1184 XrdSysCondVarHelper _lck(&m_state_cond);
1185
1186 m_writes_during_sync.clear();
1187 m_in_sync = false;
1188
1189 return;
1190 }
1191
1192 int written_while_in_sync;
1193 bool resync = false;
1194 {
1195 XrdSysCondVarHelper _lck(&m_state_cond);
1196 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1197 {
1198 m_cfi.SetBitSynced(*i);
1199 }
1200 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1201 m_writes_during_sync.clear();
1202
1203 // If there were writes during sync and the file is now complete,
1204 // let us call Sync again without resetting the m_in_sync flag.
1205 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1206 resync = true;
1207 else
1208 m_in_sync = false;
1209 }
1210 TRACEF(Dump, "Sync "<< written_while_in_sync << " blocks written during sync." << (resync ? " File is now complete - resyncing." : ""));
1211
1212 if (resync)
1213 Sync();
1214}
1215
1216
1217//==============================================================================
1218// Block processing
1219//==============================================================================
1220
1221void File::free_block(Block* b)
1222{
1223 // Method always called under lock.
1224 int i = b->m_offset / m_block_size;
1225 TRACEF(Dump, "free_block block " << b << " idx = " << i);
1226 size_t ret = m_block_map.erase(i);
1227 if (ret != 1)
1228 {
1229 // assert might be a better option than a warning
1230 TRACEF(Error, "free_block did not erase " << i << " from map");
1231 }
1232 else
1233 {
1234 cache()->ReleaseRAM(b->m_buff, b->m_req_size);
1235 delete b;
1236 }
1237
1238 if (m_prefetch_state == kHold && (int) m_block_map.size() < Cache::GetInstance().RefConfiguration().m_prefetch_max_blocks)
1239 {
1240 m_prefetch_state = kOn;
1241 cache()->RegisterPrefetchFile(this);
1242 }
1243}
1244
1245//------------------------------------------------------------------------------
1246
1247bool File::select_current_io_or_disable_prefetching(bool skip_current)
1248{
1249 // Method always called under lock. It also expects prefetch to be active.
1250
1251 int io_size = (int) m_io_set.size();
1252 bool io_ok = false;
1253
1254 if (io_size == 1)
1255 {
1256 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1257 if (io_ok)
1258 {
1259 m_current_io = m_io_set.begin();
1260 }
1261 }
1262 else if (io_size > 1)
1263 {
1264 IoSet_i mi = m_current_io;
1265 if (skip_current && mi != m_io_set.end()) ++mi;
1266
1267 for (int i = 0; i < io_size; ++i)
1268 {
1269 if (mi == m_io_set.end()) mi = m_io_set.begin();
1270
1271 if ((*mi)->m_allow_prefetching)
1272 {
1273 m_current_io = mi;
1274 io_ok = true;
1275 break;
1276 }
1277 ++mi;
1278 }
1279 }
1280
1281 if ( ! io_ok)
1282 {
1283 m_current_io = m_io_set.end();
1284 m_prefetch_state = kStopped;
1285 cache()->DeRegisterPrefetchFile(this);
1286 }
1287
1288 return io_ok;
1289}
1290
1291//------------------------------------------------------------------------------
1292
1293void File::ProcessDirectReadFinished(ReadRequest *rreq, int bytes_read, int error_cond)
1294{
1295 // Called from DirectResponseHandler.
1296 // NOT under lock.
1297
1298 if (error_cond)
1299 TRACEF(Error, "Read(), direct read finished with error " << -error_cond << " " << XrdSysE2T(-error_cond));
1300
1301 m_state_cond.Lock();
1302
1303 if (error_cond)
1304 rreq->update_error_cond(error_cond);
1305 else {
1306 rreq->m_stats.m_BytesBypassed += bytes_read;
1307 rreq->m_bytes_read += bytes_read;
1308 }
1309
1310 rreq->m_direct_done = true;
1311
1312 bool rreq_complete = rreq->is_complete();
1313
1314 m_state_cond.UnLock();
1315
1316 if (rreq_complete)
1317 FinalizeReadRequest(rreq);
1318}
1319
1320void File::ProcessBlockError(Block *b, ReadRequest *rreq)
1321{
1322 // Called from ProcessBlockResponse().
1323 // YES under lock -- we have to protect m_block_map for recovery through multiple IOs.
1324 // Does not manage m_read_req.
1325 // Will not complete the request.
1326
1327 TRACEF(Debug, "ProcessBlockError() io " << b->m_io << ", block "<< b->m_offset/m_block_size <<
1328 " finished with error " << -b->get_error() << " " << XrdSysE2T(-b->get_error()));
1329
1330 rreq->update_error_cond(b->get_error());
1331 --rreq->m_n_chunk_reqs;
1332
1333 dec_ref_count(b);
1334}
1335
1336void File::ProcessBlockSuccess(Block *b, ChunkRequest &creq)
1337{
1338 // Called from ProcessBlockResponse().
1339 // NOT under lock as it does memcopy ofor exisf block data.
1340 // Acquires lock for block, m_read_req and rreq state update.
1341
1342 ReadRequest *rreq = creq.m_read_req;
1343
1344 TRACEF(Dump, "ProcessBlockSuccess() ub=" << (void*)creq.m_buf << " from finished block " << b->m_offset/m_block_size << " size " << creq.m_size);
1345 memcpy(creq.m_buf, b->m_buff + creq.m_off, creq.m_size);
1346
1347 m_state_cond.Lock();
1348
1349 rreq->m_bytes_read += creq.m_size;
1350
1351 if (b->get_req_id() == (void*) rreq)
1352 rreq->m_stats.m_BytesMissed += creq.m_size;
1353 else
1354 rreq->m_stats.m_BytesHit += creq.m_size;
1355
1356 --rreq->m_n_chunk_reqs;
1357
1358 if (b->m_prefetch)
1359 inc_prefetch_hit_cnt(1);
1360
1361 dec_ref_count(b);
1362
1363 bool rreq_complete = rreq->is_complete();
1364
1365 m_state_cond.UnLock();
1366
1367 if (rreq_complete)
1368 FinalizeReadRequest(rreq);
1369}
1370
1371void File::FinalizeReadRequest(ReadRequest *rreq)
1372{
1373 // called from ProcessBlockResponse()
1374 // NOT under lock -- does callout
1375 {
1376 XrdSysCondVarHelper _lck(m_state_cond);
1377 m_delta_stats.AddReadStats(rreq->m_stats);
1378 check_delta_stats();
1379 }
1380
1381 rreq->m_rh->Done(rreq->return_value());
1382 delete rreq;
1383}
1384
1385void File::ProcessBlockResponse(Block *b, int res)
1386{
1387 static const char* tpfx = "ProcessBlockResponse ";
1388
1389 TRACEF(Dump, tpfx << "block=" << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset << ", res=" << res);
1390
1391 if (res >= 0 && res != b->get_size())
1392 {
1393 // Incorrect number of bytes received, apparently size of the file on the remote
1394 // is different than what the cache expects it to be.
1395 TRACEF(Error, tpfx << "Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1396 Cache::GetInstance().UnlinkFile(m_filename, false);
1397 }
1398
1399 m_state_cond.Lock();
1400
1401 // Deregister block from IO's prefetch count, if needed.
1402 if (b->m_prefetch)
1403 {
1404 IO *io = b->get_io();
1405 IoSet_i mi = m_io_set.find(io);
1406 if (mi != m_io_set.end())
1407 {
1408 --io->m_active_prefetches;
1409
1410 // If failed and IO is still prefetching -- disable prefetching on this IO.
1411 if (res < 0 && io->m_allow_prefetching)
1412 {
1413 TRACEF(Debug, tpfx << "after failed prefetch on io " << io << " disabling prefetching on this io.");
1414 io->m_allow_prefetching = false;
1415
1416 // Check if any IO is still available for prfetching. If not, stop it.
1417 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1418 {
1419 if ( ! select_current_io_or_disable_prefetching(false) )
1420 {
1421 TRACEF(Debug, tpfx << "stopping prefetching after io " << b->get_io() << " marked as bad.");
1422 }
1423 }
1424 }
1425
1426 // If failed with no subscribers -- delete the block and exit.
1427 if (b->m_refcnt == 0 && (res < 0 || m_in_shutdown))
1428 {
1429 free_block(b);
1430 m_state_cond.UnLock();
1431 return;
1432 }
1433 m_prefetch_bytes += b->get_size();
1434 }
1435 else
1436 {
1437 TRACEF(Error, tpfx << "io " << b->get_io() << " not found in IoSet.");
1438 }
1439 }
1440
1441 if (res == b->get_size())
1442 {
1443 b->set_downloaded();
1444 TRACEF(Dump, tpfx << "inc_ref_count idx=" << b->m_offset/m_block_size);
1445 if ( ! m_in_shutdown)
1446 {
1447 // Increase ref-count for the writer.
1448 inc_ref_count(b);
1449 m_delta_stats.AddWriteStats(b->get_size(), b->get_n_cksum_errors());
1450 // No check for writes, report-and-merge forced during Sync().
1451 cache()->AddWriteTask(b, true);
1452 }
1453
1454 // Swap chunk-reqs vector out of Block, it will be processed outside of lock.
1455 vChunkRequest_t creqs_to_notify;
1456 creqs_to_notify.swap( b->m_chunk_reqs );
1457
1458 m_state_cond.UnLock();
1459
1460 for (auto &creq : creqs_to_notify)
1461 {
1462 ProcessBlockSuccess(b, creq);
1463 }
1464 }
1465 else
1466 {
1467 if (res < 0) {
1468 bool new_error = b->get_io()->register_block_error(res);
1469 int tlvl = new_error ? TRACE_Error : TRACE_Debug;
1470 TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1471 << ", io=" << b->get_io() << ", error=" << res);
1472 } else {
1473 bool first_p = b->get_io()->register_incomplete_read();
1474 int tlvl = first_p ? TRACE_Error : TRACE_Debug;
1475 TRACEF_INT(tlvl, tpfx << "block " << b << ", idx=" << b->m_offset/m_block_size << ", off=" << b->m_offset
1476 << ", io=" << b->get_io() << " incomplete, got " << res << " expected " << b->get_size());
1477#if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1478 res = -EIO;
1479#else
1480 res = -EREMOTEIO;
1481#endif
1482 }
1483 b->set_error(res);
1484
1485 // Loop over Block's chunk-reqs vector, error out ones with the same IO.
1486 // Collect others with a different IO, the first of them will be used to reissue the request.
1487 // This is then done outside of lock.
1488 std::list<ReadRequest*> rreqs_to_complete;
1489 vChunkRequest_t creqs_to_keep;
1490
1491 for(ChunkRequest &creq : b->m_chunk_reqs)
1492 {
1493 ReadRequest *rreq = creq.m_read_req;
1494
1495 if (rreq->m_io == b->get_io())
1496 {
1497 ProcessBlockError(b, rreq);
1498 if (rreq->is_complete())
1499 {
1500 rreqs_to_complete.push_back(rreq);
1501 }
1502 }
1503 else
1504 {
1505 creqs_to_keep.push_back(creq);
1506 }
1507 }
1508
1509 bool reissue = false;
1510 if ( ! creqs_to_keep.empty())
1511 {
1512 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1513
1514 TRACEF(Debug, "ProcessBlockResponse() requested block " << (void*)b << " failed with another io " <<
1515 b->get_io() << " - reissuing request with my io " << rreq->m_io);
1516
1517 b->reset_error_and_set_io(rreq->m_io, rreq);
1518 b->m_chunk_reqs.swap( creqs_to_keep );
1519 reissue = true;
1520 }
1521
1522 m_state_cond.UnLock();
1523
1524 for (auto rreq : rreqs_to_complete)
1525 FinalizeReadRequest(rreq);
1526
1527 if (reissue)
1528 ProcessBlockRequest(b);
1529 }
1530}
1531
1532//------------------------------------------------------------------------------
1533
1534const char* File::lPath() const
1535{
1536 return m_filename.c_str();
1537}
1538
1539//------------------------------------------------------------------------------
1540
1541int File::offsetIdx(int iIdx) const
1542{
1543 return iIdx - m_offset/m_block_size;
1544}
1545
1546
1547//------------------------------------------------------------------------------
1548
1550{
1551 // Check that block is not on disk and not in RAM.
1552 // TODO: Could prefetch several blocks at once!
1553 // blks_max could be an argument
1554
1555 BlockList_t blks;
1556
1557 TRACEF(DumpXL, "Prefetch() entering.");
1558 {
1559 XrdSysCondVarHelper _lck(m_state_cond);
1560
1561 if (m_prefetch_state != kOn)
1562 {
1563 return;
1564 }
1565
1566 if ( ! select_current_io_or_disable_prefetching(true) )
1567 {
1568 TRACEF(Error, "Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1569 return;
1570 }
1571
1572 // Select block(s) to fetch.
1573 for (int f = 0; f < m_num_blocks; ++f)
1574 {
1575 if ( ! m_cfi.TestBitWritten(f))
1576 {
1577 int f_act = f + m_offset / m_block_size;
1578
1579 BlockMap_i bi = m_block_map.find(f_act);
1580 if (bi == m_block_map.end())
1581 {
1582 Block *b = PrepareBlockRequest(f_act, *m_current_io, nullptr, true);
1583 if (b)
1584 {
1585 TRACEF(Dump, "Prefetch take block " << f_act);
1586 blks.push_back(b);
1587 // Note: block ref_cnt not increased, it will be when placed into write queue.
1588
1589 inc_prefetch_read_cnt(1);
1590 }
1591 else
1592 {
1593 // This shouldn't happen as prefetching stops when RAM is 70% full.
1594 TRACEF(Warning, "Prefetch allocation failed for block " << f_act);
1595 }
1596 break;
1597 }
1598 }
1599 }
1600
1601 if (blks.empty())
1602 {
1603 TRACEF(Debug, "Prefetch file is complete, stopping prefetch.");
1604 m_prefetch_state = kComplete;
1605 cache()->DeRegisterPrefetchFile(this);
1606 }
1607 else
1608 {
1609 (*m_current_io)->m_active_prefetches += (int) blks.size();
1610 }
1611 }
1612
1613 if ( ! blks.empty())
1614 {
1615 ProcessBlockRequests(blks);
1616 }
1617}
1618
1619
1620//------------------------------------------------------------------------------
1621
1623{
1624 return m_prefetch_score;
1625}
1626
1628{
1629 return Cache::GetInstance().GetLog();
1630}
1631
1636
1637void File::insert_remote_location(const std::string &loc)
1638{
1639 if ( ! loc.empty())
1640 {
1641 size_t p = loc.find_first_of('@');
1642 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1643 }
1644}
1645
1646std::string File::GetRemoteLocations() const
1647{
1648 std::string s;
1649 if ( ! m_remote_locations.empty())
1650 {
1651 size_t sl = 0;
1652 int nl = 0;
1653 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1654 {
1655 sl += i->size();
1656 }
1657 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1658 s = '[';
1659 int j = 1;
1660 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1661 {
1662 s += '"'; s += *i; s += '"';
1663 if (j < nl) s += ',';
1664 }
1665 s += ']';
1666 }
1667 else
1668 {
1669 s = "[]";
1670 }
1671 return s;
1672}
1673
1674//==============================================================================
1675//======================= RESPONSE HANDLERS ==============================
1676//==============================================================================
1677
1679{
1680 m_block->m_file->ProcessBlockResponse(m_block, res);
1681 delete this;
1682}
1683
1684//------------------------------------------------------------------------------
1685
1687{
1688 m_mutex.Lock();
1689
1690 int n_left = --m_to_wait;
1691
1692 if (res < 0) {
1693 if (m_errno == 0) m_errno = res; // store first reported error
1694 } else {
1695 m_bytes_read += res;
1696 }
1697
1698 m_mutex.UnLock();
1699
1700 if (n_left == 0)
1701 {
1702 m_file->ProcessDirectReadFinished(m_read_req, m_bytes_read, m_errno);
1703 delete this;
1704 }
1705}
#define TRACE_Debug
#define XrdOssOK
Definition XrdOss.hh:50
#define XRDOSS_mkpath
Definition XrdOss.hh:466
#define TRACE_Error
Definition XrdPfcTrace.hh:7
#define TRACE_Dump
#define TRACEF(act, x)
#define ERRNO_AND_ERRSTR(err_code)
#define TRACEF_INT(act, x)
#define stat(a, b)
Definition XrdPosix.hh:101
#define XRD_TRACE
bool Debug
const char * XrdSysE2T(int errcode)
Definition XrdSysE2T.cc:104
#define TRACE(act, x)
Definition XrdTrace.hh:63
virtual int Fsync()
Definition XrdOss.hh:144
virtual int Ftruncate(unsigned long long flen)
Definition XrdOss.hh:164
virtual int Fstat(struct stat *buf)
Definition XrdOss.hh:136
virtual int Close(long long *retsz=0)=0
virtual int getFD()
Definition XrdOss.hh:426
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
Definition XrdOss.hh:200
virtual ssize_t Read(off_t offset, size_t size)
Definition XrdOss.hh:281
virtual ssize_t pgWrite(void *buffer, off_t offset, size_t wrlen, uint32_t *csvec, uint64_t opts)
Definition XrdOss.cc:198
virtual ssize_t ReadV(XrdOucIOVec *readV, int rdvcnt)
Definition XrdOss.cc:236
virtual ssize_t Write(const void *buffer, off_t offset, size_t size)
Definition XrdOss.hh:345
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
Definition XrdOucEnv.hh:85
void Done(int result) override
int get_size() const
int get_error() const
int get_n_cksum_errors()
int * ptr_n_cksum_errors()
IO * get_io() const
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
void set_error(int err)
void * get_req_id() const
void set_downloaded()
bool req_cksum_net() const
char * get_buff() const
bool has_cksums() const
long long m_offset
void reset_error_and_set_io(IO *io, void *rid)
int get_req_size() const
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
Definition XrdPfc.hh:152
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
Definition XrdPfc.hh:204
XrdSysTrace * GetTrace()
Definition XrdPfc.hh:283
static ResourceMonitor & ResMon()
Definition XrdPfc.cc:135
static Cache & GetInstance()
Singleton access.
Definition XrdPfc.cc:132
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
Definition XrdPfc.cc:1187
XrdOss * GetOss() const
Definition XrdPfc.hh:268
XrdSysError * GetLog()
Definition XrdPfc.hh:282
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
XrdSysTrace * GetTrace()
void WriteBlockToDisk(Block *b)
static File * FileOpen(const std::string &path, long long offset, long long fileSize)
Static constructor that also does Open. Returns null ptr if Open fails.
float GetPrefetchScore() const
friend class BlockResponseHandler
XrdSysError * GetLog()
std::string GetRemoteLocations() const
int Fstat(struct stat &sbuff)
void AddIO(IO *io)
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void Sync()
Sync file cache inf o and output data with disk.
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
long long initiate_emergency_shutdown()
void RemoveIO(IO *io)
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
Definition XrdPfcIO.hh:16
bool register_incomplete_read()
Definition XrdPfcIO.hh:90
XrdOucCacheIO * GetInput()
Definition XrdPfcIO.cc:31
bool register_block_error(int res)
Definition XrdPfcIO.hh:93
RAtomic_int m_active_read_reqs
number of active read requests
Definition XrdPfcIO.hh:70
const char * GetLocation()
Definition XrdPfcIO.hh:44
Status of cached file. Can be read from and written into a binary file.
Definition XrdPfcInfo.hh:41
void SetBitPrefetch(int i)
Mark block as obtained through prefetch.
static const char * s_infoExtension
void SetBitSynced(int i)
Mark block as synced to disk.
time_t GetNoCkSumTimeForUVKeep() const
CkSumCheck_e GetCkSumState() const
void WriteIOStatAttach()
Write open time in the last entry of access statistics.
void ResetCkSumNet()
bool Write(XrdOssDF *fp, const char *dname, const char *fname=0)
void DowngradeCkSumState(CkSumCheck_e css_ref)
bool IsCkSumNet() const
void ResetAllAccessStats()
Reset IO Stats.
bool TestBitPrefetch(int i) const
Test if block at the given index has been prefetched.
bool IsComplete() const
Get complete status.
bool IsCkSumCache() const
void SetBitWritten(int i)
Mark block as written to disk.
long long GetBufferSize() const
Get prefetch buffer size.
void WriteIOStat(Stats &s)
Write bytes missed, hits, and disk.
long long GetExpectedDataFileSize() const
Get expected data file size.
bool TestBitWritten(int i) const
Test if block at the given index is written to disk.
bool Read(XrdOssDF *fp, const char *dname, const char *fname=0)
Read content of cinfo file into this object.
void SetCkSumState(CkSumCheck_e css)
void ResetNoCkSumTime()
void SetBufferSizeFileSizeAndCreationTime(long long bs, long long fs)
void WriteIOStatDetach(Stats &s)
Write close time together with bytes missed, hits, and disk.
int GetNBlocks() const
Get number of blocks represented in download-state bit-vector.
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
void AddReadStats(const Stats &s)
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddUp(const Stats &s)
void AddWriteStats(long long bytes_written, int n_cks_errs)
long long BytesReadAndWritten() const
void AddBytesHit(long long bh)
long long m_BytesHit
number of bytes served from disk
long long m_BytesWritten
number of bytes written to disk
void IoDetach(int duration)
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * > BlockList_t
std::list< Block * >::iterator BlockList_i
static const int maxRVdsz
Definition XProtocol.hh:688
static const int maxRvecsz
Definition XProtocol.hh:686
long long offset
ReadRequest * m_read_req
Definition XrdPfcFile.hh:91
Contains parameters configurable from the xrootd config file.
Definition XrdPfc.hh:64
long long m_flushCnt
nuber of unsynced blcoks on disk before flush is called
Definition XrdPfc.hh:116
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
Definition XrdPfc.hh:80
CkSumCheck_e get_cs_Chk() const
Definition XrdPfc.hh:73
int m_prefetch_max_blocks
maximum number of blocks to prefetch per file
Definition XrdPfc.hh:113
bool should_uvkeep_purge(time_t delta) const
Definition XrdPfc.hh:82
std::string m_data_space
oss space for data files
Definition XrdPfc.hh:88
long long m_bufferSize
prefetch buffer size, default 1MB
Definition XrdPfc.hh:108
std::string m_meta_space
oss space for metadata files (cinfo)
Definition XrdPfc.hh:89
std::string m_username
username passed to oss plugin
Definition XrdPfc.hh:87
unsigned short m_seq_id
Definition XrdPfcFile.hh:53
void update_error_cond(int ec)
Definition XrdPfcFile.hh:81
bool is_complete() const
Definition XrdPfcFile.hh:83
int return_value() const
Definition XrdPfcFile.hh:84
long long m_bytes_read
Definition XrdPfcFile.hh:68