37#include <unordered_map>
46const int BLOCK_WRITE_MAX_ATTEMPTS = 4;
52const char *File::m_traceID =
"File";
56File::File(
const std::string& path,
long long iOffset,
long long iFileSize) :
60 m_cfi(
Cache::TheOne().GetTrace(),
Cache::TheOne().is_prefetch_enabled()),
63 m_file_size(iFileSize),
64 m_current_io(m_io_set.end()),
68 m_detach_time_logged(false),
74 m_prefetch_state(kOff),
76 m_prefetch_read_cnt(0),
77 m_prefetch_hit_cnt(0),
104 m_info_file->Close();
106 m_info_file =
nullptr;
112 m_data_file->Close();
114 m_data_file =
nullptr;
117 if (m_resmon_token >= 0)
122 if (m_stats.m_BytesWritten > 0 && ! m_in_shutdown) {
125 if (sr == 0 && s.st_blocks != m_st_blocks) {
128 m_st_blocks = s.st_blocks;
136 TRACEF(
Debug,
"Close() finished, prefetch score = " << m_prefetch_score);
143 File *file =
new File(path, offset, fileSize);
144 if ( ! file->Open(inputIO))
169 m_in_shutdown =
true;
171 if (m_prefetch_state != kStopped && m_prefetch_state != kComplete)
173 m_prefetch_state = kStopped;
174 cache()->DeRegisterPrefetchFile(
this);
177 report_and_merge_delta_stats();
184void File::check_delta_stats()
189 report_and_merge_delta_stats();
192void File::report_and_merge_delta_stats()
196 m_data_file->
Fstat(&s);
199 long long max_st_blocks_to_report = (m_file_size & 0xfff) ? ((m_file_size >> 12) + 1) << 3
201 long long st_blocks_to_report = std::min((
long long) s.st_blocks, max_st_blocks_to_report);
203 m_st_blocks = st_blocks_to_report;
205 m_stats.
AddUp(m_delta_stats);
206 m_delta_stats.
Reset();
213 TRACEF(Dump,
"BlockRemovedFromWriteQ() block = " << (
void*) b <<
" idx= " << b->
m_offset/m_block_size);
221 TRACEF(Dump,
"BlocksRemovedFromWriteQ() n_blocks = " << blocks.size());
225 for (std::list<Block*>::iterator i = blocks.begin(); i != blocks.end(); ++i)
237 insert_remote_location(loc);
253 IoSet_i mi = m_io_set.find(io);
255 if (mi != m_io_set.end())
260 ", active_reads " << n_active_reads <<
261 ", active_prefetches " << io->m_active_prefetches <<
262 ", allow_prefetching " << io->m_allow_prefetching <<
263 ", ios_in_detach " << m_ios_in_detach);
265 "\tio_map.size() " << m_io_set.size() <<
266 ", block_map.size() " << m_block_map.size() <<
", file");
268 insert_remote_location(loc);
270 io->m_allow_prefetching =
false;
271 io->m_in_detach =
true;
274 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
276 if ( ! select_current_io_or_disable_prefetching(
false) )
278 TRACEF(
Debug,
"ioActive stopping prefetching after io " << io <<
" retreat.");
285 bool io_active_result;
287 if (n_active_reads > 0)
289 io_active_result =
true;
291 else if (m_io_set.size() - m_ios_in_detach == 1)
293 io_active_result = ! m_block_map.empty();
297 io_active_result = io->m_active_prefetches > 0;
300 if ( ! io_active_result)
305 TRACEF(
Info,
"ioActive for io " << io <<
" returning " << io_active_result <<
", file");
307 return io_active_result;
311 TRACEF(
Error,
"ioActive io " << io <<
" not found in IoSet. This should not happen.");
322 m_detach_time_logged =
false;
331 if ( ! m_in_shutdown)
333 if ( ! m_writes_during_sync.empty() || m_non_flushed_cnt > 0 || ! m_detach_time_logged)
335 report_and_merge_delta_stats();
336 m_cfi.WriteIOStatDetach(m_stats);
337 m_detach_time_logged =
true;
339 TRACEF(
Debug,
"FinalizeSyncBeforeExit requesting sync to write detach stats");
343 TRACEF(
Debug,
"FinalizeSyncBeforeExit sync not required");
355 time_t now = time(0);
360 IoSet_i mi = m_io_set.find(io);
362 if (mi == m_io_set.end())
365 io->m_attach_time = now;
366 m_delta_stats.IoAttach();
368 insert_remote_location(loc);
370 if (m_prefetch_state == kStopped)
372 m_prefetch_state = kOn;
373 cache()->RegisterPrefetchFile(
this);
378 TRACEF(
Error,
"AddIO() io = " << (
void*)io <<
" already registered.");
381 m_state_cond.UnLock();
392 time_t now = time(0);
396 IoSet_i mi = m_io_set.find(io);
398 if (mi != m_io_set.end())
400 if (mi == m_current_io)
405 m_delta_stats.IoDetach(now - io->m_attach_time);
409 if (m_io_set.empty() && m_prefetch_state != kStopped && m_prefetch_state != kComplete)
411 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" Prefetching is not stopped/complete -- it should be by now.");
412 m_prefetch_state = kStopped;
413 cache()->DeRegisterPrefetchFile(
this);
418 TRACEF(
Error,
"RemoveIO() io = " << (
void*)io <<
" is NOT registered.");
421 m_state_cond.UnLock();
430 static const char *tpfx =
"Open() ";
432 TRACEF(Dump, tpfx <<
"entered");
443 struct stat data_stat, info_stat;
447 bool data_existed = (myOss.
Stat(m_filename.c_str(), &data_stat) ==
XrdOssOK);
448 bool info_existed = (myOss.
Stat(ifn.c_str(), &info_stat) ==
XrdOssOK);
451 char size_str[32]; sprintf(size_str,
"%lld", m_file_size);
452 myEnv.
Put(
"oss.asize", size_str);
464 m_data_file = myOss.
newFile(myUser);
465 if ((res = m_data_file->
Open(m_filename.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
469 delete m_data_file; m_data_file = 0;
473 myEnv.
Put(
"oss.asize",
"64k");
479 m_data_file->Close();
delete m_data_file; m_data_file = 0;
483 m_info_file = myOss.
newFile(myUser);
484 if ((res = m_info_file->Open(ifn.c_str(), O_RDWR, 0600, myEnv)) !=
XrdOssOK)
488 delete m_info_file; m_info_file = 0;
489 m_data_file->Close();
delete m_data_file; m_data_file = 0;
493 bool initialize_info_file =
true;
495 if (info_existed && m_cfi.Read(m_info_file, ifn.c_str()))
497 TRACEF(
Debug, tpfx <<
"Reading existing info file. (data_existed=" << data_existed <<
498 ", data_size_stat=" << (data_existed ? data_stat.st_size : -1ll) <<
499 ", data_size_from_last_block=" << m_cfi.GetExpectedDataFileSize() <<
500 ", block_size=" << (m_cfi.GetBufferSize() >> 10) <<
"k)");
503 if (data_existed && data_stat.st_size >= m_cfi.GetExpectedDataFileSize())
505 initialize_info_file =
false;
507 TRACEF(Warning, tpfx <<
"Basic sanity checks on data file failed, resetting info file, truncating data file.");
508 m_cfi.ResetAllAccessStats();
509 m_data_file->Ftruncate(0);
516 if ( ! initialize_info_file && m_cfi.GetCkSumState() != conf.
get_cs_Chk())
521 TRACEF(Info, tpfx <<
"Cksum state of file insufficient, uvkeep test failed, resetting info file, truncating data file.");
522 initialize_info_file =
true;
523 m_cfi.ResetAllAccessStats();
524 m_data_file->Ftruncate(0);
538 parse_pfc_url_args(inputIO, pfc_blocksize, pfc_prefetch);
541 if (initialize_info_file)
543 m_cfi.SetBufferSizeFileSizeAndCreationTime(pfc_blocksize, m_file_size);
545 m_cfi.ResetNoCkSumTime();
546 m_cfi.Write(m_info_file, ifn.c_str());
547 m_info_file->Fsync();
548 cache()->WriteFileSizeXAttr(m_info_file->getFD(), m_file_size);
549 TRACEF(
Debug, tpfx <<
"Creating new file info, data size = " << m_file_size <<
" num blocks = " << m_cfi.GetNBlocks()
550 <<
" block size = " << pfc_blocksize);
554 if (futimens(m_info_file->getFD(), NULL)) {
558 TRACEF(Info, tpfx <<
"URL CGI pfc.blocksize ignored for an already existing file");
562 m_cfi.WriteIOStatAttach();
564 m_block_size = m_cfi.GetBufferSize();
565 m_num_blocks = m_cfi.GetNBlocks();
566 m_prefetch_state = (m_cfi.IsComplete()) ? kComplete : kStopped;
567 m_prefetch_max_blocks_in_flight = pfc_prefetch;
569 TRACEF(
Debug, tpfx <<
"pfc.prefetch set to " << pfc_prefetch <<
" via CGI parameter");
571 m_data_file->Fstat(&data_stat);
572 m_st_blocks = data_stat.st_blocks;
575 constexpr long long MB = 1024 * 1024;
576 m_resmon_report_threshold = std::min(std::max(10 * MB, m_file_size / 20), 500 * MB);
580 m_state_cond.UnLock();
585void File::parse_pfc_url_args(XrdOucCacheIO* inputIO,
long long &pfc_blocksize,
int &pfc_prefetch)
const
589 XrdCl::URL url(inputIO->
Path());
590 auto const & urlp = url.GetParams();
592 auto extract = [&](
const std::string &key, std::string &value) ->
bool {
593 auto it = urlp.find(key);
594 if (it != urlp.end()) {
606 const char *tpfx =
"File::Open::urlcgi pfc.blocksize ";
608 if (
Cache::TheOne().blocksize_str2value(tpfx, val.c_str(), bsize,
611 pfc_blocksize = bsize;
613 TRACEF(
Error, tpfx <<
"Error processing the parameter.");
618 const char *tpfx =
"File::Open::urlcgi pfc.prefetch ";
620 if (
Cache::TheOne().prefetch_str2value(tpfx, val.c_str(), pref,
625 TRACEF(
Error, tpfx <<
"Error processing the parameter.");
643 if ((res = m_data_file->Fstat(&sbuff)))
return res;
645 sbuff.st_size = m_file_size;
647 bool is_cached = cache()->DecideIfConsideredCached(m_file_size, sbuff.st_blocks * 512ll);
658bool File::overlap(
int blk,
667 const long long beg = blk * blk_size;
668 const long long end = beg + blk_size;
669 const long long req_end = req_off + req_size;
671 if (req_off < end && req_end > beg)
673 const long long ovlp_beg = std::max(beg, req_off);
674 const long long ovlp_end = std::min(end, req_end);
676 off = ovlp_beg - req_off;
677 blk_off = ovlp_beg - beg;
678 size = (int) (ovlp_end - ovlp_beg);
680 assert(size <= blk_size);
691Block* File::PrepareBlockRequest(
int i,
IO *io,
void *req_id,
bool prefetch)
699 const long long off = i * m_block_size;
700 const int last_block = m_num_blocks - 1;
701 const bool cs_net = cache()->RefConfiguration().is_cschk_net();
703 int blk_size, req_size;
704 if (i == last_block) {
705 blk_size = req_size = m_file_size - off;
706 if (cs_net && req_size & 0xFFF) req_size = (req_size & ~0xFFF) + 0x1000;
708 blk_size = req_size = m_block_size;
712 char *buf = cache()->RequestRAM(req_size);
716 b =
new (std::nothrow) Block(
this, io, req_id, buf, off, blk_size, req_size, prefetch, cs_net);
724 if (m_prefetch_state == kOn && (
int) m_block_map.size() >= m_prefetch_max_blocks_in_flight)
726 m_prefetch_state = kHold;
727 cache()->DeRegisterPrefetchFile(
this);
732 TRACEF(Dump,
"PrepareBlockRequest() " << i <<
" prefetch " << prefetch <<
", allocation failed.");
739void File::ProcessBlockRequest(
Block *b)
747 snprintf(buf, 256,
"idx=%lld, block=%p, prefetch=%d, off=%lld, req_size=%d, buff=%p, resp_handler=%p ",
748 b->
get_offset()/m_block_size, (
void*)b, b->m_prefetch, b->get_offset(), b->get_req_size(), (
void*)b->get_buff(), (
void*)brh);
749 TRACEF(Dump,
"ProcessBlockRequest() " << buf);
765 for (
BlockList_i bi = blks.begin(); bi != blks.end(); ++bi)
767 ProcessBlockRequest(*bi);
773void File::RequestBlocksDirect(
IO *io,
ReadRequest *read_req, std::vector<XrdOucIOVec>& ioVec,
int expected_size)
775 int n_chunks = ioVec.size();
778 TRACEF(DumpXL,
"RequestBlocksDirect() issuing ReadV for n_chunks = " << n_chunks <<
779 ", total_size = " << expected_size <<
", n_vec_reads = " << n_vec_reads);
789 io->
GetInput()->
ReadV( *handler, ioVec.data() + pos, n_chunks);
794int File::ReadBlocksFromDisk(std::vector<XrdOucIOVec>& ioVec,
int expected_size)
796 TRACEF(DumpXL,
"ReadBlocksFromDisk() issuing ReadV for n_chunks = " << (
int) ioVec.size() <<
", total_size = " << expected_size);
798 long long rs = m_data_file->ReadV(ioVec.data(), (
int) ioVec.size());
802 TRACEF(
Error,
"ReadBlocksFromDisk neg retval = " << rs);
806 if (rs != expected_size)
808 TRACEF(
Error,
"ReadBlocksFromDisk incomplete size = " << rs);
827 if (m_in_shutdown || io->m_in_detach)
829 m_state_cond.UnLock();
830 return m_in_shutdown ? -ENOENT : -EBADF;
835 if (m_cfi.IsComplete())
837 m_state_cond.UnLock();
838 int ret = m_data_file->Read(iUserBuff, iUserOff, iUserSize);
841 m_delta_stats.AddBytesHit(ret);
847 XrdOucIOVec readV( { iUserOff, iUserSize, 0, iUserBuff } );
849 return ReadOpusCoalescere(io, &readV, 1, rh,
"Read() ");
856 TRACEF(Dump,
"ReadV() for " << readVnum <<
" chunks.");
860 if (m_in_shutdown || io->m_in_detach)
862 m_state_cond.UnLock();
863 return m_in_shutdown ? -ENOENT : -EBADF;
868 if (m_cfi.IsComplete())
870 m_state_cond.UnLock();
871 int ret = m_data_file->ReadV(
const_cast<XrdOucIOVec*
>(readV), readVnum);
874 m_delta_stats.AddBytesHit(ret);
880 return ReadOpusCoalescere(io, readV, readVnum, rh,
"ReadV() ");
885int File::ReadOpusCoalescere(
IO *io,
const XrdOucIOVec *readV,
int readVnum,
897 int prefetch_cnt = 0;
902 std::unordered_map<Block*, std::vector<ChunkRequest>> blks_ready;
904 std::vector<XrdOucIOVec> iovec_disk;
905 std::vector<XrdOucIOVec> iovec_direct;
906 int iovec_disk_total = 0;
907 int iovec_direct_total = 0;
909 for (
int iov_idx = 0; iov_idx < readVnum; ++iov_idx)
916 const int idx_first = iUserOff / m_block_size;
917 const int idx_last = (iUserOff + iUserSize - 1) / m_block_size;
919 TRACEF(DumpXL, tpfx <<
"sid: " <<
Xrd::hex1 << rh->
m_seq_id <<
" idx_first: " << idx_first <<
" idx_last: " << idx_last);
921 enum LastBlock_e { LB_other, LB_disk, LB_direct };
923 LastBlock_e lbe = LB_other;
925 for (
int block_idx = idx_first; block_idx <= idx_last; ++block_idx)
928 BlockMap_i bi = m_block_map.find(block_idx);
935 overlap(block_idx, m_block_size, iUserOff, iUserSize, off, blk_off, size);
938 if (bi != m_block_map.end())
940 inc_ref_count(bi->second);
941 TRACEF(Dump, tpfx << (
void*) iUserBuff <<
" inc_ref_count for existing block " << bi->second <<
" idx = " << block_idx);
943 if (bi->second->is_finished())
947 assert(bi->second->is_ok());
949 blks_ready[bi->second].emplace_back(
ChunkRequest(
nullptr, iUserBuff + off, blk_off, size) );
951 if (bi->second->m_prefetch)
957 read_req =
new ReadRequest(io, rh);
962 bi->second->m_chunk_reqs.emplace_back( ChunkRequest(read_req, iUserBuff + off, blk_off, size) );
969 else if (m_cfi.TestBitWritten(offsetIdx(block_idx)))
971 TRACEF(DumpXL, tpfx <<
"read from disk " << (
void*)iUserBuff <<
" idx = " << block_idx);
974 iovec_disk.back().size += size;
976 iovec_disk.push_back( { block_idx * m_block_size + blk_off, size, 0, iUserBuff + off } );
977 iovec_disk_total += size;
979 if (m_cfi.TestBitPrefetch(offsetIdx(block_idx)))
988 read_req =
new ReadRequest(io, rh);
991 Block *b = PrepareBlockRequest(block_idx, io, read_req,
false);
994 TRACEF(Dump, tpfx <<
"inc_ref_count new " << (
void*)iUserBuff <<
" idx = " << block_idx);
996 blks_to_request.push_back(b);
998 b->
m_chunk_reqs.emplace_back(ChunkRequest(read_req, iUserBuff + off, blk_off, size));
1005 TRACEF(DumpXL, tpfx <<
"direct block " << block_idx <<
", blk_off " << blk_off <<
", size " << size);
1007 iovec_direct_total += size;
1014 iovec_direct.back().size += size;
1016 long long in_offset = block_idx * m_block_size + blk_off;
1017 char *out_pos = iUserBuff + off;
1024 iovec_direct.push_back( { in_offset, size, 0, out_pos } );
1033 inc_prefetch_hit_cnt(prefetch_cnt);
1035 m_state_cond.UnLock();
1038 if ( ! blks_to_request.empty())
1040 ProcessBlockRequests(blks_to_request);
1041 blks_to_request.clear();
1045 if ( ! iovec_direct.empty())
1047 RequestBlocksDirect(io, read_req, iovec_direct, iovec_direct_total);
1049 TRACEF(Dump, tpfx <<
"direct read requests sent out, n_chunks = " << (
int) iovec_direct.size() <<
", total_size = " << iovec_direct_total);
1054 long long bytes_read = 0;
1058 if ( ! blks_ready.empty())
1060 for (
auto &bvi : blks_ready)
1062 for (
auto &cr : bvi.second)
1064 TRACEF(DumpXL, tpfx <<
"ub=" << (
void*)cr.m_buf <<
" from pre-finished block " << bvi.first->m_offset/m_block_size <<
" size " << cr.m_size);
1065 memcpy(cr.m_buf, bvi.first->m_buff + cr.m_off, cr.m_size);
1066 bytes_read += cr.m_size;
1072 if ( ! iovec_disk.empty())
1074 int rc = ReadBlocksFromDisk(iovec_disk, iovec_disk_total);
1075 TRACEF(DumpXL, tpfx <<
"from disk finished size = " << rc);
1090 m_state_cond.Lock();
1092 for (
auto &bvi : blks_ready)
1093 dec_ref_count(bvi.first, (
int) bvi.second.size());
1106 m_delta_stats.AddReadStats(read_req->
m_stats);
1107 check_delta_stats();
1108 m_state_cond.UnLock();
1116 m_state_cond.UnLock();
1117 return -EWOULDBLOCK;
1122 m_delta_stats.m_BytesHit += bytes_read;
1123 check_delta_stats();
1124 m_state_cond.UnLock();
1128 return error_cond ? error_cond : bytes_read;
1140 long long offset = b->
m_offset - m_offset;
1144 if (m_cfi.IsCkSumCache())
1148 retval = m_data_file->pgWrite(b->
get_buff(), offset, size, 0, 0);
1150 retval = m_data_file->Write(b->
get_buff(), offset, size);
1155 TRACEF(
Error,
"WriteToDisk() write error " << retval);
1157 TRACEF(
Error,
"WriteToDisk() incomplete block write ret=" << retval <<
" (should be " << size <<
")");
1167 const int blk_idx = (b->
m_offset - m_offset) / m_block_size;
1170 TRACEF(Dump,
"WriteToDisk() success set bit for block " << b->
m_offset <<
" size=" << size);
1172 bool schedule_sync =
false;
1176 m_cfi.SetBitWritten(blk_idx);
1180 m_cfi.SetBitPrefetch(blk_idx);
1184 m_cfi.ResetCkSumNet();
1191 m_writes_during_sync.push_back(blk_idx);
1195 m_cfi.SetBitSynced(blk_idx);
1196 ++m_non_flushed_cnt;
1197 if ((m_cfi.IsComplete() || m_non_flushed_cnt >=
Cache::GetInstance().RefConfiguration().m_flushCnt) &&
1200 schedule_sync =
true;
1202 m_non_flushed_cnt = 0;
1208 if (!schedule_sync) {
1215 cache()->ScheduleFileSync(
this);
1227 int ret = m_data_file->Fsync();
1228 bool errorp =
false;
1234 report_and_merge_delta_stats();
1235 loc_stats = m_stats;
1237 m_cfi.WriteIOStat(loc_stats);
1238 m_cfi.Write(m_info_file, m_filename.c_str());
1239 int cret = m_info_file->Fsync();
1242 TRACEF(
Error,
"Sync cinfo file sync error " << cret);
1248 TRACEF(
Error,
"Sync data file sync error " << ret <<
", cinfo file has not been updated");
1254 TRACEF(
Error,
"Sync failed, unlinking local files and initiating shutdown of File object");
1261 m_writes_during_sync.clear();
1267 int written_while_in_sync;
1268 bool resync =
false;
1271 for (std::vector<int>::iterator i = m_writes_during_sync.begin(); i != m_writes_during_sync.end(); ++i)
1273 m_cfi.SetBitSynced(*i);
1275 written_while_in_sync = m_non_flushed_cnt = (int) m_writes_during_sync.size();
1276 m_writes_during_sync.clear();
1280 if (written_while_in_sync > 0 && m_cfi.IsComplete() && ! m_in_shutdown)
1285 TRACEF(Dump,
"Sync "<< written_while_in_sync <<
" blocks written during sync." << (resync ?
" File is now complete - resyncing." :
""));
1296void File::free_block(
Block* b)
1299 int i = b->
m_offset / m_block_size;
1300 TRACEF(Dump,
"free_block block " << b <<
" idx = " << i);
1301 size_t ret = m_block_map.erase(i);
1305 TRACEF(
Error,
"free_block did not erase " << i <<
" from map");
1313 if (m_prefetch_state == kHold && (
int) m_block_map.size() < m_prefetch_max_blocks_in_flight)
1315 m_prefetch_state = kOn;
1316 cache()->RegisterPrefetchFile(
this);
1322bool File::select_current_io_or_disable_prefetching(
bool skip_current)
1326 int io_size = (int) m_io_set.size();
1331 io_ok = (*m_io_set.begin())->m_allow_prefetching;
1334 m_current_io = m_io_set.begin();
1337 else if (io_size > 1)
1339 IoSet_i mi = m_current_io;
1340 if (skip_current && mi != m_io_set.end()) ++mi;
1342 for (
int i = 0; i < io_size; ++i)
1344 if (mi == m_io_set.end()) mi = m_io_set.begin();
1346 if ((*mi)->m_allow_prefetching)
1358 m_current_io = m_io_set.end();
1359 m_prefetch_state = kStopped;
1360 cache()->DeRegisterPrefetchFile(
this);
1368void File::ProcessDirectReadFinished(
ReadRequest *rreq,
int bytes_read,
int error_cond)
1374 TRACEF(
Error,
"Read(), direct read finished with error " << -error_cond <<
" " <<
XrdSysE2T(-error_cond));
1376 m_state_cond.Lock();
1389 m_state_cond.UnLock();
1392 FinalizeReadRequest(rreq);
1419 TRACEF(Dump,
"ProcessBlockSuccess() ub=" << (
void*)creq.
m_buf <<
" from finished block " << b->
m_offset/m_block_size <<
" size " << creq.
m_size);
1422 m_state_cond.Lock();
1427 rreq->m_stats.m_BytesMissed += creq.
m_size;
1429 rreq->m_stats.m_BytesHit += creq.
m_size;
1431 --rreq->m_n_chunk_reqs;
1434 inc_prefetch_hit_cnt(1);
1438 bool rreq_complete = rreq->is_complete();
1440 m_state_cond.UnLock();
1443 FinalizeReadRequest(rreq);
1451 XrdSysCondVarHelper _lck(m_state_cond);
1452 m_delta_stats.AddReadStats(rreq->
m_stats);
1453 check_delta_stats();
1460void File::ProcessBlockResponse(
Block *b,
int res)
1462 static const char* tpfx =
"ProcessBlockResponse ";
1464 TRACEF(Dump, tpfx <<
"block=" << b <<
", idx=" << b->
m_offset/m_block_size <<
", off=" << b->
m_offset <<
", res=" << res);
1466 if (res >= 0 && res != b->
get_size())
1470 TRACEF(
Error, tpfx <<
"Wrong number of bytes received, assuming remote/local file size mismatch, unlinking local files and initiating shutdown of File object");
1474 m_state_cond.Lock();
1480 IoSet_i mi = m_io_set.find(io);
1481 if (mi != m_io_set.end())
1483 --io->m_active_prefetches;
1486 if (res < 0 && io->m_allow_prefetching)
1488 TRACEF(
Debug, tpfx <<
"after failed prefetch on io " << io <<
" disabling prefetching on this io.");
1489 io->m_allow_prefetching =
false;
1492 if (m_prefetch_state == kOn || m_prefetch_state == kHold)
1494 if ( ! select_current_io_or_disable_prefetching(
false) )
1496 TRACEF(
Debug, tpfx <<
"stopping prefetching after io " << b->
get_io() <<
" marked as bad.");
1502 if (b->
m_refcnt == 0 && (res < 0 || m_in_shutdown))
1505 m_state_cond.UnLock();
1519 TRACEF(Dump, tpfx <<
"inc_ref_count idx=" << b->
m_offset/m_block_size);
1520 if ( ! m_in_shutdown)
1526 cache()->AddWriteTask(b,
true);
1533 m_state_cond.UnLock();
1535 for (
auto &creq : creqs_to_notify)
1537 ProcessBlockSuccess(b, creq);
1546 <<
", io=" << b->
get_io() <<
", error=" << res);
1551 <<
", io=" << b->
get_io() <<
" incomplete, got " << res <<
" expected " << b->
get_size());
1552#if defined(__APPLE__) || defined(__GNU__) || (defined(__FreeBSD_kernel__) && defined(__GLIBC__)) || defined(__FreeBSD__)
1563 std::list<ReadRequest*> rreqs_to_complete;
1572 ProcessBlockError(b, rreq);
1575 rreqs_to_complete.push_back(rreq);
1580 creqs_to_keep.push_back(creq);
1584 bool reissue =
false;
1585 if ( ! creqs_to_keep.empty())
1587 ReadRequest *rreq = creqs_to_keep.front().m_read_req;
1589 TRACEF(
Debug,
"ProcessBlockResponse() requested block " << (
void*)b <<
" failed with another io " <<
1590 b->
get_io() <<
" - reissuing request with my io " << rreq->
m_io);
1597 m_state_cond.UnLock();
1599 for (
auto rreq : rreqs_to_complete)
1600 FinalizeReadRequest(rreq);
1603 ProcessBlockRequest(b);
1611 return m_filename.c_str();
1616int File::offsetIdx(
int iIdx)
const
1618 return iIdx - m_offset/m_block_size;
1632 TRACEF(DumpXL,
"Prefetch() entering.");
1636 if (m_prefetch_state != kOn)
1641 if ( ! select_current_io_or_disable_prefetching(
true) )
1643 TRACEF(
Error,
"Prefetch no available IO object found, prefetching stopped. This should not happen, i.e., prefetching should be stopped before.");
1648 for (
int f = 0; f < m_num_blocks; ++f)
1650 if ( ! m_cfi.TestBitWritten(f))
1652 int f_act = f + m_offset / m_block_size;
1654 BlockMap_i bi = m_block_map.find(f_act);
1655 if (bi == m_block_map.end())
1657 Block *b = PrepareBlockRequest(f_act, *m_current_io,
nullptr,
true);
1660 TRACEF(Dump,
"Prefetch take block " << f_act);
1664 inc_prefetch_read_cnt(1);
1669 TRACEF(Warning,
"Prefetch allocation failed for block " << f_act);
1678 TRACEF(
Debug,
"Prefetch file is complete, stopping prefetch.");
1679 m_prefetch_state = kComplete;
1680 cache()->DeRegisterPrefetchFile(
this);
1684 (*m_current_io)->m_active_prefetches += (int) blks.size();
1688 if ( ! blks.empty())
1690 ProcessBlockRequests(blks);
1699 return m_prefetch_score;
1712void File::insert_remote_location(
const std::string &loc)
1716 size_t p = loc.find_first_of(
'@');
1717 m_remote_locations.insert(&loc[p != std::string::npos ? p + 1 : 0]);
1724 if ( ! m_remote_locations.empty())
1728 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++nl)
1732 s.reserve(2 + sl + 2*nl + nl - 1 + 1);
1735 for (std::set<std::string>::iterator i = m_remote_locations.begin(); i != m_remote_locations.end(); ++i, ++j)
1737 s +=
'"'; s += *i; s +=
'"';
1738 if (j < nl) s +=
',';
#define ERRNO_AND_ERRSTR(err_code)
#define TRACEF_INT(act, x)
const char * XrdSysE2T(int errcode)
virtual int Fstat(struct stat *buf)
virtual int Open(const char *path, int Oflag, mode_t Mode, XrdOucEnv &env)
virtual int Create(const char *tid, const char *path, mode_t mode, XrdOucEnv &env, int opts=0)=0
virtual XrdOssDF * newFile(const char *tident)=0
virtual int Stat(const char *path, struct stat *buff, int opts=0, XrdOucEnv *envP=0)=0
virtual void Done(int result)=0
virtual const char * Path()=0
virtual int pgRead(char *buff, long long offs, int rdlen, std::vector< uint32_t > &csvec, uint64_t opts=0, int *csfix=0)
virtual int ReadV(const XrdOucIOVec *readV, int rnum)
void Put(const char *varname, const char *value)
void Done(int result) override
int * ptr_n_cksum_errors()
vCkSum_t & ref_cksum_vec()
long long get_offset() const
vChunkRequest_t m_chunk_reqs
void * get_req_id() const
bool req_cksum_net() const
void reset_error_and_set_io(IO *io, void *rid)
Attaches/creates and detaches/deletes cache-io objects for disk based cache.
XrdSysError * GetLog() const
const Configuration & RefConfiguration() const
Reference XrdPfc configuration.
static ResourceMonitor & ResMon()
static Cache & GetInstance()
Singleton access.
XrdSysTrace * GetTrace() const
int UnlinkFile(const std::string &f_name, bool fail_if_open)
Remove cinfo and data files from cache.
static const Cache & TheOne()
void Done(int result) override
bool FinalizeSyncBeforeExit()
Returns true if any of blocks need sync. Called from Cache::dec_ref_cnt on zero ref cnt.
const char * lPath() const
Log path.
int ReadV(IO *io, const XrdOucIOVec *readV, int readVnum, ReadReqRH *rh)
Vector read.
void WriteBlockToDisk(Block *b)
float GetPrefetchScore() const
friend class BlockResponseHandler
std::string GetRemoteLocations() const
int Fstat(struct stat &sbuff)
static File * FileOpen(const std::string &path, long long offset, long long fileSize, XrdOucCacheIO *inputIO)
Static constructor that also does Open. Returns null ptr if Open fails.
void RequestSyncOfDetachStats()
Flags that detach stats should be written out in final sync. Called from CacheIO upon Detach.
void BlocksRemovedFromWriteQ(std::list< Block * > &)
Handle removal of a set of blocks from Cache's write queue.
friend class DirectResponseHandler
void Sync()
Sync file cache inf o and output data with disk.
XrdSysTrace * GetTrace() const
int Read(IO *io, char *buff, long long offset, int size, ReadReqRH *rh)
Normal read.
void ioUpdated(IO *io)
Notification from IO that it has been updated (remote open).
long long initiate_emergency_shutdown()
void BlockRemovedFromWriteQ(Block *)
Handle removal of a block from Cache's write queue.
XrdSysError * GetLog() const
bool ioActive(IO *io)
Initiate close. Return true if still IO active. Used in XrdPosixXrootd::Close()
Base cache-io class that implements some XrdOucCacheIO abstract methods.
bool register_incomplete_read()
XrdOucCacheIO * GetInput()
bool register_block_error(int res)
RAtomic_int m_active_read_reqs
number of active read requests
const char * GetLocation()
Status of cached file. Can be read from and written into a binary file.
static const char * s_infoExtension
void CrossCheckIfScanIsInProgress(const std::string &lfn, XrdSysCondVar &cond)
int register_file_open(const std::string &filename, time_t open_timestamp, bool existing_file)
void register_file_purge(DirState *target, long long size_in_st_blocks)
void register_file_update_stats(int token_id, const Stats &stats)
void register_file_close(int token_id, time_t close_timestamp, const Stats &full_stats)
Statistics of cache utilisation by a File object.
long long m_StBlocksAdded
number of 512-byte blocks the file has grown by
long long m_BytesBypassed
number of bytes served directly through XrdCl
void AddUp(const Stats &s)
long long BytesReadAndWritten() const
long long m_BytesHit
number of bytes served from disk
std::list< Block * > BlockList_t
std::vector< ChunkRequest > vChunkRequest_t
std::list< Block * >::iterator BlockList_i
static const int maxRVdsz
static const int maxRvecsz
Contains parameters configurable from the xrootd config file.
long long m_cgi_max_bufferSize
max buffer size allowed in pfc.blocksize
int m_cgi_min_prefetch_max_blocks
min prefetch block count allowed in pfc.prefetch
bool does_cschk_have_missing_bits(CkSumCheck_e cks_on_file) const
bool m_cgi_prefetch_allowed
allow cgi setting of prefetch
CkSumCheck_e get_cs_Chk() const
int m_prefetch_max_blocks
default maximum number of blocks to prefetch per file
bool should_uvkeep_purge(time_t delta) const
std::string m_data_space
oss space for data files
long long m_bufferSize
cache block size, default 128 kB
long long m_cgi_min_bufferSize
min buffer size allowed in pfc.blocksize
std::string m_meta_space
oss space for metadata files (cinfo)
int m_cgi_max_prefetch_max_blocks
max prefetch block count allowed in pfc.prefetch
std::string m_username
username passed to oss plugin
bool m_cgi_blocksize_allowed
allow cgi setting of blocksize
void update_error_cond(int ec)