xrootd
XrdClientPhyConnection.hh
Go to the documentation of this file.
00001 
00002 //                                                                      //
00003 // XrdClientPhyConnection                                               //
00004 // Author: Fabrizio Furano (INFN Padova, 2004)                          //
00005 // Adapted from TXNetFile (root.cern.ch) originally done by             //
00006 //  Alvise Dorigo, Fabrizio Furano                                      //
00007 //          INFN Padova, 2003                                           //
00008 //                                                                      //
00009 // Class handling physical connections to xrootd servers                //
00010 //                                                                      //
00012 
00013 //       $Id$
00014 
00015 #ifndef _XrdClientPhyConnection
00016 #define _XrdClientPhyConnection
00017 
00018 #include "XrdClient/XrdClientPSock.hh"
00019 #include "XrdClient/XrdClientMessage.hh"
00020 #include "XrdClient/XrdClientUnsolMsg.hh"
00021 #include "XrdClient/XrdClientInputBuffer.hh"
00022 #include "XrdClient/XrdClientUrlInfo.hh"
00023 #include "XrdClient/XrdClientThread.hh"
00024 #include "XrdSys/XrdSysPthread.hh"
00025 #include "XrdSys/XrdSysSemWait.hh"
00026 
00027 #include <time.h> // for time_t data type
00028 
00029 enum ELoginState {
00030     kNo      = 0,
00031     kYes     = 1, 
00032     kPending = 2
00033 };
00034 
00035 enum ERemoteServerType {
00036     kSTError      = -1,  // Some error occurred: server type undetermined 
00037     kSTNone       = 0,   // Remote server type un-recognized
00038     kSTRootd      = 1,   // Remote server type: old rootd server
00039     kSTBaseXrootd = 2,   // Remote server type: xrootd dynamic load balancer
00040     kSTDataXrootd = 3,   // Remote server type: xrootd data server
00041     kSTMetaXrootd = 4    // Remote server type: xrootd meta manager
00042 }; 
00043 
00044 class XrdClientSid;
00045 class XrdSecProtocol;
00046 
00047 class XrdClientPhyConnection: public XrdClientUnsolMsgSender {
00048 
00049 private:
00050     time_t              fLastUseTimestamp;
00051     enum ELoginState    fLogged;       // only 1 login/auth is needed for physical  
00052     XrdSecProtocol     *fSecProtocol;  // authentication protocol
00053 
00054     XrdClientInputBuffer
00055     fMsgQ;         // The queue used to hold incoming messages
00056 
00057     int                 fRequestTimeout;
00058     bool                fMStreamsGoing;
00059     XrdSysRecMutex         fRwMutex;     // Lock before using the physical channel 
00060     // (for reading and/or writing)
00061 
00062     XrdSysRecMutex         fMutex;
00063     XrdSysRecMutex         fMultireadMutex; // Used to arbitrate between multiple
00064                                             // threads reading msgs from the same conn
00065 
00066     XrdClientThread     *fReaderthreadhandler[64]; // The thread which is going to pump
00067     // out the data from the socket
00068 
00069     int                  fReaderthreadrunning;
00070 
00071     XrdClientUrlInfo          fServer;
00072 
00073     XrdClientSock       *fSocket;
00074 
00075     UnsolRespProcResult HandleUnsolicited(XrdClientMessage *m);
00076 
00077     XrdSysSemWait       fReaderCV;
00078 
00079     short               fLogConnCnt; // Number of logical connections using this phyconn
00080 
00081     XrdClientSid       *fSidManager;
00082 
00083 public:
00084     long                fServerProto;        // The server protocol
00085     ERemoteServerType   fServerType;
00086     long                fTTLsec;
00087 
00088     XrdClientPhyConnection(XrdClientAbsUnsolMsgHandler *h, XrdClientSid *sid);
00089     ~XrdClientPhyConnection();
00090 
00091     XrdClientMessage     *BuildMessage(bool IgnoreTimeouts, bool Enqueue);
00092     bool                  CheckAutoTerm();
00093 
00094     bool           Connect(XrdClientUrlInfo RemoteHost, bool isUnix = 0);
00095 
00096     //--------------------------------------------------------------------------
00104     //--------------------------------------------------------------------------
00105     bool Connect( XrdClientUrlInfo RemoteHost, bool isUnix , int fd );
00106 
00107     void           CountLogConn(int d = 1);
00108     void           Disconnect();
00109 
00110     ERemoteServerType
00111     DoHandShake(ServerInitHandShake &xbody,
00112                 int substreamid = 0);
00113 
00114     bool           ExpiredTTL();
00115     short          GetLogConnCnt() const { return fLogConnCnt; }
00116     int            GetReaderThreadsCnt() { XrdSysMutexHelper l(fMutex); return fReaderthreadrunning; }
00117 
00118     long           GetTTL() { return fTTLsec; }
00119 
00120     XrdSecProtocol *GetSecProtocol() const { return fSecProtocol; }
00121     int            GetSocket() { return fSocket ? fSocket->fSocket : -1; }
00122 
00123     // Tells to the sock to rebuild the list of interesting selectors
00124     void           ReinitFDTable() { if (fSocket) fSocket->ReinitFDTable(); }
00125 
00126     int            SaveSocket() { fTTLsec = 0; return fSocket ? (fSocket->SaveSocket()) : -1; }
00127     void           SetInterrupt() { if (fSocket) fSocket->SetInterrupt(); }
00128     void           SetSecProtocol(XrdSecProtocol *sp) { fSecProtocol = sp; }
00129 
00130     void           StartedReader();
00131 
00132     bool           IsAddress(const XrdOucString &addr) {
00133         return ( (fServer.Host == addr) ||
00134                  (fServer.HostAddr == addr) );
00135     }
00136 
00137     ELoginState    IsLogged();
00138 
00139     bool           IsPort(int port) { return (fServer.Port == port); };
00140     bool           IsUser(const XrdOucString &usr) { return (fServer.User == usr); };
00141     bool           IsValid();
00142     
00143 
00144     void           LockChannel();
00145 
00146     // see XrdClientSock for the meaning of the parameters
00147     int            ReadRaw(void *buffer, int BufferLength, int substreamid = -1,
00148                            int *usedsubstreamid = 0);
00149 
00150     XrdClientMessage     *ReadMessage(int streamid);
00151     bool           ReConnect(XrdClientUrlInfo RemoteHost);
00152     void           SetLogged(ELoginState status) { fLogged = status; }
00153     inline void    SetTTL(long ttl) { fTTLsec = ttl; }
00154     void           StartReader();
00155     void           Touch();
00156     void           UnlockChannel();
00157     int            WriteRaw(const void *buffer, int BufferLength, int substreamid = 0);
00158 
00159     int TryConnectParallelStream(int port, int windowsz, int sockid) { return ( fSocket ? fSocket->TryConnectParallelSock(port, windowsz, sockid) : -1); }
00160     int EstablishPendingParallelStream(int tmpid, int newid) { return ( fSocket ? fSocket->EstablishParallelSock(tmpid, newid) : -1); }
00161     void RemoveParallelStream(int substreamid) { if (fSocket) fSocket->RemoveParallelSock(substreamid); }
00162     // Tells if the attempt to establish the parallel streams is ongoing or was done
00163     //  and mark it as ongoing or done
00164     bool TestAndSetMStreamsGoing();
00165 
00166     int GetSockIdHint(int reqsperstream) { return ( fSocket ? fSocket->GetSockIdHint(reqsperstream) : 0); }
00167     int GetSockIdCount() {return ( fSocket ? fSocket->GetSockIdCount() : 0); }
00168     void PauseSelectOnSubstream(int substreamid) { if (fSocket) fSocket->PauseSelectOnSubstream(substreamid); }
00169     void RestartSelectOnSubstream(int substreamid) { if (fSocket) fSocket->RestartSelectOnSubstream(substreamid); }
00170 
00171     // To prohibit/re-enable a socket descriptor from being looked at by the reader threads
00172     virtual void BanSockDescr(int sockdescr, int sockid) { if (fSocket) fSocket->BanSockDescr(sockdescr, sockid); }
00173     virtual void UnBanSockDescr(int sockdescr) { if (fSocket) fSocket->UnBanSockDescr(sockdescr); }
00174 
00175     void ReadLock() { fMultireadMutex.Lock(); }
00176     void ReadUnLock() { fMultireadMutex.UnLock(); }
00177 
00178    int WipeStreamid(int streamid) { return fMsgQ.WipeStreamid(streamid); }
00179 };
00180 
00181 
00182 
00183 
00184 //
00185 // Class implementing a trick to automatically unlock an XrdClientPhyConnection
00186 //
00187 class XrdClientPhyConnLocker {
00188 private:
00189     XrdClientPhyConnection *phyconn;
00190 
00191 public:
00192     XrdClientPhyConnLocker(XrdClientPhyConnection *phyc) {
00193         // Constructor
00194         phyconn = phyc;
00195         phyconn->LockChannel();
00196     }
00197 
00198     ~XrdClientPhyConnLocker(){
00199         // Destructor. 
00200         phyconn->UnlockChannel();
00201     }
00202 
00203 };
00204 
00205 
00206 #endif