xrootd
XrdClStream.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2012 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // XRootD is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Lesser General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // XRootD is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU General Public License for more details.
14 //
15 // You should have received a copy of the GNU Lesser General Public License
16 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
17 //------------------------------------------------------------------------------
18 
19 #ifndef __XRD_CL_STREAM_HH__
20 #define __XRD_CL_STREAM_HH__
21 
22 #include "XrdCl/XrdClPoller.hh"
23 #include "XrdCl/XrdClStatus.hh"
24 #include "XrdCl/XrdClURL.hh"
27 #include "XrdCl/XrdClJobManager.hh"
28 #include "XrdCl/XrdClInQueue.hh"
29 
30 #include "XrdSys/XrdSysPthread.hh"
31 #include <list>
32 #include <vector>
33 #include <netinet/in.h>
34 
35 namespace XrdCl
36 {
37  class Message;
38  class Channel;
39  class TransportHandler;
40  class TaskManager;
41  struct SubStreamData;
42 
43  //----------------------------------------------------------------------------
45  //----------------------------------------------------------------------------
46  class Stream
47  {
48  public:
49  //------------------------------------------------------------------------
51  //------------------------------------------------------------------------
53  {
55  Connected = 1,
56  Connecting = 2,
57  Error = 3
58  };
59 
60  //------------------------------------------------------------------------
62  //------------------------------------------------------------------------
63  Stream( const URL *url, uint16_t streamNum );
64 
65  //------------------------------------------------------------------------
67  //------------------------------------------------------------------------
68  ~Stream();
69 
70  //------------------------------------------------------------------------
72  //------------------------------------------------------------------------
74 
75  //------------------------------------------------------------------------
77  //------------------------------------------------------------------------
78  Status Send( Message *msg,
79  OutgoingMsgHandler *handler,
80  bool stateful,
81  time_t expires );
82 
83  //------------------------------------------------------------------------
85  //------------------------------------------------------------------------
86  void SetTransport( TransportHandler *transport )
87  {
88  pTransport = transport;
89  }
90 
91  //------------------------------------------------------------------------
93  //------------------------------------------------------------------------
94  void SetPoller( Poller *poller )
95  {
96  pPoller = poller;
97  }
98 
99  //------------------------------------------------------------------------
101  //------------------------------------------------------------------------
102  void SetIncomingQueue( InQueue *incomingQueue )
103  {
104  pIncomingQueue = incomingQueue;
105  delete pQueueIncMsgJob;
106  pQueueIncMsgJob = new QueueIncMsgJob( incomingQueue );
107  }
108 
109  //------------------------------------------------------------------------
111  //------------------------------------------------------------------------
112  void SetChannelData( AnyObject *channelData )
113  {
114  pChannelData = channelData;
115  }
116 
117  //------------------------------------------------------------------------
119  //------------------------------------------------------------------------
120  void SetTaskManager( TaskManager *taskManager )
121  {
122  pTaskManager = taskManager;
123  }
124 
125  //------------------------------------------------------------------------
127  //------------------------------------------------------------------------
128  void SetJobManager( JobManager *jobManager )
129  {
130  pJobManager = jobManager;
131  }
132 
133  //------------------------------------------------------------------------
137  //------------------------------------------------------------------------
138  Status EnableLink( PathID &path );
139 
140  //------------------------------------------------------------------------
142  //------------------------------------------------------------------------
143  void Disconnect( bool force = false );
144 
145  //------------------------------------------------------------------------
148  //------------------------------------------------------------------------
149  void Tick( time_t now );
150 
151  //------------------------------------------------------------------------
153  //------------------------------------------------------------------------
154  const URL *GetURL() const
155  {
156  return pUrl;
157  }
158 
159  //------------------------------------------------------------------------
161  //------------------------------------------------------------------------
162  uint16_t GetStreamNumber() const
163  {
164  return pStreamNum;
165  }
166 
167  //------------------------------------------------------------------------
169  //------------------------------------------------------------------------
170  void ForceConnect();
171 
172  //------------------------------------------------------------------------
174  //------------------------------------------------------------------------
175  const std::string &GetName() const
176  {
177  return pStreamName;
178  }
179 
180  //------------------------------------------------------------------------
182  //------------------------------------------------------------------------
183  void OnIncoming( uint16_t subStream,
184  Message *msg,
185  uint32_t bytesReceived );
186 
187  //------------------------------------------------------------------------
188  // Call when one of the sockets is ready to accept a new message
189  //------------------------------------------------------------------------
190  std::pair<Message *, OutgoingMsgHandler *>
191  OnReadyToWrite( uint16_t subStream );
192 
193  //------------------------------------------------------------------------
194  // Call when a message is written to the socket
195  //------------------------------------------------------------------------
196  void OnMessageSent( uint16_t subStream,
197  Message *msg,
198  uint32_t bytesSent );
199 
200  //------------------------------------------------------------------------
202  //------------------------------------------------------------------------
203  void OnConnect( uint16_t subStream );
204 
205  //------------------------------------------------------------------------
207  //------------------------------------------------------------------------
208  void OnConnectError( uint16_t subStream, Status status );
209 
210  //------------------------------------------------------------------------
212  //------------------------------------------------------------------------
213  void OnError( uint16_t subStream, Status status );
214 
215  //------------------------------------------------------------------------
217  //------------------------------------------------------------------------
218  void OnReadTimeout( uint16_t subStream );
219 
220  //------------------------------------------------------------------------
222  //------------------------------------------------------------------------
223  void OnWriteTimeout( uint16_t subStream );
224 
225  //------------------------------------------------------------------------
227  //------------------------------------------------------------------------
228  void RegisterEventHandler( ChannelEventHandler *handler );
229 
230  //------------------------------------------------------------------------
232  //------------------------------------------------------------------------
233  void RemoveEventHandler( ChannelEventHandler *handler );
234 
235  //------------------------------------------------------------------------
244  //------------------------------------------------------------------------
245  std::pair<IncomingMsgHandler *, bool>
246  InstallIncHandler( Message *msg, uint16_t stream );
247 
248  private:
249 
250  //------------------------------------------------------------------------
251  // Job queuing the incoming messages
252  //------------------------------------------------------------------------
253  class QueueIncMsgJob: public Job
254  {
255  public:
256  QueueIncMsgJob( InQueue *queue ): pQueue( queue ) {};
257  virtual ~QueueIncMsgJob() {};
258  virtual void Run( void *arg )
259  {
260  Message *msg = (Message *)arg;
261  pQueue->AddMessage( msg );
262  }
263  private:
265  };
266 
267  //------------------------------------------------------------------------
268  // Job handling the incoming messages
269  //------------------------------------------------------------------------
270  class HandleIncMsgJob: public Job
271  {
272  public:
273  HandleIncMsgJob( IncomingMsgHandler *handler ): pHandler( handler ) {};
274  virtual ~HandleIncMsgJob() {};
275  virtual void Run( void *arg )
276  {
277  Message *msg = (Message *)arg;
278  pHandler->Process( msg );
279  delete this;
280  }
281  private:
283  };
284 
285  //------------------------------------------------------------------------
287  //------------------------------------------------------------------------
288  void OnFatalError( uint16_t subStream,
289  Status status,
290  XrdSysMutexHelper &lock );
291 
292  //------------------------------------------------------------------------
294  //------------------------------------------------------------------------
295  void MonitorDisconnection( Status status );
296 
297  typedef std::vector<SubStreamData*> SubStreamList;
298 
299  //------------------------------------------------------------------------
300  // Data members
301  //------------------------------------------------------------------------
302  const URL *pUrl;
303  uint16_t pStreamNum;
304  std::string pStreamName;
319  std::vector<sockaddr_in> pAddresses;
321  uint64_t pSessionId;
322 
323  //------------------------------------------------------------------------
324  // Jobs
325  //------------------------------------------------------------------------
327 
328  //------------------------------------------------------------------------
329  // Monitoring info
330  //------------------------------------------------------------------------
333  uint64_t pBytesSent;
334  uint64_t pBytesReceived;
335  };
336 }
337 
338 #endif // __XRD_CL_STREAM_HH__