Line data Source code
1 :
2 : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "connection.h"
22 :
23 : #include "buffer.h"
24 : #include "connectionDescription.h"
25 : #include "connectionListener.h"
26 : #include "log.h"
27 : #include "pipeConnection.h"
28 : #include "socketConnection.h"
29 : #include "rspConnection.h"
30 :
31 : #ifdef _WIN32
32 : # include "namedPipeConnection.h"
33 : #endif
34 :
35 : #include <co/exception.h>
36 :
37 : #ifdef COLLAGE_USE_OFED
38 : # include "rdmaConnection.h"
39 : #endif
40 : #ifdef COLLAGE_USE_UDT
41 : # include "udtConnection.h"
42 : #endif
43 :
44 : #include <lunchbox/scopedMutex.h>
45 : #include <lunchbox/stdExt.h>
46 :
47 : //#define STATISTICS
48 : #ifdef STATISTICS
49 : typedef std::map< uint64_t, size_t > Histogram;
50 : typedef Histogram::const_iterator HistogramCIter;
51 : lunchbox::Lockable< Histogram, lunchbox::SpinLock > _histogram;
52 :
53 : # define ADD_STATISTIC( x ) \
54 : { \
55 : lunchbox::ScopedFastWrite mutex( _histogram ); \
56 : ++(*_histogram)[ x ]; \
57 : }
58 : # define DUMP_STATISTIC \
59 : { \
60 : lunchbox::ScopedFastRead mutex( _histogram ); \
61 : LBINFO << lunchbox::disableFlush << lunchbox::disableHeader; \
62 : for( HistogramCIter i=_histogram->begin(); i!=_histogram->end(); ++i) \
63 : LBINFO << i->first << ", " << i->second << std::endl; \
64 : LBINFO << lunchbox::enableHeader << lunchbox::enableFlush <<std::endl; \
65 : }
66 : #else
67 : # define ADD_STATISTIC( x )
68 : # define DUMP_STATISTIC
69 : #endif
70 :
71 : namespace co
72 : {
73 : namespace detail
74 : {
75 : class Connection
76 : {
77 : public:
78 : co::Connection::State state; //!< The connection state
79 : ConnectionDescriptionPtr description; //!< The connection parameters
80 :
81 : /** The lock used to protect concurrent write calls. */
82 : mutable lunchbox::Lock sendLock;
83 :
84 : BufferPtr buffer; //!< Current async read buffer
85 : uint64_t bytes; //!< Current read request size
86 :
87 : /** The listeners on state changes */
88 : ConnectionListeners listeners;
89 :
90 429 : Connection()
91 : : state( co::Connection::STATE_CLOSED )
92 429 : , description( new ConnectionDescription )
93 858 : , bytes( 0 )
94 : {
95 429 : description->type = CONNECTIONTYPE_NONE;
96 429 : }
97 :
98 421 : ~Connection()
99 421 : {
100 421 : LBASSERT( state == co::Connection::STATE_CLOSED );
101 421 : state = co::Connection::STATE_CLOSED;
102 421 : description = 0;
103 :
104 421 : LBASSERTINFO( !buffer,
105 : "Pending read operation during connection destruction" );
106 421 : }
107 :
108 1127 : void fireStateChanged( co::Connection* connection )
109 : {
110 3513 : for( ConnectionListeners::const_iterator i= listeners.begin();
111 2342 : i != listeners.end(); ++i )
112 : {
113 44 : (*i)->notifyStateChanged( connection );
114 : }
115 1127 : }
116 : };
117 : }
118 :
119 429 : Connection::Connection()
120 429 : : _impl( new detail::Connection )
121 : {
122 429 : LBVERB << "New Connection @" << (void*)this << std::endl;
123 429 : }
124 :
125 842 : Connection::~Connection()
126 : {
127 421 : delete _impl;
128 421 : LBVERB << "Delete Connection @" << (void*)this << std::endl;
129 : DUMP_STATISTIC;
130 421 : }
131 :
132 0 : bool Connection::operator == ( const Connection& rhs ) const
133 : {
134 0 : if( this == &rhs )
135 0 : return true;
136 0 : if( _impl->description->type != CONNECTIONTYPE_PIPE )
137 0 : return false;
138 0 : Connection* pipe = const_cast< Connection* >( this );
139 0 : return pipe->acceptSync().get() == &rhs;
140 : }
141 :
142 106 : ConnectionPtr Connection::create( ConnectionDescriptionPtr description )
143 : {
144 106 : ConnectionPtr connection;
145 106 : switch( description->type )
146 : {
147 : case CONNECTIONTYPE_TCPIP:
148 98 : connection = new SocketConnection;
149 98 : break;
150 :
151 : case CONNECTIONTYPE_PIPE:
152 3 : connection = new PipeConnection;
153 3 : break;
154 :
155 : #ifdef _WIN32
156 : case CONNECTIONTYPE_NAMEDPIPE:
157 : connection = new NamedPipeConnection;
158 : break;
159 : #endif
160 :
161 : case CONNECTIONTYPE_RSP:
162 0 : connection = new RSPConnection;
163 0 : break;
164 :
165 : #ifdef COLLAGE_USE_OFED
166 : case CONNECTIONTYPE_RDMA:
167 4 : connection = new RDMAConnection;
168 4 : break;
169 : #endif
170 : #ifdef COLLAGE_USE_UDT
171 : case CONNECTIONTYPE_UDT:
172 : connection = new UDTConnection;
173 : break;
174 : #endif
175 :
176 : default:
177 3 : LBWARN << "Connection type " << description->type
178 3 : << " not supported" << std::endl;
179 1 : return 0;
180 : }
181 :
182 105 : if( description->bandwidth == 0 )
183 78 : description->bandwidth = connection->getDescription()->bandwidth;
184 :
185 105 : connection->_setDescription( description );
186 105 : return connection;
187 : }
188 :
189 3256119 : Connection::State Connection::getState() const
190 : {
191 3256119 : return _impl->state;
192 : }
193 :
194 105 : void Connection::_setDescription( ConnectionDescriptionPtr description )
195 : {
196 105 : LBASSERT( description.isValid( ));
197 105 : LBASSERTINFO( _impl->description->type == description->type,
198 : "Wrong connection type in description" );
199 105 : _impl->description = description;
200 105 : LBASSERT( description->bandwidth > 0 );
201 105 : }
202 :
203 1181 : void Connection::_setState( const State state )
204 : {
205 1181 : if( _impl->state == state )
206 1235 : return;
207 1127 : _impl->state = state;
208 1127 : _impl->fireStateChanged( this );
209 : }
210 :
211 183 : void Connection::lockSend() const
212 : {
213 183 : _impl->sendLock.set();
214 183 : }
215 :
216 183 : void Connection::unlockSend() const
217 : {
218 183 : _impl->sendLock.unset();
219 183 : }
220 :
221 175 : void Connection::addListener( ConnectionListener* listener )
222 : {
223 175 : _impl->listeners.push_back( listener );
224 175 : }
225 :
226 174 : void Connection::removeListener( ConnectionListener* listener )
227 : {
228 : ConnectionListeners::iterator i = find( _impl->listeners.begin(),
229 174 : _impl->listeners.end(), listener );
230 174 : if( i != _impl->listeners.end( ))
231 174 : _impl->listeners.erase( i );
232 174 : }
233 :
234 : //----------------------------------------------------------------------
235 : // read
236 : //----------------------------------------------------------------------
237 2207507 : void Connection::recvNB( BufferPtr buffer, const uint64_t bytes )
238 : {
239 2207507 : LBASSERT( !_impl->buffer );
240 2207507 : LBASSERT( _impl->bytes == 0 );
241 2207507 : LBASSERT( buffer );
242 2207507 : LBASSERT( bytes > 0 );
243 2207507 : LBASSERTINFO( bytes < LB_BIT48,
244 : "Out-of-sync network stream: read size " << bytes << "?" );
245 :
246 2207507 : _impl->buffer = buffer;
247 2207507 : _impl->bytes = bytes;
248 2207507 : buffer->reserve( buffer->getSize() + bytes );
249 2207507 : readNB( buffer->getData() + buffer->getSize(), bytes );
250 2207506 : }
251 :
252 2207389 : bool Connection::recvSync( BufferPtr& outBuffer, const bool block )
253 : {
254 2207389 : LBASSERTINFO( _impl->buffer,
255 : "No pending receive on " << getDescription()->toString( ));
256 :
257 : // reset async IO data
258 2207389 : outBuffer = _impl->buffer;
259 2207389 : const uint64_t bytes = _impl->bytes;
260 2207389 : _impl->buffer = 0;
261 2207389 : _impl->bytes = 0;
262 :
263 2207389 : if( _impl->state != STATE_CONNECTED || !outBuffer || bytes == 0 )
264 34 : return false;
265 2207355 : LBASSERTINFO( bytes < LB_BIT48,
266 : "Out-of-sync network stream: read size " << bytes << "?" );
267 :
268 : // 'Iterators' for receive loop
269 2207355 : uint8_t* ptr = outBuffer->getData() + outBuffer->getSize();
270 2207355 : uint64_t bytesLeft = bytes;
271 2207355 : int64_t got = readSync( ptr, bytesLeft, block );
272 :
273 : // WAR: fluke notification: On Win32, we get occasionally a data
274 : // notification and then deadlock when reading from the connection. The
275 : // callee (Node::handleData) will flag the first read, the underlying
276 : // SocketConnection will not block and we will restore the AIO operation if
277 : // no data was present.
278 2207355 : if( got == READ_TIMEOUT )
279 : {
280 0 : _impl->buffer = outBuffer;
281 0 : _impl->bytes = bytes;
282 0 : outBuffer = 0;
283 0 : return true;
284 : }
285 :
286 : // From here on, receive loop until all data read or error
287 : while( true )
288 : {
289 2501257 : if( got < 0 ) // error
290 : {
291 40 : const uint64_t read = bytes - bytesLeft;
292 40 : outBuffer->resize( outBuffer->getSize() + read );
293 40 : if( bytes == bytesLeft )
294 40 : LBDEBUG << "Read on dead connection" << std::endl;
295 : else
296 0 : LBERROR << "Error during read after " << read << " bytes on "
297 0 : << _impl->description << std::endl;
298 40 : return false;
299 : }
300 2501217 : else if( got == 0 )
301 : {
302 : // ConnectionSet::select may report data on an 'empty' connection.
303 : // If we have nothing read so far, we have hit this case.
304 0 : if( bytes == bytesLeft )
305 0 : return false;
306 0 : LBVERB << "Zero bytes read" << std::endl;
307 : }
308 2501217 : if( bytesLeft > static_cast< uint64_t >( got )) // partial read
309 : {
310 293902 : ptr += got;
311 293902 : bytesLeft -= got;
312 :
313 293902 : readNB( ptr, bytesLeft );
314 293902 : got = readSync( ptr, bytesLeft, true );
315 293902 : continue;
316 : }
317 :
318 : // read done
319 2207315 : LBASSERTINFO( static_cast< uint64_t >( got ) == bytesLeft,
320 : got << " != " << bytesLeft );
321 :
322 2207315 : outBuffer->resize( outBuffer->getSize() + bytes );
323 : #ifndef NDEBUG
324 2207315 : if( bytes <= 1024 && ( lunchbox::Log::topics & LOG_PACKETS ))
325 : {
326 0 : ptr -= (bytes - bytesLeft); // rewind
327 0 : LBINFO << "recv:" << lunchbox::format( ptr, bytes ) << std::endl;
328 : }
329 : #endif
330 2207315 : return true;
331 : }
332 :
333 : LBUNREACHABLE;
334 293902 : return true;
335 : }
336 :
337 224 : BufferPtr Connection::resetRecvData()
338 : {
339 224 : BufferPtr buffer = _impl->buffer;
340 224 : _impl->buffer = 0;
341 224 : _impl->bytes = 0;
342 224 : return buffer;
343 : }
344 :
345 : //----------------------------------------------------------------------
346 : // write
347 : //----------------------------------------------------------------------
348 2455002 : bool Connection::send( const void* buffer, const uint64_t bytes,
349 : const bool isLocked )
350 : {
351 : ADD_STATISTIC( bytes );
352 2455002 : LBASSERT( bytes > 0 );
353 2451907 : if( bytes == 0 )
354 0 : return true;
355 :
356 2451907 : const uint8_t* ptr = static_cast< const uint8_t* >( buffer );
357 :
358 : // possible OPT: We need to lock here to guarantee an atomic transmission of
359 : // the buffer. Possible improvements are:
360 : // 1) Disassemble buffer into 'small enough' pieces and use a header to
361 : // reassemble correctly on the other side (aka reliable UDP)
362 : // 2) Introduce a send thread with a thread-safe task queue
363 2451907 : lunchbox::ScopedMutex<> mutex( isLocked ? 0 : &_impl->sendLock );
364 :
365 : #ifndef NDEBUG
366 2458719 : if( bytes <= 1024 && ( lunchbox::Log::topics & LOG_PACKETS ))
367 0 : LBINFO << "send:" << lunchbox::format( ptr, bytes ) << std::endl;
368 : #endif
369 :
370 2458719 : uint64_t bytesLeft = bytes;
371 7390561 : while( bytesLeft )
372 : {
373 : try
374 : {
375 2479108 : const int64_t wrote = this->write( ptr, bytesLeft );
376 2473123 : if( wrote == -1 ) // error
377 : {
378 0 : LBERROR << "Error during write after " << bytes - bytesLeft
379 0 : << " bytes, closing connection" << std::endl;
380 0 : close();
381 0 : return false;
382 : }
383 2473123 : else if( wrote == 0 )
384 0 : LBINFO << "Zero bytes write" << std::endl;
385 :
386 2473123 : bytesLeft -= wrote;
387 2473123 : ptr += wrote;
388 : }
389 0 : catch( const co::Exception& e )
390 : {
391 0 : LBERROR << e.what() << " after " << bytes - bytesLeft
392 0 : << " bytes, closing connection" << std::endl;
393 0 : close();
394 0 : return false;
395 : }
396 :
397 : }
398 2452734 : return true;
399 : }
400 :
401 166 : bool Connection::isMulticast() const
402 : {
403 166 : return getDescription()->type >= CONNECTIONTYPE_MULTICAST;
404 : }
405 :
406 681 : ConstConnectionDescriptionPtr Connection::getDescription() const
407 : {
408 681 : return _impl->description;
409 : }
410 :
411 527 : ConnectionDescriptionPtr Connection::_getDescription()
412 : {
413 527 : return _impl->description;
414 : }
415 :
416 81 : std::ostream& operator << ( std::ostream& os, const Connection& connection )
417 : {
418 81 : const Connection::State state = connection.getState();
419 81 : ConstConnectionDescriptionPtr desc = connection.getDescription();
420 :
421 162 : os << lunchbox::className( connection ) << " " << (void*)&connection
422 81 : << " state " << ( state == Connection::STATE_CLOSED ? "closed" :
423 : state == Connection::STATE_CONNECTING ? "connecting" :
424 : state == Connection::STATE_CONNECTED ? "connected" :
425 : state == Connection::STATE_LISTENING ? "listening" :
426 : state == Connection::STATE_CLOSING ? "closing" :
427 162 : "UNKNOWN" );
428 81 : if( desc.isValid( ))
429 81 : os << " description " << desc->toString();
430 :
431 81 : return os;
432 : }
433 66 : }
|