xrootd
XrdMonDecSink.hh
Go to the documentation of this file.
1 /*****************************************************************************/
2 /* */
3 /* XrdMonDecSink.hh */
4 /* */
5 /* (c) 2004 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* All Rights Reserved */
7 /* Produced by Jacek Becla for Stanford University under contract */
8 /* DE-AC02-76SF00515 with the Department of Energy */
9 /*****************************************************************************/
10 
11 // $Id$
12 
13 #ifndef XRDMONDECSINK_HH
14 #define XRDMONDECSINK_HH
15 
21 #include "XrdSys/XrdSysPthread.hh"
22 #include <algorithm>
23 #include <fstream>
24 #include <map>
25 #include <vector>
26 using std::fstream;
27 using std::map;
28 using std::pair;
29 using std::vector;
30 
32 public:
33  XrdMonDecSink(const char* baseDir,
34  const char* rtLogDir,
35  int rtBufSize,
36  bool saveTraces,
37  int maxTraceLogSize);
39 
40  void init(dictid_t min, dictid_t max, const string& senderHP);
41  sequen_t lastSeq() const { return _lastSeq; }
42  void registerXrdRestart(kXR_int32 stod, senderid_t senderId);
43 
44  void setLastSeq(sequen_t seq) { _lastSeq = seq; }
45 
46  void addDictId(dictid_t xrdId,
47  const char* theString,
48  int len,
49  senderid_t senderId);
50  void addStageInfo(dictid_t xrdId,
51  const char* theString,
52  int len,
53  senderid_t senderId);
54  void addUserId(dictid_t xrdId,
55  const char* theString,
56  int len,
57  senderid_t senderId);
58  void add(dictid_t xrdId,
59  XrdMonDecTraceInfo& trace,
60  senderid_t senderId);
61  void addUserDisconnect(dictid_t xrdId,
62  kXR_int32 sec,
63  kXR_int32 timestamp,
64  senderid_t senderId);
65  void openFile(dictid_t dictId,
66  kXR_int32 timestamp,
67  senderid_t senderId,
68  kXR_int64 fSize);
69  void closeFile(dictid_t dictId,
70  kXR_int64 bytesR,
71  kXR_int64 bytesW,
72  kXR_int32 timestamp,
73  senderid_t senderId);
74  void flushHistoryData();
75  void flushRealTimeData() { if ( 0 != _rtLogger ) _rtLogger->flush(); }
76 
77  void reset(senderid_t senderId);
78 
79 private:
80  typedef map<dictid_t, XrdMonDecDictInfo*> dmap_t;
81  typedef map<dictid_t, XrdMonDecUserInfo*> umap_t;
82  typedef map<dictid_t, XrdMonDecDictInfo*>::iterator dmapitr_t;
83  typedef map<dictid_t, XrdMonDecUserInfo*>::iterator umapitr_t;
84 
85  void initRT(const char* rtLogDir, int rtBufSize);
86  void addVersion();
87 
88  void loadUniqueIdsAndSeq();
89  vector<XrdMonDecDictInfo*> loadActiveDictInfo();
90  void flushClosedDicts();
91  void flushUserCache();
92  void flushTCache();
93  void checkpoint();
94  void openTraceFile(fstream& f);
95  void write2TraceFile(fstream& f, const char* buf, int len);
96  void registerLostPacket(dictid_t id, const char* descr);
97  void reportLostPackets();
98 
99  void flushOneDMap(dmap_t* m, int& curLen, const int BUFSIZE,
100  string& buf, fstream& fD);
101  void flushOneUMap(umap_t* m, int& curLen, const int BUFSIZE,
102  string& buf, fstream& fD);
103 
104  void resetDMap(senderid_t senderId);
105  void resetUMap(senderid_t senderId);
106 
107 private:
108  // this defines how frequently version information will be
109  // added to the log file (every ...how many entries in the log file)
110  static const kXR_unt16 VER_FREQ;
111 
113 
114 
115  vector< dmap_t* > _dCache;
116  vector< umap_t* > _uCache;
117 
118  // The mutexes guard access to dCache, uCache respectively.
119  // _dCache and _uCache can be accessed from different threads
120  // (periodic data flushing inside dedicated thread)
123 
125 
127  typedef vector<XrdMonDecTraceInfo> TraceVector;
130  kXR_unt16 _traceLogNumber; // trace.000.ascii, 001, and so on...
132 
133  map<dictid_t, long> _lost; //lost dictIds -> number of lost traces
134 
136  dictid_t _uniqueDictId; // dictId in mySQL, unique for given xrootd host
137  dictid_t _uniqueUserId; // userId in mySQL, unique for given xrootd host
138 
139  string _path; // <basePath>/<date>_seqId_
140  string _jnlPath; // <basePath>/jnl
141  string _dictPath; // <basePath>/<YYYYMMDD_HH:MM:SS.MMM_dict.ascii
142  string _userPath; // <basePath>/<YYYYMMDD_HH:MM:SS.MMM_user.ascii
143  string _rtFlagPath; // <rtLogDir>/rtRunning.flag
144  string _rtMaxIdsPath;// <rtLogDir>/rtMax.jnl
145  string _xrdRestartLog;// <basePath>/xrdRestarts.ascii
146 };
147 
148 #endif /* XRDMONDECSINK_HH */