|
xrootd
|
00001 00002 // // 00003 // XrdClientPhyConnection // 00004 // Author: Fabrizio Furano (INFN Padova, 2004) // 00005 // Adapted from TXNetFile (root.cern.ch) originally done by // 00006 // Alvise Dorigo, Fabrizio Furano // 00007 // INFN Padova, 2003 // 00008 // // 00009 // Class handling physical connections to xrootd servers // 00010 // // 00012 00013 // $Id$ 00014 00015 #ifndef _XrdClientPhyConnection 00016 #define _XrdClientPhyConnection 00017 00018 #include "XrdClient/XrdClientPSock.hh" 00019 #include "XrdClient/XrdClientMessage.hh" 00020 #include "XrdClient/XrdClientUnsolMsg.hh" 00021 #include "XrdClient/XrdClientInputBuffer.hh" 00022 #include "XrdClient/XrdClientUrlInfo.hh" 00023 #include "XrdClient/XrdClientThread.hh" 00024 #include "XrdSys/XrdSysPthread.hh" 00025 #include "XrdSys/XrdSysSemWait.hh" 00026 00027 #include <time.h> // for time_t data type 00028 00029 enum ELoginState { 00030 kNo = 0, 00031 kYes = 1, 00032 kPending = 2 00033 }; 00034 00035 enum ERemoteServerType { 00036 kSTError = -1, // Some error occurred: server type undetermined 00037 kSTNone = 0, // Remote server type un-recognized 00038 kSTRootd = 1, // Remote server type: old rootd server 00039 kSTBaseXrootd = 2, // Remote server type: xrootd dynamic load balancer 00040 kSTDataXrootd = 3, // Remote server type: xrootd data server 00041 kSTMetaXrootd = 4 // Remote server type: xrootd meta manager 00042 }; 00043 00044 class XrdClientSid; 00045 class XrdSecProtocol; 00046 00047 class XrdClientPhyConnection: public XrdClientUnsolMsgSender { 00048 00049 private: 00050 time_t fLastUseTimestamp; 00051 enum ELoginState fLogged; // only 1 login/auth is needed for physical 00052 XrdSecProtocol *fSecProtocol; // authentication protocol 00053 00054 XrdClientInputBuffer 00055 fMsgQ; // The queue used to hold incoming messages 00056 00057 int fRequestTimeout; 00058 bool fMStreamsGoing; 00059 XrdSysRecMutex fRwMutex; // Lock before using the physical channel 00060 // (for reading and/or writing) 00061 00062 XrdSysRecMutex fMutex; 00063 XrdSysRecMutex fMultireadMutex; // Used to arbitrate between multiple 00064 // threads reading msgs from the same conn 00065 00066 XrdClientThread *fReaderthreadhandler[64]; // The thread which is going to pump 00067 // out the data from the socket 00068 00069 int fReaderthreadrunning; 00070 00071 XrdClientUrlInfo fServer; 00072 00073 XrdClientSock *fSocket; 00074 00075 UnsolRespProcResult HandleUnsolicited(XrdClientMessage *m); 00076 00077 XrdSysSemWait fReaderCV; 00078 00079 short fLogConnCnt; // Number of logical connections using this phyconn 00080 00081 XrdClientSid *fSidManager; 00082 00083 public: 00084 long fServerProto; // The server protocol 00085 ERemoteServerType fServerType; 00086 long fTTLsec; 00087 00088 XrdClientPhyConnection(XrdClientAbsUnsolMsgHandler *h, XrdClientSid *sid); 00089 ~XrdClientPhyConnection(); 00090 00091 XrdClientMessage *BuildMessage(bool IgnoreTimeouts, bool Enqueue); 00092 bool CheckAutoTerm(); 00093 00094 bool Connect(XrdClientUrlInfo RemoteHost, bool isUnix = 0); 00095 00096 //-------------------------------------------------------------------------- 00104 //-------------------------------------------------------------------------- 00105 bool Connect( XrdClientUrlInfo RemoteHost, bool isUnix , int fd ); 00106 00107 void CountLogConn(int d = 1); 00108 void Disconnect(); 00109 00110 ERemoteServerType 00111 DoHandShake(ServerInitHandShake &xbody, 00112 int substreamid = 0); 00113 00114 bool ExpiredTTL(); 00115 short GetLogConnCnt() const { return fLogConnCnt; } 00116 int GetReaderThreadsCnt() { XrdSysMutexHelper l(fMutex); return fReaderthreadrunning; } 00117 00118 long GetTTL() { return fTTLsec; } 00119 00120 XrdSecProtocol *GetSecProtocol() const { return fSecProtocol; } 00121 int GetSocket() { return fSocket ? fSocket->fSocket : -1; } 00122 00123 // Tells to the sock to rebuild the list of interesting selectors 00124 void ReinitFDTable() { if (fSocket) fSocket->ReinitFDTable(); } 00125 00126 int SaveSocket() { fTTLsec = 0; return fSocket ? (fSocket->SaveSocket()) : -1; } 00127 void SetInterrupt() { if (fSocket) fSocket->SetInterrupt(); } 00128 void SetSecProtocol(XrdSecProtocol *sp) { fSecProtocol = sp; } 00129 00130 void StartedReader(); 00131 00132 bool IsAddress(const XrdOucString &addr) { 00133 return ( (fServer.Host == addr) || 00134 (fServer.HostAddr == addr) ); 00135 } 00136 00137 ELoginState IsLogged(); 00138 00139 bool IsPort(int port) { return (fServer.Port == port); }; 00140 bool IsUser(const XrdOucString &usr) { return (fServer.User == usr); }; 00141 bool IsValid(); 00142 00143 00144 void LockChannel(); 00145 00146 // see XrdClientSock for the meaning of the parameters 00147 int ReadRaw(void *buffer, int BufferLength, int substreamid = -1, 00148 int *usedsubstreamid = 0); 00149 00150 XrdClientMessage *ReadMessage(int streamid); 00151 bool ReConnect(XrdClientUrlInfo RemoteHost); 00152 void SetLogged(ELoginState status) { fLogged = status; } 00153 inline void SetTTL(long ttl) { fTTLsec = ttl; } 00154 void StartReader(); 00155 void Touch(); 00156 void UnlockChannel(); 00157 int WriteRaw(const void *buffer, int BufferLength, int substreamid = 0); 00158 00159 int TryConnectParallelStream(int port, int windowsz, int sockid) { return ( fSocket ? fSocket->TryConnectParallelSock(port, windowsz, sockid) : -1); } 00160 int EstablishPendingParallelStream(int tmpid, int newid) { return ( fSocket ? fSocket->EstablishParallelSock(tmpid, newid) : -1); } 00161 void RemoveParallelStream(int substreamid) { if (fSocket) fSocket->RemoveParallelSock(substreamid); } 00162 // Tells if the attempt to establish the parallel streams is ongoing or was done 00163 // and mark it as ongoing or done 00164 bool TestAndSetMStreamsGoing(); 00165 00166 int GetSockIdHint(int reqsperstream) { return ( fSocket ? fSocket->GetSockIdHint(reqsperstream) : 0); } 00167 int GetSockIdCount() {return ( fSocket ? fSocket->GetSockIdCount() : 0); } 00168 void PauseSelectOnSubstream(int substreamid) { if (fSocket) fSocket->PauseSelectOnSubstream(substreamid); } 00169 void RestartSelectOnSubstream(int substreamid) { if (fSocket) fSocket->RestartSelectOnSubstream(substreamid); } 00170 00171 // To prohibit/re-enable a socket descriptor from being looked at by the reader threads 00172 virtual void BanSockDescr(int sockdescr, int sockid) { if (fSocket) fSocket->BanSockDescr(sockdescr, sockid); } 00173 virtual void UnBanSockDescr(int sockdescr) { if (fSocket) fSocket->UnBanSockDescr(sockdescr); } 00174 00175 void ReadLock() { fMultireadMutex.Lock(); } 00176 void ReadUnLock() { fMultireadMutex.UnLock(); } 00177 00178 int WipeStreamid(int streamid) { return fMsgQ.WipeStreamid(streamid); } 00179 }; 00180 00181 00182 00183 00184 // 00185 // Class implementing a trick to automatically unlock an XrdClientPhyConnection 00186 // 00187 class XrdClientPhyConnLocker { 00188 private: 00189 XrdClientPhyConnection *phyconn; 00190 00191 public: 00192 XrdClientPhyConnLocker(XrdClientPhyConnection *phyc) { 00193 // Constructor 00194 phyconn = phyc; 00195 phyconn->LockChannel(); 00196 } 00197 00198 ~XrdClientPhyConnLocker(){ 00199 // Destructor. 00200 phyconn->UnlockChannel(); 00201 } 00202 00203 }; 00204 00205 00206 #endif
1.7.5