XRootD
XrdClOperations.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3 // Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4 // Michal Simon <michal.simon@cern.ch>
5 //------------------------------------------------------------------------------
6 // This file is part of the XRootD software suite.
7 //
8 // XRootD is free software: you can redistribute it and/or modify
9 // it under the terms of the GNU Lesser General Public License as published by
10 // the Free Software Foundation, either version 3 of the License, or
11 // (at your option) any later version.
12 //
13 // XRootD is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 // GNU General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public License
19 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20 //
21 // In applying this licence, CERN does not waive the privileges and immunities
22 // granted to it by virtue of its status as an Intergovernmental Organization
23 // or submit itself to any jurisdiction.
24 //------------------------------------------------------------------------------
25 
26 #ifndef __XRD_CL_OPERATIONS_HH__
27 #define __XRD_CL_OPERATIONS_HH__
28 
29 #include <memory>
30 #include <stdexcept>
31 #include <sstream>
32 #include <tuple>
33 #include <future>
36 #include "XrdCl/XrdClArg.hh"
39 #include "XrdSys/XrdSysPthread.hh"
40 
42 #include "XrdCl/XrdClJobManager.hh"
43 #include "XrdCl/XrdClPostMaster.hh"
44 #include "XrdCl/XrdClDefaultEnv.hh"
45 
46 namespace XrdCl
47 {
48 
49  class Pipeline;
50  class PipelineHandler;
51 
52  //----------------------------------------------------------------------------
58  //----------------------------------------------------------------------------
59  template<bool HasHndl>
60  class Operation
61  {
62  // Declare friendship between templates
63  template<bool>
64  friend class Operation;
65 
66  friend std::future<XRootDStatus> Async( Pipeline, uint16_t );
67 
68  friend class Pipeline;
69  friend class PipelineHandler;
70 
71  public:
72 
73  //------------------------------------------------------------------------
75  //------------------------------------------------------------------------
76  Operation() : valid( true )
77  {
78  }
79 
80  //------------------------------------------------------------------------
82  //------------------------------------------------------------------------
83  template<bool from>
85  handler( std::move( op.handler ) ), valid( true )
86  {
87  if( !op.valid ) throw std::invalid_argument( "Cannot construct "
88  "Operation from an invalid Operation!" );
89  op.valid = false;
90  }
91 
92  //------------------------------------------------------------------------
94  //------------------------------------------------------------------------
95  virtual ~Operation()
96  {
97  }
98 
99  //------------------------------------------------------------------------
101  //------------------------------------------------------------------------
102  virtual std::string ToString() = 0;
103 
104  //------------------------------------------------------------------------
108  //------------------------------------------------------------------------
109  virtual Operation<HasHndl>* Move() = 0;
110 
111  //------------------------------------------------------------------------
116  //------------------------------------------------------------------------
117  virtual Operation<true>* ToHandled() = 0;
118 
119  protected:
120 
121  //------------------------------------------------------------------------
126  //------------------------------------------------------------------------
127  void Run( Timeout timeout,
128  std::promise<XRootDStatus> prms,
129  std::function<void(const XRootDStatus&)> final );
130 
131  //------------------------------------------------------------------------
137  //------------------------------------------------------------------------
138  virtual XRootDStatus RunImpl( PipelineHandler *handler, uint16_t timeout ) = 0;
139 
140  //------------------------------------------------------------------------
144  //------------------------------------------------------------------------
145  void AddOperation( Operation<true> *op );
146 
147  //------------------------------------------------------------------------
149  //------------------------------------------------------------------------
150  std::unique_ptr<PipelineHandler> handler;
151 
152  //------------------------------------------------------------------------
154  //------------------------------------------------------------------------
155  bool valid;
156  };
157 
158  //----------------------------------------------------------------------------
160  //----------------------------------------------------------------------------
161  typedef std::function<Operation<true>*(const XRootDStatus&)> rcvry_func;
162 
163  //----------------------------------------------------------------------------
166  //----------------------------------------------------------------------------
168  {
169  template<bool> friend class Operation;
170 
171  public:
172 
173  //------------------------------------------------------------------------
177  //------------------------------------------------------------------------
178  PipelineHandler( ResponseHandler *handler );
179 
180  //------------------------------------------------------------------------
182  //------------------------------------------------------------------------
184  {
185  }
186 
187  //------------------------------------------------------------------------
189  //------------------------------------------------------------------------
190  void HandleResponseWithHosts( XRootDStatus *status, AnyObject *response,
191  HostList *hostList );
192 
193  //------------------------------------------------------------------------
195  //------------------------------------------------------------------------
196  void HandleResponse( XRootDStatus *status, AnyObject *response );
197 
198  //------------------------------------------------------------------------
200  //------------------------------------------------------------------------
202  {
203  }
204 
205  //------------------------------------------------------------------------
209  //------------------------------------------------------------------------
210  void AddOperation( Operation<true> *operation );
211 
212  //------------------------------------------------------------------------
219  //------------------------------------------------------------------------
220  void Assign( const Timeout &timeout,
221  std::promise<XRootDStatus> prms,
222  std::function<void(const XRootDStatus&)> final,
223  Operation<true> *opr );
224 
225  //------------------------------------------------------------------------
227  //------------------------------------------------------------------------
228  void Assign( std::function<void(const XRootDStatus&)> final );
229 
230  //------------------------------------------------------------------------
232  //------------------------------------------------------------------------
233  void PreparePipelineStart();
234 
235  private:
236 
237  //------------------------------------------------------------------------
239  //------------------------------------------------------------------------
240  void HandleResponseImpl( XRootDStatus *status, AnyObject *response, HostList *hostList );
241 
242  inline void dealloc( XRootDStatus *status, AnyObject *response,
243  HostList *hostList )
244  {
245  delete status;
246  delete response;
247  delete hostList;
248  }
249 
250  //------------------------------------------------------------------------
252  //------------------------------------------------------------------------
253  std::unique_ptr<ResponseHandler> responseHandler;
254 
255  //------------------------------------------------------------------------
257  //------------------------------------------------------------------------
258  std::unique_ptr<Operation<true>> currentOperation;
259 
260  //------------------------------------------------------------------------
262  //------------------------------------------------------------------------
263  std::unique_ptr<Operation<true>> nextOperation;
264 
265  //------------------------------------------------------------------------
267  //------------------------------------------------------------------------
268  Timeout timeout;
269 
270  //------------------------------------------------------------------------
272  //------------------------------------------------------------------------
273  std::promise<XRootDStatus> prms;
274 
275  //------------------------------------------------------------------------
278  //------------------------------------------------------------------------
279  std::function<void(const XRootDStatus&)> final;
280  };
281 
282  //----------------------------------------------------------------------------
288  //----------------------------------------------------------------------------
289  class Pipeline
290  {
291  template<bool> friend class ParallelOperation;
292  friend std::future<XRootDStatus> Async( Pipeline, uint16_t );
293  friend class PipelineHandler;
294 
295  public:
296 
297  //------------------------------------------------------------------------
299  //------------------------------------------------------------------------
301  {
302  }
303 
304  //------------------------------------------------------------------------
306  //------------------------------------------------------------------------
308  operation( op->Move() )
309  {
310  }
311 
312  //------------------------------------------------------------------------
314  //------------------------------------------------------------------------
316  operation( op.Move() )
317  {
318  }
319 
320  //------------------------------------------------------------------------
322  //------------------------------------------------------------------------
324  operation( op.Move() )
325  {
326  }
327 
329  operation( op->ToHandled() )
330  {
331  }
332 
333  //------------------------------------------------------------------------
335  //------------------------------------------------------------------------
337  operation( op.ToHandled() )
338  {
339  }
340 
341  //------------------------------------------------------------------------
343  //------------------------------------------------------------------------
345  operation( op.ToHandled() )
346  {
347  }
348 
349  Pipeline( Pipeline &&pipe ) :
350  operation( std::move( pipe.operation ) )
351  {
352  }
353 
354  //------------------------------------------------------------------------
356  //------------------------------------------------------------------------
358  {
359  operation = std::move( pipe.operation );
360  return *this;
361  }
362 
363  //------------------------------------------------------------------------
365  //------------------------------------------------------------------------
367  {
368  operation->AddOperation( op.Move() );
369  return *this;
370  }
371 
372  //------------------------------------------------------------------------
374  //------------------------------------------------------------------------
376  {
377  operation->AddOperation( op.ToHandled() );
378  return *this;
379  }
380 
381  //------------------------------------------------------------------------
385  //------------------------------------------------------------------------
386  operator Operation<true>&()
387  {
388  if( !bool( operation ) ) throw std::logic_error( "Invalid pipeline." );
389  return *operation.get();
390  }
391 
392  //------------------------------------------------------------------------
396  //------------------------------------------------------------------------
397  operator bool()
398  {
399  return bool( operation );
400  }
401 
402  //------------------------------------------------------------------------
406  //------------------------------------------------------------------------
407  static void Stop( const XRootDStatus &status = XrdCl::XRootDStatus() );
408 
409  //------------------------------------------------------------------------
411  //------------------------------------------------------------------------
412  static void Repeat();
413 
414  //------------------------------------------------------------------------
416  //------------------------------------------------------------------------
417  static void Replace( Operation<false> &&opr );
418 
419  //------------------------------------------------------------------------
421  //------------------------------------------------------------------------
422  static void Replace( Pipeline p );
423 
424  //------------------------------------------------------------------------
426  //------------------------------------------------------------------------
427  static void Ignore();
428 
429  private:
430 
431  //------------------------------------------------------------------------
436  //------------------------------------------------------------------------
437  Operation<true>* operator->()
438  {
439  return operation.get();
440  }
441 
442  //------------------------------------------------------------------------
447  //------------------------------------------------------------------------
448  void Run( Timeout timeout, std::function<void(const XRootDStatus&)> final = nullptr )
449  {
450  if( ftr.valid() )
451  throw std::logic_error( "Pipeline is already running!" );
452 
453  // a promise that the pipe will have a result
454  std::promise<XRootDStatus> prms;
455  ftr = prms.get_future();
456 
457  if( !operation ) std::logic_error( "Empty pipeline!" );
458 
459  Operation<true> *opr = operation.release();
460  PipelineHandler *h = opr->handler.get();
461  if( h )
462  h->PreparePipelineStart();
463 
464  opr->Run( timeout, std::move( prms ), std::move( final ) );
465  }
466 
467  //------------------------------------------------------------------------
469  //------------------------------------------------------------------------
470  std::unique_ptr<Operation<true>> operation;
471 
472  //------------------------------------------------------------------------
474  //------------------------------------------------------------------------
475  std::future<XRootDStatus> ftr;
476 
477  };
478 
479  //----------------------------------------------------------------------------
486  //----------------------------------------------------------------------------
487  inline std::future<XRootDStatus> Async( Pipeline pipeline, uint16_t timeout = 0 )
488  {
489  pipeline.Run( timeout );
490  return std::move( pipeline.ftr );
491  }
492 
493  //----------------------------------------------------------------------------
501  //----------------------------------------------------------------------------
502  inline XRootDStatus WaitFor( Pipeline pipeline, uint16_t timeout = 0 )
503  {
504  return Async( std::move( pipeline ), timeout ).get();
505  }
506 
507  //----------------------------------------------------------------------------
514  //----------------------------------------------------------------------------
515  template<template<bool> class Derived, bool HasHndl, typename HdlrFactory, typename ... Args>
516  class ConcreteOperation: public Operation<HasHndl>
517  {
518  template<template<bool> class, bool, typename, typename ...>
519  friend class ConcreteOperation;
520 
521  public:
522 
523  //------------------------------------------------------------------------
527  //------------------------------------------------------------------------
528  ConcreteOperation( Args&&... args ) : args( std::tuple<Args...>( std::move( args )... ) ),
529  timeout( 0 )
530  {
531  static_assert( !HasHndl, "It is only possible to construct operation without handler" );
532  }
533 
534  //------------------------------------------------------------------------
540  //------------------------------------------------------------------------
541  template<bool from>
543  Operation<HasHndl>( std::move( op ) ), args( std::move( op.args ) ), timeout( 0 )
544  {
545  }
546 
547  //------------------------------------------------------------------------
555  //------------------------------------------------------------------------
556  template<typename Hdlr>
557  Derived<true> operator>>( Hdlr &&hdlr )
558  {
559  return this->StreamImpl( HdlrFactory::Create( hdlr ) );
560  }
561 
562  //------------------------------------------------------------------------
568  //------------------------------------------------------------------------
569  Derived<true> operator|( Operation<true> &op )
570  {
571  return PipeImpl( *this, op );
572  }
573 
574  //------------------------------------------------------------------------
580  //------------------------------------------------------------------------
581  Derived<true> operator|( Operation<true> &&op )
582  {
583  return PipeImpl( *this, op );
584  }
585 
586  //------------------------------------------------------------------------
592  //------------------------------------------------------------------------
593  Derived<true> operator|( Operation<false> &op )
594  {
595  return PipeImpl( *this, op );
596  }
597 
598  //------------------------------------------------------------------------
604  //------------------------------------------------------------------------
605  Derived<true> operator|( Operation<false> &&op )
606  {
607  return PipeImpl( *this, op );
608  }
609 
610  //------------------------------------------------------------------------
612  //------------------------------------------------------------------------
613  Derived<true> operator|( FinalOperation &&fo )
614  {
615  AllocHandler( *this );
616  this->handler->Assign( fo.final );
617  return this->template Transform<true>();
618  }
619 
620  //------------------------------------------------------------------------
624  //------------------------------------------------------------------------
626  {
627  Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
628  return new Derived<HasHndl>( std::move( *me ) );
629  }
630 
631  //------------------------------------------------------------------------
635  //------------------------------------------------------------------------
637  {
638  this->handler.reset( new PipelineHandler() );
639  Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
640  return new Derived<true>( std::move( *me ) );
641  }
642 
643  //------------------------------------------------------------------------
645  //------------------------------------------------------------------------
646  Derived<HasHndl> Timeout( uint16_t timeout )
647  {
648  this->timeout = timeout;
649  Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
650  return std::move( *me );
651  }
652 
653  protected:
654 
655  //------------------------------------------------------------------------
659  //------------------------------------------------------------------------
660  template<bool to>
661  inline Derived<to> Transform()
662  {
663  Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
664  return Derived<to>( std::move( *me ) );
665  }
666 
667  //------------------------------------------------------------------------
673  //------------------------------------------------------------------------
674  inline Derived<true> StreamImpl( ResponseHandler *handler )
675  {
676  static_assert( !HasHndl, "Operator >> is available only for operation without handler" );
677  this->handler.reset( new PipelineHandler( handler ) );
678  return Transform<true>();
679  }
680 
681  //------------------------------------------------------------------------
682  // Allocate handler if necessary
683  //------------------------------------------------------------------------
684  inline static
686  {
687  // nothing to do
688  }
689 
690  //------------------------------------------------------------------------
691  // Allocate handler if necessary
692  //------------------------------------------------------------------------
693  inline static
695  {
696  me.handler.reset( new PipelineHandler() );
697  }
698 
699  //------------------------------------------------------------------------
706  //------------------------------------------------------------------------
707  inline static
708  Derived<true> PipeImpl( ConcreteOperation<Derived, HasHndl, HdlrFactory,
709  Args...> &me, Operation<true> &op )
710  {
711  AllocHandler( me ); // if HasHndl is false allocate handler
712  me.AddOperation( op.Move() );
713  return me.template Transform<true>();
714  }
715 
716  //------------------------------------------------------------------------
723  //------------------------------------------------------------------------
724  inline static
725  Derived<true> PipeImpl( ConcreteOperation<Derived, HasHndl, HdlrFactory,
726  Args...> &me, Operation<false> &op )
727  {
728  AllocHandler( me ); // if HasHndl is false allocate handler
729  me.AddOperation( op.ToHandled() );
730  return me.template Transform<true>();
731  }
732 
733  //------------------------------------------------------------------------
735  //------------------------------------------------------------------------
736  std::tuple<Args...> args;
737 
738  //------------------------------------------------------------------------
740  //------------------------------------------------------------------------
741  uint16_t timeout;
742  };
743 
744  // Out-of-line methods for class Operation
745 
746  template <bool HasHndl>
747  void Operation<HasHndl>::Run(Timeout timeout, std::promise<XRootDStatus> prms,
748  std::function<void(const XRootDStatus &)> f)
749  {
750  static_assert(HasHndl, "Only an operation that has a handler can be assigned to workflow");
751 
752  XRootDStatus st;
753  handler->Assign(timeout, std::move(prms), std::move(f), this);
754  PipelineHandler *h = handler.release();
755 
756  try {
757  st = RunImpl(h, timeout);
758  } catch (const operation_expired &ex) {
760  } catch (const PipelineException &ex) { // probably not needed
761  st = ex.GetError();
762  } catch (const std::exception &ex) {
763  st = XRootDStatus(stError, errInternal, 0, ex.what());
764  }
765 
766  if (!st.IsOK()) {
767  ResponseJob *job = new ResponseJob(h, new XRootDStatus(st), 0, nullptr);
769  }
770  }
771 
772  template <bool HasHndl>
774  {
775  if (handler)
776  handler->AddOperation(op);
777  }
778 }
779 
780 #endif // __XRD_CL_OPERATIONS_HH__
bool Create
static void AllocHandler(ConcreteOperation< Derived, true, HdlrFactory, Args... > &me)
static void AllocHandler(ConcreteOperation< Derived, false, HdlrFactory, Args... > &me)
std::tuple< Args... > args
Operation arguments.
ConcreteOperation(ConcreteOperation< Derived, from, HdlrFactory, Args... > &&op)
uint16_t timeout
Operation timeout.
ConcreteOperation(Args &&... args)
Operation< HasHndl > * Move()
static Derived< true > PipeImpl(ConcreteOperation< Derived, HasHndl, HdlrFactory, Args... > &me, Operation< false > &op)
Operation< true > * ToHandled()
Derived< true > operator|(FinalOperation &&fo)
Adds a final operation to the pipeline.
Derived< HasHndl > Timeout(uint16_t timeout)
Set operation timeout.
Derived< true > operator|(Operation< true > &&op)
Derived< true > StreamImpl(ResponseHandler *handler)
Derived< true > operator|(Operation< false > &op)
Derived< true > operator|(Operation< true > &op)
static Derived< true > PipeImpl(ConcreteOperation< Derived, HasHndl, HdlrFactory, Args... > &me, Operation< true > &op)
Derived< true > operator>>(Hdlr &&hdlr)
Derived< true > operator|(Operation< false > &&op)
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
virtual ~Operation()
Destructor.
friend class PipelineHandler
Operation()
Constructor.
void AddOperation(Operation< true > *op)
void Run(Timeout timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final)
bool valid
Flag indicating if it is a valid object.
friend std::future< XRootDStatus > Async(Pipeline, uint16_t)
virtual std::string ToString()=0
Name of the operation.
virtual XRootDStatus RunImpl(PipelineHandler *handler, uint16_t timeout)=0
std::unique_ptr< PipelineHandler > handler
Operation handler.
virtual Operation< true > * ToHandled()=0
virtual Operation< HasHndl > * Move()=0
Operation(Operation< from > &&op)
Move constructor between template instances.
Pipeline exception, wrapps an XRootDStatus.
void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
Callback function.
PipelineHandler()
Default Constructor.
void PreparePipelineStart()
Called by a pipeline on the handler of its first operation before Run.
void Assign(const Timeout &timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final, Operation< true > *opr)
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback function.
void AddOperation(Operation< true > *operation)
Pipeline(Operation< true > *op)
Constructor.
Pipeline(Operation< true > &&op)
Constructor.
static void Repeat()
Repeat current operation.
Pipeline(Operation< true > &op)
Constructor.
friend class PipelineHandler
Pipeline & operator=(Pipeline &&pipe)
Constructor.
Pipeline & operator|=(Operation< false > &&op)
Extend pipeline.
Pipeline(Pipeline &&pipe)
friend std::future< XRootDStatus > Async(Pipeline, uint16_t)
Pipeline(Operation< false > *op)
static void Stop(const XRootDStatus &status=XrdCl::XRootDStatus())
Pipeline(Operation< false > &&op)
Constructor.
Pipeline & operator|=(Operation< true > &&op)
Extend pipeline.
static void Replace(Operation< false > &&opr)
Replace current operation.
static void Ignore()
Ignore error and proceed with the pipeline.
Pipeline(Operation< false > &op)
Constructor.
Pipeline()
Default constructor.
JobManager * GetJobManager()
Get the job manager object user by the post master.
Handle an async response.
Call the user callback.
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
std::function< Operation< true > *(const XRootDStatus &)> rcvry_func
Type of the recovery function to be provided by the user.
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
std::vector< HostInfo > HostList
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124