XRootD
Loading...
Searching...
No Matches
XrdClStream.cc
Go to the documentation of this file.
1//------------------------------------------------------------------------------
2// Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3// Author: Lukasz Janyst <ljanyst@cern.ch>
4//------------------------------------------------------------------------------
5// This file is part of the XRootD software suite.
6//
7// XRootD is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// XRootD is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19//
20// In applying this licence, CERN does not waive the privileges and immunities
21// granted to it by virtue of its status as an Intergovernmental Organization
22// or submit itself to any jurisdiction.
23//------------------------------------------------------------------------------
24
25#include "XrdCl/XrdClStream.hh"
26#include "XrdCl/XrdClSocket.hh"
27#include "XrdCl/XrdClChannel.hh"
29#include "XrdCl/XrdClLog.hh"
30#include "XrdCl/XrdClMessage.hh"
32#include "XrdCl/XrdClUtils.hh"
34#include "XrdCl/XrdClMonitor.hh"
39
40#include <sys/types.h>
41#include <algorithm>
42#include <sys/socket.h>
43#include <sys/time.h>
44
45namespace XrdCl
46{
47 //----------------------------------------------------------------------------
48 // Statics
49 //----------------------------------------------------------------------------
50 RAtomic_uint64_t Stream::sSessCntGen{0};
51
52 //----------------------------------------------------------------------------
53 // Incoming message helper
54 //----------------------------------------------------------------------------
56 {
57 InMessageHelper( Message *message = 0,
58 MsgHandler *hndlr = 0,
59 time_t expir = 0,
60 uint16_t actio = 0 ):
61 msg( message ), handler( hndlr ), expires( expir ), action( actio ) {}
62 void Reset()
63 {
64 msg = 0; handler = 0; expires = 0; action = 0;
65 }
68 time_t expires;
69 uint16_t action;
70 };
71
72 //----------------------------------------------------------------------------
73 // Sub stream helper
74 //----------------------------------------------------------------------------
92
93 //----------------------------------------------------------------------------
94 // Constructor
95 //----------------------------------------------------------------------------
96 Stream::Stream( std::shared_ptr<URL> url, const URL &prefer ):
97 pUrl( url ),
98 pPrefer( prefer ),
99 pTransport( 0 ),
100 pPoller( 0 ),
101 pTaskManager( 0 ),
102 pJobManager( 0 ),
103 pIncomingQueue( 0 ),
104 pChannelData( 0 ),
105 pLastStreamError( 0 ),
106 pConnectionCount( 0 ),
107 pConnectionInitTime( 0 ),
108 pAddressType( Utils::IPAll ),
109 pSessionId( 0 ),
110 pTTLDiscJob( nullptr ),
111 pSubsWaitingClose( 0 ),
112 pDiscCV( 0 ),
113 pDiscAllCnt( 0 ),
114 pBytesSent( 0 ),
115 pBytesReceived( 0 )
116 {
117 pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
118 pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
119
120 std::ostringstream o;
121 o << pUrl->GetHostId();
122 pStreamName = o.str();
123
124 pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
126 pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
128 pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
130
131 std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
133
134 pAddressType = Utils::String2AddressType( netStack );
135 if( pAddressType == Utils::AddressType::IPAuto )
136 {
138 if( !( stacks & XrdNetUtils::hasIP64 ) )
139 {
140 if( stacks & XrdNetUtils::hasIPv4 )
141 pAddressType = Utils::AddressType::IPv4;
142 else if( stacks & XrdNetUtils::hasIPv6 )
143 pAddressType = Utils::AddressType::IPv6;
144 }
145 }
146
147 Log *log = DefaultEnv::GetLog();
148 log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
149 "Connection Window: %d, ConnectionRetry: %d, Stream Error "
150 "Window: %d", pStreamName.c_str(), netStack.c_str(),
151 pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
152 }
153
154 //----------------------------------------------------------------------------
155 // Destructor
156 //----------------------------------------------------------------------------
158 {
159 Disconnect( true );
160
161 Log *log = DefaultEnv::GetLog();
162 log->Debug( PostMasterMsg, "[%s] Destroying stream",
163 pStreamName.c_str() );
164
165 MonitorDisconnection( XRootDStatus() );
166
167 SubStreamList::iterator it;
168 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
169 delete *it;
170 }
171
172 //----------------------------------------------------------------------------
173 // Initializer
174 //----------------------------------------------------------------------------
176 {
177 if( !pTransport || !pPoller || !pChannelData )
179
180 AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
181 pChannelData, 0, this );
182 pSubStreams.push_back( new SubStreamData() );
183 pSubStreams[0]->socket = s;
184 return XRootDStatus();
185 }
186
187 //------------------------------------------------------------------------
188 // Make sure that the underlying socket handler gets write readiness
189 // events
190 //------------------------------------------------------------------------
192 {
193 XrdSysMutexHelper scopedLock( pMutex );
194
195 //--------------------------------------------------------------------------
196 // We are in the process of connecting the main stream, so we do nothing
197 // because when the main stream connection is established it will connect
198 // all the other streams
199 //--------------------------------------------------------------------------
200 if( pSubStreams[0]->status == Socket::Connecting )
201 return XRootDStatus();
202
203 //--------------------------------------------------------------------------
204 // The main stream is connected, so we can verify whether we have
205 // the up and the down stream connected and ready to handle data.
206 // If anything is not right we fall back to stream 0.
207 //--------------------------------------------------------------------------
208 if( pSubStreams[0]->status == Socket::Connected )
209 {
210 if( pSubStreams[path.down]->status != Socket::Connected )
211 path.down = 0;
212
213 if( pSubStreams[path.up]->status == Socket::Disconnected )
214 {
215 path.up = 0;
216 return pSubStreams[0]->socket->EnableUplink();
217 }
218
219 if( pSubStreams[path.up]->status == Socket::Connected )
220 return pSubStreams[path.up]->socket->EnableUplink();
221
222 return XRootDStatus();
223 }
224
225 //--------------------------------------------------------------------------
226 // The main stream is not connected, we need to check whether enough time
227 // has passed since we last encountered an error (if any) so that we could
228 // re-attempt the connection
229 //--------------------------------------------------------------------------
230 Log *log = DefaultEnv::GetLog();
231 time_t now = ::time(0);
232
233 if( now-pLastStreamError < pStreamErrorWindow )
234 return pLastFatalError;
235
236 gettimeofday( &pConnectionStarted, 0 );
237 ++pConnectionCount;
238
239 //--------------------------------------------------------------------------
240 // Resolve all the addresses of the host we're supposed to connect to
241 //--------------------------------------------------------------------------
242 XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
243 if( !st.IsOK() )
244 {
245 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
246 "the host", pStreamName.c_str() );
247 pLastStreamError = now;
248 st.status = stFatal;
249 pLastFatalError = st;
250 return st;
251 }
252
253 if( pPrefer.IsValid() )
254 {
255 std::vector<XrdNetAddr> addrresses;
256 XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
257 if( !st.IsOK() )
258 {
259 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
260 pStreamName.c_str(), pPrefer.GetHostName().c_str() );
261 }
262 else
263 {
264 std::vector<XrdNetAddr> tmp;
265 tmp.reserve( pAddresses.size() );
266 // first add all remaining addresses
267 auto itr = pAddresses.begin();
268 for( ; itr != pAddresses.end() ; ++itr )
269 {
270 if( !HasNetAddr( *itr, addrresses ) )
271 tmp.push_back( *itr );
272 }
273 // then copy all 'preferred' addresses
274 std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
275 // and keep the result
276 pAddresses.swap( tmp );
277 }
278 }
279
280 Utils::LogHostAddresses( log, PostMasterMsg, pUrl->GetHostId(),
281 pAddresses );
282
283 while( !pAddresses.empty() )
284 {
285 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
286 pAddresses.pop_back();
287 pConnectionInitTime = ::time( 0 );
288 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
289 if( st.IsOK() )
290 {
291 pSubStreams[0]->status = Socket::Connecting;
292 break;
293 }
294 }
295 return st;
296 }
297
298 //----------------------------------------------------------------------------
299 // Queue the message for sending
300 //----------------------------------------------------------------------------
302 MsgHandler *handler,
303 bool stateful,
304 time_t expires )
305 {
306 XrdSysMutexHelper scopedLock( pMutex );
307 Log *log = DefaultEnv::GetLog();
308
309 //--------------------------------------------------------------------------
310 // Check the session ID and bounce if needed
311 //--------------------------------------------------------------------------
312 if( msg->GetSessionId() &&
313 (pSubStreams[0]->status != Socket::Connected ||
314 pSessionId != msg->GetSessionId()) )
316
317 //--------------------------------------------------------------------------
318 // Decide on the path to send the message
319 //--------------------------------------------------------------------------
320 PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
321 if( pSubStreams.size() <= path.up )
322 {
323 log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
324 "substream %d, using 0 instead", pStreamName.c_str(),
325 msg->GetObfuscatedDescription().c_str(), path.up );
326 path.up = 0;
327 }
328
329 log->Dump( PostMasterMsg, "[%s] Sending message %s (%p) through "
330 "substream %d expecting answer at %d", pStreamName.c_str(),
331 msg->GetObfuscatedDescription().c_str(), (void*)msg, path.up, path.down );
332
333 //--------------------------------------------------------------------------
334 // Enable *a* path and insert the message to the right queue
335 //--------------------------------------------------------------------------
336 XRootDStatus st = EnableLink( path );
337 if( st.IsOK() )
338 {
339 pTransport->MultiplexSubStream( msg, *pChannelData, &path );
340 pSubStreams[path.up]->outQueue->PushBack( msg, handler,
341 expires, stateful );
342 }
343 else
344 st.status = stFatal;
345 return st;
346 }
347
348 //----------------------------------------------------------------------------
349 // Force connection
350 //----------------------------------------------------------------------------
352 {
353 XrdSysMutexHelper scopedLock( pMutex );
354 if( pSubStreams[0]->status == Socket::Connecting )
355 {
356 pSubStreams[0]->status = Socket::Disconnected;
357 XrdCl::PathID path( 0, 0 );
358 XrdCl::XRootDStatus st = EnableLink( path );
359 if( !st.IsOK() )
360 OnConnectError( 0, st );
361 }
362 }
363
364 //----------------------------------------------------------------------------
365 // Disconnect the stream
366 //----------------------------------------------------------------------------
367 void Stream::Disconnect( bool /*force*/ )
368 {
369 //--------------------------------------------------------------------------
370 // See comment about deadlocks in ForceError() method. We don't expect
371 // to be called from a callback thread.
372 //--------------------------------------------------------------------------
373 XrdSysCondVarHelper discLock( pDiscCV );
374 while ( pDiscAllCnt )
375 {
376 pDiscCV.Wait();
377 }
378 ++pDiscAllCnt;
379 discLock.UnLock();
380
381 XrdSysMutexHelper scopedLock( pMutex );
382 SubStreamList::iterator it;
383 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
384 {
385 (*it)->socket->Close();
386 (*it)->status = Socket::Disconnected;
387 }
388 pSubsWaitingClose = 0;
389
390 scopedLock.UnLock();
391 discLock.Lock( &pDiscCV );
392 --pDiscAllCnt;
393 pDiscCV.Signal();
394 }
395
396 //----------------------------------------------------------------------------
397 // Handle a clock event
398 //----------------------------------------------------------------------------
399 void Stream::Tick( time_t now )
400 {
401 //--------------------------------------------------------------------------
402 // Check for timed-out requests and incoming handlers
403 //--------------------------------------------------------------------------
404 pMutex.Lock();
405 OutQueue q;
406 SubStreamList::iterator it;
407 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
408 q.GrabExpired( *(*it)->outQueue, now );
409 pMutex.UnLock();
410
412 pIncomingQueue->ReportTimeout( now );
413 }
414}
415
416//------------------------------------------------------------------------------
417// Handle message timeouts and reconnection in the future
418//------------------------------------------------------------------------------
419namespace
420{
421 class StreamConnectorTask: public XrdCl::Task
422 {
423 public:
424 //------------------------------------------------------------------------
425 // Constructor
426 //------------------------------------------------------------------------
427 StreamConnectorTask( const XrdCl::URL &url, const std::string &n ):
428 url( url )
429 {
430 std::string name = "StreamConnectorTask for ";
431 name += n;
432 SetName( name );
433 }
434
435 //------------------------------------------------------------------------
436 // Run the task
437 //------------------------------------------------------------------------
438 time_t Run( time_t )
439 {
441 return 0;
442 }
443
444 private:
445 XrdCl::URL url;
446 };
447}
448
449namespace XrdCl
450{
451 XRootDStatus Stream::RequestClose( Message &response )
452 {
453 ServerResponse *rsp = reinterpret_cast<ServerResponse*>( response.GetBuffer() );
454 if( rsp->hdr.dlen < 4 ) return XRootDStatus( stError );
455 Message *msg;
456 ClientCloseRequest *req;
457 MessageUtils::CreateRequest( msg, req );
458 req->requestid = kXR_close;
459 memcpy( req->fhandle, reinterpret_cast<uint8_t*>( rsp->body.buffer.data ), 4 );
461 msg->SetSessionId( pSessionId );
462 NullResponseHandler *handler = new NullResponseHandler();
463 MessageSendParams params;
464 params.timeout = 0;
465 params.followRedirects = false;
466 params.stateful = true;
468 return MessageUtils::SendMessage( *pUrl, msg, handler, params, 0 );
469 }
470
471 //------------------------------------------------------------------------
472 // Check if message is a partial response
473 //------------------------------------------------------------------------
474 bool Stream::IsPartial( Message &msg )
475 {
476 ServerResponseHeader *rsphdr = (ServerResponseHeader*)msg.GetBuffer();
477 if( rsphdr->status == kXR_oksofar )
478 return true;
479
480 if( rsphdr->status == kXR_status )
481 {
482 ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
484 return true;
485 }
486
487 return false;
488 }
489
490 //----------------------------------------------------------------------------
491 // Call back when a message has been reconstructed
492 //----------------------------------------------------------------------------
493 void Stream::OnIncoming( uint16_t subStream,
494 std::shared_ptr<Message> msg,
495 uint32_t bytesReceived )
496 {
497 msg->SetSessionId( pSessionId );
498 pBytesReceived += bytesReceived;
499
500 MsgHandler *handler = nullptr;
501 uint16_t action = 0;
502 {
503 InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
504 handler = mh.handler;
505 action = mh.action;
506 mh.Reset();
507 }
508
509 if( !IsPartial( *msg ) )
510 {
511 uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
512 *pChannelData );
513 if( streamAction & TransportHandler::DigestMsg )
514 return;
515
516 if( streamAction & TransportHandler::RequestClose )
517 {
518 RequestClose( *msg );
519 return;
520 }
521 }
522
523 Log *log = DefaultEnv::GetLog();
524
525 //--------------------------------------------------------------------------
526 // No handler, we discard the message ...
527 //--------------------------------------------------------------------------
528 if( !handler )
529 {
530 ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
531 log->Warning( PostMasterMsg, "[%s] Discarding received message: %p "
532 "(status=%d, SID=[%d,%d]), no MsgHandler found.",
533 pStreamName.c_str(), (void*)msg.get(), rsp->hdr.status,
534 rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
535 return;
536 }
537
538 //--------------------------------------------------------------------------
539 // We have a handler, so we call the callback
540 //--------------------------------------------------------------------------
541 log->Dump( PostMasterMsg, "[%s] Handling received message: %p.",
542 pStreamName.c_str(), (void*)msg.get() );
543
545 {
546 log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: %s.",
547 pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
548
549 // if we are handling partial response we have to take down the timeout fence
550 if( IsPartial( *msg ) )
551 {
552 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
553 if( xrdHandler ) xrdHandler->PartialReceived();
554 }
555
556 return;
557 }
558
559 Job *job = new HandleIncMsgJob( handler );
560 pJobManager->QueueJob( job );
561 }
562
563 //----------------------------------------------------------------------------
564 // Call when one of the sockets is ready to accept a new message
565 //----------------------------------------------------------------------------
566 std::pair<Message *, MsgHandler *>
567 Stream::OnReadyToWrite( uint16_t subStream )
568 {
569 XrdSysMutexHelper scopedLock( pMutex );
570 Log *log = DefaultEnv::GetLog();
571 if( pSubStreams[subStream]->outQueue->IsEmpty() )
572 {
573 log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
574 pSubStreams[subStream]->socket->GetStreamName().c_str() );
575
576 pSubStreams[subStream]->socket->DisableUplink();
577 return std::make_pair( (Message *)0, (MsgHandler *)0 );
578 }
579
580 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
581 h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
582 h.expires,
583 h.stateful );
584
585 log->Debug( PostMasterMsg, "[%s] Duplicating MsgHandler: %p (message: %s) "
586 "from out-queue to in-queue, starting to send outgoing.",
587 pUrl->GetHostId().c_str(), (void*)h.handler,
588 h.msg->GetObfuscatedDescription().c_str() );
589
590 scopedLock.UnLock();
591
592 if( h.handler )
593 {
594 bool rmMsg = false;
595 pIncomingQueue->AddMessageHandler( h.handler, rmMsg );
596 if( rmMsg )
597 {
598 Log *log = DefaultEnv::GetLog();
599 log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
600 pStreamName.c_str() );
601 }
602 h.handler->OnReadyToSend( h.msg );
603 }
604 return std::make_pair( h.msg, h.handler );
605 }
606
607 void Stream::DisableIfEmpty( uint16_t subStream )
608 {
609 XrdSysMutexHelper scopedLock( pMutex );
610 Log *log = DefaultEnv::GetLog();
611
612 if( pSubStreams[subStream]->outQueue->IsEmpty() )
613 {
614 log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
615 pSubStreams[subStream]->socket->GetStreamName().c_str() );
616 pSubStreams[subStream]->socket->DisableUplink();
617 }
618 }
619
620 //----------------------------------------------------------------------------
621 // Call when a message is written to the socket
622 //----------------------------------------------------------------------------
623 void Stream::OnMessageSent( uint16_t subStream,
624 Message *msg,
625 uint32_t bytesSent )
626 {
627 pTransport->MessageSent( msg, subStream, bytesSent,
628 *pChannelData );
629 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
630 pBytesSent += bytesSent;
631 if( h.handler )
632 {
633 // ensure expiration time is assigned if still in queue
634 pIncomingQueue->AssignTimeout( h.handler );
635 // OnStatusReady may cause the handler to delete itself, in
636 // which case the handler or the user callback may also delete msg
638 }
639 pSubStreams[subStream]->outMsgHelper.Reset();
640 }
641
642 //----------------------------------------------------------------------------
643 // Call back when a message has been reconstructed
644 //----------------------------------------------------------------------------
645 void Stream::OnConnect( uint16_t subStream )
646 {
647 XrdSysMutexHelper scopedLock( pMutex );
648 if( subStream == 0 )
649 {
650 int nsubconn = 0;
651 if( pSubStreams.size() > 1 )
652 {
653 for( size_t i = 1; i < pSubStreams.size(); ++i )
654 if( pSubStreams[i]->status != Socket::Disconnected ) nsubconn++;
655 }
656 if( nsubconn )
657 {
658 pSubsWaitingClose = nsubconn;
659 pSubStreams[0]->socket->DisableUplink();
660 return;
661 }
662 else
663 pSubStreams[0]->socket->EnableUplink();
664 }
665 else
666 {
667 if( pSubsWaitingClose > 0 )
668 {
669 pSubStreams[subStream]->socket->Close();
670 pSubStreams[subStream]->status = Socket::Disconnected;
671 if( --pSubsWaitingClose == 0 )
672 {
673 scopedLock.UnLock();
674 OnConnect( 0 );
675 }
676 return;
677 }
678 }
679
680 pSubStreams[subStream]->status = Socket::Connected;
681
682 std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
683 Log *log = DefaultEnv::GetLog();
684 log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
685 subStream, ipstack.c_str() );
686
687 if( subStream == 0 )
688 {
689 pLastStreamError = 0;
690 pLastFatalError = XRootDStatus();
691 pConnectionCount = 0;
692 uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
693 pSessionId = ++sSessCntGen;
694
695 //------------------------------------------------------------------------
696 // Create the streams if they don't exist yet
697 //------------------------------------------------------------------------
698 if( pSubStreams.size() == 1 && numSub > 1 )
699 {
700 for( uint16_t i = 1; i < numSub; ++i )
701 {
702 URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
703 AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
704 pChannelData, i, this );
705 pSubStreams.push_back( new SubStreamData() );
706 pSubStreams[i]->socket = s;
707 }
708 }
709
710 //------------------------------------------------------------------------
711 // Connect the extra streams, if we fail we move all the outgoing items
712 // to stream 0, we don't need to enable the uplink here, because it
713 // should be already enabled after the handshaking process is completed.
714 //------------------------------------------------------------------------
715 if( pSubStreams.size() > 1 )
716 {
717 log->Debug( PostMasterMsg, "[%s] Attempting to connect %zu additional streams.",
718 pStreamName.c_str(), pSubStreams.size() - 1 );
719 for( size_t i = 1; i < pSubStreams.size(); ++i )
720 {
721 pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
722 XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
723 if( !st.IsOK() )
724 {
725 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
726 // mark as disconnected. We don't try to actively Close here as
727 // we're in a poller callback thread and the i'th substream here
728 // may be hadled by a different poller callback thread, raising
729 // the possibility of deadlock.
730 pSubStreams[i]->status = Socket::Disconnected;
731 }
732 else
733 {
734 pSubStreams[i]->status = Socket::Connecting;
735 }
736 }
737 }
738
739 //------------------------------------------------------------------------
740 // Inform monitoring
741 //------------------------------------------------------------------------
742 pBytesSent = 0;
743 pBytesReceived = 0;
744 gettimeofday( &pConnectionDone, 0 );
746 if( mon )
747 {
749 i.server = pUrl->GetHostId();
750 i.sTOD = pConnectionStarted;
751 i.eTOD = pConnectionDone;
752 i.streams = pSubStreams.size();
753
754 AnyObject qryResult;
755 std::string *qryResponse = nullptr;
756 pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
757 qryResult.Get( qryResponse );
758
759 if (qryResponse) {
760 i.auth = *qryResponse;
761 delete qryResponse;
762 } else {
763 i.auth = "";
764 }
765
766 mon->Event( Monitor::EvConnect, &i );
767 }
768
769 //------------------------------------------------------------------------
770 // For every connected control-stream call the global on-connect handler
771 //------------------------------------------------------------------------
773 }
774 else if( pOnDataConnJob )
775 {
776 //------------------------------------------------------------------------
777 // For every connected data-stream call the on-connect handler
778 //------------------------------------------------------------------------
779 pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
780 }
781 }
782
783 //----------------------------------------------------------------------------
784 // On connect error
785 //----------------------------------------------------------------------------
786 void Stream::OnConnectError( uint16_t subStream, XRootDStatus status )
787 {
788 XrdSysMutexHelper scopedLock( pMutex );
789 Log *log = DefaultEnv::GetLog();
790 pSubStreams[subStream]->socket->Close();
791 time_t now = ::time(0);
792
793 //--------------------------------------------------------------------------
794 // For every connection error call the global connection error handler
795 //--------------------------------------------------------------------------
797
798 //--------------------------------------------------------------------------
799 // If we connected subStream == 0 and cannot connect >0 then we just give
800 // up and move the outgoing messages to another queue
801 //--------------------------------------------------------------------------
802 if( subStream > 0 )
803 {
804 const Socket::SocketStatus oldstate = pSubStreams[subStream]->status;
805 pSubStreams[subStream]->status = Socket::Disconnected;
806
807 if( pSubsWaitingClose > 0 && oldstate != Socket::Disconnected )
808 {
809 if( --pSubsWaitingClose == 0 )
810 {
811 scopedLock.UnLock();
812 OnConnect( 0 );
813 }
814 return;
815 }
816
817 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
818 if( pSubStreams[0]->status == Socket::Connected )
819 {
820 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
821 if( !st.IsOK() )
822 OnFatalError( 0, st, scopedLock );
823 return;
824 }
825
826 if( pSubStreams[0]->status == Socket::Connecting )
827 return;
828
829 OnFatalError( subStream, status, scopedLock );
830 return;
831 }
832
833 //--------------------------------------------------------------------------
834 // Check if we still have time to try and do something in the current window
835 //--------------------------------------------------------------------------
836 time_t elapsed = now-pConnectionInitTime;
837 log->Error( PostMasterMsg, "[%s] elapsed = %lld, pConnectionWindow = %d seconds.",
838 pStreamName.c_str(), (long long) elapsed, pConnectionWindow );
839
840 //------------------------------------------------------------------------
841 // If we have some IP addresses left we try them
842 //------------------------------------------------------------------------
843 if( !pAddresses.empty() )
844 {
845 XRootDStatus st;
846 do
847 {
848 pSubStreams[0]->socket->SetAddress( pAddresses.back() );
849 pAddresses.pop_back();
850 pConnectionInitTime = ::time( 0 );
851 st = pSubStreams[0]->socket->Connect( pConnectionWindow );
852 }
853 while( !pAddresses.empty() && !st.IsOK() );
854
855 if( !st.IsOK() )
856 OnFatalError( subStream, st, scopedLock );
857
858 return;
859 }
860 //------------------------------------------------------------------------
861 // If we still can retry with the same host name, we sleep until the end
862 // of the connection window and try
863 //------------------------------------------------------------------------
864 else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
865 && !status.IsFatal() )
866 {
867 log->Info( PostMasterMsg, "[%s] Attempting reconnection in %lld seconds.",
868 pStreamName.c_str(), (long long) (pConnectionWindow - elapsed) );
869
870 Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
871 pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
872 return;
873 }
874 //--------------------------------------------------------------------------
875 // We are out of the connection window, the only thing we can do here
876 // is re-resolving the host name and retrying if we still can
877 //--------------------------------------------------------------------------
878 else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
879 {
880 pAddresses.clear();
881 pSubStreams[0]->status = Socket::Disconnected;
882 PathID path( 0, 0 );
883 XRootDStatus st = EnableLink( path );
884 if( !st.IsOK() )
885 OnFatalError( subStream, st, scopedLock );
886 return;
887 }
888
889 //--------------------------------------------------------------------------
890 // Else, we fail
891 //--------------------------------------------------------------------------
892 OnFatalError( subStream, status, scopedLock );
893 }
894
895 //----------------------------------------------------------------------------
896 // Call back when an error has occurred
897 //----------------------------------------------------------------------------
898 void Stream::OnError( uint16_t subStream, XRootDStatus status )
899 {
900 //--------------------------------------------------------------------------
901 // See comment about deadlocks in ForceError() method. We expect to be
902 // called form a callback thread. However we take care to only potentially
903 // disconnect the socket for our own subStream. We require no ongoing
904 // disconnect of all substreams and ensure that remains true throughout
905 // our execution by releasing discLock only after acquiring pMutex.
906 //--------------------------------------------------------------------------
907
908 XrdSysCondVarHelper discLock( pDiscCV );
909 if( pDiscAllCnt ) return;
910
911 XrdSysMutexHelper scopedLock( pMutex );
912 discLock.UnLock();
913 Log *log = DefaultEnv::GetLog();
914 const Socket::SocketStatus oldstate = pSubStreams[subStream]->status;
915 pSubStreams[subStream]->socket->Close();
916 pSubStreams[subStream]->status = Socket::Disconnected;
917
918 log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
919 pStreamName.c_str(), subStream, status.ToString().c_str() );
920
921 //--------------------------------------------------------------------------
922 // Reinsert the stuff that we have failed to sent
923 //--------------------------------------------------------------------------
924 if( pSubStreams[subStream]->outMsgHelper.msg )
925 {
926 OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
927 pSubStreams[subStream]->outQueue->PushFront( h.msg, h.handler, h.expires,
928 h.stateful );
929 pIncomingQueue->RemoveMessageHandler(h.handler);
930 pSubStreams[subStream]->outMsgHelper.Reset();
931 }
932
933 //--------------------------------------------------------------------------
934 // Reinsert the receiving handler and reset any partially read partial
935 //--------------------------------------------------------------------------
936 if( pSubStreams[subStream]->inMsgHelper.handler )
937 {
938 InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
939 pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
940 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
941 if( xrdHandler ) xrdHandler->PartialReceived();
942 h.Reset();
943 }
944
945 //--------------------------------------------------------------------------
946 // We are dealing with an error of a peripheral stream. If we don't have
947 // anything to send don't bother recovering. Otherwise move the requests
948 // to stream 0 if possible.
949 //--------------------------------------------------------------------------
950 if( subStream > 0 )
951 {
952 if( pSubsWaitingClose > 0 && oldstate != Socket::Disconnected )
953 {
954 if( --pSubsWaitingClose == 0 )
955 {
956 scopedLock.UnLock();
957 OnConnect( 0 );
958 }
959 return;
960 }
961
962 if( pSubStreams[0]->status != Socket::Disconnected )
963 {
964 pSubStreams[subStream]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
965 XRootDStatus st = pSubStreams[subStream]->socket->Connect( pConnectionWindow );
966 if( !st.IsOK() )
967 {
968 pSubStreams[subStream]->socket->Close();
969 if( pSubStreams[subStream]->outQueue->IsEmpty() )
970 return;
971 pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
972 if( pSubStreams[0]->status == Socket::Connected )
973 {
974 XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
975 if( !st.IsOK() )
976 OnFatalError( 0, st, scopedLock );
977 return;
978 }
979 if( pSubStreams[0]->status == Socket::Connecting )
980 return;
981 }
982 else
983 {
984 pSubStreams[subStream]->status = Socket::Connecting;
985 return;
986 }
987 OnFatalError( subStream, status, scopedLock );
988 return;
989 }
990 if( pSubStreams[subStream]->outQueue->IsEmpty() )
991 return;
992 OnFatalError( subStream, status, scopedLock );
993 return;
994 }
995
996 //--------------------------------------------------------------------------
997 // If we lost the stream 0 we have lost the session, we re-enable the
998 // stream if we still have things in one of the outgoing queues, otherwise
999 // there is not point to recover at this point.
1000 //--------------------------------------------------------------------------
1001 if( subStream == 0 )
1002 {
1003 MonitorDisconnection( status );
1004
1005 SubStreamList::iterator it;
1006 size_t outstanding = 0;
1007 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1008 outstanding += (*it)->outQueue->GetSizeStateless();
1009
1010 if( outstanding )
1011 {
1012 PathID path( 0, 0 );
1013 XRootDStatus st = EnableLink( path );
1014 if( !st.IsOK() )
1015 {
1016 OnFatalError( 0, st, scopedLock );
1017 return;
1018 }
1019 }
1020
1021 //------------------------------------------------------------------------
1022 // We're done here, unlock the stream mutex to avoid deadlocks and
1023 // report the disconnection event to the handlers
1024 //------------------------------------------------------------------------
1025 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
1026 "message handlers.", pStreamName.c_str() );
1027 OutQueue q;
1028 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1029 q.GrabStateful( *(*it)->outQueue );
1030 scopedLock.UnLock();
1031
1032 q.Report( status );
1033 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
1034 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
1035 return;
1036 }
1037 }
1038
1039 //------------------------------------------------------------------------
1040 // Force error
1041 //------------------------------------------------------------------------
1042 void Stream::ForceError( XRootDStatus status, bool hush )
1043 {
1044 //----------------------------------------------------------------------
1045 // We can be called in two ways: first is by by a non-poller thread,
1046 // with errOperationInterrupted as error as part of ForceDisconnect. In
1047 // which case the Stream will be destoryed shortly after we return. The
1048 // second way is call by a poller thread with another type of error.
1049 // Further, when we call socket handler Close() for a socket handled a
1050 // callback running we wait for that to complete (unless it is
1051 // ourselves). This raises the possibility of a deadlock. We avoid this
1052 // by returning quickly if we detect we're in a callback thread and
1053 // there's already a disconnect affecting multiple streams in progress.
1054 //----------------------------------------------------------------------
1055 XrdSysCondVarHelper discLock( pDiscCV );
1056 if( pDiscAllCnt &&
1057 !( status.IsError() && status.code == errOperationInterrupted ) )
1058 {
1059 return;
1060 }
1061 while( pDiscAllCnt )
1062 {
1063 pDiscCV.Wait();
1064 }
1065 ++pDiscAllCnt;
1066 discLock.UnLock();
1067
1068 XrdSysMutexHelper scopedLock( pMutex );
1069 Log *log = DefaultEnv::GetLog();
1070 for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
1071 {
1072 if( pSubStreams[substream]->status != Socket::Connected ) continue;
1073 pSubStreams[substream]->socket->Close();
1074 pSubStreams[substream]->status = Socket::Disconnected;
1075
1076 if( !hush )
1077 log->Debug( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
1078 pStreamName.c_str(), status.ToString().c_str() );
1079
1080 //--------------------------------------------------------------------
1081 // Reinsert the stuff that we have failed to sent
1082 //--------------------------------------------------------------------
1083 if( pSubStreams[substream]->outMsgHelper.msg )
1084 {
1085 OutQueue::MsgHelper &h = pSubStreams[substream]->outMsgHelper;
1086 pSubStreams[substream]->outQueue->PushFront( h.msg, h.handler, h.expires,
1087 h.stateful );
1088 pIncomingQueue->RemoveMessageHandler(h.handler);
1089 pSubStreams[substream]->outMsgHelper.Reset();
1090 }
1091
1092 //--------------------------------------------------------------------
1093 // Reinsert the receiving handler and reset any partially read partial
1094 //--------------------------------------------------------------------
1095 if( pSubStreams[substream]->inMsgHelper.handler )
1096 {
1097 InMessageHelper &h = pSubStreams[substream]->inMsgHelper;
1098 pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
1099 XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
1100 if( xrdHandler ) xrdHandler->PartialReceived();
1101 h.Reset();
1102 }
1103 }
1104
1105 pConnectionCount = 0;
1106 pSubsWaitingClose = 0;
1107
1108 //------------------------------------------------------------------------
1109 // We're done here, unlock the stream mutex to avoid deadlocks and
1110 // report the disconnection event to the handlers
1111 //------------------------------------------------------------------------
1112 log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
1113 "message handlers.", pStreamName.c_str() );
1114
1115 SubStreamList::iterator it;
1116 OutQueue q;
1117 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1118 q.GrabItems( *(*it)->outQueue );
1119 scopedLock.UnLock();
1120
1121 q.Report( status );
1122
1123 pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
1124 pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
1125
1126 discLock.Lock( &pDiscCV );
1127 --pDiscAllCnt;
1128 pDiscCV.Signal();
1129 }
1130
1131 //----------------------------------------------------------------------------
1132 // On fatal error
1133 //----------------------------------------------------------------------------
1134 void Stream::OnFatalError( uint16_t subStream,
1135 XRootDStatus status,
1136 XrdSysMutexHelper &lock )
1137 {
1138 Log *log = DefaultEnv::GetLog();
1139 pSubStreams[subStream]->status = Socket::Disconnected;
1140 log->Error( PostMasterMsg, "[%s] Unable to recover: %s.",
1141 pStreamName.c_str(), status.ToString().c_str() );
1142
1143 //--------------------------------------------------------------------------
1144 // Don't set the stream error windows for authentication errors as the user
1145 // may refresh his credential at any time
1146 //--------------------------------------------------------------------------
1147 if( status.code != errAuthFailed )
1148 {
1149 pConnectionCount = 0;
1150 pLastStreamError = ::time(0);
1151 pLastFatalError = status;
1152 }
1153
1154 SubStreamList::iterator it;
1155 OutQueue q;
1156 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1157 q.GrabItems( *(*it)->outQueue );
1158 lock.UnLock();
1159
1160 status.status = stFatal;
1161 q.Report( status );
1162 pIncomingQueue->ReportStreamEvent( MsgHandler::FatalError, status );
1163 pChannelEvHandlers.ReportEvent( ChannelEventHandler::FatalError, status );
1164
1165 }
1166
1167 //----------------------------------------------------------------------------
1168 // Inform monitoring about disconnection
1169 //----------------------------------------------------------------------------
1170 void Stream::MonitorDisconnection( XRootDStatus status )
1171 {
1172 Monitor *mon = DefaultEnv::GetMonitor();
1173 if( mon )
1174 {
1175 Monitor::DisconnectInfo i;
1176 i.server = pUrl->GetHostId();
1177 i.rBytes = pBytesReceived;
1178 i.sBytes = pBytesSent;
1179 i.cTime = ::time(0) - pConnectionDone.tv_sec;
1180 i.status = status;
1181 mon->Event( Monitor::EvDisconnect, &i );
1182 }
1183 }
1184
1185 //----------------------------------------------------------------------------
1186 // Call back when a message has been reconstructed
1187 //----------------------------------------------------------------------------
1188 bool Stream::OnReadTimeout( uint16_t substream )
1189 {
1190 //--------------------------------------------------------------------------
1191 // We only take the main stream into account
1192 //--------------------------------------------------------------------------
1193 if( substream != 0 )
1194 {
1195 if( pSubsWaitingClose )
1196 {
1197 XrdSysMutexHelper scopedLock( pMutex );
1198 if( !pSubsWaitingClose ) return true;
1199 if( pSubStreams[substream]->status == Socket::Disconnected ) return true;
1200 pSubStreams[substream]->socket->Close();
1201 pSubStreams[substream]->status = Socket::Disconnected;
1202 if( --pSubsWaitingClose == 0 )
1203 {
1204 scopedLock.UnLock();
1205 OnConnect( 0 );
1206 }
1207 }
1208 return true;
1209 }
1210
1211 //--------------------------------------------------------------------------
1212 // Check if there is no outgoing messages and if the stream TTL is elapesed.
1213 // It is assumed that the underlying transport makes sure that there is no
1214 // pending requests that are not answered, ie. all possible virtual streams
1215 // are de-allocated
1216 //--------------------------------------------------------------------------
1217 Log *log = DefaultEnv::GetLog();
1218 SubStreamList::iterator it;
1219 time_t now = time(0);
1220
1221 XrdSysMutexHelper scopedLock( pMutex );
1222 uint32_t outgoingMessages = 0;
1223 time_t lastActivity = 0;
1224 for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1225 {
1226 outgoingMessages += (*it)->outQueue->GetSize();
1227 time_t sockLastActivity = (*it)->socket->GetLastActivity();
1228 if( lastActivity < sockLastActivity )
1229 lastActivity = sockLastActivity;
1230 }
1231
1232 if( !outgoingMessages )
1233 {
1234 bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1235 *pChannelData );
1236 if( disconnect )
1237 {
1238 log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1239 pStreamName.c_str() );
1240 //----------------------------------------------------------------------
1241 // Important note!
1242 //
1243 // This job destroys the Stream object itself, the underlying
1244 // AsyncSocketHandler object (that called this method) and the Channel
1245 // object that aggregates this Stream.
1246 //
1247 // Additionally &(*pUrl) is used by ForceDisconnect to check if we are
1248 // in a Channel that was previously collapsed in a redirect.
1249 //----------------------------------------------------------------------
1250 if( !pTTLDiscJob )
1251 {
1252 pTTLDiscJob = new ForceDisconnectJob( pUrl );
1253 pJobManager->QueueJob( pTTLDiscJob );
1254 }
1255 return false;
1256 }
1257 }
1258
1259 //--------------------------------------------------------------------------
1260 // Check if the stream is broken
1261 //--------------------------------------------------------------------------
1262 XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1263 *pChannelData );
1264 if( !st.IsOK() )
1265 {
1266 scopedLock.UnLock();
1267 OnError( substream, st );
1268 return false;
1269 }
1270 return true;
1271 }
1272
1273 //----------------------------------------------------------------------------
1274 // Call back when a message has been reconstru
1275 //----------------------------------------------------------------------------
1276 bool Stream::OnWriteTimeout( uint16_t /*substream*/ )
1277 {
1278 return true;
1279 }
1280
1281 //----------------------------------------------------------------------------
1282 // Register channel event handler
1283 //----------------------------------------------------------------------------
1285 {
1286 pChannelEvHandlers.AddHandler( handler );
1287 }
1288
1289 //----------------------------------------------------------------------------
1290 // Remove a channel event handler
1291 //----------------------------------------------------------------------------
1293 {
1294 pChannelEvHandlers.RemoveHandler( handler );
1295 }
1296
1297 //----------------------------------------------------------------------------
1298 // Install a incoming message handler
1299 //----------------------------------------------------------------------------
1300 MsgHandler*
1301 Stream::InstallIncHandler( std::shared_ptr<Message> &msg, uint16_t stream )
1302 {
1303 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1304 if( !mh.handler )
1305 mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1306 mh.expires,
1307 mh.action );
1308
1309 if( !mh.handler )
1310 return nullptr;
1311
1312 if( mh.action & MsgHandler::Raw )
1313 return mh.handler;
1314 return nullptr;
1315 }
1316
1317 //----------------------------------------------------------------------------
1321 //----------------------------------------------------------------------------
1322 uint16_t Stream::InspectStatusRsp( uint16_t stream,
1323 MsgHandler *&incHandler )
1324 {
1325 InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1326 if( !mh.handler )
1328
1329 uint16_t action = mh.handler->InspectStatusRsp();
1330 mh.action |= action;
1331
1332 if( action & MsgHandler::RemoveHandler )
1333 pIncomingQueue->RemoveMessageHandler( mh.handler );
1334
1335 if( action & MsgHandler::Raw )
1336 {
1337 incHandler = mh.handler;
1338 return MsgHandler::Raw;
1339 }
1340
1341 if( action & MsgHandler::Corrupted )
1342 return MsgHandler::Corrupted;
1343
1344 if( action & MsgHandler::More )
1345 return MsgHandler::More;
1346
1347 return MsgHandler::None;
1348 }
1349
1350 //----------------------------------------------------------------------------
1351 // Check if channel can be collapsed using given URL
1352 //----------------------------------------------------------------------------
1353 bool Stream::CanCollapse( const URL &url )
1354 {
1355 Log *log = DefaultEnv::GetLog();
1356
1357 //--------------------------------------------------------------------------
1358 // Resolve all the addresses of the host we're supposed to connect to
1359 //--------------------------------------------------------------------------
1360 std::vector<XrdNetAddr> prefaddrs;
1361 XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1362 if( !st.IsOK() )
1363 {
1364 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1365 , pStreamName.c_str(), url.GetHostName().c_str() );
1366 return false;
1367 }
1368
1369 //--------------------------------------------------------------------------
1370 // Resolve all the addresses of the alias
1371 //--------------------------------------------------------------------------
1372 std::vector<XrdNetAddr> aliasaddrs;
1373 st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1374 if( !st.IsOK() )
1375 {
1376 log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1377 , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1378 return false;
1379 }
1380
1381 //--------------------------------------------------------------------------
1382 // Now check if the preferred host is part of the alias
1383 //--------------------------------------------------------------------------
1384 auto itr = prefaddrs.begin();
1385 for( ; itr != prefaddrs.end() ; ++itr )
1386 {
1387 auto itr2 = aliasaddrs.begin();
1388 for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1389 if( itr->Same( &*itr2 ) ) return true;
1390 }
1391
1392 return false;
1393 }
1394
1395 //------------------------------------------------------------------------
1396 // Query the stream
1397 //------------------------------------------------------------------------
1398 Status Stream::Query( uint16_t query, AnyObject &result )
1399 {
1400 switch( query )
1401 {
1403 {
1404 result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1405 return Status();
1406 }
1407
1409 {
1410 result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1411 return Status();
1412 }
1413
1415 {
1416 result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1417 return Status();
1418 }
1419
1420 default:
1422 }
1423 }
1424
1425}
union ServerResponse::@040373375333017131300127053271011057331004327334 body
kXR_char streamid[2]
Definition XProtocol.hh:914
kXR_unt16 requestid
Definition XProtocol.hh:228
@ kXR_oksofar
Definition XProtocol.hh:900
@ kXR_status
Definition XProtocol.hh:907
struct ServerResponseBody_Status bdy
kXR_char fhandle[4]
Definition XProtocol.hh:229
@ kXR_close
Definition XProtocol.hh:115
ServerResponseHeader hdr
XrdSys::RAtomic< uint64_t > RAtomic_uint64_t
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
@ FatalError
Stream has been broken and won't be recovered.
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
Interface for a job to be run by the job manager.
Handle diagnostics.
Definition XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition XrdClLog.cc:282
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
An abstract class to describe the client-side monitoring plugin interface.
@ EvDisconnect
DisconnectInfo: Logout from a server.
@ EvConnect
ConnectInfo: Login into a server.
virtual void Event(EventCode evCode, void *evData)=0
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
virtual void OnReadyToSend(Message *msg)
@ FatalError
Stream has been broken and won't be recovered.
@ Broken
The stream is broken.
virtual uint16_t InspectStatusRsp()=0
virtual void OnStatusReady(const Message *message, XRootDStatus status)=0
The requested action has been performed and the status is available.
A synchronized queue for the outgoing data.
void GrabStateful(OutQueue &queue)
void GrabExpired(OutQueue &queue, time_t exp=0)
void GrabItems(OutQueue &queue)
void Report(XRootDStatus status)
Report status to all the handlers.
Status ForceReconnect(const URL &url)
Reconnect the channel.
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
A network socket.
SocketStatus
Status of the socket.
@ Disconnected
The socket is disconnected.
@ Connected
The socket is connected.
@ Connecting
The connection process is in progress.
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
bool CanCollapse(const URL &url)
void ForceConnect()
Force connection.
Stream(std::shared_ptr< URL > url, const URL &prefer=URL())
Constructor.
void ForceError(XRootDStatus status, bool hush=false)
Force error.
Status Query(uint16_t query, AnyObject &result)
Query the stream.
void Disconnect(bool force=false)
Disconnect the stream.
XRootDStatus EnableLink(PathID &path)
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
void Tick(time_t now)
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
~Stream()
Destructor.
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
void OnError(uint16_t subStream, XRootDStatus status)
On error.
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
XRootDStatus Initialize()
Initializer.
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
Interface for a task to be run by the TaskManager.
@ RequestClose
Send a close request.
URL representation.
Definition XrdClURL.hh:31
const std::string & GetHostName() const
Get the name of the target host.
Definition XrdClURL.hh:170
Random utilities.
Definition XrdClUtils.hh:50
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition XrdClUtils.cc:81
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
Handle/Process/Forward XRootD messages.
static void SetDescription(Message *msg)
Get the description of a message.
@ qryINIF
Only consider internet protocols via ifconfig.
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
void Lock(XrdSysCondVar *CndVar)
const uint16_t errQueryNotSupported
const uint16_t errUninitialized
const uint16_t errOperationExpired
const uint16_t stFatal
Fatal error, it's still an error.
const uint16_t stError
An error occurred that could potentially be retried.
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
const uint16_t errOperationInterrupted
const uint16_t errInvalidSession
const uint16_t errAuthFailed
@ kXR_PartialResult
InMessageHelper(Message *message=0, MsgHandler *hndlr=0, time_t expir=0, uint16_t actio=0)
Describe a server login event.
std::string server
"user@host:port"
uint16_t streams
Number of streams.
timeval sTOD
gettimeofday() when login started
timeval eTOD
gettimeofday() when login ended
std::string auth
authentication protocol used or empty if none
Procedure execution status.
uint16_t code
Error type, or additional hints on what to do.
uint16_t status
Status of the execution.
bool IsOK() const
We're fine.
bool IsFatal() const
Fatal error.
std::string ToString() const
Create a string representation.
bool IsError() const
Error.
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack
InMessageHelper inMsgHelper
AsyncSocketHandler * socket
OutQueue::MsgHelper outMsgHelper
Socket::SocketStatus status
static const uint16_t Auth
Transport name, returns std::string *.