ROOT  6.06/08
Reference Guide
TXSocket.cxx
Go to the documentation of this file.
1 // @(#)root/proofx:$Id$
2 // Author: Gerardo Ganis 12/12/2005
3 
4 /*************************************************************************
5  * Copyright (C) 1995-2005, Rene Brun and Fons Rademakers. *
6  * All rights reserved. *
7  * *
8  * For the licensing terms see $ROOTSYS/LICENSE. *
9  * For the list of contributors see $ROOTSYS/README/CREDITS. *
10  *************************************************************************/
11 
12 //////////////////////////////////////////////////////////////////////////
13 // //
14 // TXSocket //
15 // //
16 // High level handler of connections to xproofd. //
17 // //
18 //////////////////////////////////////////////////////////////////////////
19 
20 #include "MessageTypes.h"
21 #include "TEnv.h"
22 #include "TError.h"
23 #include "TException.h"
24 #include "TMonitor.h"
25 #include "TObjString.h"
26 #include "TProof.h"
27 #include "TSlave.h"
28 #include "TRegexp.h"
29 #include "TROOT.h"
30 #include "TUrl.h"
31 #include "TXHandler.h"
32 #include "TXSocket.h"
33 #include "XProofProtocol.h"
34 
35 #include "XrdProofConn.h"
36 
37 #include "XrdClient/XrdClientConnMgr.hh"
38 #include "XrdClient/XrdClientConst.hh"
39 #include "XrdClient/XrdClientEnv.hh"
40 #include "XrdClient/XrdClientLogConnection.hh"
41 #include "XrdClient/XrdClientMessage.hh"
42 
43 #ifndef WIN32
44 #include <sys/socket.h>
45 #else
46 #include <Winsock2.h>
47 #endif
48 
49 
50 #include "XpdSysError.h"
51 #include "XpdSysLogger.h"
52 
53 // ---- Tracing utils ----------------------------------------------------------
54 #include "XrdProofdTrace.h"
55 XrdOucTrace *XrdProofdTrace = 0;
57 static XrdSysError eDest(0, "Proofx");
58 
59 #ifdef WIN32
62 #endif
63 
64 //______________________________________________________________________________
65 
66 //---- error handling ----------------------------------------------------------
67 
68 ////////////////////////////////////////////////////////////////////////////////
69 /// Interface to ErrorHandler (protected).
70 
71 void TXSocket::DoError(int level, const char *location, const char *fmt, va_list va) const
72 {
73  ::ErrorHandler(level, Form("TXSocket::%s", location), fmt, va);
74 }
75 
76 //----- Ping handler -----------------------------------------------------------
77 ////////////////////////////////////////////////////////////////////////////////
78 
79 class TXSocketPingHandler : public TFileHandler {
81 public:
82  TXSocketPingHandler(TXSocket *s, Int_t fd)
83  : TFileHandler(fd, 1) { fSocket = s; }
84  Bool_t Notify();
85  Bool_t ReadNotify() { return Notify(); }
86 };
87 
88 ////////////////////////////////////////////////////////////////////////////////
89 /// Ping the socket
90 
91 Bool_t TXSocketPingHandler::Notify()
92 {
93  fSocket->Ping("ping handler");
94 
95  return kTRUE;
96 }
97 
98 // Env variables init flag
100 
101 // Static variables for input notification
102 TXSockPipe TXSocket::fgPipe; // Pipe for input monitoring
103 TString TXSocket::fgLoc = "undef"; // Location string
104 
105 // Static buffer manager
106 TMutex TXSocket::fgSMtx; // To protect spare list
107 std::list<TXSockBuf *> TXSocket::fgSQue; // list of spare buffers
108 Long64_t TXSockBuf::fgBuffMem = 0; // Total allocated memory
109 Long64_t TXSockBuf::fgMemMax = 10485760; // Max allowed allocated memory [10 MB]
110 
111 ////////////////////////////////////////////////////////////////////////////////
112 /// Constructor
113 /// Open the connection to a remote XrdProofd instance and start a PROOF
114 /// session.
115 /// The mode 'm' indicates the role of this connection:
116 /// 'a' Administrator; used by an XPD to contact the head XPD
117 /// 'i' Internal; used by a TXProofServ to call back its creator
118 /// (see XrdProofUnixConn)
119 /// 'C' PROOF manager: open connection only (do not start a session)
120 /// 'M' Client creating a top master
121 /// 'A' Client attaching to top master
122 /// 'm' Top master creating a submaster
123 /// 's' Master creating a slave
124 /// The buffer 'logbuf' is a null terminated string to be sent over at
125 /// login.
126 
127 TXSocket::TXSocket(const char *url, Char_t m, Int_t psid, Char_t capver,
128  const char *logbuf, Int_t loglevel, TXHandler *handler)
129  : TSocket(), fMode(m), fLogLevel(loglevel),
130  fBuffer(logbuf), fConn(0), fASem(0), fAsynProc(1),
132 {
133  fUrl = url;
134  // Enable tracing in the XrdProof client. if not done already
135  eDest.logger(&eLogger);
136  if (!XrdProofdTrace)
137  XrdProofdTrace = new XrdOucTrace(&eDest);
138 
139  // Init envs the first time
140  if (!fgInitDone)
141  InitEnvs();
142 
143  // Async queue related stuff
144  if (!(fAMtx = new TMutex(kTRUE))) {
145  Error("TXSocket", "problems initializing mutex for async queue");
146  return;
147  }
148  fAQue.clear();
149 
150  // Interrupts queue related stuff
151  if (!(fIMtx = new TMutex(kTRUE))) {
152  Error("TXSocket", "problems initializing mutex for interrupts");
153  return;
154  }
155  fILev = -1;
156  fIForward = kFALSE;
157 
158  // Init some variables
159  fByteLeft = 0;
160  fByteCur = 0;
161  fBufCur = 0;
162  fServType = kPROOFD; // for consistency
163  fTcpWindowSize = -1;
164  fRemoteProtocol = -1;
165  // By default forward directly to end-point
166  fSendOpt = (fMode == 'i') ? (kXPD_internal | kXPD_async) : kXPD_async;
167  fSessionID = (fMode == 'C') ? -1 : psid;
168  fSocket = -1;
169 
170  // This is used by external code to create a link between this object
171  // and another one
172  fReference = 0;
173 
174  // The global pipe
175  if (!fgPipe.IsValid()) {
176  Error("TXSocket", "internal pipe is invalid");
177  return;
178  }
179 
180  // Some initial values
181  TUrl u(url);
183  u.SetProtocol("proof", kTRUE);
184  fAddress.fPort = (u.GetPort() > 0) ? u.GetPort() : 1093;
185 
186  // Set the asynchronous handler
187  fHandler = handler;
188 
189  if (url) {
190 
191  // Create connection (for managers the type of the connection is the same
192  // as for top masters)
193  char md = (fMode !='A' && fMode !='C') ? fMode : 'M';
194  fConn = new XrdProofConn(url, md, psid, capver, this, fBuffer.Data());
195  if (!fConn || !(fConn->IsValid())) {
197  if (gDebug > 0)
198  Error("TXSocket", "fatal error occurred while opening a connection"
199  " to server [%s]: %s", url, fConn->GetLastErr());
200  return;
201  }
202 
203  // Fill some info
204  fUser = fConn->fUser.c_str();
205  fHost = fConn->fHost.c_str();
206  fPort = fConn->fPort;
207 
208  // Create new proofserv if not client manager or administrator or internal mode
209  if (fMode == 'm' || fMode == 's' || fMode == 'M' || fMode == 'A'|| fMode == 'L') {
210  // We attach or create
211  if (!Create()) {
212  // Failure
213  Error("TXSocket", "create or attach failed (%s)",
214  ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
215  Close();
216  return;
217  }
218  }
219 
220  // Fill some other info available if Create is successful
221  if (fMode == 'C') {
224  }
225 
226  // Also in the base class
227  fUrl = fConn->fUrl.GetUrl().c_str();
228  fAddress = gSystem->GetHostByName(fConn->fUrl.Host.c_str());
229  fAddress.fPort = fPort;
230 
231  // This is needed for the reader thread to signal an interrupt
232  fPid = gSystem->GetPid();
233  }
234 }
235 
236 ////////////////////////////////////////////////////////////////////////////////
237 /// Destructor
238 
240 {
241  // Disconnect from remote server (the connection manager is
242  // responsible of the underlying physical connection, so we do not
243  // force its closing)
244  Close();
245 
246  // Delete mutexes
247  SafeDelete(fAMtx);
248  SafeDelete(fIMtx);
249 }
250 
251 ////////////////////////////////////////////////////////////////////////////////
252 /// Set location string
253 
254 void TXSocket::SetLocation(const char *loc)
255 {
256  if (loc) {
257  fgLoc = loc;
258  fgPipe.SetLoc(loc);
259  } else {
260  fgLoc = "";
261  fgPipe.SetLoc("");
262  }
263 }
264 
265 ////////////////////////////////////////////////////////////////////////////////
266 /// Set session ID to 'id'. If id < 0, disable also the asynchronous handler.
267 
269 {
270  if (id < 0 && fConn)
271  fConn->SetAsync(0);
272  fSessionID = id;
273 }
274 
275 ////////////////////////////////////////////////////////////////////////////////
276 /// Disconnect a session. Use opt= "S" or "s" to
277 /// shutdown remote session.
278 /// Default is opt = "".
279 
281 {
282  // Make sure we are connected
283  if (!IsValid()) {
284  if (gDebug > 0)
285  Info("DisconnectSession","not connected: nothing to do");
286  return;
287  }
288 
289  Bool_t shutdown = opt && (strchr(opt,'S') || strchr(opt,'s'));
290  Bool_t all = opt && (strchr(opt,'A') || strchr(opt,'a'));
291 
292  if (id > -1 || all) {
293  // Prepare request
294  XPClientRequest Request;
295  memset(&Request, 0, sizeof(Request) );
296  fConn->SetSID(Request.header.streamid);
297  if (shutdown)
298  Request.proof.requestid = kXP_destroy;
299  else
300  Request.proof.requestid = kXP_detach;
301  Request.proof.sid = id;
302 
303  // Send request
304  XrdClientMessage *xrsp =
305  fConn->SendReq(&Request, (const void *)0, 0, "DisconnectSession");
306 
307  // Print error msg, if any
308  if (!xrsp && fConn->GetLastErr())
309  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
310 
311  // Cleanup
312  SafeDelete(xrsp);
313  }
314 }
315 
316 ////////////////////////////////////////////////////////////////////////////////
317 /// Close connection. Available options are (case insensitive)
318 /// 'P' force closing of the underlying physical connection
319 /// 'S' shutdown remote session, is any
320 /// A session ID can be given using #...# signature, e.g. "#1#".
321 /// Default is opt = "".
322 
324 {
325  Int_t to = gEnv->GetValue("XProof.AsynProcSemTimeout", 60);
326  if (fAsynProc.Wait(to*1000) != 0)
327  Warning("Close", "could not hold semaphore for async messages after %d sec: closing anyhow (may give error messages)", to);
328 
329  // Remove any reference in the global pipe and ready-sock queue
330  TXSocket::fgPipe.Flush(this);
331 
332  // Make sure we have a connection
333  if (!fConn) {
334  if (gDebug > 0)
335  Info("Close","no connection: nothing to do");
336  fAsynProc.Post();
337  return;
338  }
339 
340  // Disconnect the asynchronous requests handler
341  fConn->SetAsync(0);
342 
343  // If we are connected we disconnect
344  if (IsValid()) {
345 
346  // Parse options
347  TString o(opt);
348  Int_t sessID = fSessionID;
349  if (o.Index("#") != kNPOS) {
350  o.Remove(0,o.Index("#")+1);
351  if (o.Index("#") != kNPOS) {
352  o.Remove(o.Index("#"));
353  sessID = o.IsDigit() ? o.Atoi() : sessID;
354  }
355  }
356 
357  if (sessID > -1) {
358  // Warn the remote session, if any (after destroy the session is gone)
359  DisconnectSession(sessID, opt);
360  } else {
361  // We are the manager: close underlying connection
362  fConn->Close(opt);
363  }
364  }
365 
366  // Delete the connection module
367  SafeDelete(fConn);
368 
369  // Post semaphore
370  fAsynProc.Post();
371 }
372 
373 ////////////////////////////////////////////////////////////////////////////////
374 /// We are here if an unsolicited response comes from a logical conn
375 /// The response comes in the form of an XrdClientMessage *, that must NOT be
376 /// destroyed after processing. It is destroyed by the first sender.
377 /// Remember that we are in a separate thread, since unsolicited
378 /// responses are asynchronous by nature.
379 
380 UnsolRespProcResult TXSocket::ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *,
381  XrdClientMessage *m)
382 {
383  UnsolRespProcResult rc = kUNSOL_KEEP;
384 
385  // If we are closing we will not do anything
387  if (!semg.IsValid()) {
388  Error("ProcessUnsolicitedMsg", "%p: async semaphore taken by Close()! Should not be here!", this);
389  return kUNSOL_CONTINUE;
390  }
391 
392  if (!m) {
393  if (gDebug > 2)
394  Info("ProcessUnsolicitedMsg", "%p: got empty message: skipping", this);
395  // Some one is perhaps interested in empty messages
396  return kUNSOL_CONTINUE;
397  } else {
398  if (gDebug > 2)
399  Info("ProcessUnsolicitedMsg", "%p: got message with status: %d, len: %d bytes (ID: %d)",
400  this, m->GetStatusCode(), m->DataLen(), m->HeaderSID());
401  }
402 
403  // Error notification
404  if (m->IsError()) {
405  if (m->GetStatusCode() != XrdClientMessage::kXrdMSC_timeout) {
406  if (gDebug > 0)
407  Info("ProcessUnsolicitedMsg","%p: got error from underlying connection", this);
408  XHandleErr_t herr = {1, 0};
409  if (!fHandler || fHandler->HandleError((const void *)&herr)) {
410  if (gDebug > 0)
411  Info("ProcessUnsolicitedMsg","%p: handler undefined or recovery failed", this);
412  // Avoid to contact the server any more
413  fSessionID = -1;
414  } else {
415  // Connection still usable: update usage timestamp
416  Touch();
417  }
418  } else {
419  // Time out
420  if (gDebug > 2)
421  Info("ProcessUnsolicitedMsg", "%p: underlying connection timed out", this);
422  }
423  // Propagate the message to other possible handlers
424  return kUNSOL_CONTINUE;
425  }
426 
427  // From now on make sure is for us (but only if not during setup, i.e. fConn == 0; otherwise
428  // we may miss some important server message)
429  if (fConn && !m->MatchStreamid(fConn->fStreamid)) {
430  if (gDebug > 1)
431  Info("ProcessUnsolicitedMsg", "%p: IDs do not match: {%d, %d}", this, fConn->fStreamid, m->HeaderSID());
432  return kUNSOL_CONTINUE;
433  }
434 
435  // Local processing ...
436  Int_t len = 0;
437  if ((len = m->DataLen()) < (int)sizeof(kXR_int32)) {
438  Error("ProcessUnsolicitedMsg", "empty or bad-formed message - disabling");
440  return rc;
441  }
442 
443  // Activity on the line: update usage timestamp
444  Touch();
445 
446  // The first 4 bytes contain the action code
447  kXR_int32 acod = 0;
448  memcpy(&acod, m->GetData(), sizeof(kXR_int32));
449  if (acod > 10000)
450  Info("ProcessUnsolicitedMsg", "%p: got acod %d (%x): message has status: %d, len: %d bytes (ID: %d)",
451  this, acod, acod, m->GetStatusCode(), m->DataLen(), m->HeaderSID());
452  //
453  // Update pointer to data
454  void *pdata = (void *)((char *)(m->GetData()) + sizeof(kXR_int32));
455  len -= sizeof(kXR_int32);
456  if (gDebug > 1)
457  Info("ProcessUnsolicitedMsg", "%p: got action: %d (%d bytes) (ID: %d)",
458  this, acod, len, m->HeaderSID());
459 
460  if (gDebug > 3)
462 
463  // Case by case
464  kXR_int32 ilev = -1;
465  const char *lab = 0;
466 
467  switch (acod) {
468  case kXPD_ping:
469  //
470  // Special interrupt
471  ilev = TProof::kPing;
472  lab = "kXPD_ping";
473  case kXPD_interrupt:
474  //
475  // Interrupt
476  lab = !lab ? "kXPD_interrupt" : lab;
477  { R__LOCKGUARD(fIMtx);
478  if (acod == kXPD_interrupt) {
479  memcpy(&ilev, pdata, sizeof(kXR_int32));
480  ilev = net2host(ilev);
481  // Update pointer to data
482  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
483  len -= sizeof(kXR_int32);
484  }
485  // The next 4 bytes contain the forwarding option
486  kXR_int32 ifw = 0;
487  if (len > 0) {
488  memcpy(&ifw, pdata, sizeof(kXR_int32));
489  ifw = net2host(ifw);
490  if (gDebug > 1)
491  Info("ProcessUnsolicitedMsg","%s: forwarding option: %d", lab, ifw);
492  }
493  //
494  // Save the interrupt
495  fILev = ilev;
496  fIForward = (ifw == 1) ? kTRUE : kFALSE;
497 
498  // Handle this input in this thread to avoid queuing on the
499  // main thread
500  XHandleIn_t hin = {acod, 0, 0, 0};
501  if (fHandler)
502  fHandler->HandleInput((const void *)&hin);
503  else
504  Error("ProcessUnsolicitedMsg","handler undefined");
505  }
506  break;
507  case kXPD_timer:
508  //
509  // Set shutdown timer
510  {
511  kXR_int32 opt = 1;
512  kXR_int32 delay = 0;
513  // The next 4 bytes contain the shutdown option
514  if (len > 0) {
515  memcpy(&opt, pdata, sizeof(kXR_int32));
516  opt = net2host(opt);
517  if (gDebug > 1)
518  Info("ProcessUnsolicitedMsg","kXPD_timer: found opt: %d", opt);
519  // Update pointer to data
520  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
521  len -= sizeof(kXR_int32);
522  }
523  // The next 4 bytes contain the delay
524  if (len > 0) {
525  memcpy(&delay, pdata, sizeof(kXR_int32));
526  delay = net2host(delay);
527  if (gDebug > 1)
528  Info("ProcessUnsolicitedMsg","kXPD_timer: found delay: %d", delay);
529  // Update pointer to data
530  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
531  len -= sizeof(kXR_int32);
532  }
533 
534  // Handle this input in this thread to avoid queuing on the
535  // main thread
536  XHandleIn_t hin = {acod, opt, delay, 0};
537  if (fHandler)
538  fHandler->HandleInput((const void *)&hin);
539  else
540  Error("ProcessUnsolicitedMsg","handler undefined");
541  }
542  break;
543  case kXPD_inflate:
544  //
545  // Set inflate factor
546  {
547  kXR_int32 inflate = 1000;
548  if (len > 0) {
549  memcpy(&inflate, pdata, sizeof(kXR_int32));
550  inflate = net2host(inflate);
551  if (gDebug > 1)
552  Info("ProcessUnsolicitedMsg","kXPD_inflate: factor: %d", inflate);
553  // Update pointer to data
554  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
555  len -= sizeof(kXR_int32);
556  }
557  // Handle this input in this thread to avoid queuing on the
558  // main thread
559  XHandleIn_t hin = {acod, inflate, 0, 0};
560  if (fHandler)
561  fHandler->HandleInput((const void *)&hin);
562  else
563  Error("ProcessUnsolicitedMsg","handler undefined");
564  }
565  break;
566  case kXPD_priority:
567  //
568  // Broadcast group priority
569  {
570  kXR_int32 priority = -1;
571  if (len > 0) {
572  memcpy(&priority, pdata, sizeof(kXR_int32));
573  priority = net2host(priority);
574  if (gDebug > 1)
575  Info("ProcessUnsolicitedMsg","kXPD_priority: priority: %d", priority);
576  // Update pointer to data
577  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
578  len -= sizeof(kXR_int32);
579  }
580  // Handle this input in this thread to avoid queuing on the
581  // main thread
582  XHandleIn_t hin = {acod, priority, 0, 0};
583  if (fHandler)
584  fHandler->HandleInput((const void *)&hin);
585  else
586  Error("ProcessUnsolicitedMsg","handler undefined");
587  }
588  break;
589  case kXPD_flush:
590  //
591  // Flush request
592  {
593  // Handle this input in this thread to avoid queuing on the
594  // main thread
595  XHandleIn_t hin = {acod, 0, 0, 0};
596  if (fHandler)
597  fHandler->HandleInput((const void *)&hin);
598  else
599  Error("ProcessUnsolicitedMsg","handler undefined");
600  }
601  break;
602  case kXPD_urgent:
603  //
604  // Set shutdown timer
605  {
606  // The next 4 bytes contain the urgent msg type
607  kXR_int32 type = -1;
608  if (len > 0) {
609  memcpy(&type, pdata, sizeof(kXR_int32));
610  type = net2host(type);
611  if (gDebug > 1)
612  Info("ProcessUnsolicitedMsg","kXPD_urgent: found type: %d", type);
613  // Update pointer to data
614  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
615  len -= sizeof(kXR_int32);
616  }
617  // The next 4 bytes contain the first info container
618  kXR_int32 int1 = -1;
619  if (len > 0) {
620  memcpy(&int1, pdata, sizeof(kXR_int32));
621  int1 = net2host(int1);
622  if (gDebug > 1)
623  Info("ProcessUnsolicitedMsg","kXPD_urgent: found int1: %d", int1);
624  // Update pointer to data
625  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
626  len -= sizeof(kXR_int32);
627  }
628  // The next 4 bytes contain the second info container
629  kXR_int32 int2 = -1;
630  if (len > 0) {
631  memcpy(&int2, pdata, sizeof(kXR_int32));
632  int2 = net2host(int2);
633  if (gDebug > 1)
634  Info("ProcessUnsolicitedMsg","kXPD_urgent: found int2: %d", int2);
635  // Update pointer to data
636  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
637  len -= sizeof(kXR_int32);
638  }
639 
640  // Handle this input in this thread to avoid queuing on the
641  // main thread
642  XHandleIn_t hin = {acod, type, int1, int2};
643  if (fHandler)
644  fHandler->HandleInput((const void *)&hin);
645  else
646  Error("ProcessUnsolicitedMsg","handler undefined");
647  }
648  break;
649  case kXPD_msg:
650  //
651  // Data message
652  { R__LOCKGUARD(fAMtx);
653 
654  // Get a spare buffer
655  TXSockBuf *b = PopUpSpare(len);
656  if (!b) {
657  Error("ProcessUnsolicitedMsg","could allocate spare buffer");
658  return rc;
659  }
660  memcpy(b->fBuf, pdata, len);
661  b->fLen = len;
662 
663  // Update counters
664  fBytesRecv += len;
665 
666  // Produce the message
667  fAQue.push_back(b);
668 
669  // Post the global pipe
670  fgPipe.Post(this);
671 
672  // Signal it and release the mutex
673  if (gDebug > 2)
674  Info("ProcessUnsolicitedMsg","%p: %s: posting semaphore: %p (%d bytes)",
675  this, GetTitle(), &fASem, len);
676  fASem.Post();
677  }
678 
679  break;
680  case kXPD_feedback:
681  Info("ProcessUnsolicitedMsg",
682  "kXPD_feedback treatment not yet implemented");
683  break;
684  case kXPD_srvmsg:
685  //
686  // Service message
687  {
688  // The next 4 bytes may contain a flag to control the way the message is displayed
689  kXR_int32 opt = 0;
690  memcpy(&opt, pdata, sizeof(kXR_int32));
691  opt = net2host(opt);
692  if (opt >= 0 && opt <= 4) {
693  // Update pointer to data
694  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
695  len -= sizeof(kXR_int32);
696  } else {
697  opt = 1;
698  }
699 
700  if (opt == 0) {
701  // One line
702  Printf("| %.*s", len, (char *)pdata);
703  } else if (opt == 2) {
704  // Raw displaying
705  Printf("%.*s", len, (char *)pdata);
706  } else if (opt == 3) {
707  // Incremental displaying
708  fprintf(stderr, "%.*s", len, (char *)pdata);
709  } else if (opt == 4) {
710  // Rewind
711  fprintf(stderr, "%.*s\r", len, (char *)pdata);
712  } else {
713  // A small header
714  Printf(" ");
715  Printf("| Message from server:");
716  Printf("| %.*s", len, (char *)pdata);
717  }
718  }
719  break;
720  case kXPD_errmsg:
721  //
722  // Error condition with message
723  Printf("\n\n");
724  Printf("| Error condition occured: message from server:");
725  Printf("| %.*s", len, (char *)pdata);
726  Printf("\n");
727  // Handle error
728  if (fHandler)
730  else
731  Error("ProcessUnsolicitedMsg","handler undefined");
732  break;
733  case kXPD_msgsid:
734  //
735  // Data message
736  { R__LOCKGUARD(fAMtx);
737 
738  // The next 4 bytes contain the sessiond id
739  kXR_int32 cid = 0;
740  memcpy(&cid, pdata, sizeof(kXR_int32));
741  cid = net2host(cid);
742 
743  if (gDebug > 1)
744  Info("ProcessUnsolicitedMsg","found cid: %d", cid);
745 
746  // Update pointer to data
747  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
748  len -= sizeof(kXR_int32);
749 
750  // Get a spare buffer
751  TXSockBuf *b = PopUpSpare(len);
752  if (!b) {
753  Error("ProcessUnsolicitedMsg","could allocate spare buffer");
754  return rc;
755  }
756  memcpy(b->fBuf, pdata, len);
757  b->fLen = len;
758 
759  // Set the sid
760  b->fCid = cid;
761 
762  // Update counters
763  fBytesRecv += len;
764 
765  // Produce the message
766  fAQue.push_back(b);
767 
768  // Post the global pipe
769  fgPipe.Post(this);
770 
771  // Signal it and release the mutex
772  if (gDebug > 2)
773  Info("ProcessUnsolicitedMsg","%p: cid: %d, posting semaphore: %p (%d bytes)",
774  this, cid, &fASem, len);
775  fASem.Post();
776  }
777 
778  break;
779  case kXPD_wrkmortem:
780  //
781  // A worker died
782  { TString what = TString::Format("%.*s", len, (char *)pdata);
783  if (what.BeginsWith("idle-timeout")) {
784  // Notify the idle timeout
786  } else {
787  Printf(" ");
788  Printf("| %s", what.Data());
789  // Handle error
790  if (fHandler)
792  else
793  Error("ProcessUnsolicitedMsg","handler undefined");
794  }
795  }
796  break;
797 
798  case kXPD_touch:
799  //
800  // Request for remote touch: post a message to do that
802  break;
803  case kXPD_resume:
804  //
805  // process the next query (in the TXProofServ)
807  break;
808  case kXPD_clusterinfo:
809  //
810  // Broadcast cluster information
811  {
812  kXR_int32 nsess = -1, nacti = -1, neffs = -1;
813  if (len > 0) {
814  // Total sessions
815  memcpy(&nsess, pdata, sizeof(kXR_int32));
816  nsess = net2host(nsess);
817  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
818  len -= sizeof(kXR_int32);
819  // Active sessions
820  memcpy(&nacti, pdata, sizeof(kXR_int32));
821  nacti = net2host(nacti);
822  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
823  len -= sizeof(kXR_int32);
824  // Effective sessions
825  memcpy(&neffs, pdata, sizeof(kXR_int32));
826  neffs = net2host(neffs);
827  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
828  len -= sizeof(kXR_int32);
829  }
830  if (gDebug > 1)
831  Info("ProcessUnsolicitedMsg","kXPD_clusterinfo: # sessions: %d,"
832  " # active: %d, # effective: %f", nsess, nacti, neffs/1000.);
833  // Handle this input in this thread to avoid queuing on the
834  // main thread
835  XHandleIn_t hin = {acod, nsess, nacti, neffs};
836  if (fHandler)
837  fHandler->HandleInput((const void *)&hin);
838  else
839  Error("ProcessUnsolicitedMsg","handler undefined");
840  }
841  break;
842  default:
843  Error("ProcessUnsolicitedMsg","%p: unknown action code: %d received from '%s' - disabling",
844  this, acod, GetTitle());
846  break;
847  }
848 
849  // We are done
850  return rc;
851 }
852 
853 ////////////////////////////////////////////////////////////////////////////////
854 /// Post a message of type 'type' into the read messages queue.
855 /// If 'msg' is defined it is also added as TString.
856 /// This is used, for example, with kPROOF_FATAL to force the main thread
857 /// to mark this socket as bad, avoiding race condition when a worker
858 /// dies while in processing state.
859 
860 void TXSocket::PostMsg(Int_t type, const char *msg)
861 {
862  // Create the message
863  TMessage m(type);
864 
865  // Add the string if any
866  if (msg && strlen(msg) > 0)
867  m << TString(msg);
868 
869  // Write length in first word of buffer
870  m.SetLength();
871 
872  // Get pointer to the message buffer
873  char *mbuf = m.Buffer();
874  Int_t mlen = m.Length();
875  if (m.CompBuffer()) {
876  mbuf = m.CompBuffer();
877  mlen = m.CompLength();
878  }
879 
880  //
881  // Data message
883 
884  // Get a spare buffer
885  TXSockBuf *b = PopUpSpare(mlen);
886  if (!b) {
887  Error("PostMsg", "could allocate spare buffer");
888  return;
889  }
890 
891  // Fill the pipe buffer
892  memcpy(b->fBuf, mbuf, mlen);
893  b->fLen = mlen;
894 
895  // Update counters
896  fBytesRecv += mlen;
897 
898  // Produce the message
899  fAQue.push_back(b);
900 
901  // Post the global pipe
902  fgPipe.Post(this);
903 
904  // Signal it and release the mutex
905  if (gDebug > 0)
906  Info("PostMsg", "%p: posting type %d to semaphore: %p (%d bytes)",
907  this, type, &fASem, mlen);
908  fASem.Post();
909 
910  // Done
911  return;
912 }
913 
914 ////////////////////////////////////////////////////////////////////////////////
915 /// Getter for logical connection ID
916 
918 {
919  return (fConn ? fConn->GetLogConnID() : -1);
920 }
921 
922 ////////////////////////////////////////////////////////////////////////////////
923 /// Getter for last error
924 
926 {
927  return (fConn ? fConn->GetOpenError() : -1);
928 }
929 
930 ////////////////////////////////////////////////////////////////////////////////
931 /// Getter for server type
932 
934 {
935  return (fConn ? fConn->GetServType() : -1);
936 }
937 
938 ////////////////////////////////////////////////////////////////////////////////
939 /// Getter for session ID
940 
942 {
943  return (fConn ? fConn->GetSessionID() : -1);
944 }
945 
946 ////////////////////////////////////////////////////////////////////////////////
947 /// Getter for validity status
948 
950 {
951  return (fConn ? (fConn->IsValid()) : kFALSE);
952 }
953 
954 ////////////////////////////////////////////////////////////////////////////////
955 /// Return kTRUE if the remote server is a 'proofd'
956 
958 {
960  return kTRUE;
961 
962  // Failure
963  return kFALSE;
964 }
965 
966 ////////////////////////////////////////////////////////////////////////////////
967 /// Get latest interrupt level and reset it; if the interrupt has to be
968 /// propagated to lower stages forward will be kTRUE after the call
969 
971 {
972  if (gDebug > 2)
973  Info("GetInterrupt","%p: waiting to lock mutex %p", this, fIMtx);
974 
976 
977  // Reset values
978  Int_t ilev = -1;
979  forward = kFALSE;
980 
981  // Check if filled
982  if (fILev == -1)
983  Error("GetInterrupt", "value is unset (%d) - protocol error",fILev);
984 
985  // Fill output
986  ilev = fILev;
987  forward = fIForward;
988 
989  // Reset values (we process it only once)
990  fILev = -1;
991  fIForward = kFALSE;
992 
993  // Return what we got
994  return ilev;
995 }
996 
997 ////////////////////////////////////////////////////////////////////////////////
998 /// Flush the asynchronous queue.
999 /// Typically called when a kHardInterrupt is received.
1000 /// Returns number of bytes in flushed buffers.
1001 
1003 {
1004  Int_t nf = 0;
1005  list<TXSockBuf *> splist;
1006  list<TXSockBuf *>::iterator i;
1007 
1008  { R__LOCKGUARD(fAMtx);
1009 
1010  // Must have something to flush
1011  if (fAQue.size() > 0) {
1012 
1013  // Save size for later semaphore cleanup
1014  Int_t sz = fAQue.size();
1015  // get the highest interrupt level
1016  for (i = fAQue.begin(); i != fAQue.end();) {
1017  if (*i) {
1018  splist.push_back(*i);
1019  nf += (*i)->fLen;
1020  i = fAQue.erase(i);
1021  }
1022  }
1023 
1024  // Reset the asynchronous queue
1025  while (sz--) {
1026  if (fASem.TryWait() == 1)
1027  Printf("Warning in TXSocket::Flush: semaphore counter already 0 (sz: %d)", sz);
1028  }
1029  fAQue.clear();
1030  }
1031  }
1032 
1033  // Move spares to the spare queue
1034  { R__LOCKGUARD(&fgSMtx);
1035  if (splist.size() > 0) {
1036  for (i = splist.begin(); i != splist.end();) {
1037  fgSQue.push_back(*i);
1038  i = splist.erase(i);
1039  }
1040  }
1041  }
1042 
1043  // We are done
1044  return nf;
1045 }
1046 
1047 ////////////////////////////////////////////////////////////////////////////////
1048 /// This method sends a request for creation of (or attachment to) a remote
1049 /// server application.
1050 
1052 {
1053  // Make sure we are connected
1054  if (!IsValid()) {
1055  if (gDebug > 0)
1056  Info("Create","not connected: nothing to do");
1057  return kFALSE;
1058  }
1059 
1060  Int_t retriesleft = gEnv->GetValue("XProof.CreationRetries", 4);
1061 
1062  while (retriesleft--) {
1063 
1064  XPClientRequest reqhdr;
1065 
1066  // We fill the header struct containing the request for login
1067  memset( &reqhdr, 0, sizeof(reqhdr));
1068  fConn->SetSID(reqhdr.header.streamid);
1069 
1070  // This will be a kXP_attach or kXP_create request
1071  if (fMode == 'A' || attach) {
1072  reqhdr.header.requestid = kXP_attach;
1073  reqhdr.proof.sid = fSessionID;
1074  } else {
1075  reqhdr.header.requestid = kXP_create;
1076  }
1077 
1078  // Send log level
1079  reqhdr.proof.int1 = fLogLevel;
1080 
1081  // Send also the chosen alias
1082  const void *buf = (const void *)(fBuffer.Data());
1083  reqhdr.header.dlen = fBuffer.Length();
1084  if (gDebug >= 2)
1085  Info("Create", "sending %d bytes to server", reqhdr.header.dlen);
1086 
1087  // We call SendReq, the function devoted to sending commands.
1088  if (gDebug > 1)
1089  Info("Create", "creating session of server %s", fUrl.Data());
1090 
1091  // server response header
1092  char *answData = 0;
1093  XrdClientMessage *xrsp = fConn->SendReq(&reqhdr, buf,
1094  &answData, "TXSocket::Create", 0);
1095  struct ServerResponseBody_Protocol *srvresp = (struct ServerResponseBody_Protocol *)answData;
1096 
1097  // If any, the URL the data pool entry point will be stored here
1098  fBuffer = "";
1099  if (xrsp) {
1100 
1101  //
1102  // Pointer to data
1103  void *pdata = (void *)(xrsp->GetData());
1104  Int_t len = xrsp->DataLen();
1105 
1106  if (len >= (Int_t)sizeof(kXR_int32)) {
1107  // The first 4 bytes contain the session ID
1108  kXR_int32 psid = 0;
1109  memcpy(&psid, pdata, sizeof(kXR_int32));
1110  fSessionID = net2host(psid);
1111  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
1112  len -= sizeof(kXR_int32);
1113  } else {
1114  Error("Create","session ID is undefined!");
1115  fSessionID = -1;
1116  if (srvresp) free(srvresp);
1117  return kFALSE;
1118  }
1119 
1120  if (len >= (Int_t)sizeof(kXR_int16)) {
1121  // The second 2 bytes contain the remote PROOF protocol version
1122  kXR_int16 dver = 0;
1123  memcpy(&dver, pdata, sizeof(kXR_int16));
1124  fRemoteProtocol = net2host(dver);
1125  pdata = (void *)((char *)pdata + sizeof(kXR_int16));
1126  len -= sizeof(kXR_int16);
1127  } else {
1128  Warning("Create","protocol version of the remote PROOF undefined!");
1129  }
1130 
1131  if (fRemoteProtocol == 0) {
1132  // We are dealing with an older server: the PROOF protocol is on 4 bytes
1133  len += sizeof(kXR_int16);
1134  kXR_int32 dver = 0;
1135  memcpy(&dver, pdata, sizeof(kXR_int32));
1136  fRemoteProtocol = net2host(dver);
1137  pdata = (void *)((char *)pdata + sizeof(kXR_int32));
1138  len -= sizeof(kXR_int32);
1139  } else {
1140  if (len >= (Int_t)sizeof(kXR_int16)) {
1141  // The third 2 bytes contain the remote XrdProofdProtocol version
1142  kXR_int16 dver = 0;
1143  memcpy(&dver, pdata, sizeof(kXR_int16));
1144  fXrdProofdVersion = net2host(dver);
1145  pdata = (void *)((char *)pdata + sizeof(kXR_int16));
1146  len -= sizeof(kXR_int16);
1147  } else {
1148  Warning("Create","version of the remote XrdProofdProtocol undefined!");
1149  }
1150  }
1151 
1152  if (len > 0) {
1153  // From top masters, the url of the data pool
1154  char *url = new char[len+1];
1155  memcpy(url, pdata, len);
1156  url[len] = 0;
1157  fBuffer = url;
1158  delete[] url;
1159  }
1160 
1161  // Cleanup
1162  SafeDelete(xrsp);
1163  if (srvresp) free(srvresp);
1164 
1165  // Notify
1166  return kTRUE;
1167  } else {
1168  // Extract log file path, if any
1169  Ssiz_t ilog = kNPOS;
1170  if (retriesleft <= 0 && fConn->GetLastErr()) {
1171  fBuffer = fConn->GetLastErr();
1172  if ((ilog = fBuffer.Index("|log:")) != kNPOS) fBuffer.Remove(0, ilog);
1173  }
1174  // If not free resources now, just give up
1175  if (fConn->GetOpenError() == kXP_TooManySess) {
1176  // Avoid to contact the server any more
1177  fSessionID = -1;
1178  if (srvresp) free(srvresp);
1179  return kFALSE;
1180  } else {
1181  // Print error msg, if any
1182  if ((retriesleft <= 0 || gDebug > 0) && fConn->GetLastErr()) {
1183  TString emsg(fConn->GetLastErr());
1184  if ((ilog = emsg.Index("|log:")) != kNPOS) emsg.Remove(ilog);
1185  Printf("%s: %s", fHost.Data(), emsg.Data());
1186  }
1187  }
1188  }
1189 
1190  if (gDebug > 0)
1191  Info("Create", "creation/attachment attempt failed: %d attempts left", retriesleft);
1192  if (retriesleft <= 0)
1193  Error("Create", "%d creation/attachment attempts failed: no attempts left",
1194  gEnv->GetValue("XProof.CreationRetries", 4));
1195 
1196  if (srvresp) free(srvresp);
1197  } // Creation retries
1198 
1199  // The session is invalid: reset the sessionID to invalid state (it was our protocol
1200  // number during creation
1201  fSessionID = -1;
1202 
1203  // Notify failure
1204  Error("Create:",
1205  "problems creating or attaching to a remote server (%s)",
1206  ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
1207  return kFALSE;
1208 }
1209 
1210 ////////////////////////////////////////////////////////////////////////////////
1211 /// Send a raw buffer of specified length.
1212 /// Use opt = kDontBlock to ask xproofd to push the message into the proofsrv.
1213 /// (by default is appended to a queue waiting for a request from proofsrv).
1214 /// Returns the number of bytes sent or -1 in case of error.
1215 
1217 {
1219 
1220  // Options and request ID
1221  fSendOpt = (opt == kDontBlock) ? (kXPD_async | fSendOpt)
1222  : (~kXPD_async & fSendOpt) ;
1223 
1224  // Prepare request
1225  XPClientRequest Request;
1226  memset( &Request, 0, sizeof(Request) );
1227  fConn->SetSID(Request.header.streamid);
1228  Request.sendrcv.requestid = kXP_sendmsg;
1229  Request.sendrcv.sid = fSessionID;
1230  Request.sendrcv.opt = fSendOpt;
1231  Request.sendrcv.cid = GetClientID();
1232  Request.sendrcv.dlen = length;
1233  if (gDebug >= 2)
1234  Info("SendRaw", "sending %d bytes to server", Request.sendrcv.dlen);
1235 
1236  // Send request
1237  XrdClientMessage *xrsp = fConn->SendReq(&Request, buffer, 0, "SendRaw");
1238 
1239  if (xrsp) {
1240  // Prepare return info
1241  Int_t nsent = length;
1242 
1243  // Update counters
1244  fBytesSent += length;
1245 
1246  // Cleanup
1247  SafeDelete(xrsp);
1248 
1249  // Success: update usage timestamp
1250  Touch();
1251 
1252  // ok
1253  return nsent;
1254  } else {
1255  // Print error message, if any
1256  if (fConn->GetLastErr())
1257  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1258  else
1259  Printf("%s: error occured but no message from server", fHost.Data());
1260  }
1261 
1262  // Failure notification (avoid using the handler: we may be exiting)
1263  Error("SendRaw", "%s: problems sending %d bytes to server",
1264  fHost.Data(), length);
1265  return -1;
1266 }
1267 
1268 ////////////////////////////////////////////////////////////////////////////////
1269 /// Ping functionality: contact the server to check its vitality.
1270 /// If external, the server waits for a reply from the server
1271 /// Returns kTRUE if OK or kFALSE in case of error.
1272 
1273 Bool_t TXSocket::Ping(const char *ord)
1274 {
1276 
1277  if (gDebug > 0)
1278  Info("Ping","%p: %s: sid: %d", this, ord ? ord : "int", fSessionID);
1279 
1280  // Make sure we are connected
1281  if (!IsValid()) {
1282  Error("Ping","not connected: nothing to do");
1283  return kFALSE;
1284  }
1285 
1286  // Options
1287  kXR_int32 options = (fMode == 'i') ? kXPD_internal : 0;
1288 
1289  // Prepare request
1290  XPClientRequest Request;
1291  memset( &Request, 0, sizeof(Request) );
1292  fConn->SetSID(Request.header.streamid);
1293  Request.sendrcv.requestid = kXP_ping;
1294  Request.sendrcv.sid = fSessionID;
1295  Request.sendrcv.opt = options;
1296  Request.sendrcv.dlen = 0;
1297 
1298  // Send request
1299  Bool_t res = kFALSE;
1300  if (fMode != 'i') {
1301  char *pans = 0;
1302  XrdClientMessage *xrsp =
1303  fConn->SendReq(&Request, (const void *)0, &pans, "Ping");
1304  kXR_int32 *pres = (kXR_int32 *) pans;
1305 
1306  // Get the result
1307  if (xrsp && xrsp->HeaderStatus() == kXR_ok) {
1308  *pres = net2host(*pres);
1309  res = (*pres == 1) ? kTRUE : kFALSE;
1310  // Success: update usage timestamp
1311  Touch();
1312  } else {
1313  // Print error msg, if any
1314  if (fConn->GetLastErr())
1315  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1316  }
1317 
1318  // Cleanup
1319  SafeDelete(xrsp);
1320  if (pans) free(pans);
1321 
1322  } else {
1323  if (XPD::clientMarshall(&Request) == 0) {
1324  XReqErrorType e = fConn->LowWrite(&Request, 0, 0);
1325  res = (e == kOK) ? kTRUE : kFALSE;
1326  } else {
1327  Error("Ping", "%p: int: problems marshalling request", this);
1328  }
1329  }
1330 
1331  // Failure notification (avoid using the handler: we may be exiting)
1332  if (!res) {
1333  Error("Ping", "%p: %s: problems sending ping to server", this, ord ? ord : "int");
1334  } else if (gDebug > 0) {
1335  Info("Ping","%p: %s: sid: %d OK", this, ord ? ord : "int", fSessionID);
1336  }
1337 
1338  return res;
1339 }
1340 
1341 ////////////////////////////////////////////////////////////////////////////////
1342 /// Remote touch functionality: contact the server to proof our vitality.
1343 /// No reply from server is expected.
1344 
1346 {
1348 
1349  if (gDebug > 0)
1350  Info("RemoteTouch","%p: sending touch request to %s", this, GetName());
1351 
1352  // Make sure we are connected
1353  if (!IsValid()) {
1354  Error("RemoteTouch","not connected: nothing to do");
1355  return;
1356  }
1357 
1358  // Prepare request
1359  XPClientRequest Request;
1360  memset( &Request, 0, sizeof(Request) );
1361  fConn->SetSID(Request.header.streamid);
1362  Request.sendrcv.requestid = kXP_touch;
1363  Request.sendrcv.sid = fSessionID;
1364  Request.sendrcv.opt = 0;
1365  Request.sendrcv.dlen = 0;
1366 
1367  // We need the right order
1368  if (XPD::clientMarshall(&Request) != 0) {
1369  Error("Touch", "%p: problems marshalling request ", this);
1370  return;
1371  }
1372  if (fConn->LowWrite(&Request, 0, 0) != kOK)
1373  Error("Touch", "%p: problems sending touch request to server", this);
1374 
1375  // Done
1376  return;
1377 }
1378 
1379 ////////////////////////////////////////////////////////////////////////////////
1380 /// Interrupt the remote protocol instance. Used to propagate Ctrl-C.
1381 /// No reply from server is expected.
1382 
1384 {
1386 
1387  if (gDebug > 0)
1388  Info("CtrlC","%p: sending ctrl-c request to %s", this, GetName());
1389 
1390  // Make sure we are connected
1391  if (!IsValid()) {
1392  Error("CtrlC","not connected: nothing to do");
1393  return;
1394  }
1395 
1396  // Prepare request
1397  XPClientRequest Request;
1398  memset( &Request, 0, sizeof(Request) );
1399  fConn->SetSID(Request.header.streamid);
1400  Request.proof.requestid = kXP_ctrlc;
1401  Request.proof.sid = 0;
1402  Request.proof.dlen = 0;
1403 
1404  // We need the right order
1405  if (XPD::clientMarshall(&Request) != 0) {
1406  Error("CtrlC", "%p: problems marshalling request ", this);
1407  return;
1408  }
1409  if (fConn->LowWrite(&Request, 0, 0) != kOK)
1410  Error("CtrlC", "%p: problems sending ctrl-c request to server", this);
1411 
1412  // Done
1413  return;
1414 }
1415 
1416 ////////////////////////////////////////////////////////////////////////////////
1417 /// Wait and pick-up next buffer from the asynchronous queue
1418 
1420 {
1421  fBufCur = 0;
1422  fByteLeft = 0;
1423  fByteCur = 0;
1424  if (gDebug > 2)
1425  Info("PickUpReady", "%p: %s: going to sleep", this, GetTitle());
1426 
1427  // User can choose whether to wait forever or for a fixed amount of time
1428  if (!fDontTimeout) {
1429  static Int_t timeout = gEnv->GetValue("XProof.ReadTimeout", 300) * 1000;
1430  static Int_t dt = 2000;
1431  Int_t to = timeout;
1433  while (to && !IsInterrupt()) {
1434  SetAWait(kTRUE);
1435  if (fASem.Wait(dt) != 0) {
1436  to -= dt;
1437  if (to <= 0) {
1438  Error("PickUpReady","error waiting at semaphore");
1439  return -1;
1440  } else {
1441  if (gDebug > 0)
1442  Info("PickUpReady", "%p: %s: got timeout: retring (%d secs)",
1443  this, GetTitle(), to/1000);
1444  }
1445  } else
1446  break;
1447  SetAWait(kFALSE);
1448  }
1449  // We wait forever
1450  if (IsInterrupt()) {
1451  if (gDebug > 2)
1452  Info("PickUpReady","interrupted");
1454  SetAWait(kFALSE);
1455  return -1;
1456  }
1457  } else {
1458  // We wait forever
1459  SetAWait(kTRUE);
1460  if (fASem.Wait() != 0) {
1461  Error("PickUpReady","error waiting at semaphore");
1462  SetAWait(kFALSE);
1463  return -1;
1464  }
1465  SetAWait(kFALSE);
1466  }
1467  if (gDebug > 2)
1468  Info("PickUpReady", "%p: %s: waken up", this, GetTitle());
1469 
1471 
1472  // Get message, if any
1473  if (fAQue.size() <= 0) {
1474  Error("PickUpReady","queue is empty - protocol error ?");
1475  return -1;
1476  }
1477  if (!(fBufCur = fAQue.front())) {
1478  Error("PickUpReady","got invalid buffer - protocol error ?");
1479  return -1;
1480  }
1481  // Remove message from the queue
1482  fAQue.pop_front();
1483 
1484  // Set number of available bytes
1485  fByteLeft = fBufCur->fLen;
1486 
1487  if (gDebug > 2)
1488  Info("PickUpReady", "%p: %s: got message (%d bytes)",
1489  this, GetTitle(), (Int_t)(fBufCur ? fBufCur->fLen : 0));
1490 
1491  // Update counters
1492  fBytesRecv += fBufCur->fLen;
1493 
1494  // Set session ID
1495  if (fBufCur->fCid > -1 && fBufCur->fCid != GetClientID())
1497 
1498  // Clean entry in the underlying pipe
1499  fgPipe.Clean(this);
1500 
1501  // We are done
1502  return 0;
1503 }
1504 
1505 ////////////////////////////////////////////////////////////////////////////////
1506 /// Pop-up a buffer of at least size bytes from the spare list
1507 /// If none is found either one is reallocated or a new one
1508 /// created
1509 
1511 {
1512  TXSockBuf *buf = 0;
1513  static Int_t nBuf = 0;
1514 
1515 
1516  R__LOCKGUARD(&fgSMtx);
1517 
1518 
1519  Int_t maxsz = 0;
1520  if (fgSQue.size() > 0) {
1521  list<TXSockBuf *>::iterator i;
1522  for (i = fgSQue.begin(); i != fgSQue.end(); i++) {
1523  maxsz = ((*i)->fSiz > maxsz) ? (*i)->fSiz : maxsz;
1524  if ((*i) && (*i)->fSiz >= size) {
1525  buf = *i;
1526  if (gDebug > 2)
1527  Info("PopUpSpare","asked: %d, spare: %d/%d, REUSE buf %p, sz: %d",
1528  size, (int) fgSQue.size(), nBuf, buf, buf->fSiz);
1529  // Drop from this list
1530  fgSQue.erase(i);
1531  return buf;
1532  }
1533  }
1534  // All buffers are too small: enlarge the first one
1535  buf = fgSQue.front();
1536  buf->Resize(size);
1537  if (gDebug > 2)
1538  Info("PopUpSpare","asked: %d, spare: %d/%d, maxsz: %d, RESIZE buf %p, sz: %d",
1539  size, (int) fgSQue.size(), nBuf, maxsz, buf, buf->fSiz);
1540  // Drop from this list
1541  fgSQue.pop_front();
1542  return buf;
1543  }
1544 
1545  // Create a new buffer
1546  buf = new TXSockBuf((char *)malloc(size), size);
1547  nBuf++;
1548 
1549  if (gDebug > 2)
1550  Info("PopUpSpare","asked: %d, spare: %d/%d, maxsz: %d, NEW buf %p, sz: %d",
1551  size, (int) fgSQue.size(), nBuf, maxsz, buf, buf->fSiz);
1552 
1553  // We are done
1554  return buf;
1555 }
1556 
1557 ////////////////////////////////////////////////////////////////////////////////
1558 /// Release read buffer giving back to the spare list
1559 
1561 {
1562  R__LOCKGUARD(&fgSMtx);
1563 
1564  if (gDebug > 2)
1565  Info("PushBackSpare","release buf %p, sz: %d (BuffMem: %lld)",
1567 
1569  fgSQue.push_back(fBufCur);
1570  } else {
1571  delete fBufCur;
1572  }
1573  fBufCur = 0;
1574  fByteCur = 0;
1575  fByteLeft = 0;
1576 }
1577 
1578 ////////////////////////////////////////////////////////////////////////////////
1579 /// Receive a raw buffer of specified length bytes.
1580 
1582 {
1583  // Inputs must make sense
1584  if (!buffer || (length <= 0))
1585  return -1;
1586 
1587  // Wait and pick-up a read buffer if we do not have one
1588  if (!fBufCur && (PickUpReady() != 0))
1589  return -1;
1590 
1591  // Use it
1592  if (fByteLeft >= length) {
1593  memcpy(buffer, fBufCur->fBuf + fByteCur, length);
1594  fByteCur += length;
1595  if ((fByteLeft -= length) <= 0)
1596  // All used: give back
1597  PushBackSpare();
1598  // Success: update usage timestamp
1599  Touch();
1600  return length;
1601  } else {
1602  // Copy the first part
1603  memcpy(buffer, fBufCur->fBuf + fByteCur, fByteLeft);
1604  Int_t at = fByteLeft;
1605  Int_t tobecopied = length - fByteLeft;
1606  PushBackSpare();
1607  while (tobecopied > 0) {
1608  // Pick-up next buffer (it may wait inside)
1609  if (PickUpReady() != 0)
1610  return -1;
1611  // Copy the fresh meat
1612  Int_t ncpy = (fByteLeft > tobecopied) ? tobecopied : fByteLeft;
1613  memcpy((void *)((Char_t *)buffer+at), fBufCur->fBuf, ncpy);
1614  fByteCur = ncpy;
1615  if ((fByteLeft -= ncpy) <= 0)
1616  // All used: give back
1617  PushBackSpare();
1618  // Recalculate
1619  tobecopied -= ncpy;
1620  at += ncpy;
1621  }
1622  }
1623 
1624  // Update counters
1625  fBytesRecv += length;
1626  fgBytesRecv += length;
1627 
1628  // Success: update usage timestamp
1629  Touch();
1630 
1631  return length;
1632 }
1633 
1634 ////////////////////////////////////////////////////////////////////////////////
1635 /// Send urgent message (interrupt) to remote server
1636 /// Returns 0 or -1 in case of error.
1637 
1639 {
1641 
1642  // Prepare request
1643  XPClientRequest Request;
1644  memset(&Request, 0, sizeof(Request) );
1645  fConn->SetSID(Request.header.streamid);
1646  if (type == (Int_t) TProof::kShutdownInterrupt)
1647  Request.interrupt.requestid = kXP_destroy;
1648  else
1649  Request.interrupt.requestid = kXP_interrupt;
1650  Request.interrupt.sid = fSessionID;
1651  Request.interrupt.type = type; // type of interrupt (see TProof::EUrgent)
1652  Request.interrupt.dlen = 0;
1653 
1654  // Send request
1655  XrdClientMessage *xrsp =
1656  fConn->SendReq(&Request, (const void *)0, 0, "SendInterrupt");
1657  if (xrsp) {
1658  // Success: update usage timestamp
1659  Touch();
1660  // Cleanup
1661  SafeDelete(xrsp);
1662  // ok
1663  return 0;
1664  } else {
1665  // Print error msg, if any
1666  if (fConn->GetLastErr())
1667  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1668  }
1669 
1670  // Failure notification (avoid using the handler: we may be exiting)
1671  Error("SendInterrupt", "problems sending interrupt to server");
1672  return -1;
1673 }
1674 
1675 ////////////////////////////////////////////////////////////////////////////////
1676 
1678 {
1680  fRDInterrupt = i;
1681  if (i && fConn) fConn->SetInterrupt();
1682  if (i && fAWait) fASem.Post();
1683 }
1684 
1685 ////////////////////////////////////////////////////////////////////////////////
1686 /// Send a TMessage object. Returns the number of bytes in the TMessage
1687 /// that were sent and -1 in case of error.
1688 
1690 {
1692 
1693  if (mess.IsReading()) {
1694  Error("Send", "cannot send a message used for reading");
1695  return -1;
1696  }
1697 
1698  // send streamer infos in case schema evolution is enabled in the TMessage
1699  SendStreamerInfos(mess);
1700 
1701  // send the process id's so TRefs work
1702  SendProcessIDs(mess);
1703 
1704  mess.SetLength(); //write length in first word of buffer
1705 
1706  if (GetCompressionLevel() > 0 && mess.GetCompressionLevel() == 0)
1707  const_cast<TMessage&>(mess).SetCompressionSettings(fCompress);
1708 
1709  if (mess.GetCompressionLevel() > 0)
1710  const_cast<TMessage&>(mess).Compress();
1711 
1712  char *mbuf = mess.Buffer();
1713  Int_t mlen = mess.Length();
1714  if (mess.CompBuffer()) {
1715  mbuf = mess.CompBuffer();
1716  mlen = mess.CompLength();
1717  }
1718 
1719  // Parse message type to choose sending options
1720  kXR_int32 fSendOptDefault = fSendOpt;
1721  switch (mess.What()) {
1722  case kPROOF_PROCESS:
1724  break;
1725  case kPROOF_PROGRESS:
1726  case kPROOF_FEEDBACK:
1728  break;
1729  case kPROOF_QUERYSUBMITTED:
1732  break;
1733  case kPROOF_STARTPROCESS:
1736  break;
1737  case kPROOF_STOPPROCESS:
1739  break;
1740  case kPROOF_SETIDLE:
1743  break;
1744  case kPROOF_LOGFILE:
1745  case kPROOF_LOGDONE:
1746  if (GetClientIDSize() <= 1)
1747  fSendOpt |= kXPD_logmsg;
1748  break;
1749  default:
1750  break;
1751  }
1752 
1753  if (gDebug > 2)
1754  Info("Send", "sending type %d (%d bytes) to '%s'", mess.What(), mlen, GetTitle());
1755 
1756  Int_t nsent = SendRaw(mbuf, mlen);
1757  fSendOpt = fSendOptDefault;
1758 
1759  if (nsent <= 0)
1760  return nsent;
1761 
1762  fBytesSent += nsent;
1763  fgBytesSent += nsent;
1764 
1765  return nsent - sizeof(UInt_t); //length - length header
1766 }
1767 
1768 ////////////////////////////////////////////////////////////////////////////////
1769 /// Receive a TMessage object. The user must delete the TMessage object.
1770 /// Returns length of message in bytes (can be 0 if other side of connection
1771 /// is closed) or -1 in case of error or -5 if pipe broken (connection invalid).
1772 /// In those case mess == 0.
1773 
1775 {
1777 
1778  if (!IsValid()) {
1779  mess = 0;
1780  return -5;
1781  }
1782 
1783 oncemore:
1784  Int_t n;
1785  UInt_t len;
1786  if ((n = RecvRaw(&len, sizeof(UInt_t))) <= 0) {
1787  mess = 0;
1788  return n;
1789  }
1790  len = net2host(len); //from network to host byte order
1791 
1792  char *buf = new char[len+sizeof(UInt_t)];
1793  if ((n = RecvRaw(buf+sizeof(UInt_t), len)) <= 0) {
1794  delete [] buf;
1795  mess = 0;
1796  return n;
1797  }
1798 
1799  fBytesRecv += n + sizeof(UInt_t);
1800  fgBytesRecv += n + sizeof(UInt_t);
1801 
1802  mess = new TMessage(buf, len+sizeof(UInt_t));
1803 
1804  // receive any streamer infos
1805  if (RecvStreamerInfos(mess))
1806  goto oncemore;
1807 
1808  // receive any process ids
1809  if (RecvProcessIDs(mess))
1810  goto oncemore;
1811 
1812  if (mess->What() & kMESS_ACK) {
1813  // Acknowledgement embedded: ignore ...
1814  mess->SetWhat(mess->What() & ~kMESS_ACK);
1815  }
1816 
1817  return n;
1818 }
1819 
1820 ////////////////////////////////////////////////////////////////////////////////
1821 /// Send message to intermediate coordinator.
1822 /// If any output is due, this is returned as an obj string to be
1823 /// deleted by the caller
1824 
1825 TObjString *TXSocket::SendCoordinator(Int_t kind, const char *msg, Int_t int2,
1826  Long64_t l64, Int_t int3, const char *)
1827 {
1828  TObjString *sout = 0;
1829 
1830  // We fill the header struct containing the request
1831  XPClientRequest reqhdr;
1832  const void *buf = 0;
1833  char *bout = 0;
1834  char **vout = 0;
1835  memset(&reqhdr, 0, sizeof(reqhdr));
1836  fConn->SetSID(reqhdr.header.streamid);
1837  reqhdr.header.requestid = kXP_admin;
1838  reqhdr.proof.int1 = kind;
1839  reqhdr.proof.int2 = int2;
1840  switch (kind) {
1841  case kQueryMssUrl:
1842  case kQueryROOTVersions:
1843  case kQuerySessions:
1844  case kQueryWorkers:
1845  reqhdr.proof.sid = 0;
1846  reqhdr.header.dlen = 0;
1847  vout = (char **)&bout;
1848  break;
1849  case kCleanupSessions:
1850  reqhdr.proof.int2 = (int2 == 1) ? (kXR_int32) kXPD_AnyServer
1851  : (kXR_int32) kXPD_TopMaster;
1852  reqhdr.proof.int3 = int2;
1853  reqhdr.proof.sid = fSessionID;
1854  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1855  buf = (msg) ? (const void *)msg : buf;
1856  break;
1857  case kCpFile:
1858  case kGetFile:
1859  case kPutFile:
1860  case kExec:
1861  reqhdr.proof.sid = fSessionID;
1862  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1863  buf = (msg) ? (const void *)msg : buf;
1864  vout = (char **)&bout;
1865  break;
1866  case kQueryLogPaths:
1867  vout = (char **)&bout;
1868  reqhdr.proof.int3 = int3;
1869  case kReleaseWorker:
1870  case kSendMsgToUser:
1871  case kGroupProperties:
1872  case kSessionTag:
1873  case kSessionAlias:
1874  reqhdr.proof.sid = fSessionID;
1875  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1876  buf = (msg) ? (const void *)msg : buf;
1877  break;
1878  case kROOTVersion:
1879  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1880  buf = (msg) ? (const void *)msg : buf;
1881  break;
1882  case kGetWorkers:
1883  reqhdr.proof.sid = fSessionID;
1884  reqhdr.header.dlen = (msg) ? strlen(msg) : 0;
1885  if (msg)
1886  buf = (const void *)msg;
1887  vout = (char **)&bout;
1888  break;
1889  case kReadBuffer:
1890  reqhdr.header.requestid = kXP_readbuf;
1891  reqhdr.readbuf.ofs = l64;
1892  reqhdr.readbuf.len = int2;
1893  if (int3 > 0 && fXrdProofdVersion < 1003) {
1894  Info("SendCoordinator", "kReadBuffer: old server (ver %d < 1003):"
1895  " grep functionality not supported", fXrdProofdVersion);
1896  return sout;
1897  }
1898  reqhdr.readbuf.int1 = int3;
1899  if (!msg || strlen(msg) <= 0) {
1900  Info("SendCoordinator", "kReadBuffer: file path undefined");
1901  return sout;
1902  }
1903  reqhdr.header.dlen = strlen(msg);
1904  buf = (const void *)msg;
1905  vout = (char **)&bout;
1906  break;
1907  default:
1908  Info("SendCoordinator", "unknown message kind: %d", kind);
1909  return sout;
1910  }
1911 
1912  // server response header
1913  Bool_t noterr = (gDebug > 0) ? kTRUE : kFALSE;
1914  XrdClientMessage *xrsp =
1915  fConn->SendReq(&reqhdr, buf, vout, "TXSocket::SendCoordinator", noterr);
1916 
1917  // If positive answer
1918  if (xrsp) {
1919  // Check if we need to create an output string
1920  if (bout && (xrsp->DataLen() > 0))
1921  sout = new TObjString(TString(bout,xrsp->DataLen()));
1922  if (bout)
1923  free(bout);
1924  // Success: update usage timestamp
1925  Touch();
1926  SafeDelete(xrsp);
1927  } else {
1928  // Print error msg, if any
1929  if (fConn->GetLastErr())
1930  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1931  }
1932 
1933  // Failure notification (avoid using the handler: we may be exiting)
1934  return sout;
1935 }
1936 
1937 ////////////////////////////////////////////////////////////////////////////////
1938 /// Send urgent message to counterpart; 'type' specifies the type of
1939 /// the message (see TXSocket::EUrgentMsgType), and 'int1', 'int2'
1940 /// two containers for additional information.
1941 
1943 {
1945 
1946  // Prepare request
1947  XPClientRequest Request;
1948  memset(&Request, 0, sizeof(Request) );
1949  fConn->SetSID(Request.header.streamid);
1950  Request.proof.requestid = kXP_urgent;
1951  Request.proof.sid = fSessionID;
1952  Request.proof.int1 = type; // type of urgent msg (see TXSocket::EUrgentMsgType)
1953  Request.proof.int2 = int1; // 4-byte container info 1
1954  Request.proof.int3 = int2; // 4-byte container info 2
1955  Request.proof.dlen = 0;
1956 
1957  // Send request
1958  XrdClientMessage *xrsp =
1959  fConn->SendReq(&Request, (const void *)0, 0, "SendUrgent");
1960  if (xrsp) {
1961  // Success: update usage timestamp
1962  Touch();
1963  // Cleanup
1964  SafeDelete(xrsp);
1965  } else {
1966  // Print error msg, if any
1967  if (fConn->GetLastErr())
1968  Printf("%s: %s", fHost.Data(), fConn->GetLastErr());
1969  }
1970 
1971  // Done
1972  return;
1973 }
1974 
1975 ////////////////////////////////////////////////////////////////////////////////
1976 
1978  return (fConn ? fConn->GetLowSocket() : -1);
1979 }
1980 
1981 ////////////////////////////////////////////////////////////////////////////////
1982 /// Init environment variables for XrdClient
1983 
1985 {
1986  // Set debug level
1987  Int_t deb = gEnv->GetValue("XProof.Debug", -1);
1988  EnvPutInt(NAME_DEBUG, deb);
1989  if (deb > 0) {
1990  XrdProofdTrace->What |= TRACE_REQ;
1991  if (deb > 1) {
1992  XrdProofdTrace->What |= TRACE_DBG;
1993  if (deb > 2)
1994  XrdProofdTrace->What |= TRACE_ALL;
1995  }
1996  }
1997  const char *cenv = 0;
1998 
1999  // List of domains where connection is allowed
2000  TString allowCO = gEnv->GetValue("XProof.ConnectDomainAllowRE", "");
2001  if (allowCO.Length() > 0)
2002  EnvPutString(NAME_CONNECTDOMAINALLOW_RE, allowCO.Data());
2003 
2004  // List of domains where connection is denied
2005  TString denyCO = gEnv->GetValue("XProof.ConnectDomainDenyRE", "");
2006  if (denyCO.Length() > 0)
2007  EnvPutString(NAME_CONNECTDOMAINDENY_RE, denyCO.Data());
2008 
2009  // Max number of retries on first connect and related timeout
2011  Int_t maxRetries = gEnv->GetValue("XProof.FirstConnectMaxCnt",5);
2012  EnvPutInt(NAME_FIRSTCONNECTMAXCNT, maxRetries);
2013  Int_t connTO = gEnv->GetValue("XProof.ConnectTimeout", 2);
2014  EnvPutInt(NAME_CONNECTTIMEOUT, connTO);
2015 
2016  // Reconnect Wait
2017  Int_t recoTO = gEnv->GetValue("XProof.ReconnectWait",
2018  DFLT_RECONNECTWAIT);
2019  if (recoTO == DFLT_RECONNECTWAIT) {
2020  // Check also the old variable name
2021  recoTO = gEnv->GetValue("XProof.ReconnectTimeout",
2022  DFLT_RECONNECTWAIT);
2023  }
2024  EnvPutInt(NAME_RECONNECTWAIT, recoTO);
2025 
2026  // Request Timeout
2027  Int_t requTO = gEnv->GetValue("XProof.RequestTimeout", 150);
2028  EnvPutInt(NAME_REQUESTTIMEOUT, requTO);
2029 
2030  // No automatic proofd backward-compatibility
2031  EnvPutInt(NAME_KEEPSOCKOPENIFNOTXRD, 0);
2032 
2033  // Dynamic forwarding (SOCKS4)
2034  TString socks4Host = gEnv->GetValue("XNet.SOCKS4Host","");
2035  Int_t socks4Port = gEnv->GetValue("XNet.SOCKS4Port",-1);
2036  if (socks4Port > 0) {
2037  if (socks4Host.IsNull())
2038  // Default
2039  socks4Host = "127.0.0.1";
2040  EnvPutString(NAME_SOCKS4HOST, socks4Host.Data());
2041  EnvPutInt(NAME_SOCKS4PORT, socks4Port);
2042  }
2043 
2044  // For password-based authentication
2045  TString autolog = gEnv->GetValue("XSec.Pwd.AutoLogin","1");
2046  if (autolog.Length() > 0 &&
2047  (!(cenv = gSystem->Getenv("XrdSecPWDAUTOLOG")) || strlen(cenv) <= 0))
2048  gSystem->Setenv("XrdSecPWDAUTOLOG",autolog.Data());
2049 
2050  // For password-based authentication
2051  TString netrc;
2052  netrc.Form("%s/.rootnetrc",gSystem->HomeDirectory());
2053  gSystem->Setenv("XrdSecNETRC", netrc.Data());
2054 
2055  TString alogfile = gEnv->GetValue("XSec.Pwd.ALogFile","");
2056  if (alogfile.Length() > 0)
2057  gSystem->Setenv("XrdSecPWDALOGFILE",alogfile.Data());
2058 
2059  TString verisrv = gEnv->GetValue("XSec.Pwd.VerifySrv","1");
2060  if (verisrv.Length() > 0 &&
2061  (!(cenv = gSystem->Getenv("XrdSecPWDVERIFYSRV")) || strlen(cenv) <= 0))
2062  gSystem->Setenv("XrdSecPWDVERIFYSRV",verisrv.Data());
2063 
2064  TString srvpuk = gEnv->GetValue("XSec.Pwd.ServerPuk","");
2065  if (srvpuk.Length() > 0)
2066  gSystem->Setenv("XrdSecPWDSRVPUK",srvpuk.Data());
2067 
2068  // For GSI authentication
2069  TString cadir = gEnv->GetValue("XSec.GSI.CAdir","");
2070  if (cadir.Length() > 0)
2071  gSystem->Setenv("XrdSecGSICADIR",cadir.Data());
2072 
2073  TString crldir = gEnv->GetValue("XSec.GSI.CRLdir","");
2074  if (crldir.Length() > 0)
2075  gSystem->Setenv("XrdSecGSICRLDIR",crldir.Data());
2076 
2077  TString crlext = gEnv->GetValue("XSec.GSI.CRLextension","");
2078  if (crlext.Length() > 0)
2079  gSystem->Setenv("XrdSecGSICRLEXT",crlext.Data());
2080 
2081  TString ucert = gEnv->GetValue("XSec.GSI.UserCert","");
2082  if (ucert.Length() > 0)
2083  gSystem->Setenv("XrdSecGSIUSERCERT",ucert.Data());
2084 
2085  TString ukey = gEnv->GetValue("XSec.GSI.UserKey","");
2086  if (ukey.Length() > 0)
2087  gSystem->Setenv("XrdSecGSIUSERKEY",ukey.Data());
2088 
2089  TString upxy = gEnv->GetValue("XSec.GSI.UserProxy","");
2090  if (upxy.Length() > 0)
2091  gSystem->Setenv("XrdSecGSIUSERPROXY",upxy.Data());
2092 
2093  TString valid = gEnv->GetValue("XSec.GSI.ProxyValid","");
2094  if (valid.Length() > 0)
2095  gSystem->Setenv("XrdSecGSIPROXYVALID",valid.Data());
2096 
2097  TString deplen = gEnv->GetValue("XSec.GSI.ProxyForward","0");
2098  if (deplen.Length() > 0 &&
2099  (!(cenv = gSystem->Getenv("XrdSecGSIPROXYDEPLEN")) || strlen(cenv) <= 0))
2100  gSystem->Setenv("XrdSecGSIPROXYDEPLEN",deplen.Data());
2101 
2102  TString pxybits = gEnv->GetValue("XSec.GSI.ProxyKeyBits","");
2103  if (pxybits.Length() > 0)
2104  gSystem->Setenv("XrdSecGSIPROXYKEYBITS",pxybits.Data());
2105 
2106  TString crlcheck = gEnv->GetValue("XSec.GSI.CheckCRL","1");
2107  if (crlcheck.Length() > 0 &&
2108  (!(cenv = gSystem->Getenv("XrdSecGSICRLCHECK")) || strlen(cenv) <= 0))
2109  gSystem->Setenv("XrdSecGSICRLCHECK",crlcheck.Data());
2110 
2111  TString delegpxy = gEnv->GetValue("XSec.GSI.DelegProxy","0");
2112  if (delegpxy.Length() > 0 &&
2113  (!(cenv = gSystem->Getenv("XrdSecGSIDELEGPROXY")) || strlen(cenv) <= 0))
2114  gSystem->Setenv("XrdSecGSIDELEGPROXY",delegpxy.Data());
2115 
2116  TString signpxy = gEnv->GetValue("XSec.GSI.SignProxy","1");
2117  if (signpxy.Length() > 0 &&
2118  (!(cenv = gSystem->Getenv("XrdSecGSISIGNPROXY")) || strlen(cenv) <= 0))
2119  gSystem->Setenv("XrdSecGSISIGNPROXY",signpxy.Data());
2120 
2121  // Print the tag, if required (only once)
2122  if (gEnv->GetValue("XNet.PrintTAG",0) == 1)
2123  ::Info("TXSocket","(C) 2005 CERN TXSocket (XPROOF client) %s",
2124  gROOT->GetVersion());
2125 
2126  // Only once
2127  fgInitDone = kTRUE;
2128 }
2129 
2130 ////////////////////////////////////////////////////////////////////////////////
2131 /// Try reconnection after failure
2132 
2134 {
2135  if (gDebug > 0) {
2136  Info("Reconnect", "%p (c:%p, v:%d): trying to reconnect to %s (logid: %d)",
2137  this, fConn, (fConn ? fConn->IsValid() : 0),
2138  fUrl.Data(), fConn->GetLogConnID());
2139  }
2140 
2141  Int_t tryreconnect = gEnv->GetValue("TXSocket.Reconnect", 0);
2142  if (tryreconnect == 0 || fXrdProofdVersion < 1005) {
2143  if (tryreconnect == 0)
2144  Info("Reconnect","%p: reconnection attempts explicitly disabled!", this);
2145  else
2146  Info("Reconnect","%p: server does not support reconnections (protocol: %d < 1005)",
2147  this, fXrdProofdVersion);
2148  return -1;
2149  }
2150 
2151  if (fConn) {
2152  if (gDebug > 0)
2153  Info("Reconnect", "%p: locking phyconn: %p", this, fConn->fPhyConn);
2154  fConn->ReConnect();
2155  if (fConn->IsValid()) {
2156  // Create new proofserv if not client manager or administrator or internal mode
2157  if (fMode == 'm' || fMode == 's' || fMode == 'M' || fMode == 'A') {
2158  // We attach or create
2159  if (!Create(kTRUE)) {
2160  // Failure
2161  Error("TXSocket", "create or attach failed (%s)",
2162  ((fConn->fLastErrMsg.length() > 0) ? fConn->fLastErrMsg.c_str() : "-"));
2163  Close();
2164  return -1;
2165  }
2166  }
2167  }
2168  }
2169 
2170  if (gDebug > 0) {
2171  if (fConn) {
2172  Info("Reconnect", "%p (c:%p): attempt %s (logid: %d)", this, fConn,
2173  (fConn->IsValid() ? "succeeded!" : "failed"),
2174  fConn->GetLogConnID() );
2175  } else {
2176  Info("Reconnect", "%p (c:0x0): attempt failed", this);
2177  }
2178  }
2179 
2180  // Done
2181  return ((fConn && fConn->IsValid()) ? 0 : -1);
2182 }
2183 
2184 ////////////////////////////////////////////////////////////////////////////////
2185 ///constructor
2186 
2188 {
2189  fBuf = fMem = bp;
2190  fSiz = fLen = sz;
2191  fOwn = own;
2192  fCid = -1;
2193  fgBuffMem += sz;
2194 }
2195 
2196 ////////////////////////////////////////////////////////////////////////////////
2197 ///destructor
2198 
2200 {
2201  if (fOwn && fMem) {
2202  free(fMem);
2203  fgBuffMem -= fSiz;
2204  }
2205 }
2206 
2207 ////////////////////////////////////////////////////////////////////////////////
2208 ///resize socket buffer
2209 
2211 {
2212  if (sz > fSiz) {
2213  if ((fMem = (Char_t *)realloc(fMem, sz))) {
2214  fgBuffMem += (sz - fSiz);
2215  fBuf = fMem;
2216  fSiz = sz;
2217  fLen = 0;
2218  }
2219  }
2220 }
2221 
2222 //_____________________________________________________________________________
2223 //
2224 // TXSockBuf static methods
2225 //
2226 
2227 ////////////////////////////////////////////////////////////////////////////////
2228 /// Return the currently allocated memory
2229 
2231 {
2232  return fgBuffMem;
2233 }
2234 
2235 ////////////////////////////////////////////////////////////////////////////////
2236 /// Return the max allocated memory allowed
2237 
2239 {
2240  return fgMemMax;
2241 }
2242 
2243 ////////////////////////////////////////////////////////////////////////////////
2244 /// Return the max allocated memory allowed
2245 
2247 {
2248  fgMemMax = memmax > 0 ? memmax : fgMemMax;
2249 }
2250 
2251 //_____________________________________________________________________________
2252 //
2253 // TXSockPipe
2254 //
2255 
2256 ////////////////////////////////////////////////////////////////////////////////
2257 /// Constructor
2258 
2259 TXSockPipe::TXSockPipe(const char *loc) : fMutex(kTRUE), fLoc(loc)
2260 {
2261  // Create the pipe
2262  if (pipe(fPipe) != 0) {
2263  Printf("TXSockPipe: problem initializing pipe for socket inputs");
2264  fPipe[0] = -1;
2265  fPipe[1] = -1;
2266  return;
2267  }
2268 }
2269 
2270 ////////////////////////////////////////////////////////////////////////////////
2271 /// Destructor
2272 
2274 {
2275  if (fPipe[0] >= 0) close(fPipe[0]);
2276  if (fPipe[1] >= 0) close(fPipe[1]);
2277 }
2278 
2279 
2280 ////////////////////////////////////////////////////////////////////////////////
2281 /// Write a byte to the global pipe to signal new availibility of
2282 /// new messages
2283 
2285 {
2286  if (!IsValid() || !s) return -1;
2287 
2288  // This must be an atomic action
2289  Int_t sz = 0;
2290  { R__LOCKGUARD(&fMutex);
2291  // Add this one
2292  fReadySock.Add(s);
2293 
2294  // Only one char
2295  Char_t c = 1;
2296  if (write(fPipe[1],(const void *)&c, sizeof(Char_t)) < 1) {
2297  Printf("TXSockPipe::Post: %s: can't notify pipe", fLoc.Data());
2298  return -1;
2299  }
2300  if (gDebug > 2) sz = fReadySock.GetSize();
2301  }
2302 
2303  if (gDebug > 2)
2304  Printf("TXSockPipe::Post: %s: %p: pipe posted (pending %d) (descriptor: %d)",
2305  fLoc.Data(), s, sz, fPipe[1]);
2306  // We are done
2307  return 0;
2308 }
2309 
2310 ////////////////////////////////////////////////////////////////////////////////
2311 /// Read a byte to the global pipe to synchronize message pickup
2312 
2314 {
2315  // Pipe must have been created
2316  if (!IsValid() || !s) return -1;
2317 
2318  // Only one char
2319  Int_t sz = 0;
2320  Char_t c = 0;
2321  { R__LOCKGUARD(&fMutex);
2322  if (read(fPipe[0],(void *)&c, sizeof(Char_t)) < 1) {
2323  Printf("TXSockPipe::Clean: %s: can't read from pipe", fLoc.Data());
2324  return -1;
2325  }
2326  // Remove this one
2327  fReadySock.Remove(s);
2328 
2329  if (gDebug > 2) sz = fReadySock.GetSize();
2330  }
2331 
2332  if (gDebug > 2)
2333  Printf("TXSockPipe::Clean: %s: %p: pipe cleaned (pending %d) (descriptor: %d)",
2334  fLoc.Data(), s, sz, fPipe[0]);
2335 
2336  // We are done
2337  return 0;
2338 }
2339 
2340 ////////////////////////////////////////////////////////////////////////////////
2341 /// Remove any reference to socket 's' from the global pipe and
2342 /// ready-socket queue
2343 
2345 {
2346  // Pipe must have been created
2347  if (!IsValid() || !s) return -1;
2348 
2349  TObject *o = 0;
2350  // This must be an atomic action
2351  { R__LOCKGUARD(&fMutex);
2352  o = fReadySock.FindObject(s);
2353 
2354  while (o) {
2355  // Remove from the list
2356  fReadySock.Remove(s);
2357  o = fReadySock.FindObject(s);
2358  // Remove one notification from the pipe
2359  Char_t c = 0;
2360  if (read(fPipe[0],(void *)&c, sizeof(Char_t)) < 1)
2361  Printf("TXSockPipe::Flush: %s: can't read from pipe", fLoc.Data());
2362  }
2363  }
2364  // Flush also the socket
2365  ((TXSocket *)s)->Flush();
2366 
2367  // Notify
2368  if (gDebug > 0)
2369  Printf("TXSockPipe::Flush: %s: %p: pipe flushed", fLoc.Data(), s);
2370 
2371  // We are done
2372  return 0;
2373 }
2374 
2375 ////////////////////////////////////////////////////////////////////////////////
2376 /// Dump content of the ready socket list
2377 
2379 {
2380  R__LOCKGUARD(&fMutex);
2381 
2382  TString buf = Form("%d |", fReadySock.GetSize());
2383  TIter nxs(&fReadySock);
2384  TObject *o = 0;
2385  while ((o = nxs()))
2386  buf += Form(" %p",o);
2387  Printf("TXSockPipe::DumpReadySock: %s: list content: %s", fLoc.Data(), buf.Data());
2388 }
2389 
2390 ////////////////////////////////////////////////////////////////////////////////
2391 /// Return last ready socket
2392 
2394 {
2395  R__LOCKGUARD(&fMutex);
2396 
2397  return (TXSocket *) fReadySock.Last();
2398 }
virtual const char * GetName() const
Returns name of object.
Definition: TNamed.h:51
Int_t GetOpenError() const
Getter for last error.
Definition: TXSocket.cxx:925
Bool_t IsReading() const
Definition: TBuffer.h:81
Definition: TMutex.h:37
double read(const std::string &file_name)
reading
Int_t fSocket
Definition: TSocket.h:100
Int_t fCid
Definition: TXSocket.h:260
#define XrdSysLogger
Definition: XpdSysLogger.h:8
#define kXPD_querynum
static void SetLocation(const char *loc="")
Set location string.
Definition: TXSocket.cxx:254
virtual int GetPid()
Get process id.
Definition: TSystem.cxx:711
Int_t fTcpWindowSize
Definition: TSocket.h:101
Bool_t RecvStreamerInfos(TMessage *mess)
Receive a message containing streamer infos.
Definition: TSocket.cxx:932
virtual void SetClientID(Int_t)
Definition: TXSocket.h:183
Int_t GetCompressionLevel() const
Definition: TSocket.h:211
virtual void Info(const char *method, const char *msgfmt,...) const
Issue info message.
Definition: TObject.cxx:892
TSemaphore fAsynProc
Definition: TXSocket.h:109
char * CompBuffer() const
Definition: TMessage.h:94
long long Long64_t
Definition: RtypesCore.h:69
Int_t SendRaw(const void *buf, Int_t len, ESendRecvOptions opt=kDontBlock)
Send a raw buffer of specified length.
Definition: TXSocket.cxx:1216
kXR_unt16 fStreamid
Definition: XrdProofConn.h:73
#define kXPD_TopMaster
TXSocket(const char *url, Char_t mode='M', Int_t psid=-1, Char_t ver=-1, const char *logbuf=0, Int_t loglevel=-1, TXHandler *handler=0)
Constructor Open the connection to a remote XrdProofd instance and start a PROOF session.
Definition: TXSocket.cxx:127
TSemaphore fASem
Definition: TXSocket.h:101
void SetLoc(const char *loc="")
Definition: TXSocket.h:296
int GetServType() const
Definition: XrdProofConn.h:143
void SetProtocol(const char *proto, Bool_t setDefaultPort=kFALSE)
Set protocol and, optionally, change the port accordingly.
Definition: TUrl.cxx:518
Int_t fPipe[2]
Definition: TXSocket.h:300
double write(int n, const std::string &file_name, const std::string &vector_type, int compress=0)
writing
Collectable string class.
Definition: TObjString.h:32
virtual void Close(Option_t *opt="")
Close connection.
Definition: TXSocket.cxx:323
int GetLogConnID() const
Definition: XrdProofConn.h:140
const char Option_t
Definition: RtypesCore.h:62
Int_t fLogLevel
Definition: TXSocket.h:92
Bool_t RecvProcessIDs(TMessage *mess)
Receive a message containing process ids.
Definition: TSocket.cxx:979
static Long64_t BuffMem()
Return the currently allocated memory.
Definition: TXSocket.cxx:2230
struct XPClientReadbufRequest readbuf
This class represents a WWW compatible URL.
Definition: TUrl.h:41
XrdProofConn * fConn
Definition: TXSocket.h:98
char fMode
Definition: TXSocket.h:85
Int_t TryWait()
If semaphore value is > 0 then decrement it and return 0.
Definition: TSemaphore.cxx:81
virtual UnsolRespProcResult ProcessUnsolicitedMsg(XrdClientUnsolMsgSender *s, XrdClientMessage *msg)
We are here if an unsolicited response comes from a logical conn The response comes in the form of an...
Definition: TXSocket.cxx:380
Int_t Wait(Int_t millisec=0)
If semaphore value is > 0 then decrement it and carry on.
Definition: TSemaphore.cxx:38
struct XPClientInterruptRequest interrupt
virtual TObject * Last() const
Return the last object in the list. Returns 0 when list is empty.
Definition: TList.cxx:580
static void SetRetryParam(int maxtry=5, int timewait=2)
Change values of the retry control parameters, numer of retries and wait time between attempts (in se...
~TXSockBuf()
destructor
Definition: TXSocket.cxx:2199
virtual const char * HomeDirectory(const char *userName=0)
Return the user&#39;s home directory.
Definition: TSystem.cxx:881
TMutex * fAMtx
Definition: TXSocket.h:102
XrdClientMessage * SendReq(XPClientRequest *req, const void *reqData, char **answData, const char *CmdName, bool notifyerr=1)
SendReq tries to send a single command for a number of times.
#define gROOT
Definition: TROOT.h:352
Ssiz_t Index(const char *pat, Ssiz_t i=0, ECaseCompare cmp=kExact) const
Definition: TString.h:582
XrdOucTrace * XrdProofdTrace
Definition: TXSocket.cxx:55
Basic string class.
Definition: TString.h:137
virtual Bool_t HandleError(const void *in=0)
Handler of asynchronous error events.
Definition: TXHandler.cxx:39
void SendStreamerInfos(const TMessage &mess)
Check if TStreamerInfo must be sent.
Definition: TSocket.cxx:650
int Int_t
Definition: RtypesCore.h:41
bool Bool_t
Definition: RtypesCore.h:59
virtual ~TXSocket()
Destructor.
Definition: TXSocket.cxx:239
const Bool_t kFALSE
Definition: Rtypes.h:92
TMutex * fIMtx
Definition: TXSocket.h:112
void SetInterrupt(Bool_t i=kTRUE)
Definition: TXSocket.cxx:1677
void SetSID(kXR_char *sid)
Set our stream id, to match against that one in the server&#39;s response.
TInetAddress fAddress
Definition: TSocket.h:90
#define kXPD_async
void SetSessionID(Int_t id)
Set session ID to &#39;id&#39;. If id < 0, disable also the asynchronous handler.
Definition: TXSocket.cxx:268
TXSocket * GetLastReady()
Return last ready socket.
Definition: TXSocket.cxx:2393
void RemoteTouch()
Remote touch functionality: contact the server to proof our vitality.
Definition: TXSocket.cxx:1345
Int_t GetServType() const
Getter for server type.
Definition: TXSocket.cxx:933
Int_t Post(TSocket *s)
Write a byte to the global pipe to signal new availibility of new messages.
Definition: TXSocket.cxx:2284
ESendRecvOptions
Definition: TSocket.h:65
virtual TObject * FindObject(const char *name) const
Find an object in this list using its name.
Definition: TList.cxx:496
static Long64_t fgBuffMem
Definition: TXSocket.h:273
Int_t Length() const
Definition: TBuffer.h:94
int GetOpenError() const
Definition: XrdProofConn.h:142
void SetInterrupt()
Interrupt the underlying socket.
Int_t Clean(TSocket *s)
Read a byte to the global pipe to synchronize message pickup.
Definition: TXSocket.cxx:2313
static Long64_t fgMemMax
Definition: TXSocket.h:274
struct ClientRequestHdr header
const char * GetHost() const
Definition: TUrl.h:76
#define SafeDelete(p)
Definition: RConfig.h:449
Bool_t fAWait
Definition: TXSocket.h:103
UShort_t net2host(UShort_t x)
Definition: Bytes.h:579
Int_t fPort
Definition: TXSocket.h:90
Int_t RecvRaw(void *buf, Int_t len, ESendRecvOptions opt=kDefault)
Receive a raw buffer of specified length bytes.
Definition: TXSocket.cxx:1581
static TString Format(const char *fmt,...)
Static method which formats a string using a printf style format descriptor and return a TString...
Definition: TString.cxx:2334
static void InitEnvs()
Init environment variables for XrdClient.
Definition: TXSocket.cxx:1984
Vc_ALWAYS_INLINE void free(T *p)
Frees memory that was allocated with Vc::malloc.
Definition: memory.h:94
#define kXPD_setidle
#define kXPD_process
TString fUrl
Definition: TSocket.h:102
virtual Bool_t Notify()
This method must be overridden to handle object notification.
Definition: TObject.cxx:550
const char *const kPROOF_WorkerIdleTO
Definition: TProof.h:158
char * Buffer() const
Definition: TBuffer.h:91
static std::list< TXSockBuf * > fgSQue
Definition: TXSocket.h:133
Int_t Send(const TMessage &mess)
Send a TMessage object.
Definition: TXSocket.cxx:1689
Int_t GetInterrupt(Bool_t &forward)
Get latest interrupt level and reset it; if the interrupt has to be propagated to lower stages forwar...
Definition: TXSocket.cxx:970
Int_t CompLength() const
Definition: TMessage.h:95
Bool_t fIForward
Definition: TXSocket.h:114
TXHandler * fHandler
Definition: TXSocket.h:96
const char * GetLastErr()
Definition: XrdProofConn.h:146
virtual const char * Getenv(const char *env)
Get environment variable.
Definition: TSystem.cxx:1626
TXSockBuf * PopUpSpare(Int_t sz)
Pop-up a buffer of at least size bytes from the spare list If none is found either one is reallocated...
Definition: TXSocket.cxx:1510
void DoError(int level, const char *location, const char *fmt, va_list va) const
Interface to ErrorHandler (protected).
Definition: TXSocket.cxx:71
Bool_t IsServProofd()
Return kTRUE if the remote server is a &#39;proofd&#39;.
Definition: TXSocket.cxx:957
XFontStruct * id
Definition: TGX11.cxx:108
#define kXPD_internal
TString fUser
Definition: TXSocket.h:88
virtual TInetAddress GetHostByName(const char *server)
Get Internet Protocol (IP) address of host.
Definition: TSystem.cxx:2232
int fRemoteProtocol
Definition: XrdProofConn.h:74
struct XPClientSendRcvRequest sendrcv
static const char * what
Definition: stlLoader.cc:6
Bool_t Create(Bool_t attach=kFALSE)
This method sends a request for creation of (or attachment to) a remote server application.
Definition: TXSocket.cxx:1051
Int_t fByteCur
Definition: TXSocket.h:106
Int_t Flush()
Flush the asynchronous queue.
Definition: TXSocket.cxx:1002
Bool_t IsInterrupt()
Definition: TXSocket.h:220
TString fHost
Definition: TXSocket.h:89
virtual Int_t GetClientID() const
Definition: TXSocket.h:172
TXSockBuf(Char_t *bp=0, Int_t sz=0, Bool_t own=1)
constructor
Definition: TXSocket.cxx:2187
virtual void Setenv(const char *name, const char *value)
Set environment variable.
Definition: TSystem.cxx:1610
virtual Bool_t HandleInput(const void *in=0)
Handler of asynchronous input events.
Definition: TXHandler.cxx:30
Double_t length(const TVector2 &v)
Definition: CsgOps.cxx:347
void SetAWait(Bool_t w=kTRUE)
Definition: TXSocket.h:223
R__EXTERN TSystem * gSystem
Definition: TSystem.h:549
virtual Int_t GetValue(const char *name, Int_t dflt)
Returns the integer value for a resource.
Definition: TEnv.cxx:480
virtual TObject * Remove(TObject *obj)
Remove object from the list.
Definition: TList.cxx:674
Int_t fXrdProofdVersion
Definition: TXSocket.h:124
struct XPClientProofRequest proof
void PostMsg(Int_t type, const char *msg=0)
Post a message of type &#39;type&#39; into the read messages queue.
Definition: TXSocket.cxx:860
Bool_t BeginsWith(const char *s, ECaseCompare cmp=kExact) const
Definition: TString.h:558
#define XrdSysError
Definition: XpdSysError.h:8
Int_t GetLowSocket() const
Definition: TXSocket.cxx:1977
void Form(const char *fmt,...)
Formats a string using a printf style format descriptor.
Definition: TString.cxx:2321
Bool_t Ping(const char *ord=0)
Ping functionality: contact the server to check its vitality.
Definition: TXSocket.cxx:1273
unsigned int UInt_t
Definition: RtypesCore.h:42
TMarker * m
Definition: textangle.C:8
UInt_t fBytesSent
Definition: TSocket.h:92
virtual void Error(const char *method, const char *msgfmt,...) const
Issue error message.
Definition: TObject.cxx:918
char * Form(const char *fmt,...)
Ssiz_t Length() const
Definition: TString.h:390
TString fBuffer
Definition: TXSocket.h:94
int clientMarshall(XPClientRequest *str)
This function applies the network byte order on those parts of the 16-bytes buffer, only if it is composed by some binary part Return 0 if OK, -1 in case the ID is unknown.
static TMutex fgSMtx
Definition: TXSocket.h:132
#define TRACE_REQ
std::list< TXSockBuf * > fAQue
Definition: TXSocket.h:104
#define TRACE_ALL
Int_t Post()
If any threads are blocked in Wait(), wake one of them up and increment the value of the semaphore...
Definition: TSemaphore.cxx:105
Int_t Flush(TSocket *s)
Remove any reference to socket &#39;s&#39; from the global pipe and ready-socket queue.
Definition: TXSocket.cxx:2344
static TString fgLoc
Definition: TXSocket.h:128
Char_t * fBuf
Definition: TXSocket.h:258
void Touch()
Definition: TSocket.h:187
TString fLoc
Definition: TXSocket.h:301
XrdOucString fUser
Definition: XrdProofConn.h:79
TList fReadySock
Definition: TXSocket.h:302
Int_t fSiz
Definition: TXSocket.h:256
#define kXPD_AnyServer
#define kXPD_logmsg
Bool_t IsValid() const
Definition: TXSocket.h:286
#define Printf
Definition: TGeoToOCC.h:18
XrdClientUrlInfo fUrl
Definition: XrdProofConn.h:102
Int_t fLen
Definition: TXSocket.h:257
virtual ~TXSockPipe()
Destructor.
Definition: TXSocket.cxx:2273
virtual Int_t Reconnect()
Try reconnection after failure.
Definition: TXSocket.cxx:2133
void DisconnectSession(Int_t id, Option_t *opt="")
Disconnect a session.
Definition: TXSocket.cxx:280
UInt_t What() const
Definition: TMessage.h:80
static XrdSysError eDest(0, "Proofx")
#define TRACE_DBG
TString & Remove(Ssiz_t pos)
Definition: TString.h:616
int Ssiz_t
Definition: RtypesCore.h:63
TXSockBuf * fBufCur
Definition: TXSocket.h:107
int GetLowSocket()
Return the socket descriptor of the underlying connection.
Short_t fSessionID
Definition: TXSocket.h:87
XrdOucString fHost
Definition: XrdProofConn.h:80
Bool_t IsValid() const
Definition: TXSocket.h:314
Int_t fCompress
Definition: TSocket.h:93
int type
Definition: TGX11.cxx:120
R__EXTERN TEnv * gEnv
Definition: TEnv.h:174
unsigned long long ULong64_t
Definition: RtypesCore.h:70
#define kXPD_fb_prog
static XrdSysLogger eLogger
Definition: TXSocket.cxx:56
virtual void Close(const char *opt="")
Close connection.
XrdClientPhyConnection * fPhyConn
Definition: XrdProofConn.h:93
#define R__LOCKGUARD(mutex)
Bool_t fDontTimeout
Definition: TXSocket.h:120
TXSockPipe(const char *loc="")
Constructor.
Definition: TXSocket.cxx:2259
XrdOucString fLastErrMsg
Definition: XrdProofConn.h:82
Int_t Recv(TMessage *&mess)
Receive a TMessage object.
Definition: TXSocket.cxx:1774
virtual void SetAsync(XrdClientAbsUnsolMsgHandler *uh, XrdProofConnSender_t=0, void *=0)
Set handler of unsolicited responses.
Int_t GetCompressionLevel() const
Definition: TMessage.h:112
kXR_int32 fSendOpt
Definition: TXSocket.h:86
#define kXPD_startprocess
void ReConnect()
Perform a reconnection attempt when a connection is not valid any more.
Int_t GetPort() const
Definition: TUrl.h:87
Bool_t fRDInterrupt
Definition: TXSocket.h:121
Bool_t IsNull() const
Definition: TString.h:387
EServiceType fServType
Definition: TSocket.h:99
void DumpReadySock()
Dump content of the ready socket list.
Definition: TXSocket.cxx:2378
TObject * fReference
Definition: TXSocket.h:95
Mother of all ROOT objects.
Definition: TObject.h:58
static ULong64_t fgBytesSent
Definition: TSocket.h:110
char Char_t
Definition: RtypesCore.h:29
void SetLength() const
Set the message length at the beginning of the message buffer.
Definition: TMessage.cxx:188
void PushBackSpare()
Release read buffer giving back to the spare list.
Definition: TXSocket.cxx:1560
Bool_t IsValid() const
Getter for validity status.
Definition: TXSocket.cxx:949
kXR_int32 fILev
Definition: TXSocket.h:113
void SendProcessIDs(const TMessage &mess)
Check if TProcessIDs must be sent.
Definition: TSocket.cxx:685
void ErrorHandler(int level, const char *location, const char *fmt, va_list va)
General error handler function. It calls the user set error handler.
Definition: TError.cxx:202
virtual void Add(TObject *obj)
Definition: TList.h:81
const Ssiz_t kNPOS
Definition: Rtypes.h:115
Int_t PickUpReady()
Wait and pick-up next buffer from the asynchronous queue.
Definition: TXSocket.cxx:1419
static Bool_t fgInitDone
Definition: TXSocket.h:129
TObjString * SendCoordinator(Int_t kind, const char *msg=0, Int_t int2=0, Long64_t l64=0, Int_t int3=0, const char *opt=0)
Send message to intermediate coordinator.
Definition: TXSocket.cxx:1825
void SendUrgent(Int_t type, Int_t int1, Int_t int2)
Send urgent message to counterpart; &#39;type&#39; specifies the type of the message (see TXSocket::EUrgentMs...
Definition: TXSocket.cxx:1942
static TXSockPipe fgPipe
Definition: TXSocket.h:127
static ULong64_t fgBytesRecv
Definition: TSocket.h:109
R__EXTERN Int_t gDebug
Definition: Rtypes.h:128
Int_t Atoi() const
Return integer value of string.
Definition: TString.cxx:1964
XReqErrorType LowWrite(XPClientRequest *, const void *, int)
Send request to server (NB: req is marshalled at this point, so we need also the plain reqDataLen) ...
short GetSessionID() const
Definition: XrdProofConn.h:144
Int_t fRemoteProtocol
Definition: TSocket.h:95
bool IsValid() const
Test validity of this connection.
Bool_t IsDigit() const
Returns true if all characters in string are digits (0-9) or white spaces, i.e.
Definition: TString.cxx:1806
static void ResetErrno()
Static function resetting system error number.
Definition: TSystem.cxx:280
virtual Int_t GetClientIDSize() const
Definition: TXSocket.h:173
static void SetMemMax(Long64_t memmax)
Return the max allocated memory allowed.
Definition: TXSocket.cxx:2246
Int_t SendInterrupt(Int_t type)
Send urgent message (interrupt) to remote server Returns 0 or -1 in case of error.
Definition: TXSocket.cxx:1638
Int_t fPid
Definition: TXSocket.h:117
virtual Int_t GetSize() const
Definition: TCollection.h:95
const Bool_t kTRUE
Definition: Rtypes.h:91
Int_t fByteLeft
Definition: TXSocket.h:105
Vc_ALWAYS_INLINE_L T *Vc_ALWAYS_INLINE_R malloc(size_t n)
Allocates memory on the Heap with alignment and padding suitable for vectorized access.
Definition: memory.h:67
const Int_t n
Definition: legend1.C:16
void Resize(Int_t sz)
resize socket buffer
Definition: TXSocket.cxx:2210
void CtrlC()
Interrupt the remote protocol instance.
Definition: TXSocket.cxx:1383
TMutex fMutex
Definition: TXSocket.h:299
virtual void Warning(const char *method, const char *msgfmt,...) const
Issue warning message.
Definition: TObject.cxx:904
Int_t GetSessionID() const
Getter for session ID.
Definition: TXSocket.cxx:941
if(line.BeginsWith("/*"))
Definition: HLFactory.cxx:443
virtual const char * GetTitle() const
Returns title of object.
Definition: TNamed.h:52
Int_t GetLogConnID() const
Getter for logical connection ID.
Definition: TXSocket.cxx:917
UInt_t fBytesRecv
Definition: TSocket.h:91
static Long64_t GetMemMax()
Return the max allocated memory allowed.
Definition: TXSocket.cxx:2238
const char * Data() const
Definition: TString.h:349