Line data Source code
1 :
2 : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2012, Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "connection.h"
22 :
23 : #include "buffer.h"
24 : #include "connectionDescription.h"
25 : #include "connectionListener.h"
26 : #include "log.h"
27 : #include "pipeConnection.h"
28 : #include "socketConnection.h"
29 : #include "rspConnection.h"
30 :
31 : #ifdef _WIN32
32 : # include "namedPipeConnection.h"
33 : #endif
34 :
35 : #include <co/exception.h>
36 :
37 : #ifdef COLLAGE_USE_OFED
38 : # include "rdmaConnection.h"
39 : #endif
40 : #ifdef COLLAGE_USE_UDT
41 : # include "udtConnection.h"
42 : #endif
43 :
44 : #include <lunchbox/scopedMutex.h>
45 : #include <lunchbox/stdExt.h>
46 :
47 : //#define STATISTICS
48 : #ifdef STATISTICS
49 : typedef std::map< uint64_t, size_t > Histogram;
50 : typedef Histogram::const_iterator HistogramCIter;
51 : lunchbox::Lockable< Histogram, lunchbox::SpinLock > _histogram;
52 :
53 : # define ADD_STATISTIC( x ) \
54 : { \
55 : lunchbox::ScopedFastWrite mutex( _histogram ); \
56 : ++(*_histogram)[ x ]; \
57 : }
58 : # define DUMP_STATISTIC \
59 : { \
60 : lunchbox::ScopedFastRead mutex( _histogram ); \
61 : LBINFO << lunchbox::disableFlush << lunchbox::disableHeader; \
62 : for( HistogramCIter i=_histogram->begin(); i!=_histogram->end(); ++i) \
63 : LBINFO << i->first << ", " << i->second << std::endl; \
64 : LBINFO << lunchbox::enableHeader << lunchbox::enableFlush <<std::endl; \
65 : }
66 : #else
67 : # define ADD_STATISTIC( x )
68 : # define DUMP_STATISTIC
69 : #endif
70 :
71 : namespace co
72 : {
73 : namespace detail
74 : {
75 : class Connection
76 : {
77 : public:
78 : co::Connection::State state; //!< The connection state
79 : ConnectionDescriptionPtr description; //!< The connection parameters
80 :
81 : /** The lock used to protect concurrent write calls. */
82 : mutable lunchbox::Lock sendLock;
83 :
84 : BufferPtr buffer; //!< Current async read buffer
85 : uint64_t bytes; //!< Current read request size
86 :
87 : /** The listeners on state changes */
88 : ConnectionListeners listeners;
89 :
90 414 : Connection()
91 : : state( co::Connection::STATE_CLOSED )
92 414 : , description( new ConnectionDescription )
93 828 : , bytes( 0 )
94 : {
95 414 : description->type = CONNECTIONTYPE_NONE;
96 414 : }
97 :
98 406 : ~Connection()
99 406 : {
100 406 : LBASSERT( state == co::Connection::STATE_CLOSED );
101 406 : state = co::Connection::STATE_CLOSED;
102 406 : description = 0;
103 :
104 406 : LBASSERTINFO( !buffer,
105 : "Pending read operation during connection destruction" );
106 406 : }
107 :
108 1083 : void fireStateChanged( co::Connection* connection )
109 : {
110 3377 : for( ConnectionListeners::const_iterator i= listeners.begin();
111 2251 : i != listeners.end(); ++i )
112 : {
113 43 : (*i)->notifyStateChanged( connection );
114 : }
115 1082 : }
116 : };
117 : }
118 :
119 414 : Connection::Connection()
120 414 : : _impl( new detail::Connection )
121 : {
122 414 : LBVERB << "New Connection @" << (void*)this << std::endl;
123 414 : }
124 :
125 812 : Connection::~Connection()
126 : {
127 406 : delete _impl;
128 406 : LBVERB << "Delete Connection @" << (void*)this << std::endl;
129 : DUMP_STATISTIC;
130 406 : }
131 :
132 0 : bool Connection::operator == ( const Connection& rhs ) const
133 : {
134 0 : if( this == &rhs )
135 0 : return true;
136 0 : if( _impl->description->type != CONNECTIONTYPE_PIPE )
137 0 : return false;
138 0 : Connection* pipe = const_cast< Connection* >( this );
139 0 : return pipe->acceptSync().get() == &rhs;
140 : }
141 :
142 101 : ConnectionPtr Connection::create( ConnectionDescriptionPtr description )
143 : {
144 101 : ConnectionPtr connection;
145 101 : switch( description->type )
146 : {
147 : case CONNECTIONTYPE_TCPIP:
148 : case CONNECTIONTYPE_SDP:
149 94 : connection = new SocketConnection( description->type );
150 94 : break;
151 :
152 : case CONNECTIONTYPE_PIPE:
153 3 : connection = new PipeConnection;
154 3 : break;
155 :
156 : #ifdef _WIN32
157 : case CONNECTIONTYPE_NAMEDPIPE:
158 : connection = new NamedPipeConnection;
159 : break;
160 : #endif
161 :
162 : case CONNECTIONTYPE_RSP:
163 2 : connection = new RSPConnection;
164 2 : break;
165 :
166 : #ifdef COLLAGE_USE_OFED
167 : case CONNECTIONTYPE_RDMA:
168 : connection = new RDMAConnection;
169 : break;
170 : #endif
171 : #ifdef COLLAGE_USE_UDT
172 : case CONNECTIONTYPE_UDT:
173 : connection = new UDTConnection;
174 : break;
175 : #endif
176 :
177 : default:
178 6 : LBWARN << "Connection type " << description->type
179 6 : << " not supported" << std::endl;
180 2 : return 0;
181 : }
182 :
183 99 : if( description->bandwidth == 0 )
184 75 : description->bandwidth = connection->getDescription()->bandwidth;
185 :
186 99 : connection->_setDescription( description );
187 99 : return connection;
188 : }
189 :
190 3237849 : Connection::State Connection::getState() const
191 : {
192 3237849 : return _impl->state;
193 : }
194 :
195 101 : void Connection::_setDescription( ConnectionDescriptionPtr description )
196 : {
197 101 : LBASSERT( description.isValid( ));
198 101 : LBASSERTINFO( _impl->description->type == description->type,
199 : "Wrong connection type in description" );
200 101 : _impl->description = description;
201 101 : LBASSERT( description->bandwidth > 0 );
202 101 : }
203 :
204 1137 : void Connection::_setState( const State state )
205 : {
206 1137 : if( _impl->state == state )
207 1190 : return;
208 1083 : _impl->state = state;
209 1083 : _impl->fireStateChanged( this );
210 : }
211 :
212 183 : void Connection::lockSend() const
213 : {
214 183 : _impl->sendLock.set();
215 183 : }
216 :
217 183 : void Connection::unlockSend() const
218 : {
219 183 : _impl->sendLock.unset();
220 183 : }
221 :
222 166 : void Connection::addListener( ConnectionListener* listener )
223 : {
224 166 : _impl->listeners.push_back( listener );
225 166 : }
226 :
227 165 : void Connection::removeListener( ConnectionListener* listener )
228 : {
229 : ConnectionListeners::iterator i = find( _impl->listeners.begin(),
230 165 : _impl->listeners.end(), listener );
231 165 : if( i != _impl->listeners.end( ))
232 165 : _impl->listeners.erase( i );
233 165 : }
234 :
235 : //----------------------------------------------------------------------
236 : // read
237 : //----------------------------------------------------------------------
238 2586833 : void Connection::recvNB( BufferPtr buffer, const uint64_t bytes )
239 : {
240 2586833 : LBASSERT( !_impl->buffer );
241 2586833 : LBASSERT( _impl->bytes == 0 );
242 2586833 : LBASSERT( buffer );
243 2586833 : LBASSERT( bytes > 0 );
244 2586833 : LBASSERTINFO( bytes < LB_BIT48,
245 : "Out-of-sync network stream: read size " << bytes << "?" );
246 :
247 2586833 : _impl->buffer = buffer;
248 2586833 : _impl->bytes = bytes;
249 2586833 : buffer->reserve( buffer->getSize() + bytes );
250 2586833 : readNB( buffer->getData() + buffer->getSize(), bytes );
251 2586833 : }
252 :
253 2586721 : bool Connection::recvSync( BufferPtr& outBuffer, const bool block )
254 : {
255 2586721 : LBASSERTINFO( _impl->buffer,
256 : "No pending receive on " << getDescription()->toString( ));
257 :
258 : // reset async IO data
259 2586721 : outBuffer = _impl->buffer;
260 2586721 : const uint64_t bytes = _impl->bytes;
261 2586721 : _impl->buffer = 0;
262 2586721 : _impl->bytes = 0;
263 :
264 2586721 : if( _impl->state != STATE_CONNECTED || !outBuffer || bytes == 0 )
265 33 : return false;
266 2586688 : LBASSERTINFO( bytes < LB_BIT48,
267 : "Out-of-sync network stream: read size " << bytes << "?" );
268 :
269 : // 'Iterators' for receive loop
270 2586688 : uint8_t* ptr = outBuffer->getData() + outBuffer->getSize();
271 2586688 : uint64_t bytesLeft = bytes;
272 2586688 : int64_t got = readSync( ptr, bytesLeft, block );
273 :
274 : // WAR: fluke notification: On Win32, we get occasionally a data
275 : // notification and then deadlock when reading from the connection. The
276 : // callee (Node::handleData) will flag the first read, the underlying
277 : // SocketConnection will not block and we will restore the AIO operation if
278 : // no data was present.
279 2586688 : if( got == READ_TIMEOUT )
280 : {
281 0 : _impl->buffer = outBuffer;
282 0 : _impl->bytes = bytes;
283 0 : outBuffer = 0;
284 0 : return true;
285 : }
286 :
287 : // From here on, receive loop until all data read or error
288 : while( true )
289 : {
290 2812597 : if( got < 0 ) // error
291 : {
292 39 : const uint64_t read = bytes - bytesLeft;
293 39 : outBuffer->resize( outBuffer->getSize() + read );
294 39 : if( bytes == bytesLeft )
295 39 : LBINFO << "Read on dead connection" << std::endl;
296 : else
297 0 : LBERROR << "Error during read after " << read << " bytes on "
298 0 : << _impl->description << std::endl;
299 39 : return false;
300 : }
301 2812558 : else if( got == 0 )
302 : {
303 : // ConnectionSet::select may report data on an 'empty' connection.
304 : // If we have nothing read so far, we have hit this case.
305 0 : if( bytes == bytesLeft )
306 0 : return false;
307 0 : LBVERB << "Zero bytes read" << std::endl;
308 : }
309 2812558 : if( bytesLeft > static_cast< uint64_t >( got )) // partial read
310 : {
311 225909 : ptr += got;
312 225909 : bytesLeft -= got;
313 :
314 225909 : readNB( ptr, bytesLeft );
315 225909 : got = readSync( ptr, bytesLeft, true );
316 225909 : continue;
317 : }
318 :
319 : // read done
320 2586649 : LBASSERTINFO( static_cast< uint64_t >( got ) == bytesLeft,
321 : got << " != " << bytesLeft );
322 :
323 2586649 : outBuffer->resize( outBuffer->getSize() + bytes );
324 : #ifndef NDEBUG
325 2586649 : if( bytes <= 1024 && ( lunchbox::Log::topics & LOG_PACKETS ))
326 : {
327 0 : ptr -= (bytes - bytesLeft); // rewind
328 0 : LBINFO << "recv:" << lunchbox::format( ptr, bytes ) << std::endl;
329 : }
330 : #endif
331 2586649 : return true;
332 : }
333 :
334 : LBUNREACHABLE;
335 225909 : return true;
336 : }
337 :
338 214 : BufferPtr Connection::resetRecvData()
339 : {
340 214 : BufferPtr buffer = _impl->buffer;
341 214 : _impl->buffer = 0;
342 214 : _impl->bytes = 0;
343 214 : return buffer;
344 : }
345 :
346 : //----------------------------------------------------------------------
347 : // write
348 : //----------------------------------------------------------------------
349 2834983 : bool Connection::send( const void* buffer, const uint64_t bytes,
350 : const bool isLocked )
351 : {
352 : ADD_STATISTIC( bytes );
353 2834983 : LBASSERT( bytes > 0 );
354 2836156 : if( bytes == 0 )
355 0 : return true;
356 :
357 2836156 : const uint8_t* ptr = static_cast< const uint8_t* >( buffer );
358 :
359 : // possible OPT: We need to lock here to guarantee an atomic transmission of
360 : // the buffer. Possible improvements are:
361 : // 1) Disassemble buffer into 'small enough' pieces and use a header to
362 : // reassemble correctly on the other side (aka reliable UDP)
363 : // 2) Introduce a send thread with a thread-safe task queue
364 2836156 : lunchbox::ScopedMutex<> mutex( isLocked ? 0 : &_impl->sendLock );
365 :
366 : #ifndef NDEBUG
367 2837090 : if( bytes <= 1024 && ( lunchbox::Log::topics & LOG_PACKETS ))
368 0 : LBINFO << "send:" << lunchbox::format( ptr, bytes ) << std::endl;
369 : #endif
370 :
371 2837090 : uint64_t bytesLeft = bytes;
372 8512233 : while( bytesLeft )
373 : {
374 : try
375 : {
376 2837172 : const int64_t wrote = this->write( ptr, bytesLeft );
377 2838053 : if( wrote == -1 ) // error
378 : {
379 0 : LBERROR << "Error during write after " << bytes - bytesLeft
380 0 : << " bytes, closing connection" << std::endl;
381 0 : close();
382 0 : return false;
383 : }
384 2838053 : else if( wrote == 0 )
385 0 : LBINFO << "Zero bytes write" << std::endl;
386 :
387 2838053 : bytesLeft -= wrote;
388 2838053 : ptr += wrote;
389 : }
390 : catch( const co::Exception& e )
391 : {
392 : LBERROR << e.what() << " after " << bytes - bytesLeft
393 : << " bytes, closing connection" << std::endl;
394 : close();
395 : return false;
396 : }
397 :
398 : }
399 2837971 : return true;
400 : }
401 :
402 164 : bool Connection::isMulticast() const
403 : {
404 164 : return getDescription()->type >= CONNECTIONTYPE_MULTICAST;
405 : }
406 :
407 15160 : ConstConnectionDescriptionPtr Connection::getDescription() const
408 : {
409 15160 : return _impl->description;
410 : }
411 :
412 502 : ConnectionDescriptionPtr Connection::_getDescription()
413 : {
414 502 : return _impl->description;
415 : }
416 :
417 77 : std::ostream& operator << ( std::ostream& os, const Connection& connection )
418 : {
419 77 : const Connection::State state = connection.getState();
420 77 : ConstConnectionDescriptionPtr desc = connection.getDescription();
421 :
422 154 : os << lunchbox::className( connection ) << " " << (void*)&connection
423 77 : << " state " << ( state == Connection::STATE_CLOSED ? "closed" :
424 : state == Connection::STATE_CONNECTING ? "connecting" :
425 : state == Connection::STATE_CONNECTED ? "connected" :
426 : state == Connection::STATE_LISTENING ? "listening" :
427 : state == Connection::STATE_CLOSING ? "closing" :
428 154 : "UNKNOWN" );
429 77 : if( desc.isValid( ))
430 77 : os << " description " << desc->toString();
431 :
432 77 : return os;
433 : }
434 60 : }
|