XRootD
XrdClClassicCopyJob.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
26 #include "XrdCl/XrdClConstants.hh"
27 #include "XrdCl/XrdClLog.hh"
28 #include "XrdCl/XrdClDefaultEnv.hh"
29 #include "XrdCl/XrdClFile.hh"
30 #include "XrdCl/XrdClMonitor.hh"
31 #include "XrdCl/XrdClUtils.hh"
33 #include "XrdCks/XrdCksCalc.hh"
35 #include "XrdCl/XrdClZipArchive.hh"
37 #include "XrdCl/XrdClPostMaster.hh"
38 #include "XrdCl/XrdClJobManager.hh"
40 #include "XrdClXCpCtx.hh"
42 #include "XrdSys/XrdSysE2T.hh"
43 #include "XrdSys/XrdSysPthread.hh"
44 
45 #include <memory>
46 #include <mutex>
47 #include <queue>
48 #include <algorithm>
49 #include <chrono>
50 #include <thread>
51 #include <vector>
52 
53 #include <sys/types.h>
54 #include <sys/stat.h>
55 #include <fcntl.h>
56 #include <cerrno>
57 #include <unistd.h>
58 
59 #if __cplusplus < 201103L
60 #include <ctime>
61 #endif
62 
63 namespace
64 {
65  //----------------------------------------------------------------------------
67  //----------------------------------------------------------------------------
68  template<typename U = std::ratio<1, 1>>
69  class mytimer_t
70  {
71  public:
72  mytimer_t() : start( clock_t::now() ){ }
73  void reset(){ start = clock_t::now(); }
74  uint64_t elapsed() const
75  {
76  return std::chrono::duration_cast<unit_t>( clock_t::now() - start ).count();
77  }
78  private:
79  typedef std::chrono::high_resolution_clock clock_t;
80  typedef std::chrono::duration<uint64_t, U> unit_t;
81  std::chrono::time_point<clock_t> start;
82  };
83 
84  using timer_sec_t = mytimer_t<>;
85  using timer_nsec_t = mytimer_t<std::nano>;
86 
87 
88  inline XrdCl::XRootDStatus Translate( std::vector<XrdCl::XAttr> &in,
89  std::vector<XrdCl::xattr_t> &out )
90  {
91  std::vector<XrdCl::xattr_t> ret;
92  ret.reserve( in.size() );
93  std::vector<XrdCl::XAttr>::iterator itr = in.begin();
94  for( ; itr != in.end() ; ++itr )
95  {
96  if( !itr->status.IsOK() ) return itr->status;
97  XrdCl::xattr_t xa( itr->name, itr->value );
98  ret.push_back( std::move( xa ) );
99  }
100  out.swap( ret );
101  return XrdCl::XRootDStatus();
102  }
103 
104  //----------------------------------------------------------------------------
106  //----------------------------------------------------------------------------
108  std::vector<XrdCl::xattr_t> &xattrs )
109  {
110  std::vector<XrdCl::XAttr> rsp;
111  XrdCl::XRootDStatus st = file.ListXAttr( rsp );
112  if( !st.IsOK() ) return st;
113  return Translate( rsp, xattrs );
114  }
115 
116  //----------------------------------------------------------------------------
118  //----------------------------------------------------------------------------
119  inline XrdCl::XRootDStatus GetXAttr( const std::string &url,
120  std::vector<XrdCl::xattr_t> &xattrs )
121  {
122  XrdCl::URL u( url );
123  XrdCl::FileSystem fs( u );
124  std::vector<XrdCl::XAttr> rsp;
125  XrdCl::XRootDStatus st = fs.ListXAttr( u.GetPath(), rsp );
126  if( !st.IsOK() ) return st;
127  return Translate( rsp, xattrs );
128  }
129 
131  const std::vector<XrdCl::xattr_t> &xattrs )
132  {
133  std::vector<XrdCl::XAttrStatus> rsp;
134  file.SetXAttr( xattrs, rsp );
135  std::vector<XrdCl::XAttrStatus>::iterator itr = rsp.begin();
136  for( ; itr != rsp.end() ; ++itr )
137  if( !itr->status.IsOK() ) return itr->status;
138  return XrdCl::XRootDStatus();
139  }
140 
141  //----------------------------------------------------------------------------
143  //----------------------------------------------------------------------------
144  class Source
145  {
146  public:
147  //------------------------------------------------------------------------
148  // Destructor
149  //------------------------------------------------------------------------
150  Source( const std::string &checkSumType = "",
151  const std::vector<std::string> &addcks = std::vector<std::string>() ) :
152  pCkSumHelper( 0 ),
153  pContinue( false )
154  {
155  if( !checkSumType.empty() )
156  pCkSumHelper = new XrdCl::CheckSumHelper( "source", checkSumType );
157 
158  for( auto &type : addcks )
159  pAddCksHelpers.push_back( new XrdCl::CheckSumHelper( "source", type ) );
160  };
161 
162  virtual ~Source()
163  {
164  delete pCkSumHelper;
165  for( auto ptr : pAddCksHelpers )
166  delete ptr;
167  }
168 
169  //------------------------------------------------------------------------
171  //------------------------------------------------------------------------
172  virtual XrdCl::XRootDStatus Initialize() = 0;
173 
174  //------------------------------------------------------------------------
176  //------------------------------------------------------------------------
177  virtual int64_t GetSize() = 0;
178 
179  //------------------------------------------------------------------------
181  //------------------------------------------------------------------------
182  virtual XrdCl::XRootDStatus StartAt( uint64_t offset ) = 0;
183 
184  //------------------------------------------------------------------------
191  //------------------------------------------------------------------------
192  virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci ) = 0;
193 
194  //------------------------------------------------------------------------
196  //------------------------------------------------------------------------
197  virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
198  std::string &checkSumType ) = 0;
199 
200  //------------------------------------------------------------------------
202  //------------------------------------------------------------------------
203  virtual std::vector<std::string> GetAddCks() = 0;
204 
205  //------------------------------------------------------------------------
207  //------------------------------------------------------------------------
208  virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs ) = 0;
209 
210  //------------------------------------------------------------------------
212  //------------------------------------------------------------------------
213  virtual XrdCl::XRootDStatus TryOtherServer()
214  {
216  }
217 
218  protected:
219 
220  XrdCl::CheckSumHelper *pCkSumHelper;
221  std::vector<XrdCl::CheckSumHelper*> pAddCksHelpers;
222  bool pContinue;
223  };
224 
225  //----------------------------------------------------------------------------
227  //----------------------------------------------------------------------------
228  class Destination
229  {
230  public:
231  //------------------------------------------------------------------------
233  //------------------------------------------------------------------------
234  Destination( const std::string &checkSumType = "" ):
235  pPosc( false ), pForce( false ), pCoerce( false ), pMakeDir( false ),
236  pContinue( false ), pCkSumHelper( 0 )
237  {
238  if( !checkSumType.empty() )
239  pCkSumHelper = new XrdCl::CheckSumHelper( "destination", checkSumType );
240  }
241 
242  //------------------------------------------------------------------------
244  //------------------------------------------------------------------------
245  virtual ~Destination()
246  {
247  delete pCkSumHelper;
248  }
249 
250  //------------------------------------------------------------------------
252  //------------------------------------------------------------------------
253  virtual XrdCl::XRootDStatus Initialize() = 0;
254 
255  //------------------------------------------------------------------------
257  //------------------------------------------------------------------------
258  virtual XrdCl::XRootDStatus Finalize() = 0;
259 
260  //------------------------------------------------------------------------
265  //------------------------------------------------------------------------
266  virtual XrdCl::XRootDStatus PutChunk( XrdCl::PageInfo &&ci ) = 0;
267 
268  //------------------------------------------------------------------------
270  //------------------------------------------------------------------------
271  virtual XrdCl::XRootDStatus Flush() = 0;
272 
273  //------------------------------------------------------------------------
275  //------------------------------------------------------------------------
276  virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
277  std::string &checkSumType ) = 0;
278 
279  //------------------------------------------------------------------------
281  //------------------------------------------------------------------------
282  virtual XrdCl::XRootDStatus SetXAttr( const std::vector<XrdCl::xattr_t> &xattrs ) = 0;
283 
284  //------------------------------------------------------------------------
286  //------------------------------------------------------------------------
287  virtual int64_t GetSize() = 0;
288 
289  //------------------------------------------------------------------------
291  //------------------------------------------------------------------------
292  void SetPOSC( bool posc )
293  {
294  pPosc = posc;
295  }
296 
297  //------------------------------------------------------------------------
299  //------------------------------------------------------------------------
300  void SetForce( bool force )
301  {
302  pForce = force;
303  }
304 
305  //------------------------------------------------------------------------
307  //------------------------------------------------------------------------
308  void SetContinue( bool continue_ )
309  {
310  pContinue = continue_;
311  }
312 
313  //------------------------------------------------------------------------
315  //------------------------------------------------------------------------
316  void SetCoerce( bool coerce )
317  {
318  pCoerce = coerce;
319  }
320 
321  //------------------------------------------------------------------------
323  //------------------------------------------------------------------------
324  void SetMakeDir( bool makedir )
325  {
326  pMakeDir = makedir;
327  }
328 
329  //------------------------------------------------------------------------
331  //------------------------------------------------------------------------
332  virtual const std::string& GetLastURL() const
333  {
334  static const std::string empty;
335  return empty;
336  }
337 
338  //------------------------------------------------------------------------
340  //------------------------------------------------------------------------
341  virtual const std::string& GetWrtRecoveryRedir() const
342  {
343  static const std::string empty;
344  return empty;
345  }
346 
347  protected:
348  bool pPosc;
349  bool pForce;
350  bool pCoerce;
351  bool pMakeDir;
352  bool pContinue;
353 
354  XrdCl::CheckSumHelper *pCkSumHelper;
355  };
356 
357  //----------------------------------------------------------------------------
359  //----------------------------------------------------------------------------
360  class StdInSource: public Source
361  {
362  public:
363  //------------------------------------------------------------------------
365  //------------------------------------------------------------------------
366  StdInSource( const std::string &ckSumType, uint32_t chunkSize, const std::vector<std::string> &addcks ):
367  Source( ckSumType, addcks ),
368  pCurrentOffset(0),
369  pChunkSize( chunkSize )
370  {
371 
372  }
373 
374  //------------------------------------------------------------------------
376  //------------------------------------------------------------------------
377  virtual ~StdInSource()
378  {
379 
380  }
381 
382  //------------------------------------------------------------------------
384  //------------------------------------------------------------------------
385  virtual XrdCl::XRootDStatus Initialize()
386  {
387  if( pCkSumHelper )
388  {
389  auto st = pCkSumHelper->Initialize();
390  if( !st.IsOK() ) return st;
391  for( auto cksHelper : pAddCksHelpers )
392  {
393  st = cksHelper->Initialize();
394  if( !st.IsOK() ) return st;
395  }
396  }
397  return XrdCl::XRootDStatus();
398  }
399 
400  //------------------------------------------------------------------------
402  //------------------------------------------------------------------------
403  virtual int64_t GetSize()
404  {
405  return -1;
406  }
407 
408  //------------------------------------------------------------------------
410  //------------------------------------------------------------------------
411  virtual XrdCl::XRootDStatus StartAt( uint64_t )
412  {
414  "Cannot continue from stdin!" );
415  }
416 
417  //------------------------------------------------------------------------
419  //------------------------------------------------------------------------
420  virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
421  {
422  using namespace XrdCl;
423  Log *log = DefaultEnv::GetLog();
424 
425  uint32_t toRead = pChunkSize;
426  char *buffer = new char[toRead];
427 
428  int64_t bytesRead = 0;
429  uint32_t offset = 0;
430  while( toRead )
431  {
432  int64_t bRead = read( 0, buffer+offset, toRead );
433  if( bRead == -1 )
434  {
435  log->Debug( UtilityMsg, "Unable to read from stdin: %s",
436  XrdSysE2T( errno ) );
437  delete [] buffer;
438  return XRootDStatus( stError, errOSError, errno );
439  }
440 
441  if( bRead == 0 )
442  break;
443 
444  bytesRead += bRead;
445  offset += bRead;
446  toRead -= bRead;
447  }
448 
449  if( bytesRead == 0 )
450  {
451  delete [] buffer;
452  return XRootDStatus( stOK, suDone );
453  }
454 
455  if( pCkSumHelper )
456  pCkSumHelper->Update( buffer, bytesRead );
457 
458  for( auto cksHelper : pAddCksHelpers )
459  cksHelper->Update( buffer, bytesRead );
460 
461  ci = XrdCl::PageInfo( pCurrentOffset, bytesRead, buffer );
462  pCurrentOffset += bytesRead;
463  return XRootDStatus( stOK, suContinue );
464  }
465 
466  //------------------------------------------------------------------------
468  //------------------------------------------------------------------------
469  virtual XrdCl::XRootDStatus GetCheckSumImpl( XrdCl::CheckSumHelper *cksHelper,
470  std::string &checkSum,
471  std::string &checkSumType )
472  {
473  using namespace XrdCl;
474  if( cksHelper )
475  return cksHelper->GetCheckSum( checkSum, checkSumType );
477  }
478 
479  //------------------------------------------------------------------------
481  //------------------------------------------------------------------------
482  virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
483  std::string &checkSumType )
484  {
485  return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
486  }
487 
488  //------------------------------------------------------------------------
490  //------------------------------------------------------------------------
491  std::vector<std::string> GetAddCks()
492  {
493  std::vector<std::string> ret;
494  for( auto cksHelper : pAddCksHelpers )
495  {
496  std::string type = cksHelper->GetType();
497  std::string cks;
498  GetCheckSumImpl( cksHelper, cks, type );
499  ret.push_back( type + ":" + cks );
500  }
501  return ret;
502  }
503 
504  //------------------------------------------------------------------------
506  //------------------------------------------------------------------------
507  virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
508  {
509  return XrdCl::XRootDStatus();
510  }
511 
512  private:
513  StdInSource(const StdInSource &other);
514  StdInSource &operator = (const StdInSource &other);
515 
516  uint64_t pCurrentOffset;
517  uint32_t pChunkSize;
518  };
519 
520  //----------------------------------------------------------------------------
522  //----------------------------------------------------------------------------
523  class XRootDSource: public Source
524  {
525  struct CancellableJob : public XrdCl::Job
526  {
527  virtual void Cancel() = 0;
528 
529  std::mutex mtx;
530  };
531 
532  //----------------------------------------------------------------------------
533  // On-connect callback job, a lambda would be more elegant, but we still have
534  // to support SLC6
535  //----------------------------------------------------------------------------
536  template<typename READER>
537  struct OnConnJob : public CancellableJob
538  {
539  OnConnJob( XRootDSource *self, READER *reader ) : self( self ), reader( reader )
540  {
541  }
542 
543  void Run( void* )
544  {
545  std::unique_lock<std::mutex> lck( mtx );
546  if( !self || !reader ) return;
547  // add new chunks to the queue
548  if( self->pNbConn < self->pMaxNbConn )
549  self->FillQueue( reader );
550  }
551 
552  void Cancel()
553  {
554  std::unique_lock<std::mutex> lck( mtx );
555  self = 0;
556  reader = 0;
557  }
558 
559  private:
560  XRootDSource *self;
561  READER *reader;
562 
563  };
564 
565  public:
566 
567  //------------------------------------------------------------------------
569  //------------------------------------------------------------------------
570  XrdCl::XRootDStatus TryOtherServer()
571  {
572  return pFile->TryOtherServer();
573  }
574 
575  //------------------------------------------------------------------------
577  //------------------------------------------------------------------------
578  XRootDSource( const XrdCl::URL *url,
579  uint32_t chunkSize,
580  uint8_t parallelChunks,
581  const std::string &ckSumType,
582  const std::vector<std::string> &addcks,
583  bool doserver ):
584  Source( ckSumType, addcks ),
585  pUrl( url ), pFile( new XrdCl::File() ), pSize( -1 ),
586  pCurrentOffset( 0 ), pChunkSize( chunkSize ),
587  pParallel( parallelChunks ),
588  pNbConn( 0 ), pUsePgRead( false ),
589  pDoServer( doserver )
590  {
592  XrdCl::DefaultEnv::GetEnv()->GetInt( "SubStreamsPerChannel", val );
593  pMaxNbConn = val - 1; // account for the control stream
594  }
595 
596  //------------------------------------------------------------------------
598  //------------------------------------------------------------------------
599  virtual ~XRootDSource()
600  {
601  if( pDataConnCB )
602  pDataConnCB->Cancel();
603 
604  CleanUpChunks();
605  if( pFile->IsOpen() )
606  XrdCl::XRootDStatus status = pFile->Close();
607  delete pFile;
608  }
609 
610  //------------------------------------------------------------------------
612  //------------------------------------------------------------------------
613  virtual XrdCl::XRootDStatus Initialize()
614  {
615  using namespace XrdCl;
616  Log *log = DefaultEnv::GetLog();
617  log->Debug( UtilityMsg, "Opening %s for reading",
618  pUrl->GetObfuscatedURL().c_str() );
619 
620  std::string value;
621  DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
622  pFile->SetProperty( "ReadRecovery", value );
623 
624  XRootDStatus st = pFile->Open( pUrl->GetURL(), OpenFlags::Read );
625  if( !st.IsOK() )
626  return st;
627 
628  StatInfo *statInfo;
629  st = pFile->Stat( false, statInfo );
630  if( !st.IsOK() )
631  return st;
632 
633  pSize = statInfo->GetSize();
634  delete statInfo;
635 
636  if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper && !pContinue )
637  {
638  st = pCkSumHelper->Initialize();
639  if( !st.IsOK() ) return st;
640 
641  for( auto cksHelper : pAddCksHelpers )
642  {
643  st = cksHelper->Initialize();
644  if( !st.IsOK() ) return st;
645  }
646  }
647 
648  //----------------------------------------------------------------------
649  // Figere out the actual data server we are talking to
650  //----------------------------------------------------------------------
651  if( !pUrl->IsLocalFile() ||
652  ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
653  {
654  pFile->GetProperty( "LastURL", pDataServer );
655  }
656 
657 
658  if( ( !pUrl->IsLocalFile() && !pFile->IsSecure() ) ||
659  ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
660  {
661  //--------------------------------------------------------------------
662  // Decide whether we can use PgRead
663  //--------------------------------------------------------------------
664  int val = XrdCl::DefaultCpUsePgWrtRd;
665  XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
666  pUsePgRead = XrdCl::Utils::HasPgRW( pDataServer ) && ( val == 1 );
667  }
668 
669  //----------------------------------------------------------------------
670  // Print the IPv4/IPv6 stack to the stderr if we are running in server
671  // mode
672  //----------------------------------------------------------------------
673  if( pDoServer && !pUrl->IsLocalFile() )
674  {
675  AnyObject obj;
676  DefaultEnv::GetPostMaster()->QueryTransport( pDataServer, StreamQuery::IpStack, obj );
677  std::string *ipstack = nullptr;
678  obj.Get( ipstack );
679  std::cerr << "!-!" << *ipstack << std::endl;
680  delete ipstack;
681  }
682 
683  SetOnDataConnectHandler( pFile );
684 
685  return XRootDStatus();
686  }
687 
688  //------------------------------------------------------------------------
690  //------------------------------------------------------------------------
691  virtual int64_t GetSize()
692  {
693  return pSize;
694  }
695 
696  //------------------------------------------------------------------------
698  //------------------------------------------------------------------------
699  virtual XrdCl::XRootDStatus StartAt( uint64_t offset )
700  {
701  pCurrentOffset = offset;
702  pContinue = true;
703  return XrdCl::XRootDStatus();
704  }
705 
706  //------------------------------------------------------------------------
713  //------------------------------------------------------------------------
714  virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
715  {
716  return GetChunkImpl( pFile, ci );
717  }
718 
719  //------------------------------------------------------------------------
721  //------------------------------------------------------------------------
722  virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
723  {
724  return ::GetXAttr( *pFile, xattrs );
725  }
726 
727  //------------------------------------------------------------------------
728  // Clean up the chunks that are flying
729  //------------------------------------------------------------------------
730  void CleanUpChunks()
731  {
732  while( !pChunks.empty() )
733  {
734  ChunkHandler *ch = pChunks.front();
735  pChunks.pop();
736  ch->sem->Wait();
737  delete [] (char *)ch->chunk.GetBuffer();
738  delete ch;
739  }
740  }
741 
742  //------------------------------------------------------------------------
743  // Get check sum
744  //------------------------------------------------------------------------
745  virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
746  std::string &checkSumType )
747  {
748  return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
749  }
750 
751  XrdCl::XRootDStatus GetCheckSumImpl( XrdCl::CheckSumHelper *cksHelper,
752  std::string &checkSum,
753  std::string &checkSumType )
754  {
755  if( pUrl->IsMetalink() )
756  {
758  XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
759  checkSum = redirector->GetCheckSum( checkSumType );
760  if( !checkSum.empty() ) return XrdCl::XRootDStatus();
761  }
762 
763  if( pUrl->IsLocalFile() )
764  {
765  if( pContinue )
766  // in case of --continue option we have to calculate the checksum from scratch
767  return XrdCl::Utils::GetLocalCheckSum( checkSum, checkSumType, pUrl->GetPath() );
768 
769  if( cksHelper )
770  return cksHelper->GetCheckSum( checkSum, checkSumType );
771 
773  }
774 
775  std::string dataServer; pFile->GetProperty( "DataServer", dataServer );
776  std::string lastUrl; pFile->GetProperty( "LastURL", lastUrl );
777  return XrdCl::Utils::GetRemoteCheckSum( checkSum, checkSumType, XrdCl::URL( lastUrl ) );
778  }
779 
780  //------------------------------------------------------------------------
782  //------------------------------------------------------------------------
783  std::vector<std::string> GetAddCks()
784  {
785  std::vector<std::string> ret;
786  for( auto cksHelper : pAddCksHelpers )
787  {
788  std::string type = cksHelper->GetType();
789  std::string cks;
790  GetCheckSumImpl( cksHelper, cks, type );
791  ret.push_back( cks );
792  }
793  return ret;
794  }
795 
796  private:
797  XRootDSource(const XRootDSource &other);
798  XRootDSource &operator = (const XRootDSource &other);
799 
800  protected:
801 
802  //------------------------------------------------------------------------
803  // Fill the queue with in-the-fly read requests
804  //------------------------------------------------------------------------
805  template<typename READER>
806  inline void FillQueue( READER *reader )
807  {
808  //----------------------------------------------------------------------
809  // Get the number of connected streams
810  //----------------------------------------------------------------------
811  uint16_t parallel = pParallel;
812  if( pNbConn < pMaxNbConn )
813  {
815  NbConnectedStrm( pDataServer );
816  }
817  if( pNbConn ) parallel *= pNbConn;
818 
819  while( pChunks.size() < parallel && pCurrentOffset < pSize )
820  {
821  uint64_t chunkSize = pChunkSize;
822  if( pCurrentOffset + chunkSize > (uint64_t)pSize )
823  chunkSize = pSize - pCurrentOffset;
824 
825  char *buffer = new char[chunkSize];
826  ChunkHandler *ch = new ChunkHandler();
827  auto st = pUsePgRead
828  ? reader->PgRead( pCurrentOffset, chunkSize, buffer, ch )
829  : reader->Read( pCurrentOffset, chunkSize, buffer, ch );
830  pChunks.push( ch );
831  pCurrentOffset += chunkSize;
832  if( !st.IsOK() )
833  {
834  ch->status = st;
835  ch->sem->Post();
836  break;
837  }
838  }
839  }
840 
841  //------------------------------------------------------------------------
842  // Set the on-connect handler for data streams
843  //------------------------------------------------------------------------
844  template<typename READER>
845  void SetOnDataConnectHandler( READER *reader )
846  {
847  // we need to create the object anyway as it contains our mutex now
848  pDataConnCB.reset( new OnConnJob<READER>( this, reader ) );
849 
850  // check if it is a local file
851  if( pDataServer.empty() ) return;
852 
853  XrdCl::DefaultEnv::GetPostMaster()->SetOnDataConnectHandler( pDataServer, pDataConnCB );
854  }
855 
856  //------------------------------------------------------------------------
864  //------------------------------------------------------------------------
865  template<typename READER>
866  XrdCl::XRootDStatus GetChunkImpl( READER *reader, XrdCl::PageInfo &ci )
867  {
868  //----------------------------------------------------------------------
869  // Sanity check
870  //----------------------------------------------------------------------
871  using namespace XrdCl;
872  Log *log = DefaultEnv::GetLog();
873 
874  //----------------------------------------------------------------------
875  // Fill the queue
876  //----------------------------------------------------------------------
877  std::unique_lock<std::mutex> lck( pDataConnCB->mtx );
878  FillQueue( reader );
879 
880  //----------------------------------------------------------------------
881  // Pick up a chunk from the front and wait for status
882  //----------------------------------------------------------------------
883  if( pChunks.empty() )
884  return XRootDStatus( stOK, suDone );
885 
886  std::unique_ptr<ChunkHandler> ch( pChunks.front() );
887  pChunks.pop();
888  lck.unlock();
889 
890  ch->sem->Wait();
891 
892  if( !ch->status.IsOK() )
893  {
894  log->Debug( UtilityMsg, "Unable read %d bytes at %llu from %s: %s",
895  ch->chunk.GetLength(), (unsigned long long) ch->chunk.GetOffset(),
896  pUrl->GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
897  delete [] (char *)ch->chunk.GetBuffer();
898  CleanUpChunks();
899  return ch->status;
900  }
901 
902  ci = std::move( ch->chunk );
903  // if it is a local file update the checksum
904  if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && !pContinue )
905  {
906  if( pCkSumHelper )
907  pCkSumHelper->Update( ci.GetBuffer(), ci.GetLength() );
908 
909  for( auto cksHelper : pAddCksHelpers )
910  cksHelper->Update( ci.GetBuffer(), ci.GetLength() );
911  }
912 
913  return XRootDStatus( stOK, suContinue );
914  }
915 
916  //------------------------------------------------------------------------
917  // Asynchronous chunk handler
918  //------------------------------------------------------------------------
920  {
921  public:
922  ChunkHandler(): sem( new XrdSysSemaphore(0) ) {}
923  virtual ~ChunkHandler() { delete sem; }
924  virtual void HandleResponse( XrdCl::XRootDStatus *statusval,
925  XrdCl::AnyObject *response )
926  {
927  this->status = *statusval;
928  delete statusval;
929  if( response )
930  {
931  chunk = ToChunk( response );
932  delete response;
933  }
934  sem->Post();
935  }
936 
937  XrdCl::PageInfo ToChunk( XrdCl::AnyObject *response )
938  {
939  if( response->Has<XrdCl::PageInfo>() )
940  {
941  XrdCl::PageInfo *resp = nullptr;
942  response->Get( resp );
943  return std::move( *resp );
944  }
945  else
946  {
947  XrdCl::ChunkInfo *resp = nullptr;
948  response->Get( resp );
949  return XrdCl::PageInfo( resp->GetOffset(), resp->GetLength(),
950  resp->GetBuffer() );
951  }
952  }
953 
954  XrdSysSemaphore *sem;
955  XrdCl::PageInfo chunk;
956  XrdCl::XRootDStatus status;
957  };
958 
959  const XrdCl::URL *pUrl;
960  XrdCl::File *pFile;
961  int64_t pSize;
962  int64_t pCurrentOffset;
963  uint32_t pChunkSize;
964  uint16_t pParallel;
965  std::queue<ChunkHandler*> pChunks;
966  std::string pDataServer;
967  uint16_t pNbConn;
968  uint16_t pMaxNbConn;
969  bool pUsePgRead;
970  bool pDoServer;
971 
972  std::shared_ptr<CancellableJob> pDataConnCB;
973  };
974 
975  //----------------------------------------------------------------------------
977  //----------------------------------------------------------------------------
978  class XRootDSourceZip: public XRootDSource
979  {
980  public:
981  //------------------------------------------------------------------------
983  //------------------------------------------------------------------------
984  XRootDSourceZip( const std::string &filename,
985  const XrdCl::URL *archive,
986  uint32_t chunkSize,
987  uint8_t parallelChunks,
988  const std::string &ckSumType,
989  const std::vector<std::string> &addcks,
990  bool doserver ):
991  XRootDSource( archive, chunkSize, parallelChunks, ckSumType,
992  addcks, doserver ),
993  pFilename( filename ),
994  pZipArchive( new XrdCl::ZipArchive() )
995  {
996  }
997 
998  //------------------------------------------------------------------------
1000  //------------------------------------------------------------------------
1001  virtual ~XRootDSourceZip()
1002  {
1003  CleanUpChunks();
1004 
1005  XrdCl::WaitFor( XrdCl::CloseArchive( pZipArchive ) );
1006  delete pZipArchive;
1007  }
1008 
1009  //------------------------------------------------------------------------
1011  //------------------------------------------------------------------------
1012  virtual XrdCl::XRootDStatus Initialize()
1013  {
1014  using namespace XrdCl;
1015  Log *log = DefaultEnv::GetLog();
1016  log->Debug( UtilityMsg, "Opening %s for reading",
1017  pUrl->GetObfuscatedURL().c_str() );
1018 
1019  std::string value;
1020  DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
1021  pZipArchive->SetProperty( "ReadRecovery", value );
1022 
1023  XRootDStatus st = XrdCl::WaitFor( XrdCl::OpenArchive( pZipArchive, pUrl->GetURL(), XrdCl::OpenFlags::Read ) );
1024  if( !st.IsOK() )
1025  return st;
1026 
1027  st = pZipArchive->OpenFile( pFilename );
1028  if( !st.IsOK() )
1029  return st;
1030 
1031  XrdCl::StatInfo *info = 0;
1032  st = pZipArchive->Stat( info );
1033  if( st.IsOK() )
1034  {
1035  pSize = info->GetSize();
1036  delete info;
1037  }
1038  else
1039  return st;
1040 
1041  if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper )
1042  {
1043  auto st = pCkSumHelper->Initialize();
1044  if( !st.IsOK() ) return st;
1045  for( auto cksHelper : pAddCksHelpers )
1046  {
1047  st = cksHelper->Initialize();
1048  if( !st.IsOK() ) return st;
1049  }
1050  }
1051 
1052  if( ( !pUrl->IsLocalFile() && !pZipArchive->IsSecure() ) ||
1053  ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
1054  {
1055  pZipArchive->GetProperty( "DataServer", pDataServer );
1056  //--------------------------------------------------------------------
1057  // Decide whether we can use PgRead
1058  //--------------------------------------------------------------------
1059  int val = XrdCl::DefaultCpUsePgWrtRd;
1060  XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
1061  pUsePgRead = XrdCl::Utils::HasPgRW( pDataServer ) && ( val == 1 );
1062  }
1063 
1064  SetOnDataConnectHandler( pZipArchive );
1065 
1066  return XrdCl::XRootDStatus();
1067  }
1068 
1069  //------------------------------------------------------------------------
1077  //------------------------------------------------------------------------
1078  virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
1079  {
1080  return GetChunkImpl( pZipArchive, ci );
1081  }
1082 
1083  //------------------------------------------------------------------------
1084  // Get check sum
1085  //------------------------------------------------------------------------
1086  virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1087  std::string &checkSumType )
1088  {
1089  return GetCheckSumImpl( checkSum, checkSumType, pCkSumHelper );
1090  }
1091 
1092  //------------------------------------------------------------------------
1093  // Get check sum implementation
1094  //------------------------------------------------------------------------
1095  virtual XrdCl::XRootDStatus GetCheckSumImpl( std::string &checkSum,
1096  std::string &checkSumType,
1097  XrdCl::CheckSumHelper *cksHelper )
1098  {
1099  // The ZIP archive by default contains a ZCRC32 checksum
1100  if( checkSumType == "zcrc32" )
1101  {
1102  uint32_t cksum = 0;
1103  auto st = pZipArchive->GetCRC32( pFilename, cksum );
1104  if( !st.IsOK() ) return st;
1105 
1106  XrdCksData ckSum;
1107  ckSum.Set( "zcrc32" );
1108  ckSum.Set( reinterpret_cast<void*>( &cksum ), sizeof( uint32_t ) );
1109  char cksBuffer[265];
1110  ckSum.Get( cksBuffer, 256 );
1111  checkSum = "zcrc32:";
1112  checkSum += XrdCl::Utils::NormalizeChecksum( "zcrc32", cksBuffer );
1113  return st;
1114  }
1115 
1116  int useMtlnCksum = XrdCl::DefaultZipMtlnCksum;
1118  env->GetInt( "ZipMtlnCksum", useMtlnCksum );
1119  if( useMtlnCksum && pUrl->IsMetalink() )
1120  {
1122  XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
1123  checkSum = redirector->GetCheckSum( checkSumType );
1124  if( !checkSum.empty() ) return XrdCl::XRootDStatus();
1125  }
1126 
1127  // if it is a local file we can calculate the checksum ourself
1128  if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && cksHelper && !pContinue )
1129  return cksHelper->GetCheckSum( checkSum, checkSumType );
1130 
1131  // if it is a remote file other types of checksum are not supported
1133  }
1134 
1135  //------------------------------------------------------------------------
1137  //------------------------------------------------------------------------
1138  std::vector<std::string> GetAddCks()
1139  {
1140  std::vector<std::string> ret;
1141  for( auto cksHelper : pAddCksHelpers )
1142  {
1143  std::string type = cksHelper->GetType();
1144  std::string cks;
1145  GetCheckSumImpl( cks, type, cksHelper );
1146  ret.push_back( cks );
1147  }
1148  return ret;
1149  }
1150 
1151  //------------------------------------------------------------------------
1153  //------------------------------------------------------------------------
1154  virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
1155  {
1156  return XrdCl::XRootDStatus();
1157  }
1158 
1159  private:
1160 
1161  XRootDSourceZip(const XRootDSourceZip &other);
1162  XRootDSourceZip &operator = (const XRootDSourceZip &other);
1163 
1164  const std::string pFilename;
1165  XrdCl::ZipArchive *pZipArchive;
1166  };
1167 
1168  //----------------------------------------------------------------------------
1170  //----------------------------------------------------------------------------
1171  class XRootDSourceDynamic: public Source
1172  {
1173  public:
1174 
1175  //------------------------------------------------------------------------
1177  //------------------------------------------------------------------------
1178  XrdCl::XRootDStatus TryOtherServer()
1179  {
1180  return pFile->TryOtherServer();
1181  }
1182 
1183  //------------------------------------------------------------------------
1185  //------------------------------------------------------------------------
1186  XRootDSourceDynamic( const XrdCl::URL *url,
1187  uint32_t chunkSize,
1188  const std::string &ckSumType,
1189  const std::vector<std::string> &addcks ):
1190  Source( ckSumType, addcks ),
1191  pUrl( url ), pFile( new XrdCl::File() ), pCurrentOffset( 0 ),
1192  pChunkSize( chunkSize ), pDone( false ), pUsePgRead( false )
1193  {
1194  }
1195 
1196  //------------------------------------------------------------------------
1198  //------------------------------------------------------------------------
1199  virtual ~XRootDSourceDynamic()
1200  {
1201  XrdCl::XRootDStatus status = pFile->Close();
1202  delete pFile;
1203  }
1204 
1205  //------------------------------------------------------------------------
1207  //------------------------------------------------------------------------
1208  virtual XrdCl::XRootDStatus Initialize()
1209  {
1210  using namespace XrdCl;
1211  Log *log = DefaultEnv::GetLog();
1212  log->Debug( UtilityMsg, "Opening %s for reading",
1213  pUrl->GetObfuscatedURL().c_str() );
1214 
1215  std::string value;
1216  DefaultEnv::GetEnv()->GetString( "ReadRecovery", value );
1217  pFile->SetProperty( "ReadRecovery", value );
1218 
1219  XRootDStatus st = pFile->Open( pUrl->GetURL(), OpenFlags::Read );
1220  if( !st.IsOK() )
1221  return st;
1222 
1223  if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && pCkSumHelper && !pContinue )
1224  {
1225  auto st = pCkSumHelper->Initialize();
1226  if( !st.IsOK() ) return st;
1227  for( auto cksHelper : pAddCksHelpers )
1228  {
1229  st = cksHelper->Initialize();
1230  if( !st.IsOK() ) return st;
1231  }
1232  }
1233 
1234  if( ( !pUrl->IsLocalFile() && !pFile->IsSecure() ) ||
1235  ( pUrl->IsLocalFile() && pUrl->IsMetalink() ) )
1236  {
1237  std::string datasrv;
1238  pFile->GetProperty( "DataServer", datasrv );
1239  //--------------------------------------------------------------------
1240  // Decide whether we can use PgRead
1241  //--------------------------------------------------------------------
1242  int val = XrdCl::DefaultCpUsePgWrtRd;
1243  XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
1244  pUsePgRead = XrdCl::Utils::HasPgRW( datasrv ) && ( val == 1 );
1245  }
1246 
1247  return XRootDStatus();
1248  }
1249 
1250  //------------------------------------------------------------------------
1252  //------------------------------------------------------------------------
1253  virtual int64_t GetSize()
1254  {
1255  return -1;
1256  }
1257 
1258  //------------------------------------------------------------------------
1260  //------------------------------------------------------------------------
1261  virtual XrdCl::XRootDStatus StartAt( uint64_t offset )
1262  {
1263  pCurrentOffset = offset;
1264  pContinue = true;
1265  return XrdCl::XRootDStatus();
1266  }
1267 
1268  //------------------------------------------------------------------------
1276  //------------------------------------------------------------------------
1277  virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
1278  {
1279  //----------------------------------------------------------------------
1280  // Sanity check
1281  //----------------------------------------------------------------------
1282  using namespace XrdCl;
1283 
1284  if( pDone )
1285  return XRootDStatus( stOK, suDone );
1286 
1287  //----------------------------------------------------------------------
1288  // Fill the queue
1289  //----------------------------------------------------------------------
1290  char *buffer = new char[pChunkSize];
1291  uint32_t bytesRead = 0;
1292 
1293  std::vector<uint32_t> cksums;
1294  XRootDStatus st = pUsePgRead
1295  ? pFile->PgRead( pCurrentOffset, pChunkSize, buffer, cksums, bytesRead )
1296  : pFile->Read( pCurrentOffset, pChunkSize, buffer, bytesRead );
1297 
1298  if( !st.IsOK() )
1299  {
1300  delete [] buffer;
1301  return st;
1302  }
1303 
1304  if( !bytesRead )
1305  {
1306  delete [] buffer;
1307  return XRootDStatus( stOK, suDone );
1308  }
1309 
1310  if( bytesRead < pChunkSize )
1311  pDone = true;
1312 
1313  // if it is a local file update the checksum
1314  if( pUrl->IsLocalFile() && !pUrl->IsMetalink() && !pContinue )
1315  {
1316  if( pCkSumHelper )
1317  pCkSumHelper->Update( buffer, bytesRead );
1318 
1319  for( auto cksHelper : pAddCksHelpers )
1320  cksHelper->Update( buffer, bytesRead );
1321  }
1322 
1323  ci = XrdCl::PageInfo( pCurrentOffset, bytesRead, buffer );
1324  pCurrentOffset += bytesRead;
1325 
1326  return XRootDStatus( stOK, suContinue );
1327  }
1328 
1329  //------------------------------------------------------------------------
1330  // Get check sum
1331  //------------------------------------------------------------------------
1332  virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1333  std::string &checkSumType )
1334  {
1335  return GetCheckSumImpl( pCkSumHelper, checkSum, checkSumType );
1336  }
1337 
1338  XrdCl::XRootDStatus GetCheckSumImpl( XrdCl::CheckSumHelper *cksHelper,
1339  std::string &checkSum,
1340  std::string &checkSumType )
1341  {
1342  if( pUrl->IsMetalink() )
1343  {
1345  XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
1346  checkSum = redirector->GetCheckSum( checkSumType );
1347  if( !checkSum.empty() ) return XrdCl::XRootDStatus();
1348  }
1349 
1350  if( pUrl->IsLocalFile() )
1351  {
1352  if( pContinue)
1353  // in case of --continue option we have to calculate the checksum from scratch
1354  return XrdCl::Utils::GetLocalCheckSum( checkSum, checkSumType, pUrl->GetPath() );
1355 
1356  if( cksHelper )
1357  return cksHelper->GetCheckSum( checkSum, checkSumType );
1358 
1360  }
1361 
1362  std::string dataServer; pFile->GetProperty( "DataServer", dataServer );
1363  std::string lastUrl; pFile->GetProperty( "LastURL", lastUrl );
1364  return XrdCl::Utils::GetRemoteCheckSum( checkSum, checkSumType, XrdCl::URL( lastUrl ) );
1365  }
1366 
1367  //------------------------------------------------------------------------
1369  //------------------------------------------------------------------------
1370  std::vector<std::string> GetAddCks()
1371  {
1372  std::vector<std::string> ret;
1373  for( auto cksHelper : pAddCksHelpers )
1374  {
1375  std::string type = cksHelper->GetType();
1376  std::string cks;
1377  GetCheckSumImpl( cksHelper, cks, type );
1378  ret.push_back( cks );
1379  }
1380  return ret;
1381  }
1382 
1383  //------------------------------------------------------------------------
1385  //------------------------------------------------------------------------
1386  virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
1387  {
1388  return ::GetXAttr( *pFile, xattrs );
1389  }
1390 
1391  private:
1392  XRootDSourceDynamic(const XRootDSourceDynamic &other);
1393  XRootDSourceDynamic &operator = (const XRootDSourceDynamic &other);
1394  const XrdCl::URL *pUrl;
1395  XrdCl::File *pFile;
1396  int64_t pCurrentOffset;
1397  uint32_t pChunkSize;
1398  bool pDone;
1399  bool pUsePgRead;
1400  };
1401 
1402  //----------------------------------------------------------------------------
1404  //----------------------------------------------------------------------------
1405  class XRootDSourceXCp: public Source
1406  {
1407  public:
1408  //------------------------------------------------------------------------
1410  //------------------------------------------------------------------------
1411  XRootDSourceXCp( const XrdCl::URL* url, uint32_t chunkSize, uint16_t parallelChunks, int32_t nbSrc, uint64_t blockSize ):
1412  pXCpCtx( 0 ), pUrl( url ), pChunkSize( chunkSize ), pParallelChunks( parallelChunks ), pNbSrc( nbSrc ), pBlockSize( blockSize )
1413  {
1414  }
1415 
1416  ~XRootDSourceXCp()
1417  {
1418  if( pXCpCtx )
1419  pXCpCtx->Delete();
1420  }
1421 
1422  //------------------------------------------------------------------------
1424  //------------------------------------------------------------------------
1425  virtual XrdCl::XRootDStatus Initialize()
1426  {
1428  int64_t fileSize = -1;
1429 
1430  if( pUrl->IsMetalink() )
1431  {
1433  XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
1434  fileSize = redirector->GetSize();
1435  pReplicas = redirector->GetReplicas();
1436  }
1437  else
1438  {
1439  XrdCl::LocationInfo *li = 0;
1440  XrdCl::FileSystem fs( *pUrl );
1441  XrdCl::XRootDStatus st = fs.DeepLocate( pUrl->GetPath(), XrdCl::OpenFlags::Compress | XrdCl::OpenFlags::PrefName, li );
1442  if( !st.IsOK() ) return st;
1443 
1445  for( itr = li->Begin(); itr != li->End(); ++itr)
1446  {
1447  std::string url = "root://" + itr->GetAddress() + "/" + pUrl->GetPath();
1448  pReplicas.push_back( url );
1449  }
1450 
1451  delete li;
1452  }
1453 
1454  std::stringstream ss;
1455  ss << "XCp sources: ";
1456 
1457  std::vector<std::string>::iterator itr;
1458  for( itr = pReplicas.begin() ; itr != pReplicas.end() ; ++itr )
1459  {
1460  ss << *itr << ", ";
1461  }
1462  log->Debug( XrdCl::UtilityMsg, "%s", ss.str().c_str() );
1463 
1464  pXCpCtx = new XrdCl::XCpCtx( pReplicas, pBlockSize, pNbSrc, pChunkSize, pParallelChunks, fileSize );
1465 
1466  return pXCpCtx->Initialize();
1467  }
1468 
1469  //------------------------------------------------------------------------
1471  //------------------------------------------------------------------------
1472  virtual int64_t GetSize()
1473  {
1474  return pXCpCtx->GetSize();
1475  }
1476 
1477  //------------------------------------------------------------------------
1479  //------------------------------------------------------------------------
1480  virtual XrdCl::XRootDStatus StartAt( uint64_t offset )
1481  {
1483  }
1484 
1485  //------------------------------------------------------------------------
1493  //------------------------------------------------------------------------
1494  virtual XrdCl::XRootDStatus GetChunk( XrdCl::PageInfo &ci )
1495  {
1497  do
1498  {
1499  st = pXCpCtx->GetChunk( ci );
1500  }
1501  while( st.IsOK() && st.code == XrdCl::suRetry );
1502  return st;
1503  }
1504 
1505  //------------------------------------------------------------------------
1506  // Get check sum
1507  //------------------------------------------------------------------------
1508  virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1509  std::string &checkSumType )
1510  {
1511  if( pUrl->IsMetalink() )
1512  {
1514  XrdCl::VirtualRedirector *redirector = registry.Get( *pUrl );
1515  checkSum = redirector->GetCheckSum( checkSumType );
1516  if( !checkSum.empty() ) return XrdCl::XRootDStatus();
1517  }
1518 
1519  std::vector<std::string>::iterator itr;
1520  for( itr = pReplicas.begin() ; itr != pReplicas.end() ; ++itr )
1521  {
1522  XrdCl::URL url( *itr );
1524  checkSumType, url );
1525  if( st.IsOK() ) return st;
1526  }
1527 
1529  }
1530 
1531  //------------------------------------------------------------------------
1533  //------------------------------------------------------------------------
1534  std::vector<std::string> GetAddCks()
1535  {
1536  return std::vector<std::string>();
1537  }
1538 
1539  //------------------------------------------------------------------------
1541  //------------------------------------------------------------------------
1542  virtual XrdCl::XRootDStatus GetXAttr( std::vector<XrdCl::xattr_t> &xattrs )
1543  {
1545  std::vector<std::string>::iterator itr = pReplicas.begin();
1546  for( ; itr < pReplicas.end() ; ++itr )
1547  {
1548  st = ::GetXAttr( *itr, xattrs );
1549  if( st.IsOK() ) return st;
1550  }
1551  return st;
1552  }
1553 
1554  private:
1555 
1556 
1557  XrdCl::XCpCtx *pXCpCtx;
1558  const XrdCl::URL *pUrl;
1559  std::vector<std::string> pReplicas;
1560  uint32_t pChunkSize;
1561  uint16_t pParallelChunks;
1562  int32_t pNbSrc;
1563  uint64_t pBlockSize;
1564  };
1565 
1566  //----------------------------------------------------------------------------
1568  //----------------------------------------------------------------------------
1569  class StdOutDestination: public Destination
1570  {
1571  public:
1572  //------------------------------------------------------------------------
1574  //------------------------------------------------------------------------
1575  StdOutDestination( const std::string &ckSumType ):
1576  Destination( ckSumType ), pCurrentOffset(0)
1577  {
1578  }
1579 
1580  //------------------------------------------------------------------------
1582  //------------------------------------------------------------------------
1583  virtual ~StdOutDestination()
1584  {
1585  }
1586 
1587  //------------------------------------------------------------------------
1589  //------------------------------------------------------------------------
1590  virtual XrdCl::XRootDStatus Initialize()
1591  {
1592  if( pContinue )
1594  ENOTSUP, "Cannot continue to stdout." );
1595 
1596  if( pCkSumHelper )
1597  return pCkSumHelper->Initialize();
1598  return XrdCl::XRootDStatus();
1599  }
1600 
1601  //------------------------------------------------------------------------
1603  //------------------------------------------------------------------------
1604  virtual XrdCl::XRootDStatus Finalize()
1605  {
1606  return XrdCl::XRootDStatus();
1607  }
1608 
1609  //------------------------------------------------------------------------
1614  //------------------------------------------------------------------------
1615  virtual XrdCl::XRootDStatus PutChunk( XrdCl::PageInfo &&ci )
1616  {
1617  using namespace XrdCl;
1618  Log *log = DefaultEnv::GetLog();
1619 
1620  if( pCurrentOffset != ci.GetOffset() )
1621  {
1622  log->Error( UtilityMsg, "Got out-of-bounds chunk, expected offset:"
1623  " %llu, got %llu", (unsigned long long) pCurrentOffset, (unsigned long long) ci.GetOffset() );
1624  return XRootDStatus( stError, errInternal );
1625  }
1626 
1627  int64_t wr = 0;
1628  uint32_t length = ci.GetLength();
1629  char *cursor = (char*)ci.GetBuffer();
1630  do
1631  {
1632  wr = write( 1, cursor, length );
1633  if( wr == -1 )
1634  {
1635  log->Debug( UtilityMsg, "Unable to write to stdout: %s",
1636  XrdSysE2T( errno ) );
1637  delete [] (char*)ci.GetBuffer();
1638  return XRootDStatus( stError, errOSError, errno );
1639  }
1640  pCurrentOffset += wr;
1641  cursor += wr;
1642  length -= wr;
1643  }
1644  while( length );
1645 
1646  if( pCkSumHelper )
1647  pCkSumHelper->Update( ci.GetBuffer(), ci.GetLength() );
1648  delete [] (char*)ci.GetBuffer();
1649  return XRootDStatus();
1650  }
1651 
1652  //------------------------------------------------------------------------
1654  //------------------------------------------------------------------------
1655  virtual XrdCl::XRootDStatus Flush()
1656  {
1657  return XrdCl::XRootDStatus();
1658  }
1659 
1660  //------------------------------------------------------------------------
1662  //------------------------------------------------------------------------
1663  virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1664  std::string &checkSumType )
1665  {
1666  if( pCkSumHelper )
1667  return pCkSumHelper->GetCheckSum( checkSum, checkSumType );
1669  }
1670 
1671  //------------------------------------------------------------------------
1673  //------------------------------------------------------------------------
1674  virtual XrdCl::XRootDStatus SetXAttr( const std::vector<XrdCl::xattr_t> &xattrs )
1675  {
1676  return XrdCl::XRootDStatus();
1677  }
1678 
1679  //------------------------------------------------------------------------
1681  //------------------------------------------------------------------------
1682  virtual int64_t GetSize()
1683  {
1684  return -1;
1685  }
1686 
1687  private:
1688  StdOutDestination(const StdOutDestination &other);
1689  StdOutDestination &operator = (const StdOutDestination &other);
1690  uint64_t pCurrentOffset;
1691  };
1692 
1693  //----------------------------------------------------------------------------
1695  //----------------------------------------------------------------------------
1696  class XRootDDestination: public Destination
1697  {
1698  public:
1699  //------------------------------------------------------------------------
1701  //------------------------------------------------------------------------
1702  XRootDDestination( const XrdCl::URL &url, uint8_t parallelChunks,
1703  const std::string &ckSumType, const XrdCl::ClassicCopyJob &cpjob ):
1704  Destination( ckSumType ),
1705  pUrl( url ), pFile( new XrdCl::File( XrdCl::File::DisableVirtRedirect ) ),
1706  pParallel( parallelChunks ), pSize( -1 ), pUsePgWrt( false ), cpjob( cpjob )
1707  {
1708  }
1709 
1710  //------------------------------------------------------------------------
1712  //------------------------------------------------------------------------
1713  virtual ~XRootDDestination()
1714  {
1715  CleanUpChunks();
1716  delete pFile;
1717 
1719 
1720  //----------------------------------------------------------------------
1721  // Make sure we clean up the cp-target symlink
1722  //----------------------------------------------------------------------
1723  std::string cptarget = XrdCl::DefaultCpTarget;
1724  XrdCl::DefaultEnv::GetEnv()->GetString( "CpTarget", cptarget );
1725  if( !cptarget.empty() )
1726  {
1727  XrdCl::FileSystem fs( "file://localhost" );
1728  XrdCl::XRootDStatus st = fs.Rm( cptarget );
1729  if( !st.IsOK() )
1730  log->Warning( XrdCl::UtilityMsg, "Could not delete cp-target symlink: %s",
1731  st.ToString().c_str() );
1732  }
1733 
1734  //----------------------------------------------------------------------
1735  // If the copy failed and user requested posc and we are dealing with
1736  // a local destination, remove the file
1737  //----------------------------------------------------------------------
1738  if( pUrl.IsLocalFile() && pPosc && !cpjob.GetResult().IsOK() )
1739  {
1740  XrdCl::FileSystem fs( pUrl );
1741  XrdCl::XRootDStatus st = fs.Rm( pUrl.GetPath() );
1742  if( !st.IsOK() )
1743  log->Error( XrdCl::UtilityMsg, "Failed to remove local destination"
1744  " on failure: %s", st.ToString().c_str() );
1745  }
1746  }
1747 
1748  //------------------------------------------------------------------------
1750  //------------------------------------------------------------------------
1751  virtual XrdCl::XRootDStatus Initialize()
1752  {
1753  using namespace XrdCl;
1754  Log *log = DefaultEnv::GetLog();
1755  log->Debug( UtilityMsg, "Opening %s for writing",
1756  pUrl.GetObfuscatedURL().c_str() );
1757 
1758  std::string value;
1759  DefaultEnv::GetEnv()->GetString( "WriteRecovery", value );
1760  pFile->SetProperty( "WriteRecovery", value );
1761 
1762  OpenFlags::Flags flags = OpenFlags::Update;
1763  if( pForce )
1764  flags |= OpenFlags::Delete;
1765  else if( !pContinue )
1766  flags |= OpenFlags::New;
1767 
1768  if( pPosc )
1769  flags |= OpenFlags::POSC;
1770 
1771  if( pCoerce )
1772  flags |= OpenFlags::Force;
1773 
1774  if( pMakeDir)
1775  flags |= OpenFlags::MakePath;
1776 
1777  Access::Mode mode = Access::UR|Access::UW|Access::GR|Access::OR;
1778 
1779  XrdCl::XRootDStatus st = pFile->Open( pUrl.GetURL(), flags, mode );
1780  if( !st.IsOK() )
1781  return st;
1782 
1783  if( ( !pUrl.IsLocalFile() && !pFile->IsSecure() ) ||
1784  ( pUrl.IsLocalFile() && pUrl.IsMetalink() ) )
1785  {
1786  std::string datasrv;
1787  pFile->GetProperty( "DataServer", datasrv );
1788  //--------------------------------------------------------------------
1789  // Decide whether we can use PgRead
1790  //--------------------------------------------------------------------
1791  int val = XrdCl::DefaultCpUsePgWrtRd;
1792  XrdCl::DefaultEnv::GetEnv()->GetInt( "CpUsePgWrtRd", val );
1793  pUsePgWrt = XrdCl::Utils::HasPgRW( datasrv ) && ( val == 1 );
1794  }
1795 
1796  std::string cptarget = XrdCl::DefaultCpTarget;
1797  XrdCl::DefaultEnv::GetEnv()->GetString( "CpTarget", cptarget );
1798  if( !cptarget.empty() )
1799  {
1800  std::string targeturl;
1801  pFile->GetProperty( "LastURL", targeturl );
1802  targeturl = URL( targeturl ).GetLocation();
1803  if( symlink( targeturl.c_str(), cptarget.c_str() ) == -1 )
1804  log->Warning( UtilityMsg, "Could not create cp-target symlink: %s",
1805  XrdSysE2T( errno ) );
1806  else
1807  log->Info( UtilityMsg, "Created cp-target symlink: %s -> %s",
1808  cptarget.c_str(), targeturl.c_str() );
1809  }
1810 
1811  StatInfo *info = 0;
1812  st = pFile->Stat( false, info );
1813  if( !st.IsOK() )
1814  return st;
1815  pSize = info->GetSize();
1816  delete info;
1817 
1818  if( pUrl.IsLocalFile() && pCkSumHelper && !pContinue )
1819  return pCkSumHelper->Initialize();
1820 
1821  return XRootDStatus();
1822  }
1823 
1824  //------------------------------------------------------------------------
1826  //------------------------------------------------------------------------
1827  virtual XrdCl::XRootDStatus Finalize()
1828  {
1829  return pFile->Close();
1830  }
1831 
1832  //------------------------------------------------------------------------
1837  //------------------------------------------------------------------------
1838  virtual XrdCl::XRootDStatus PutChunk( XrdCl::PageInfo &&ci )
1839  {
1840  using namespace XrdCl;
1841  if( !pFile->IsOpen() )
1842  {
1843  delete[] (char*)ci.GetBuffer(); // we took the ownership of the buffer
1845  }
1846 
1847  //----------------------------------------------------------------------
1848  // If there is still place for this chunk to be sent send it
1849  //----------------------------------------------------------------------
1850  if( pChunks.size() < pParallel )
1851  return QueueChunk( std::move( ci ) );
1852 
1853  //----------------------------------------------------------------------
1854  // We wait for a chunk to be sent so that we have space for the current
1855  // one
1856  //----------------------------------------------------------------------
1857  std::unique_ptr<ChunkHandler> ch( pChunks.front() );
1858  pChunks.pop();
1859  ch->sem->Wait();
1860  delete [] (char*)ch->chunk.GetBuffer();
1861  if( !ch->status.IsOK() )
1862  {
1863  Log *log = DefaultEnv::GetLog();
1864  log->Debug( UtilityMsg, "Unable write %d bytes at %llu from %s: %s",
1865  ch->chunk.GetLength(), (unsigned long long) ch->chunk.GetOffset(),
1866  pUrl.GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
1867  delete[] (char*)ci.GetBuffer(); // we took the ownership of the buffer
1868  CleanUpChunks();
1869 
1870  //--------------------------------------------------------------------
1871  // Check if we should re-try the transfer from scratch at a different
1872  // data server
1873  //--------------------------------------------------------------------
1874  return CheckIfRetriable( ch->status );
1875  }
1876 
1877  return QueueChunk( std::move( ci ) );
1878  }
1879 
1880  //------------------------------------------------------------------------
1882  //------------------------------------------------------------------------
1883  virtual int64_t GetSize()
1884  {
1885  return pSize;
1886  }
1887 
1888  //------------------------------------------------------------------------
1890  //------------------------------------------------------------------------
1891  void CleanUpChunks()
1892  {
1893  while( !pChunks.empty() )
1894  {
1895  ChunkHandler *ch = pChunks.front();
1896  pChunks.pop();
1897  ch->sem->Wait();
1898  delete [] (char *)ch->chunk.GetBuffer();
1899  delete ch;
1900  }
1901  }
1902 
1903  //------------------------------------------------------------------------
1905  //------------------------------------------------------------------------
1906  XrdCl::XRootDStatus QueueChunk( XrdCl::PageInfo &&ci )
1907  {
1908  // we are writing chunks in order so we can calc the checksum
1909  // in case of local files
1910  if( pUrl.IsLocalFile() && pCkSumHelper && !pContinue )
1911  pCkSumHelper->Update( ci.GetBuffer(), ci.GetLength() );
1912 
1913  ChunkHandler *ch = new ChunkHandler( std::move( ci ) );
1915  st = pUsePgWrt
1916  ? pFile->PgWrite(ch->chunk.GetOffset(), ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch->chunk.GetCksums(), ch)
1917  : pFile->Write( ch->chunk.GetOffset(), ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch );
1918  if( !st.IsOK() )
1919  {
1920  CleanUpChunks();
1921  delete [] (char*)ch->chunk.GetBuffer();
1922  delete ch;
1923  return st;
1924  }
1925  pChunks.push( ch );
1926  return XrdCl::XRootDStatus();
1927  }
1928 
1929  //------------------------------------------------------------------------
1931  //------------------------------------------------------------------------
1932  virtual XrdCl::XRootDStatus Flush()
1933  {
1935  while( !pChunks.empty() )
1936  {
1937  ChunkHandler *ch = pChunks.front();
1938  pChunks.pop();
1939  ch->sem->Wait();
1940  if( !ch->status.IsOK() )
1941  {
1942  //--------------------------------------------------------------------
1943  // Check if we should re-try the transfer from scratch at a different
1944  // data server
1945  //--------------------------------------------------------------------
1946  st = CheckIfRetriable( ch->status );
1947  }
1948  delete [] (char *)ch->chunk.GetBuffer();
1949  delete ch;
1950  }
1951  return st;
1952  }
1953 
1954  //------------------------------------------------------------------------
1956  //------------------------------------------------------------------------
1957  virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
1958  std::string &checkSumType )
1959  {
1960  if( pUrl.IsLocalFile() )
1961  {
1962  if( pContinue )
1963  // in case of --continue option we have to calculate the checksum from scratch
1964  return XrdCl::Utils::GetLocalCheckSum( checkSum, checkSumType, pUrl.GetPath() );
1965 
1966  if( pCkSumHelper )
1967  return pCkSumHelper->GetCheckSum( checkSum, checkSumType );
1968 
1970  }
1971 
1972  std::string lastUrl; pFile->GetProperty( "LastURL", lastUrl );
1973  return XrdCl::Utils::GetRemoteCheckSum( checkSum, checkSumType,
1974  XrdCl::URL( lastUrl ) );
1975  }
1976 
1977  //------------------------------------------------------------------------
1979  //------------------------------------------------------------------------
1980  virtual XrdCl::XRootDStatus SetXAttr( const std::vector<XrdCl::xattr_t> &xattrs )
1981  {
1982  return ::SetXAttr( *pFile, xattrs );
1983  }
1984 
1985  //------------------------------------------------------------------------
1987  //------------------------------------------------------------------------
1988  const std::string& GetLastURL() const
1989  {
1990  return pLastURL;
1991  }
1992 
1993  //------------------------------------------------------------------------
1995  //------------------------------------------------------------------------
1996  const std::string& GetWrtRecoveryRedir() const
1997  {
1998  return pWrtRecoveryRedir;
1999  }
2000 
2001  private:
2002  XRootDDestination(const XRootDDestination &other);
2003  XRootDDestination &operator = (const XRootDDestination &other);
2004 
2005  //------------------------------------------------------------------------
2006  // Asynchronous chunk handler
2007  //------------------------------------------------------------------------
2008  class ChunkHandler: public XrdCl::ResponseHandler
2009  {
2010  public:
2011  ChunkHandler( XrdCl::PageInfo &&ci ):
2012  sem( new XrdSysSemaphore(0) ),
2013  chunk(std::move( ci ) ) {}
2014  virtual ~ChunkHandler() { delete sem; }
2015  virtual void HandleResponse( XrdCl::XRootDStatus *statusval,
2016  XrdCl::AnyObject */*response*/ )
2017  {
2018  this->status = *statusval;
2019  delete statusval;
2020  sem->Post();
2021  }
2022 
2023  XrdSysSemaphore *sem;
2024  XrdCl::PageInfo chunk;
2025  XrdCl::XRootDStatus status;
2026  };
2027 
2028  inline XrdCl::XRootDStatus CheckIfRetriable( XrdCl::XRootDStatus &status )
2029  {
2030  if( status.IsOK() ) return status;
2031 
2032  //--------------------------------------------------------------------
2033  // Check if we should re-try the transfer from scratch at a different
2034  // data server
2035  //--------------------------------------------------------------------
2036  std::string value;
2037  if( pFile->GetProperty( "WrtRecoveryRedir", value ) )
2038  {
2039  pWrtRecoveryRedir = value;
2040  if( pFile->GetProperty( "LastURL", value ) ) pLastURL = value;
2042  }
2043 
2044  return status;
2045  }
2046 
2047  const XrdCl::URL pUrl;
2048  XrdCl::File *pFile;
2049  uint8_t pParallel;
2050  std::queue<ChunkHandler *> pChunks;
2051  int64_t pSize;
2052 
2053  std::string pWrtRecoveryRedir;
2054  std::string pLastURL;
2055  bool pUsePgWrt;
2056  const XrdCl::ClassicCopyJob &cpjob;
2057  };
2058 
2059  //----------------------------------------------------------------------------
2061  //----------------------------------------------------------------------------
2062  class XRootDZipDestination: public Destination
2063  {
2064  public:
2065  //------------------------------------------------------------------------
2067  //------------------------------------------------------------------------
2068  XRootDZipDestination( const XrdCl::URL &url, const std::string &fn,
2069  int64_t size, uint8_t parallelChunks, XrdCl::ClassicCopyJob &cpjob ):
2070  Destination( "zcrc32" ),
2071  pUrl( url ), pFilename( fn ), pZip( new XrdCl::ZipArchive() ),
2072  pParallel( parallelChunks ), pSize( size ), cpjob( cpjob )
2073  {
2074  }
2075 
2076  //------------------------------------------------------------------------
2078  //------------------------------------------------------------------------
2079  virtual ~XRootDZipDestination()
2080  {
2081  CleanUpChunks();
2082  delete pZip;
2083 
2084  //----------------------------------------------------------------------
2085  // If the copy failed and user requested posc and we are dealing with
2086  // a local destination, remove the file
2087  //----------------------------------------------------------------------
2088  if( pUrl.IsLocalFile() && pPosc && !cpjob.GetResult().IsOK() )
2089  {
2090  XrdCl::FileSystem fs( pUrl );
2091  XrdCl::XRootDStatus st = fs.Rm( pUrl.GetPath() );
2092  if( !st.IsOK() )
2093  {
2095  log->Error( XrdCl::UtilityMsg, "Failed to remove local destination"
2096  " on failure: %s", st.ToString().c_str() );
2097  }
2098  }
2099  }
2100 
2101  //------------------------------------------------------------------------
2103  //------------------------------------------------------------------------
2104  virtual XrdCl::XRootDStatus Initialize()
2105  {
2106  using namespace XrdCl;
2107  Log *log = DefaultEnv::GetLog();
2108  log->Debug( UtilityMsg, "Opening %s for writing",
2109  pUrl.GetObfuscatedURL().c_str() );
2110 
2111  std::string value;
2112  DefaultEnv::GetEnv()->GetString( "WriteRecovery", value );
2113  pZip->SetProperty( "WriteRecovery", value );
2114 
2115  OpenFlags::Flags flags = OpenFlags::Update;
2116 
2117  FileSystem fs( pUrl );
2118  StatInfo *info = nullptr;
2119  auto st = fs.Stat( pUrl.GetPath(), info );
2120  if( !st.IsOK() && st.code == errErrorResponse && st.errNo == kXR_NotFound )
2121  flags |= OpenFlags::New;
2122 
2123  if( pPosc )
2124  flags |= OpenFlags::POSC;
2125 
2126  if( pCoerce )
2127  flags |= OpenFlags::Force;
2128 
2129  if( pMakeDir)
2130  flags |= OpenFlags::MakePath;
2131 
2132  st = XrdCl::WaitFor( XrdCl::OpenArchive( pZip, pUrl.GetURL(), flags ) );
2133  if( !st.IsOK() )
2134  return st;
2135 
2136  std::string cptarget = XrdCl::DefaultCpTarget;
2137  XrdCl::DefaultEnv::GetEnv()->GetString( "CpTarget", cptarget );
2138  if( !cptarget.empty() )
2139  {
2140  std::string targeturl;
2141  pZip->GetProperty( "LastURL", targeturl );
2142  if( symlink( targeturl.c_str(), cptarget.c_str() ) == -1 )
2143  log->Warning( UtilityMsg, "Could not create cp-target symlink: %s",
2144  XrdSysE2T( errno ) );
2145  }
2146 
2147  st = pZip->OpenFile( pFilename, XrdCl::OpenFlags::New | XrdCl::OpenFlags::Write, pSize );
2148  if( !st.IsOK() )
2149  return st;
2150 
2151  return pCkSumHelper->Initialize();
2152  }
2153 
2154  //------------------------------------------------------------------------
2156  //------------------------------------------------------------------------
2157  virtual XrdCl::XRootDStatus Finalize()
2158  {
2159  uint32_t crc32 = 0;
2160  auto st = pCkSumHelper->GetRawCheckSum( "zcrc32", crc32 );
2161  if( !st.IsOK() ) return st;
2162  pZip->UpdateMetadata( crc32 );
2163  pZip->CloseFile();
2164  return XrdCl::WaitFor( XrdCl::CloseArchive( pZip ) );
2165  }
2166 
2167  //------------------------------------------------------------------------
2172  //------------------------------------------------------------------------
2173  virtual XrdCl::XRootDStatus PutChunk( XrdCl::PageInfo &&ci )
2174  {
2175  using namespace XrdCl;
2176 
2177  //----------------------------------------------------------------------
2178  // If there is still place for this chunk to be sent send it
2179  //----------------------------------------------------------------------
2180  if( pChunks.size() < pParallel )
2181  return QueueChunk( std::move( ci ) );
2182 
2183  //----------------------------------------------------------------------
2184  // We wait for a chunk to be sent so that we have space for the current
2185  // one
2186  //----------------------------------------------------------------------
2187  std::unique_ptr<ChunkHandler> ch( pChunks.front() );
2188  pChunks.pop();
2189  ch->sem->Wait();
2190  delete [] (char*)ch->chunk.GetBuffer();
2191  if( !ch->status.IsOK() )
2192  {
2193  Log *log = DefaultEnv::GetLog();
2194  log->Debug( UtilityMsg, "Unable write %d bytes at %llu from %s: %s",
2195  ch->chunk.GetLength(), (unsigned long long) ch->chunk.GetOffset(),
2196  pUrl.GetObfuscatedURL().c_str(), ch->status.ToStr().c_str() );
2197  CleanUpChunks();
2198 
2199  //--------------------------------------------------------------------
2200  // Check if we should re-try the transfer from scratch at a different
2201  // data server
2202  //--------------------------------------------------------------------
2203  return CheckIfRetriable( ch->status );
2204  }
2205 
2206  return QueueChunk( std::move( ci ) );
2207  }
2208 
2209  //------------------------------------------------------------------------
2211  //------------------------------------------------------------------------
2212  virtual int64_t GetSize()
2213  {
2214  return -1;
2215  }
2216 
2217  //------------------------------------------------------------------------
2219  //------------------------------------------------------------------------
2220  void CleanUpChunks()
2221  {
2222  while( !pChunks.empty() )
2223  {
2224  ChunkHandler *ch = pChunks.front();
2225  pChunks.pop();
2226  ch->sem->Wait();
2227  delete [] (char *)ch->chunk.GetBuffer();
2228  delete ch;
2229  }
2230  }
2231 
2232  //------------------------------------------------------------------------
2233  // Queue a chunk
2234  //------------------------------------------------------------------------
2235  XrdCl::XRootDStatus QueueChunk( XrdCl::PageInfo &&ci )
2236  {
2237  // we are writing chunks in order so we can calc the checksum
2238  // in case of local files
2239  if( pCkSumHelper ) pCkSumHelper->Update( ci.GetBuffer(), ci.GetLength() );
2240 
2241  ChunkHandler *ch = new ChunkHandler( std::move( ci ) );
2243 
2244  //----------------------------------------------------------------------
2245  // TODO
2246  // In order to use PgWrite with ZIP append we need first to implement
2247  // PgWriteV!!!
2248  //----------------------------------------------------------------------
2249  st = pZip->Write( ch->chunk.GetLength(), ch->chunk.GetBuffer(), ch );
2250  if( !st.IsOK() )
2251  {
2252  CleanUpChunks();
2253  delete [] (char*)ch->chunk.GetBuffer();
2254  delete ch;
2255  return st;
2256  }
2257  pChunks.push( ch );
2258  return XrdCl::XRootDStatus();
2259  }
2260 
2261  //------------------------------------------------------------------------
2263  //------------------------------------------------------------------------
2264  virtual XrdCl::XRootDStatus Flush()
2265  {
2267  while( !pChunks.empty() )
2268  {
2269  ChunkHandler *ch = pChunks.front();
2270  pChunks.pop();
2271  ch->sem->Wait();
2272  if( !ch->status.IsOK() )
2273  {
2274  //--------------------------------------------------------------------
2275  // Check if we should re-try the transfer from scratch at a different
2276  // data server
2277  //--------------------------------------------------------------------
2278  st = CheckIfRetriable( ch->status );
2279  }
2280  delete [] (char *)ch->chunk.GetBuffer();
2281  delete ch;
2282  }
2283  return st;
2284  }
2285 
2286  //------------------------------------------------------------------------
2288  //------------------------------------------------------------------------
2289  virtual XrdCl::XRootDStatus GetCheckSum( std::string &checkSum,
2290  std::string &checkSumType )
2291  {
2293  }
2294 
2295  //------------------------------------------------------------------------
2297  //------------------------------------------------------------------------
2298  virtual XrdCl::XRootDStatus SetXAttr( const std::vector<XrdCl::xattr_t> &xattrs )
2299  {
2301  }
2302 
2303  //------------------------------------------------------------------------
2305  //------------------------------------------------------------------------
2306  const std::string& GetLastURL() const
2307  {
2308  return pLastURL;
2309  }
2310 
2311  //------------------------------------------------------------------------
2313  //------------------------------------------------------------------------
2314  const std::string& GetWrtRecoveryRedir() const
2315  {
2316  return pWrtRecoveryRedir;
2317  }
2318 
2319  private:
2320  XRootDZipDestination(const XRootDDestination &other);
2321  XRootDZipDestination &operator = (const XRootDDestination &other);
2322 
2323  //------------------------------------------------------------------------
2324  // Asynchronous chunk handler
2325  //------------------------------------------------------------------------
2326  class ChunkHandler: public XrdCl::ResponseHandler
2327  {
2328  public:
2329  ChunkHandler( XrdCl::PageInfo &&ci ):
2330  sem( new XrdSysSemaphore(0) ),
2331  chunk( std::move( ci ) ) {}
2332  virtual ~ChunkHandler() { delete sem; }
2333  virtual void HandleResponse( XrdCl::XRootDStatus *statusval,
2334  XrdCl::AnyObject */*response*/ )
2335  {
2336  this->status = *statusval;
2337  delete statusval;
2338  sem->Post();
2339  }
2340 
2341  XrdSysSemaphore *sem;
2342  XrdCl::PageInfo chunk;
2343  XrdCl::XRootDStatus status;
2344  };
2345 
2346  inline XrdCl::XRootDStatus CheckIfRetriable( XrdCl::XRootDStatus &status )
2347  {
2348  if( status.IsOK() ) return status;
2349 
2350  //--------------------------------------------------------------------
2351  // Check if we should re-try the transfer from scratch at a different
2352  // data server
2353  //--------------------------------------------------------------------
2354  std::string value;
2355  if( pZip->GetProperty( "WrtRecoveryRedir", value ) )
2356  {
2357  pWrtRecoveryRedir = value;
2358  if( pZip->GetProperty( "LastURL", value ) ) pLastURL = value;
2360  }
2361 
2362  return status;
2363  }
2364 
2365  const XrdCl::URL pUrl;
2366  std::string pFilename;
2367  XrdCl::ZipArchive *pZip;
2368  uint8_t pParallel;
2369  std::queue<ChunkHandler *> pChunks;
2370  int64_t pSize;
2371 
2372  std::string pWrtRecoveryRedir;
2373  std::string pLastURL;
2374  XrdCl::ClassicCopyJob &cpjob;
2375  };
2376 }
2377 
2378 //------------------------------------------------------------------------------
2379 // Get current time in nanoseconds
2380 //------------------------------------------------------------------------------
2381 inline std::chrono::nanoseconds time_nsec()
2382 {
2383  using namespace std::chrono;
2384  auto since_epoch = high_resolution_clock::now().time_since_epoch();
2385  return duration_cast<nanoseconds>( since_epoch );
2386 }
2387 
2388 //------------------------------------------------------------------------------
2389 // Convert seconds to nanoseconds
2390 //------------------------------------------------------------------------------
2391 inline long long to_nsec( long long sec )
2392 {
2393  return sec * 1000000000;
2394 }
2395 
2396 //------------------------------------------------------------------------------
2397 // Sleep for # nanoseconds
2398 //------------------------------------------------------------------------------
2399 inline void sleep_nsec( long long nsec )
2400 {
2401 #if __cplusplus >= 201103L
2402  using namespace std::chrono;
2403  std::this_thread::sleep_for( nanoseconds( nsec ) );
2404 #else
2405  timespec req;
2406  req.tv_sec = nsec / to_nsec( 1 );
2407  req.tv_nsec = nsec % to_nsec( 1 );
2408  nanosleep( &req, 0 );
2409 #endif
2410 }
2411 
2412 namespace XrdCl
2413 {
2414  //----------------------------------------------------------------------------
2415  // Constructor
2416  //----------------------------------------------------------------------------
2417  ClassicCopyJob::ClassicCopyJob( uint16_t jobId,
2418  PropertyList *jobProperties,
2419  PropertyList *jobResults ):
2420  CopyJob( jobId, jobProperties, jobResults )
2421  {
2422  Log *log = DefaultEnv::GetLog();
2423  log->Debug( UtilityMsg, "Creating a classic copy job, from %s to %s",
2424  GetSource().GetObfuscatedURL().c_str(), GetTarget().GetObfuscatedURL().c_str() );
2425  }
2426 
2427  //----------------------------------------------------------------------------
2428  // Run the copy job
2429  //----------------------------------------------------------------------------
2431  {
2432  Log *log = DefaultEnv::GetLog();
2433 
2434  std::string checkSumMode;
2435  std::string checkSumType;
2436  std::string checkSumPreset;
2437  std::string zipSource;
2438  uint16_t parallelChunks;
2439  uint32_t chunkSize;
2440  uint64_t blockSize;
2441  bool posc, force, coerce, makeDir, dynamicSource, zip, xcp, preserveXAttr,
2442  rmOnBadCksum, continue_, zipappend, doserver;
2443  int32_t nbXcpSources;
2444  long long xRate;
2445  long long xRateThreshold;
2446  uint16_t cpTimeout;
2447  std::vector<std::string> addcksums;
2448 
2449  pProperties->Get( "checkSumMode", checkSumMode );
2450  pProperties->Get( "checkSumType", checkSumType );
2451  pProperties->Get( "checkSumPreset", checkSumPreset );
2452  pProperties->Get( "parallelChunks", parallelChunks );
2453  pProperties->Get( "chunkSize", chunkSize );
2454  pProperties->Get( "posc", posc );
2455  pProperties->Get( "force", force );
2456  pProperties->Get( "coerce", coerce );
2457  pProperties->Get( "makeDir", makeDir );
2458  pProperties->Get( "dynamicSource", dynamicSource );
2459  pProperties->Get( "zipArchive", zip );
2460  pProperties->Get( "xcp", xcp );
2461  pProperties->Get( "xcpBlockSize", blockSize );
2462  pProperties->Get( "preserveXAttr", preserveXAttr );
2463  pProperties->Get( "xrate", xRate );
2464  pProperties->Get( "xrateThreshold", xRateThreshold );
2465  pProperties->Get( "rmOnBadCksum", rmOnBadCksum );
2466  pProperties->Get( "continue", continue_ );
2467  pProperties->Get( "cpTimeout", cpTimeout );
2468  pProperties->Get( "zipAppend", zipappend );
2469  pProperties->Get( "addcksums", addcksums );
2470  pProperties->Get( "doServer", doserver );
2471 
2472  if( zip )
2473  pProperties->Get( "zipSource", zipSource );
2474 
2475  if( xcp )
2476  pProperties->Get( "nbXcpSources", nbXcpSources );
2477 
2478  if( force && continue_ )
2479  return SetResult( stError, errInvalidArgs, EINVAL,
2480  "Invalid argument combination: continue + force." );
2481 
2482  if( zipappend && ( continue_ || force ) )
2483  return SetResult( stError, errInvalidArgs, EINVAL,
2484  "Invalid argument combination: ( continue | force ) + zip-append." );
2485 
2486  //--------------------------------------------------------------------------
2487  // Start the cp t/o timer if necessary
2488  //--------------------------------------------------------------------------
2489  std::unique_ptr<timer_sec_t> cptimer;
2490  if( cpTimeout ) cptimer.reset( new timer_sec_t() );
2491 
2492  //--------------------------------------------------------------------------
2493  // Remove on bad checksum implies that POSC semantics has to be enabled
2494  //--------------------------------------------------------------------------
2495  if( rmOnBadCksum ) posc = true;
2496 
2497  //--------------------------------------------------------------------------
2498  // Resolve the 'auto' checksum type.
2499  //--------------------------------------------------------------------------
2500  if( checkSumType == "auto" )
2501  {
2502  checkSumType = Utils::InferChecksumType( GetSource(), GetTarget(), zip );
2503  if( checkSumType.empty() )
2504  return SetResult( stError, errCheckSumError, ENOTSUP, "Could not infer checksum type." );
2505  else
2506  log->Info( UtilityMsg, "Using inferred checksum type: %s.", checkSumType.c_str() );
2507  }
2508 
2509  if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2510  return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2511 
2512  //--------------------------------------------------------------------------
2513  // Initialize the source and the destination
2514  //--------------------------------------------------------------------------
2515  std::unique_ptr<Source> src;
2516  if( xcp )
2517  src.reset( new XRootDSourceXCp( &GetSource(), chunkSize, parallelChunks, nbXcpSources, blockSize ) );
2518  else if( zip ) // TODO make zip work for xcp
2519  src.reset( new XRootDSourceZip( zipSource, &GetSource(), chunkSize, parallelChunks,
2520  checkSumType, addcksums , doserver) );
2521  else if( GetSource().GetProtocol() == "stdio" )
2522  src.reset( new StdInSource( checkSumType, chunkSize, addcksums ) );
2523  else
2524  {
2525  if( dynamicSource )
2526  src.reset( new XRootDSourceDynamic( &GetSource(), chunkSize, checkSumType, addcksums ) );
2527  else
2528  src.reset( new XRootDSource( &GetSource(), chunkSize, parallelChunks, checkSumType, addcksums, doserver ) );
2529  }
2530 
2531  XRootDStatus st = src->Initialize();
2532  if( !st.IsOK() ) return SourceError( st );
2533  uint64_t size = src->GetSize() >= 0 ? src->GetSize() : 0;
2534 
2535  if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2536  return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2537 
2538  std::unique_ptr<Destination> dest;
2539  URL newDestUrl( GetTarget() );
2540 
2541  if( GetTarget().GetProtocol() == "stdio" )
2542  dest.reset( new StdOutDestination( checkSumType ) );
2543  else if( zipappend )
2544  {
2545  std::string fn = GetSource().GetPath();
2546  size_t pos = fn.rfind( '/' );
2547  if( pos != std::string::npos )
2548  fn = fn.substr( pos + 1 );
2549  int64_t size = src->GetSize();
2550  dest.reset( new XRootDZipDestination( newDestUrl, fn, size, parallelChunks, *this ) );
2551  }
2552  //--------------------------------------------------------------------------
2553  // For xrootd destination build the oss.asize hint
2554  //--------------------------------------------------------------------------
2555  else
2556  {
2557  if( src->GetSize() >= 0 )
2558  {
2559  URL::ParamsMap params = newDestUrl.GetParams();
2560  std::ostringstream o; o << src->GetSize();
2561  params["oss.asize"] = o.str();
2562  newDestUrl.SetParams( params );
2563  // makeDir = true; // Backward compatibility for xroot destinations!!!
2564  }
2565  dest.reset( new XRootDDestination( newDestUrl, parallelChunks, checkSumType, *this ) );
2566  }
2567 
2568  dest->SetForce( force );
2569  dest->SetPOSC( posc );
2570  dest->SetCoerce( coerce );
2571  dest->SetMakeDir( makeDir );
2572  dest->SetContinue( continue_ );
2573  st = dest->Initialize();
2574  if( !st.IsOK() ) return DestinationError( st );
2575 
2576  if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2577  return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2578 
2579  //--------------------------------------------------------------------------
2580  // Copy the chunks
2581  //--------------------------------------------------------------------------
2582  if( continue_ )
2583  {
2584  size -= dest->GetSize();
2585  XrdCl::XRootDStatus st = src->StartAt( dest->GetSize() );
2586  if( !st.IsOK() ) return SetResult( st );
2587  }
2588 
2589  PageInfo pageInfo;
2590  uint64_t total_processed = 0;
2591  uint64_t processed = 0;
2592  auto start = time_nsec();
2593  uint16_t threshold_interval = parallelChunks;
2594  bool threshold_draining = false;
2595  timer_nsec_t threshold_timer;
2596  while( 1 )
2597  {
2598  st = src->GetChunk( pageInfo );
2599  if( !st.IsOK() )
2600  return SourceError( st);
2601 
2602  if( st.IsOK() && st.code == suDone )
2603  break;
2604 
2605  if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2606  return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2607 
2608  if( xRate )
2609  {
2610  auto elapsed = ( time_nsec() - start ).count();
2611  double transferred = total_processed + pageInfo.GetLength();
2612  double expected = double( xRate ) / to_nsec( 1 ) * elapsed;
2613  //----------------------------------------------------------------------
2614  // check if our transfer rate didn't exceeded the limit
2615  // (we are too fast)
2616  //----------------------------------------------------------------------
2617  if( elapsed && // make sure elapsed time is greater than 0
2618  transferred > expected )
2619  {
2620  auto nsec = ( transferred / xRate * to_nsec( 1 ) ) - elapsed;
2621  sleep_nsec( nsec );
2622  }
2623  }
2624 
2625  if( xRateThreshold )
2626  {
2627  auto elapsed = threshold_timer.elapsed();
2628  double transferred = processed + pageInfo.GetLength();
2629  double expected = double( xRateThreshold ) / to_nsec( 1 ) * elapsed;
2630  //----------------------------------------------------------------------
2631  // check if our transfer rate dropped below the threshold
2632  // (we are too slow)
2633  //----------------------------------------------------------------------
2634  if( elapsed && // make sure elapsed time is greater than 0
2635  transferred < expected &&
2636  threshold_interval == 0 ) // we check every # parallelChunks
2637  {
2638  if( !threshold_draining )
2639  {
2640  log->Warning( UtilityMsg, "Transfer rate dropped below requested ehreshold,"
2641  " trying different source!" );
2642  XRootDStatus st = src->TryOtherServer();
2643  if( !st.IsOK() ) return SetResult( stError, errThresholdExceeded, 0,
2644  "The transfer rate dropped below "
2645  "requested threshold!" );
2646  threshold_draining = true; // before the next measurement we need to drain
2647  // all the chunks that will come from the old server
2648  }
2649  else // now that all the chunks from the old server have
2650  { // been received we can start another measurement
2651  processed = 0;
2652  threshold_timer.reset();
2653  threshold_interval = parallelChunks;
2654  threshold_draining = false;
2655  }
2656  }
2657 
2658  threshold_interval = threshold_interval > 0 ? threshold_interval - 1 : parallelChunks;
2659  }
2660 
2661  total_processed += pageInfo.GetLength();
2662  processed += pageInfo.GetLength();
2663 
2664  st = dest->PutChunk( std::move( pageInfo ) );
2665  if( !st.IsOK() )
2666  {
2667  if( st.code == errRetry )
2668  {
2669  pResults->Set( "LastURL", dest->GetLastURL() );
2670  pResults->Set( "WrtRecoveryRedir", dest->GetWrtRecoveryRedir() );
2671  return SetResult( st );
2672  }
2673  return DestinationError( st );
2674  }
2675 
2676  if( progress )
2677  {
2678  progress->JobProgress( pJobId, total_processed, size );
2679  if( progress->ShouldCancel( pJobId ) )
2680  return SetResult( stError, errOperationInterrupted, kXR_Cancelled, "The copy-job has been cancelled!" );
2681  }
2682  }
2683 
2684  st = dest->Flush();
2685  if( !st.IsOK() )
2686  return DestinationError( st );
2687 
2688  //--------------------------------------------------------------------------
2689  // Copy extended attributes
2690  //--------------------------------------------------------------------------
2691  if( preserveXAttr && Utils::HasXAttr( GetSource() ) && Utils::HasXAttr( GetTarget() ) )
2692  {
2693  std::vector<xattr_t> xattrs;
2694  st = src->GetXAttr( xattrs );
2695  if( !st.IsOK() ) return SourceError( st );
2696  st = dest->SetXAttr( xattrs );
2697  if( !st.IsOK() ) return DestinationError( st );
2698  }
2699 
2700  //--------------------------------------------------------------------------
2701  // The size of the source is known and not enough data has been transferred
2702  // to the destination
2703  //--------------------------------------------------------------------------
2704  if( src->GetSize() >= 0 && size != total_processed )
2705  {
2706  log->Error( UtilityMsg, "The declared source size is %llu bytes, but "
2707  "received %llu bytes.", (unsigned long long) size, (unsigned long long) total_processed );
2708  return SetResult( stError, errDataError );
2709  }
2710  pResults->Set( "size", total_processed );
2711 
2712  //--------------------------------------------------------------------------
2713  // Finalize the destination
2714  //--------------------------------------------------------------------------
2715  st = dest->Finalize();
2716  if( !st.IsOK() )
2717  return DestinationError( st );
2718 
2719  //--------------------------------------------------------------------------
2720  // Verify the checksums if needed
2721  //--------------------------------------------------------------------------
2722  if( checkSumMode != "none" )
2723  {
2724  log->Debug( UtilityMsg, "Attempting checksum calculation, mode: %s.",
2725  checkSumMode.c_str() );
2726  std::string sourceCheckSum;
2727  std::string targetCheckSum;
2728 
2729  if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2730  return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2731 
2732  //------------------------------------------------------------------------
2733  // Get the check sum at source
2734  //------------------------------------------------------------------------
2735  timeval oStart, oEnd;
2736  XRootDStatus st;
2737 
2738  if( checkSumMode == "end2end" || checkSumMode == "source" ||
2739  !checkSumPreset.empty() )
2740  {
2741  gettimeofday( &oStart, 0 );
2742  if( !checkSumPreset.empty() )
2743  {
2744  sourceCheckSum = checkSumType + ":";
2745  sourceCheckSum += Utils::NormalizeChecksum( checkSumType,
2746  checkSumPreset );
2747  }
2748  else
2749  {
2750  st = src->GetCheckSum( sourceCheckSum, checkSumType );
2751  }
2752  gettimeofday( &oEnd, 0 );
2753 
2754  if( !st.IsOK() )
2755  return SourceError( st );
2756 
2757  pResults->Set( "sourceCheckSum", sourceCheckSum );
2758  }
2759 
2760  if( !addcksums.empty() )
2761  pResults->Set( "additionalCkeckSum", src->GetAddCks() );
2762 
2763  if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2764  return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2765 
2766  //------------------------------------------------------------------------
2767  // Get the check sum at destination
2768  //------------------------------------------------------------------------
2769  timeval tStart, tEnd;
2770 
2771  if( checkSumMode == "end2end" || checkSumMode == "target" )
2772  {
2773  gettimeofday( &tStart, 0 );
2774  st = dest->GetCheckSum( targetCheckSum, checkSumType );
2775  if( !st.IsOK() )
2776  return DestinationError( st );
2777  gettimeofday( &tEnd, 0 );
2778  pResults->Set( "targetCheckSum", targetCheckSum );
2779  }
2780 
2781  if( cptimer && cptimer->elapsed() > cpTimeout ) // check the CP timeout
2782  return SetResult( stError, errOperationExpired, 0, "CPTimeout exceeded." );
2783 
2784  //------------------------------------------------------------------------
2785  // Make sure the checksums are both lower case
2786  //------------------------------------------------------------------------
2787  auto sanitize_cksum = []( char c )
2788  {
2789  std::locale loc;
2790  if( std::isalpha( c ) ) return std::tolower( c, loc );
2791  return c;
2792  };
2793 
2794  std::transform( sourceCheckSum.begin(), sourceCheckSum.end(),
2795  sourceCheckSum.begin(), sanitize_cksum );
2796 
2797  std::transform( targetCheckSum.begin(), targetCheckSum.end(),
2798  targetCheckSum.begin(), sanitize_cksum );
2799 
2800  //------------------------------------------------------------------------
2801  // Compare and inform monitoring
2802  //------------------------------------------------------------------------
2803  if( !sourceCheckSum.empty() && !targetCheckSum.empty() )
2804  {
2805  bool match = false;
2806  if( sourceCheckSum == targetCheckSum )
2807  match = true;
2808 
2810  if( mon )
2811  {
2813  i.transfer.origin = &GetSource();
2814  i.transfer.target = &GetTarget();
2815  i.cksum = sourceCheckSum;
2816  i.oTime = Utils::GetElapsedMicroSecs( oStart, oEnd );
2817  i.tTime = Utils::GetElapsedMicroSecs( tStart, tEnd );
2818  i.isOK = match;
2819  mon->Event( Monitor::EvCheckSum, &i );
2820  }
2821 
2822  if( !match )
2823  {
2824  if( rmOnBadCksum )
2825  {
2826  FileSystem fs( newDestUrl );
2827  st = fs.Rm( newDestUrl.GetPath() );
2828  if( !st.IsOK() )
2829  log->Error( UtilityMsg, "Invalid checksum: failed to remove the target file: %s", st.ToString().c_str() );
2830  else
2831  log->Info( UtilityMsg, "Target file removed due to bad checksum!" );
2832  }
2833 
2834  st = dest->Finalize();
2835  if( !st.IsOK() )
2836  log->Error( UtilityMsg, "Failed to finalize the destination: %s", st.ToString().c_str() );
2837 
2838  return SetResult( stError, errCheckSumError, 0 );
2839  }
2840 
2841  log->Info( UtilityMsg, "Checksum verification: succeeded." );
2842  }
2843  }
2844 
2845  return SetResult();
2846  }
2847 }
@ kXR_NotFound
Definition: XProtocol.hh:1001
@ kXR_Cancelled
Definition: XProtocol.hh:1007
std::chrono::nanoseconds time_nsec()
long long to_nsec(long long sec)
void sleep_nsec(long long nsec)
ssize_t write(int fildes, const void *buf, size_t nbyte)
ssize_t read(int fildes, void *buf, size_t nbyte)
bool Force
const char * XrdSysE2T(int errcode)
Definition: XrdSysE2T.cc:104
int Set(const char *csName)
Definition: XrdCksData.hh:81
int Get(char *Buff, int Blen)
Definition: XrdCksData.hh:69
void Get(Type &object)
Retrieve the object being held.
Check sum helper for stdio.
XRootDStatus Initialize()
Initialize.
XRootDStatus GetCheckSum(std::string &checkSum, std::string &checkSumType)
const std::string & GetType()
void Update(const void *buffer, uint32_t size)
virtual XRootDStatus Run(CopyProgressHandler *progress=0)
PropertyList * pResults
const URL & GetSource() const
Get source.
Definition: XrdClCopyJob.hh:94
const URL & GetTarget() const
Get target.
PropertyList * pProperties
Interface for copy progress notification.
virtual void JobProgress(uint16_t jobNum, uint64_t bytesProcessed, uint64_t bytesTotal)
virtual bool ShouldCancel(uint16_t jobNum)
Determine whether the job should be canceled.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
static Env * GetEnv()
Get default client environment.
bool GetString(const std::string &key, std::string &value)
Definition: XrdClEnv.cc:31
bool GetInt(const std::string &key, int &value)
Definition: XrdClEnv.cc:89
Send file/filesystem queries to an XRootD cluster.
XRootDStatus Rm(const std::string &path, ResponseHandler *handler, uint16_t timeout=0) XRD_WARN_UNUSED_RESULT
A file.
Definition: XrdClFile.hh:46
XRootDStatus ListXAttr(ResponseHandler *handler, uint16_t timeout=0)
Definition: XrdClFile.cc:764
XRootDStatus SetXAttr(const std::vector< xattr_t > &attrs, ResponseHandler *handler, uint16_t timeout=0)
Definition: XrdClFile.cc:665
Interface for a job to be run by the job manager.
Path location info.
Iterator Begin()
Get the location begin iterator.
LocationList::iterator Iterator
Iterator over locations.
Iterator End()
Get the location end iterator.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
An abstract class to describe the client-side monitoring plugin interface.
Definition: XrdClMonitor.hh:56
@ EvCheckSum
CheckSumInfo: File checksummed.
virtual void Event(EventCode evCode, void *evData)=0
void SetOnDataConnectHandler(const URL &url, std::shared_ptr< Job > onConnJob)
Set the on-connect handler for data streams.
A key-value pair map storing both keys and values as strings.
void Set(const std::string &name, const Item &value)
bool Get(const std::string &name, Item &item) const
Singleton access to URL to virtual redirector mapping.
static RedirectorRegistry & Instance()
Returns reference to the single instance.
VirtualRedirector * Get(const URL &url) const
Get a virtual redirector associated with the given URL.
Handle an async response.
Object stat info.
uint64_t GetSize() const
Get size (in bytes)
URL representation.
Definition: XrdClURL.hh:31
std::map< std::string, std::string > ParamsMap
Definition: XrdClURL.hh:33
void SetParams(const std::string &params)
Set params.
Definition: XrdClURL.cc:402
std::string GetLocation() const
Get location (protocol://host:port/path)
Definition: XrdClURL.cc:344
const ParamsMap & GetParams() const
Get the URL params.
Definition: XrdClURL.hh:244
const std::string & GetPath() const
Get the path.
Definition: XrdClURL.hh:217
static std::string NormalizeChecksum(const std::string &name, const std::string &checksum)
Normalize checksum.
Definition: XrdClUtils.cc:648
static std::string InferChecksumType(const XrdCl::URL &source, const XrdCl::URL &destination, bool zip=false)
Automatically infer the right checksum type.
Definition: XrdClUtils.cc:771
static uint64_t GetElapsedMicroSecs(timeval start, timeval end)
Get the elapsed microseconds between two timevals.
Definition: XrdClUtils.cc:269
static XRootDStatus GetLocalCheckSum(std::string &checkSum, const std::string &checkSumType, const std::string &path)
Get a checksum from local file.
Definition: XrdClUtils.cc:330
static bool HasXAttr(const XrdCl::URL &url)
Definition: XrdClUtils.hh:255
static XRootDStatus GetRemoteCheckSum(std::string &checkSum, const std::string &checkSumType, const URL &url)
Get a checksum from a remote xrootd server.
Definition: XrdClUtils.cc:279
static bool HasPgRW(const XrdCl::URL &url)
Definition: XrdClUtils.hh:269
An interface for metadata redirectors.
virtual long long GetSize() const =0
virtual const std::vector< std::string > & GetReplicas()=0
Returns a vector with replicas as given in the meatlink file.
virtual std::string GetCheckSum(const std::string &type) const =0
const uint16_t suRetry
Definition: XrdClStatus.hh:40
const uint16_t errUninitialized
Definition: XrdClStatus.hh:60
const uint16_t errErrorResponse
Definition: XrdClStatus.hh:105
const char *const DefaultCpTarget
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t errNotImplemented
Operation is not implemented.
Definition: XrdClStatus.hh:64
CloseArchiveImpl< false > CloseArchive(Ctx< ZipArchive > zip, uint16_t timeout=0)
Factory for creating CloseFileImpl objects.
SetXAttrImpl< false > SetXAttr(Ctx< File > file, Arg< std::string > name, Arg< std::string > value)
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
ReadImpl< false > Read(Ctx< File > file, Arg< uint64_t > offset, Arg< uint32_t > size, Arg< void * > buffer, uint16_t timeout=0)
Factory for creating ReadImpl objects.
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
const uint16_t errDataError
data is corrupted
Definition: XrdClStatus.hh:63
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
GetXAttrImpl< false > GetXAttr(Ctx< File > file, Arg< std::string > name)
const uint16_t stOK
Everything went OK.
Definition: XrdClStatus.hh:31
const int DefaultSubStreamsPerChannel
const int DefaultCpUsePgWrtRd
const uint16_t errOSError
Definition: XrdClStatus.hh:61
const uint64_t UtilityMsg
const uint16_t errInvalidArgs
Definition: XrdClStatus.hh:58
std::tuple< std::string, std::string > xattr_t
Extended attribute key - value pair.
const uint16_t errNotSupported
Definition: XrdClStatus.hh:62
const uint16_t errRetry
Try again for whatever reason.
Definition: XrdClStatus.hh:49
const uint16_t errCheckSumError
Definition: XrdClStatus.hh:101
const uint16_t suDone
Definition: XrdClStatus.hh:38
const uint16_t errThresholdExceeded
Definition: XrdClStatus.hh:92
const uint16_t errOperationInterrupted
Definition: XrdClStatus.hh:91
const uint16_t suContinue
Definition: XrdClStatus.hh:39
const uint16_t errNoMoreReplicas
No more replicas to try.
Definition: XrdClStatus.hh:65
OpenArchiveImpl< false > OpenArchive(Ctx< ZipArchive > zip, Arg< std::string > fn, Arg< OpenFlags::Flags > flags, uint16_t timeout=0)
Factory for creating OpenArchiveImpl objects.
const int DefaultZipMtlnCksum
Mode
Access mode.
Describe a data chunk for vector read.
uint64_t GetOffset() const
Get the offset.
uint32_t GetLength() const
Get the data length.
void * GetBuffer()
Get the buffer.
Describe a checksum event.
TransferInfo transfer
The transfer in question.
uint64_t tTime
Microseconds to obtain cksum from target.
bool isOK
True if checksum matched, false otherwise.
std::string cksum
Checksum as "type:value".
uint64_t oTime
Microseconds to obtain cksum from origin.
const URL * target
URL of the target.
const URL * origin
URL of the origin.
Flags
Open flags, may be or'd when appropriate.
@ Read
Open only for reading.
@ Write
Open only for writing.
uint32_t GetLength() const
Get the data length.
uint64_t GetOffset() const
Get the offset.
void * GetBuffer()
Get the buffer.
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
uint16_t status
Status of the execution.
Definition: XrdClStatus.hh:146
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97
uint32_t errNo
Errno, if any.
Definition: XrdClStatus.hh:148