42#include <sys/socket.h>
105 pLastStreamError( 0 ),
106 pConnectionCount( 0 ),
107 pConnectionInitTime( 0 ),
108 pAddressType(
Utils::IPAll ),
110 pTTLDiscJob( nullptr ),
111 pSubsWaitingClose( 0 ),
117 pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
118 pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
120 std::ostringstream o;
121 o << pUrl->GetHostId();
122 pStreamName = o.str();
149 "Connection Window: %d, ConnectionRetry: %d, Stream Error "
150 "Window: %d", pStreamName.c_str(), netStack.c_str(),
151 pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
163 pStreamName.c_str() );
167 SubStreamList::iterator it;
168 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
177 if( !pTransport || !pPoller || !pChannelData )
181 pChannelData, 0,
this );
183 pSubStreams[0]->socket = s;
216 return pSubStreams[0]->socket->EnableUplink();
220 return pSubStreams[path.
up]->socket->EnableUplink();
231 time_t now = ::time(0);
233 if( now-pLastStreamError < pStreamErrorWindow )
234 return pLastFatalError;
236 gettimeofday( &pConnectionStarted, 0 );
246 "the host", pStreamName.c_str() );
247 pLastStreamError = now;
249 pLastFatalError = st;
253 if( pPrefer.IsValid() )
255 std::vector<XrdNetAddr> addrresses;
260 pStreamName.c_str(), pPrefer.GetHostName().c_str() );
264 std::vector<XrdNetAddr> tmp;
265 tmp.reserve( pAddresses.size() );
267 auto itr = pAddresses.begin();
268 for( ; itr != pAddresses.end() ; ++itr )
270 if( !HasNetAddr( *itr, addrresses ) )
271 tmp.push_back( *itr );
274 std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
276 pAddresses.swap( tmp );
283 while( !pAddresses.empty() )
285 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
286 pAddresses.pop_back();
287 pConnectionInitTime = ::time( 0 );
288 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
320 PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
321 if( pSubStreams.size() <= path.
up )
324 "substream %d, using 0 instead", pStreamName.c_str(),
330 "substream %d expecting answer at %d", pStreamName.c_str(),
339 pTransport->MultiplexSubStream( msg, *pChannelData, &path );
340 pSubStreams[path.
up]->outQueue->PushBack( msg, handler,
374 while ( pDiscAllCnt )
382 SubStreamList::iterator it;
383 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
385 (*it)->socket->Close();
388 pSubsWaitingClose = 0;
391 discLock.
Lock( &pDiscCV );
406 SubStreamList::iterator it;
407 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
412 pIncomingQueue->ReportTimeout( now );
427 StreamConnectorTask(
const XrdCl::URL &url,
const std::string &n ):
430 std::string name =
"StreamConnectorTask for ";
453 ServerResponse *rsp =
reinterpret_cast<ServerResponse*
>( response.GetBuffer() );
456 ClientCloseRequest *req;
459 memcpy( req->
fhandle,
reinterpret_cast<uint8_t*
>( rsp->
body.buffer.data ), 4 );
461 msg->SetSessionId( pSessionId );
462 NullResponseHandler *handler =
new NullResponseHandler();
463 MessageSendParams params;
465 params.followRedirects =
false;
466 params.stateful =
true;
474 bool Stream::IsPartial(
Message &msg )
476 ServerResponseHeader *rsphdr = (ServerResponseHeader*)msg.GetBuffer();
482 ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
494 std::shared_ptr<Message> msg,
495 uint32_t bytesReceived )
497 msg->SetSessionId( pSessionId );
498 pBytesReceived += bytesReceived;
509 if( !IsPartial( *msg ) )
511 uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
518 RequestClose( *msg );
532 "(status=%d, SID=[%d,%d]), no MsgHandler found.",
533 pStreamName.c_str(), (
void*)msg.get(), rsp->
hdr.
status,
542 pStreamName.c_str(), (
void*)msg.get() );
547 pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
550 if( IsPartial( *msg ) )
559 Job *job =
new HandleIncMsgJob( handler );
560 pJobManager->QueueJob( job );
566 std::pair<Message *, MsgHandler *>
571 if( pSubStreams[subStream]->outQueue->IsEmpty() )
574 pSubStreams[subStream]->socket->GetStreamName().c_str() );
576 pSubStreams[subStream]->socket->DisableUplink();
581 h.
msg = pSubStreams[subStream]->outQueue->PopMessage( h.
handler,
586 "from out-queue to in-queue, starting to send outgoing.",
587 pUrl->GetHostId().c_str(), (
void*)h.
handler,
595 pIncomingQueue->AddMessageHandler( h.
handler, rmMsg );
600 pStreamName.c_str() );
612 if( pSubStreams[subStream]->outQueue->IsEmpty() )
615 pSubStreams[subStream]->socket->GetStreamName().c_str() );
616 pSubStreams[subStream]->socket->DisableUplink();
627 pTransport->MessageSent( msg, subStream, bytesSent,
630 pBytesSent += bytesSent;
634 pIncomingQueue->AssignTimeout( h.
handler );
639 pSubStreams[subStream]->outMsgHelper.
Reset();
651 if( pSubStreams.size() > 1 )
653 for(
size_t i = 1; i < pSubStreams.size(); ++i )
658 pSubsWaitingClose = nsubconn;
659 pSubStreams[0]->socket->DisableUplink();
663 pSubStreams[0]->socket->EnableUplink();
667 if( pSubsWaitingClose > 0 )
669 pSubStreams[subStream]->socket->Close();
671 if( --pSubsWaitingClose == 0 )
682 std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
685 subStream, ipstack.c_str() );
689 pLastStreamError = 0;
691 pConnectionCount = 0;
692 uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
693 pSessionId = ++sSessCntGen;
698 if( pSubStreams.size() == 1 && numSub > 1 )
700 for( uint16_t i = 1; i < numSub; ++i )
702 URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
704 pChannelData, i,
this );
706 pSubStreams[i]->socket = s;
715 if( pSubStreams.size() > 1 )
718 pStreamName.c_str(), pSubStreams.size() - 1 );
719 for(
size_t i = 1; i < pSubStreams.size(); ++i )
721 pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
722 XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
725 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
744 gettimeofday( &pConnectionDone, 0 );
749 i.
server = pUrl->GetHostId();
750 i.
sTOD = pConnectionStarted;
751 i.
eTOD = pConnectionDone;
752 i.
streams = pSubStreams.size();
755 std::string *qryResponse =
nullptr;
757 qryResult.
Get( qryResponse );
760 i.
auth = *qryResponse;
774 else if( pOnDataConnJob )
779 pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
790 pSubStreams[subStream]->socket->Close();
791 time_t now = ::time(0);
809 if( --pSubsWaitingClose == 0 )
817 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
820 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
822 OnFatalError( 0, st, scopedLock );
829 OnFatalError( subStream, status, scopedLock );
836 time_t elapsed = now-pConnectionInitTime;
838 pStreamName.c_str(), (
long long) elapsed, pConnectionWindow );
843 if( !pAddresses.empty() )
848 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
849 pAddresses.pop_back();
850 pConnectionInitTime = ::time( 0 );
851 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
853 while( !pAddresses.empty() && !st.
IsOK() );
856 OnFatalError( subStream, st, scopedLock );
864 else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
868 pStreamName.c_str(), (
long long) (pConnectionWindow - elapsed) );
870 Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
871 pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
878 else if( pConnectionCount < pConnectionRetry && !status.
IsFatal() )
885 OnFatalError( subStream, st, scopedLock );
892 OnFatalError( subStream, status, scopedLock );
909 if( pDiscAllCnt )
return;
915 pSubStreams[subStream]->socket->Close();
919 pStreamName.c_str(), subStream, status.
ToString().c_str() );
924 if( pSubStreams[subStream]->outMsgHelper.msg )
929 pIncomingQueue->RemoveMessageHandler(h.
handler);
930 pSubStreams[subStream]->outMsgHelper.
Reset();
936 if( pSubStreams[subStream]->inMsgHelper.handler )
954 if( --pSubsWaitingClose == 0 )
964 pSubStreams[subStream]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
965 XRootDStatus st = pSubStreams[subStream]->socket->Connect( pConnectionWindow );
968 pSubStreams[subStream]->socket->Close();
969 if( pSubStreams[subStream]->outQueue->IsEmpty() )
971 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
974 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
976 OnFatalError( 0, st, scopedLock );
987 OnFatalError( subStream, status, scopedLock );
990 if( pSubStreams[subStream]->outQueue->IsEmpty() )
992 OnFatalError( subStream, status, scopedLock );
1001 if( subStream == 0 )
1003 MonitorDisconnection( status );
1005 SubStreamList::iterator it;
1006 size_t outstanding = 0;
1007 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1008 outstanding += (*it)->outQueue->GetSizeStateless();
1016 OnFatalError( 0, st, scopedLock );
1026 "message handlers.", pStreamName.c_str() );
1028 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1061 while( pDiscAllCnt )
1070 for(
size_t substream = 0; substream < pSubStreams.size(); ++substream )
1073 pSubStreams[substream]->socket->Close();
1078 pStreamName.c_str(), status.
ToString().c_str() );
1083 if( pSubStreams[substream]->outMsgHelper.msg )
1088 pIncomingQueue->RemoveMessageHandler(h.
handler);
1089 pSubStreams[substream]->outMsgHelper.
Reset();
1095 if( pSubStreams[substream]->inMsgHelper.handler )
1105 pConnectionCount = 0;
1106 pSubsWaitingClose = 0;
1113 "message handlers.", pStreamName.c_str() );
1115 SubStreamList::iterator it;
1117 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1126 discLock.
Lock( &pDiscCV );
1134 void Stream::OnFatalError( uint16_t subStream,
1141 pStreamName.c_str(), status.
ToString().c_str() );
1149 pConnectionCount = 0;
1150 pLastStreamError = ::time(0);
1151 pLastFatalError = status;
1154 SubStreamList::iterator it;
1156 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1157 q.GrabItems( *(*it)->outQueue );
1170 void Stream::MonitorDisconnection(
XRootDStatus status )
1175 Monitor::DisconnectInfo i;
1176 i.server = pUrl->GetHostId();
1177 i.rBytes = pBytesReceived;
1178 i.sBytes = pBytesSent;
1179 i.cTime = ::time(0) - pConnectionDone.tv_sec;
1193 if( substream != 0 )
1195 if( pSubsWaitingClose )
1198 if( !pSubsWaitingClose )
return true;
1200 pSubStreams[substream]->socket->Close();
1202 if( --pSubsWaitingClose == 0 )
1218 SubStreamList::iterator it;
1219 time_t now = time(0);
1222 uint32_t outgoingMessages = 0;
1223 time_t lastActivity = 0;
1224 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1226 outgoingMessages += (*it)->outQueue->GetSize();
1227 time_t sockLastActivity = (*it)->socket->GetLastActivity();
1228 if( lastActivity < sockLastActivity )
1229 lastActivity = sockLastActivity;
1232 if( !outgoingMessages )
1234 bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1239 pStreamName.c_str() );
1252 pTTLDiscJob =
new ForceDisconnectJob( pUrl );
1253 pJobManager->QueueJob( pTTLDiscJob );
1262 XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1286 pChannelEvHandlers.AddHandler( handler );
1294 pChannelEvHandlers.RemoveHandler( handler );
1305 mh.
handler = pIncomingQueue->GetHandlerForMessage( msg,
1333 pIncomingQueue->RemoveMessageHandler( mh.
handler );
1360 std::vector<XrdNetAddr> prefaddrs;
1365 , pStreamName.c_str(), url.
GetHostName().c_str() );
1372 std::vector<XrdNetAddr> aliasaddrs;
1377 , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1384 auto itr = prefaddrs.begin();
1385 for( ; itr != prefaddrs.end() ; ++itr )
1387 auto itr2 = aliasaddrs.begin();
1388 for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1389 if( itr->Same( &*itr2 ) )
return true;
1404 result.
Set(
new std::string( pSubStreams[0]->socket->GetIpAddr() ),
false );
1410 result.
Set(
new std::string( pSubStreams[0]->socket->GetIpStack() ),
false );
1416 result.
Set(
new std::string( pSubStreams[0]->socket->GetHostName() ),
false );
union ServerResponse::@040373375333017131300127053271011057331004327334 body
struct ServerResponseBody_Status bdy
XrdSys::RAtomic< uint64_t > RAtomic_uint64_t
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
@ StreamBroken
The stream is broken.
@ FatalError
Stream has been broken and won't be recovered.
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
Interface for a job to be run by the job manager.
void Error(uint64_t topic, const char *format,...)
Report an error.
void Warning(uint64_t topic, const char *format,...)
Report a warning.
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
void Info(uint64_t topic, const char *format,...)
Print an info.
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
An abstract class to describe the client-side monitoring plugin interface.
@ EvDisconnect
DisconnectInfo: Logout from a server.
@ EvConnect
ConnectInfo: Login into a server.
virtual void Event(EventCode evCode, void *evData)=0
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
virtual void OnReadyToSend(Message *msg)
@ FatalError
Stream has been broken and won't be recovered.
@ Broken
The stream is broken.
virtual uint16_t InspectStatusRsp()=0
virtual void OnStatusReady(const Message *message, XRootDStatus status)=0
The requested action has been performed and the status is available.
A synchronized queue for the outgoing data.
void GrabStateful(OutQueue &queue)
void GrabExpired(OutQueue &queue, time_t exp=0)
void GrabItems(OutQueue &queue)
void Report(XRootDStatus status)
Report status to all the handlers.
Status ForceReconnect(const URL &url)
Reconnect the channel.
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
SocketStatus
Status of the socket.
@ Disconnected
The socket is disconnected.
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
bool CanCollapse(const URL &url)
void ForceConnect()
Force connection.
Stream(std::shared_ptr< URL > url, const URL &prefer=URL())
Constructor.
void ForceError(XRootDStatus status, bool hush=false)
Force error.
Status Query(uint16_t query, AnyObject &result)
Query the stream.
void Disconnect(bool force=false)
Disconnect the stream.
XRootDStatus EnableLink(PathID &path)
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
XRootDStatus Initialize()
Initializer.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
Interface for a task to be run by the TaskManager.
@ RequestClose
Send a close request.
const std::string & GetHostName() const
Get the name of the target host.
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
Handle/Process/Forward XRootD messages.
static void SetDescription(Message *msg)
Get the description of a message.
@ qryINIF
Only consider internet protocols via ifconfig.
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
void Lock(XrdSysCondVar *CndVar)
const uint16_t errQueryNotSupported
const uint16_t errUninitialized
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
const uint16_t errOperationInterrupted
const uint16_t errInvalidSession
const uint16_t errAuthFailed
InMessageHelper(Message *message=0, MsgHandler *hndlr=0, time_t expir=0, uint16_t actio=0)
Describe a server login event.
std::string server
"user@host:port"
uint16_t streams
Number of streams.
timeval sTOD
gettimeofday() when login started
timeval eTOD
gettimeofday() when login ended
std::string auth
authentication protocol used or empty if none
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
uint16_t status
Status of the execution.
bool IsOK() const
We're fine.
bool IsFatal() const
Fatal error.
std::string ToString() const
Create a string representation.
bool IsError() const
Error.
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack
InMessageHelper inMsgHelper
AsyncSocketHandler * socket
OutQueue::MsgHelper outMsgHelper
Socket::SocketStatus status
static const uint16_t Auth
Transport name, returns std::string *.