19#ifndef __XRD_CL_STREAM_HH__
20#define __XRD_CL_STREAM_HH__
68 Stream( std::shared_ptr<URL> url,
const URL &prefer =
URL() );
93 pTransport = transport;
109 pIncomingQueue = incomingQueue;
117 pChannelData = channelData;
125 pTaskManager = taskManager;
133 pJobManager = jobManager;
152 void Tick( time_t now );
184 std::shared_ptr<Message> msg,
185 uint32_t bytesReceived );
190 std::pair<Message *, MsgHandler *>
198 uint32_t bytesSent );
266 pOnDataConnJob = onConnJob;
285 static bool IsPartial(
Message &msg );
290 inline static bool HasNetAddr(
const XrdNetAddr &addr,
291 std::vector<XrdNetAddr> &addresses )
293 auto itr = addresses.begin();
294 for( ; itr != addresses.end() ; ++itr )
296 if( itr->Same( &addr ) )
return true;
305 class ForceDisconnectJob:
public Job
308 ForceDisconnectJob( std::shared_ptr<URL> url ) : pUrl( url ) {}
309 virtual ~ForceDisconnectJob() {}
310 virtual void Run(
void* )
316 std::shared_ptr<URL> pUrl;
322 class HandleIncMsgJob:
public Job
325 HandleIncMsgJob( MsgHandler *handler ): pHandler( handler ) {};
326 virtual ~HandleIncMsgJob() {};
327 virtual void Run(
void* )
333 MsgHandler *pHandler;
339 void OnFatalError( uint16_t subStream,
341 XrdSysMutexHelper &lock );
346 void MonitorDisconnection( XRootDStatus status );
351 XRootDStatus RequestClose( Message &resp );
353 typedef std::vector<SubStreamData*> SubStreamList;
358 std::shared_ptr<URL> pUrl;
360 std::string pStreamName;
361 TransportHandler *pTransport;
363 TaskManager *pTaskManager;
364 JobManager *pJobManager;
365 XrdSysRecMutex pMutex;
366 InQueue *pIncomingQueue;
367 AnyObject *pChannelData;
368 uint32_t pLastStreamError;
369 XRootDStatus pLastFatalError;
370 uint16_t pStreamErrorWindow;
371 uint16_t pConnectionCount;
372 uint16_t pConnectionRetry;
373 time_t pConnectionInitTime;
374 uint16_t pConnectionWindow;
375 SubStreamList pSubStreams;
376 std::vector<XrdNetAddr> pAddresses;
378 ChannelHandlerList pChannelEvHandlers;
380 ForceDisconnectJob *pTTLDiscJob;
381 std::atomic<int> pSubsWaitingClose;
393 XrdSysCondVar pDiscCV;
399 timeval pConnectionStarted;
400 timeval pConnectionDone;
401 std::atomic<uint64_t> pBytesSent;
402 std::atomic<uint64_t> pBytesReceived;
407 std::shared_ptr<Job> pOnDataConnJob;
#define XRD_WARN_UNUSED_RESULT
XrdSys::RAtomic< uint64_t > RAtomic_uint64_t
A communication channel between the client and the server.
static PostMaster * GetPostMaster()
Get default post master.
A synchronize queue for incoming data.
The message representation used throughout the system.
Interface for socket pollers.
Status ForceDisconnect(const URL &url)
Shut down a channel.
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
void SetTransport(TransportHandler *transport)
Set the transport.
StreamStatus
Status of the stream.
@ Disconnected
Not connected.
@ Connecting
In the process of being connected.
void SetIncomingQueue(InQueue *incomingQueue)
Set the incoming queue.
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
bool CanCollapse(const URL &url)
void SetPoller(Poller *poller)
Set the poller.
void ForceConnect()
Force connection.
Stream(std::shared_ptr< URL > url, const URL &prefer=URL())
Constructor.
void ForceError(XRootDStatus status, bool hush=false)
Force error.
void SetTaskManager(TaskManager *taskManager)
Set task manager.
void SetOnDataConnectHandler(std::shared_ptr< Job > &onConnJob)
Set the on-connect handler for data streams.
void SetJobManager(JobManager *jobManager)
Set job manager.
void Disconnect(bool force=false)
Disconnect the stream.
XRootDStatus EnableLink(PathID &path)
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
const std::string & GetName() const
Return stream name.
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
const URL * GetURL() const
Get the URL.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
void SetChannelData(AnyObject *channelData)
Set the channel data.
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
XRootDStatus Initialize()
Initializer.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
Perform the handshake and the authentication for each physical stream.
Procedure execution status.