|
xrootd
|
00001 00002 // // 00003 // XrdClientConn // 00004 // // 00005 // Author: Fabrizio Furano (INFN Padova, 2004) // 00006 // Adapted from TXNetFile (root.cern.ch) originally done by // 00007 // Alvise Dorigo, Fabrizio Furano // 00008 // INFN Padova, 2003 // 00009 // // 00010 // High level handler of connections to xrootd. // 00011 // // 00013 00014 #ifndef XRD_CONN_H 00015 #define XRD_CONN_H 00016 00017 00018 #include "XrdClient/XrdClientConst.hh" 00019 00020 #include "time.h" 00021 #include "XrdClient/XrdClientConnMgr.hh" 00022 #include "XrdClient/XrdClientMessage.hh" 00023 #include "XrdClient/XrdClientUrlInfo.hh" 00024 #include "XrdClient/XrdClientReadCache.hh" 00025 #include "XrdOuc/XrdOucHash.hh" 00026 #include "XrdSys/XrdSysPthread.hh" 00027 00028 #define ConnectionManager XrdClientConn::GetConnectionMgr() 00029 00030 class XrdClientAbs; 00031 class XrdSecProtocol; 00032 00033 class XrdClientConn { 00034 00035 public: 00036 00037 enum ESrvErrorHandlerRetval { 00038 kSEHRReturnMsgToCaller = 0, 00039 kSEHRBreakLoop = 1, 00040 kSEHRContinue = 2, 00041 kSEHRReturnNoMsgToCaller = 3, 00042 kSEHRRedirLimitReached = 4 00043 }; 00044 enum EThreeStateReadHandler { 00045 kTSRHReturnMex = 0, 00046 kTSRHReturnNullMex = 1, 00047 kTSRHContinue = 2 00048 }; 00049 00050 // To keep info about an open session 00051 struct SessionIDInfo { 00052 char id[16]; 00053 }; 00054 00055 int fLastDataBytesRecv; 00056 int fLastDataBytesSent; 00057 XErrorCode fOpenError; 00058 00059 XrdOucString fRedirOpaque; // Opaque info returned by the server when 00060 00061 // redirecting. To be used in the next opens 00062 XrdClientConn(); 00063 virtual ~XrdClientConn(); 00064 00065 inline bool CacheWillFit(long long bytes) { 00066 if (!fMainReadCache) 00067 return FALSE; 00068 return fMainReadCache->WillFit(bytes); 00069 } 00070 00071 bool CheckHostDomain(XrdOucString hostToCheck); 00072 short Connect(XrdClientUrlInfo Host2Conn, 00073 XrdClientAbsUnsolMsgHandler *unsolhandler); 00074 void Disconnect(bool ForcePhysicalDisc); 00075 virtual bool GetAccessToSrv(); 00076 XReqErrorType GoBackToRedirector(); 00077 00078 XrdOucString GetClientHostDomain() { return fgClientHostDomain; } 00079 00080 00081 static XrdClientPhyConnection *GetPhyConn(int LogConnID); 00082 00083 00084 // --------- Cache related stuff 00085 00086 long GetDataFromCache(const void *buffer, 00087 long long begin_offs, 00088 long long end_offs, 00089 bool PerfCalc, 00090 XrdClientIntvList &missingblks, 00091 long &outstandingblks ); 00092 00093 bool SubmitDataToCache(XrdClientMessage *xmsg, 00094 long long begin_offs, 00095 long long end_offs); 00096 00097 bool SubmitRawDataToCache(const void *buffer, 00098 long long begin_offs, 00099 long long end_offs); 00100 00101 void SubmitPlaceholderToCache(long long begin_offs, 00102 long long end_offs) { 00103 if (fMainReadCache) 00104 fMainReadCache->PutPlaceholder(begin_offs, end_offs); 00105 } 00106 00107 00108 void RemoveAllDataFromCache(bool keepwriteblocks=true) { 00109 if (fMainReadCache) 00110 fMainReadCache->RemoveItems(keepwriteblocks); 00111 } 00112 00113 void RemoveDataFromCache(long long begin_offs, 00114 long long end_offs, bool remove_overlapped = false) { 00115 if (fMainReadCache) 00116 fMainReadCache->RemoveItems(begin_offs, end_offs, remove_overlapped); 00117 } 00118 00119 void RemovePlaceholdersFromCache() { 00120 if (fMainReadCache) 00121 fMainReadCache->RemovePlaceholders(); 00122 } 00123 00124 void PrintCache() { 00125 if (fMainReadCache) 00126 fMainReadCache->PrintCache(); 00127 } 00128 00129 00130 bool GetCacheInfo( 00131 // The actual cache size 00132 int &size, 00133 00134 // The number of bytes submitted since the beginning 00135 long long &bytessubmitted, 00136 00137 // The number of bytes found in the cache (estimate) 00138 long long &byteshit, 00139 00140 // The number of reads which did not find their data 00141 // (estimate) 00142 long long &misscount, 00143 00144 // miss/totalreads ratio (estimate) 00145 float &missrate, 00146 00147 // number of read requests towards the cache 00148 long long &readreqcnt, 00149 00150 // ratio between bytes found / bytes submitted 00151 float &bytesusefulness 00152 ) { 00153 if (!fMainReadCache) return false; 00154 00155 fMainReadCache->GetInfo(size, 00156 bytessubmitted, 00157 byteshit, 00158 misscount, 00159 missrate, 00160 readreqcnt, 00161 bytesusefulness); 00162 return true; 00163 } 00164 00165 00166 void SetCacheSize(int CacheSize) { 00167 if (!fMainReadCache && CacheSize) 00168 fMainReadCache = new XrdClientReadCache(); 00169 00170 if (fMainReadCache) 00171 fMainReadCache->SetSize(CacheSize); 00172 } 00173 00174 void SetCacheRmPolicy(int RmPolicy) { 00175 if (fMainReadCache) 00176 fMainReadCache->SetBlkRemovalPolicy(RmPolicy); 00177 } 00178 00179 void UnPinCacheBlk(long long begin_offs, long long end_offs) { 00180 fMainReadCache->UnPinCacheBlk(begin_offs, end_offs); 00181 // Also use this to signal the possibility to proceed for a hard checkpoint 00182 fWriteWaitAck->Broadcast(); 00183 } 00184 00185 00186 // ------------------- 00187 00188 00189 int GetLogConnID() const { return fLogConnID; } 00190 00191 ERemoteServerType GetServerType() const { return fServerType; } 00192 00193 kXR_unt16 GetStreamID() const { return fPrimaryStreamid; } 00194 00195 inline XrdClientUrlInfo *GetLBSUrl() { return fLBSUrl; } 00196 inline XrdClientUrlInfo *GetMetaUrl() { return fMetaUrl; } 00197 inline XrdClientUrlInfo GetCurrentUrl() { return fUrl; } 00198 inline XrdClientUrlInfo GetRedirUrl() { return fREQUrl; } 00199 00200 XErrorCode GetOpenError() const { return fOpenError; } 00201 virtual XReqErrorType GoToAnotherServer(XrdClientUrlInfo &newdest); 00202 virtual XReqErrorType GoToMetaManager(); 00203 bool IsConnected() const { return fConnected; } 00204 bool IsPhyConnConnected(); 00205 00206 struct ServerResponseHeader 00207 LastServerResp; 00208 00209 struct ServerResponseBody_Error 00210 LastServerError; 00211 00212 void ClearLastServerError() { 00213 memset(&LastServerError, 0, sizeof(LastServerError)); 00214 LastServerError.errnum = kXR_noErrorYet; 00215 } 00216 00217 UnsolRespProcResult ProcessAsynResp(XrdClientMessage *unsolmsg); 00218 00219 virtual bool SendGenCommand(ClientRequest *req, 00220 const void *reqMoreData, 00221 void **answMoreDataAllocated, 00222 void *answMoreData, bool HasToAlloc, 00223 char *CmdName, int substreamid = 0); 00224 00225 int GetOpenSockFD() const { return fOpenSockFD; } 00226 00227 void SetClientHostDomain(const char *src) { fgClientHostDomain = src; } 00228 void SetConnected(bool conn) { fConnected = conn; } 00229 00230 void SetOpenError(XErrorCode err) { fOpenError = err; } 00231 00232 // Gets a parallel stream id to use to set the return path for a re 00233 int GetParallelStreamToUse(int reqsperstream); 00234 int GetParallelStreamCount(); // Returns the total number of connected streams 00235 00236 void SetRedirHandler(XrdClientAbs *rh) { fRedirHandler = rh; } 00237 00238 void SetRequestedDestHost(char *newh, kXR_int32 port) { 00239 fREQUrl = fUrl; 00240 fREQUrl.Host = newh; 00241 fREQUrl.Port = port; 00242 fREQUrl.SetAddrFromHost(); 00243 } 00244 00245 // Puts this instance in pause state for wsec seconds. 00246 // A value <= 0 revokes immediately the pause state 00247 void SetREQPauseState(kXR_int32 wsec) { 00248 // Lock mutex 00249 fREQWait->Lock(); 00250 00251 if (wsec > 0) 00252 fREQWaitTimeLimit = time(0) + wsec; 00253 else { 00254 fREQWaitTimeLimit = 0; 00255 fREQWait->Broadcast(); 00256 } 00257 00258 // UnLock mutex 00259 fREQWait->UnLock(); 00260 } 00261 00262 // Puts this instance in connect-pause state for wsec seconds. 00263 // Any future connection attempt will not happen before wsec 00264 // and the first one will be towards the given host 00265 void SetREQDelayedConnectState(kXR_int32 wsec) { 00266 // Lock mutex 00267 fREQConnectWait->Lock(); 00268 00269 if (wsec > 0) 00270 fREQConnectWaitTimeLimit = time(0) + wsec; 00271 else { 00272 fREQConnectWaitTimeLimit = 0; 00273 fREQConnectWait->Broadcast(); 00274 } 00275 00276 // UnLock mutex 00277 fREQConnectWait->UnLock(); 00278 } 00279 00280 void SetSID(kXR_char *sid); 00281 inline void SetUrl(XrdClientUrlInfo thisUrl) { fUrl = thisUrl; } 00282 00283 // Sends the request to the server, through logconn with ID LogConnID 00284 // The request is sent with a streamid 'child' of the current one, then marked as pending 00285 // Its answer will be caught asynchronously 00286 XReqErrorType WriteToServer_Async(ClientRequest *req, 00287 const void* reqMoreData, 00288 int substreamid = 0); 00289 00290 static XrdClientConnectionMgr *GetConnectionMgr() 00291 { return fgConnectionMgr;} //Instance of the conn manager 00292 00293 static void DelSessionIDRepo() {fSessionIDRMutex.Lock(); 00294 fSessionIDRepo.Purge(); 00295 fSessionIDRMutex.UnLock(); 00296 } 00297 00298 void GetSessionID(SessionIDInfo &sess) {sess = mySessionID;} 00299 00300 long GetServerProtocol() { return fServerProto; } 00301 00302 short GetMaxRedirCnt() const { return fMaxGlobalRedirCnt; } 00303 void SetMaxRedirCnt(short mx) {fMaxGlobalRedirCnt = mx; } 00304 short GetRedirCnt() const { return fGlobalRedirCnt; } 00305 00306 bool DoWriteSoftCheckPoint(); 00307 bool DoWriteHardCheckPoint(); 00308 void UnPinCacheBlk(); 00309 00310 00311 // To give a max number of seconds for an operation to complete, no matter what happens inside 00312 // e.g. redirections, sleeps, failed connection attempts etc. 00313 void SetOpTimeLimit(int delta_secs); 00314 bool IsOpTimeLimitElapsed(time_t timenow); 00315 00316 00317 protected: 00318 void SetLogConnID(int cid) { fLogConnID = cid; } 00319 void SetStreamID(kXR_unt16 sid) { fPrimaryStreamid = sid; } 00320 00321 00322 00323 // The handler which first tried to connect somewhere 00324 XrdClientAbsUnsolMsgHandler *fUnsolMsgHandler; 00325 00326 XrdClientUrlInfo fUrl; // The current URL 00327 XrdClientUrlInfo *fLBSUrl; // Needed to save the load balancer url 00328 XrdClientUrlInfo fREQUrl; // For explicitly requested redirs 00329 00330 short fGlobalRedirCnt; // Number of redirections 00331 00332 private: 00333 00334 static XrdOucString fgClientHostDomain; // Save the client's domain name 00335 bool fConnected; 00336 bool fGettingAccessToSrv; // To avoid recursion in desperate situations 00337 time_t fGlobalRedirLastUpdateTimestamp; // Timestamp of last redirection 00338 00339 int fLogConnID; // Logical connection ID used 00340 kXR_unt16 fPrimaryStreamid; // Streamid used for normal communication 00341 // NB it's a copy of the one contained in 00342 // the logconn 00343 00344 short fMaxGlobalRedirCnt; 00345 XrdClientReadCache *fMainReadCache; 00346 00347 // The time limit for a transaction 00348 time_t fOpTimeLimit; 00349 00350 XrdClientAbs *fRedirHandler; // Pointer to a class inheriting from 00351 // XrdClientAbs providing methods 00352 // to handle a redir at higher level 00353 00354 XrdOucString fRedirInternalToken; // Token returned by the server when 00355 // redirecting. To be used in the next logins 00356 00357 XrdSysCondVar *fREQWaitResp; // For explicitly requested delayed async responses 00358 ServerResponseBody_Attn_asynresp * 00359 fREQWaitRespData; // For explicitly requested delayed async responses 00360 00361 time_t fREQWaitTimeLimit; // For explicitly requested pause state 00362 XrdSysCondVar *fREQWait; // For explicitly requested pause state 00363 time_t fREQConnectWaitTimeLimit; // For explicitly requested delayed reconnect 00364 XrdSysCondVar *fREQConnectWait; // For explicitly requested delayed reconnect 00365 00366 long fServerProto; // The server protocol 00367 ERemoteServerType fServerType; // Server type as returned by doHandShake() 00368 SessionIDInfo mySessionID; // Login session ID 00369 00370 00371 static XrdSysMutex fSessionIDRMutex; // Mutex for the Repo 00372 static XrdOucHash<SessionIDInfo> 00373 fSessionIDRepo; // The repository of session IDs, shared. 00374 // Association between 00375 // <hostname>:<port>.<user> and a SessionIDInfo struct 00376 00377 int fOpenSockFD; // Descriptor of the underlying socket 00378 static XrdClientConnectionMgr *fgConnectionMgr; //Instance of the Connection Manager 00379 00380 XrdSysCondVar *fWriteWaitAck; 00381 XrdClientVector<ClientRequest> fWriteReqsToRetry; // To store the write reqs to retry in case of a disconnection 00382 00383 bool CheckErrorStatus(XrdClientMessage *, short &, char *); 00384 void CheckPort(int &port); 00385 void CheckREQPauseState(); 00386 void CheckREQConnectWaitState(); 00387 bool CheckResp(struct ServerResponseHeader *resp, const char *method); 00388 XrdClientMessage *ClientServerCmd(ClientRequest *req, 00389 const void *reqMoreData, 00390 void **answMoreDataAllocated, 00391 void *answMoreData, 00392 bool HasToAlloc, 00393 int substreamid = 0); 00394 XrdSecProtocol *DoAuthentication(char *plist, int plsiz); 00395 00396 ERemoteServerType DoHandShake(short log); 00397 00398 bool DoLogin(); 00399 bool DomainMatcher(XrdOucString dom, XrdOucString domlist); 00400 00401 XrdOucString GetDomainToMatch(XrdOucString hostname); 00402 00403 ESrvErrorHandlerRetval HandleServerError(XReqErrorType &, XrdClientMessage *, 00404 ClientRequest *); 00405 bool MatchStreamid(struct ServerResponseHeader *ServerResponse); 00406 00407 // Sends a close request, without waiting for an answer 00408 // useful (?) to be sent just before closing a badly working stream 00409 bool PanicClose(); 00410 00411 XrdOucString ParseDomainFromHostname(XrdOucString hostname); 00412 00413 XrdClientMessage *ReadPartialAnswer(XReqErrorType &, size_t &, 00414 ClientRequest *, bool, void**, 00415 EThreeStateReadHandler &); 00416 00417 // void ClearSessionID(); 00418 00419 XReqErrorType WriteToServer(ClientRequest *req, 00420 const void* reqMoreData, 00421 short LogConnID, 00422 int substreamid = 0); 00423 00424 bool WaitResp(int secsmax); 00425 00426 XrdClientUrlInfo *fMetaUrl; // Meta manager url 00427 bool fLBSIsMeta; // Is current redirector a meta manager? 00428 00429 }; 00430 00431 00432 00433 #endif
1.7.5