Line data Source code
1 :
2 : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "connection.h"
22 :
23 : #include "buffer.h"
24 : #include "connectionDescription.h"
25 : #include "connectionListener.h"
26 : #include "log.h"
27 : #include "pipeConnection.h"
28 : #include "socketConnection.h"
29 : #include "rspConnection.h"
30 :
31 : #ifdef _WIN32
32 : # include "namedPipeConnection.h"
33 : #endif
34 :
35 : #include <co/exception.h>
36 :
37 : #ifdef COLLAGE_USE_OFED
38 : # include "rdmaConnection.h"
39 : #endif
40 : #ifdef COLLAGE_USE_UDT
41 : # include "udtConnection.h"
42 : #endif
43 :
44 : #include <lunchbox/scopedMutex.h>
45 : #include <lunchbox/stdExt.h>
46 :
47 : #define STATISTICS
48 : namespace co
49 : {
50 : namespace detail
51 : {
52 : class Connection
53 : {
54 : public:
55 : co::Connection::State state; //!< The connection state
56 : ConnectionDescriptionPtr description; //!< The connection parameters
57 :
58 : /** The lock used to protect concurrent write calls. */
59 : mutable lunchbox::Lock sendLock;
60 :
61 : BufferPtr buffer; //!< Current async read buffer
62 : uint64_t bytes; //!< Current read request size
63 :
64 : /** The listeners on state changes */
65 : ConnectionListeners listeners;
66 :
67 : uint64_t outBytes; //!< Statistic: written bytes
68 : uint64_t inBytes; //!< Statistic: read bytes
69 :
70 429 : Connection()
71 429 : : state( co::Connection::STATE_CLOSED )
72 429 : , description( new ConnectionDescription )
73 : , bytes( 0 )
74 : , outBytes( 0 )
75 858 : , inBytes( 0 )
76 : {
77 429 : description->type = CONNECTIONTYPE_NONE;
78 429 : }
79 :
80 421 : ~Connection()
81 421 : {
82 421 : LBASSERT( state == co::Connection::STATE_CLOSED );
83 421 : state = co::Connection::STATE_CLOSED;
84 421 : description = 0;
85 :
86 421 : LBASSERTINFO( !buffer,
87 : "Pending read operation during connection destruction" );
88 421 : }
89 :
90 1127 : void fireStateChanged( co::Connection* connection )
91 : {
92 3509 : for( ConnectionListeners::const_iterator i= listeners.begin();
93 2338 : i != listeners.end(); ++i )
94 : {
95 44 : (*i)->notifyStateChanged( connection );
96 : }
97 1125 : }
98 : };
99 : }
100 :
101 428 : Connection::Connection()
102 428 : : _impl( new detail::Connection )
103 : {
104 429 : LBVERB << "New Connection @" << (void*)this << std::endl;
105 429 : }
106 :
107 842 : Connection::~Connection()
108 : {
109 421 : LBVERB << "Delete Connection @" << (void*)this << std::endl;
110 : #ifdef STATISTICS
111 1263 : LBDEBUG << *this << ": " << (_impl->outBytes >> 20) << " MB out, "
112 1263 : << (_impl->inBytes >> 20) << " MB in" << std::endl;
113 : #endif
114 421 : delete _impl;
115 421 : }
116 :
117 0 : bool Connection::operator == ( const Connection& rhs ) const
118 : {
119 0 : if( this == &rhs )
120 0 : return true;
121 0 : if( _impl->description->type != CONNECTIONTYPE_PIPE )
122 0 : return false;
123 0 : Connection* pipe = const_cast< Connection* >( this );
124 0 : return pipe->acceptSync().get() == &rhs;
125 : }
126 :
127 106 : ConnectionPtr Connection::create( ConnectionDescriptionPtr description )
128 : {
129 212 : ConnectionPtr connection;
130 106 : switch( description->type )
131 : {
132 : case CONNECTIONTYPE_TCPIP:
133 98 : connection = new SocketConnection;
134 98 : break;
135 :
136 : case CONNECTIONTYPE_PIPE:
137 3 : connection = new PipeConnection;
138 3 : break;
139 :
140 : #ifdef _WIN32
141 : case CONNECTIONTYPE_NAMEDPIPE:
142 : connection = new NamedPipeConnection;
143 : break;
144 : #endif
145 :
146 : case CONNECTIONTYPE_RSP:
147 0 : connection = new RSPConnection;
148 0 : break;
149 :
150 : #ifdef COLLAGE_USE_OFED
151 : case CONNECTIONTYPE_RDMA:
152 4 : connection = new RDMAConnection;
153 4 : break;
154 : #endif
155 : #ifdef COLLAGE_USE_UDT
156 : case CONNECTIONTYPE_UDT:
157 : connection = new UDTConnection;
158 : break;
159 : #endif
160 :
161 : default:
162 3 : LBWARN << "Connection type " << description->type
163 3 : << " not supported" << std::endl;
164 1 : return 0;
165 : }
166 :
167 105 : if( description->bandwidth == 0 )
168 78 : description->bandwidth = connection->getDescription()->bandwidth;
169 :
170 105 : connection->_setDescription( description );
171 105 : return connection;
172 : }
173 :
174 3244745 : Connection::State Connection::getState() const
175 : {
176 3244745 : return _impl->state;
177 : }
178 :
179 105 : void Connection::_setDescription( ConnectionDescriptionPtr description )
180 : {
181 105 : LBASSERT( description.isValid( ));
182 105 : LBASSERTINFO( _impl->description->type == description->type,
183 : "Wrong connection type in description" );
184 105 : _impl->description = description;
185 105 : LBASSERT( description->bandwidth > 0 );
186 105 : }
187 :
188 1181 : void Connection::_setState( const State state )
189 : {
190 1181 : if( _impl->state == state )
191 54 : return;
192 1127 : _impl->state = state;
193 1127 : _impl->fireStateChanged( this );
194 : }
195 :
196 183 : void Connection::lockSend() const
197 : {
198 183 : _impl->sendLock.set();
199 183 : }
200 :
201 183 : void Connection::unlockSend() const
202 : {
203 183 : _impl->sendLock.unset();
204 183 : }
205 :
206 175 : void Connection::addListener( ConnectionListener* listener )
207 : {
208 175 : _impl->listeners.push_back( listener );
209 175 : }
210 :
211 174 : void Connection::removeListener( ConnectionListener* listener )
212 : {
213 174 : ConnectionListeners::iterator i = find( _impl->listeners.begin(),
214 348 : _impl->listeners.end(), listener );
215 174 : if( i != _impl->listeners.end( ))
216 174 : _impl->listeners.erase( i );
217 174 : }
218 :
219 : //----------------------------------------------------------------------
220 : // read
221 : //----------------------------------------------------------------------
222 2225616 : void Connection::recvNB( BufferPtr buffer, const uint64_t bytes )
223 : {
224 2225616 : LBASSERT( !_impl->buffer );
225 2225616 : LBASSERT( _impl->bytes == 0 );
226 2225616 : LBASSERT( buffer );
227 2225616 : LBASSERT( bytes > 0 );
228 2225616 : LBASSERTINFO( bytes < LB_BIT48,
229 : "Out-of-sync network stream: read size " << bytes << "?" );
230 :
231 2225616 : _impl->buffer = buffer;
232 2225616 : _impl->bytes = bytes;
233 2225616 : buffer->reserve( buffer->getSize() + bytes );
234 2225616 : readNB( buffer->getData() + buffer->getSize(), bytes );
235 2225616 : }
236 :
237 2225498 : bool Connection::recvSync( BufferPtr& outBuffer, const bool block )
238 : {
239 2225498 : LBASSERTINFO( _impl->buffer,
240 : "No pending receive on " << getDescription()->toString( ));
241 :
242 : // reset async IO data
243 2225497 : outBuffer = _impl->buffer;
244 2225498 : const uint64_t bytes = _impl->bytes;
245 2225498 : _impl->buffer = 0;
246 2225498 : _impl->bytes = 0;
247 :
248 2225498 : if( _impl->state != STATE_CONNECTED || !outBuffer || bytes == 0 )
249 34 : return false;
250 2225464 : LBASSERTINFO( bytes < LB_BIT48,
251 : "Out-of-sync network stream: read size " << bytes << "?" );
252 : #ifdef STATISTICS
253 2225464 : _impl->inBytes += bytes;
254 : #endif
255 :
256 : // 'Iterators' for receive loop
257 2225464 : uint8_t* ptr = outBuffer->getData() + outBuffer->getSize();
258 2225464 : uint64_t bytesLeft = bytes;
259 2225464 : int64_t got = readSync( ptr, bytesLeft, block );
260 :
261 : // WAR: fluke notification: On Win32, we get occasionally a data
262 : // notification and then deadlock when reading from the connection. The
263 : // callee (Node::handleData) will flag the first read, the underlying
264 : // SocketConnection will not block and we will restore the AIO operation if
265 : // no data was present.
266 2225464 : if( got == READ_TIMEOUT )
267 : {
268 0 : _impl->buffer = outBuffer;
269 0 : _impl->bytes = bytes;
270 0 : outBuffer = 0;
271 0 : return true;
272 : }
273 :
274 : // From here on, receive loop until all data read or error
275 : while( true )
276 : {
277 2512302 : if( got < 0 ) // error
278 : {
279 40 : const uint64_t read = bytes - bytesLeft;
280 40 : outBuffer->resize( outBuffer->getSize() + read );
281 40 : if( bytes == bytesLeft )
282 40 : LBDEBUG << "Read on dead connection" << std::endl;
283 : else
284 0 : LBERROR << "Error during read after " << read << " bytes on "
285 0 : << _impl->description << std::endl;
286 40 : return false;
287 : }
288 2512262 : else if( got == 0 )
289 : {
290 : // ConnectionSet::select may report data on an 'empty' connection.
291 : // If we have nothing read so far, we have hit this case.
292 0 : if( bytes == bytesLeft )
293 0 : return false;
294 0 : LBVERB << "Zero bytes read" << std::endl;
295 : }
296 2512262 : if( bytesLeft > static_cast< uint64_t >( got )) // partial read
297 : {
298 286838 : ptr += got;
299 286838 : bytesLeft -= got;
300 :
301 286838 : readNB( ptr, bytesLeft );
302 286838 : got = readSync( ptr, bytesLeft, true );
303 286838 : continue;
304 : }
305 :
306 : // read done
307 2225424 : LBASSERTINFO( static_cast< uint64_t >( got ) == bytesLeft,
308 : got << " != " << bytesLeft );
309 :
310 2225424 : outBuffer->resize( outBuffer->getSize() + bytes );
311 : #ifndef NDEBUG
312 2225424 : if( bytes <= 1024 && ( lunchbox::Log::topics & LOG_PACKETS ))
313 : {
314 0 : ptr -= (bytes - bytesLeft); // rewind
315 0 : LBINFO << "recv:" << lunchbox::format( ptr, bytes ) << std::endl;
316 : }
317 : #endif
318 2225424 : return true;
319 286838 : }
320 :
321 : LBUNREACHABLE;
322 : return true;
323 : }
324 :
325 224 : BufferPtr Connection::resetRecvData()
326 : {
327 224 : BufferPtr buffer = _impl->buffer;
328 224 : _impl->buffer = 0;
329 224 : _impl->bytes = 0;
330 224 : return buffer;
331 : }
332 :
333 : //----------------------------------------------------------------------
334 : // write
335 : //----------------------------------------------------------------------
336 2472874 : bool Connection::send( const void* buffer, const uint64_t bytes,
337 : const bool isLocked )
338 : {
339 : #ifdef STATISTICS
340 2472874 : _impl->outBytes += bytes;
341 : #endif
342 2472874 : LBASSERT( bytes > 0 );
343 2468799 : if( bytes == 0 )
344 0 : return true;
345 :
346 2468799 : const uint8_t* ptr = static_cast< const uint8_t* >( buffer );
347 :
348 : // possible OPT: We need to lock here to guarantee an atomic transmission of
349 : // the buffer. Possible improvements are:
350 : // 1) Disassemble buffer into 'small enough' pieces and use a header to
351 : // reassemble correctly on the other side (aka reliable UDP)
352 : // 2) Introduce a send thread with a thread-safe task queue
353 4939100 : lunchbox::ScopedMutex<> mutex( isLocked ? 0 : &_impl->sendLock );
354 :
355 : #ifndef NDEBUG
356 2476679 : if( bytes <= 1024 && ( lunchbox::Log::topics & LOG_PACKETS ))
357 0 : LBINFO << "send:" << lunchbox::format( ptr, bytes ) << std::endl;
358 : #endif
359 :
360 2476679 : uint64_t bytesLeft = bytes;
361 7460911 : while( bytesLeft )
362 : {
363 : try
364 : {
365 2498494 : const int64_t wrote = this->write( ptr, bytesLeft );
366 2492116 : if( wrote == -1 ) // error
367 : {
368 0 : LBERROR << "Error during write after " << bytes - bytesLeft
369 0 : << " bytes, closing connection" << std::endl;
370 0 : close();
371 0 : return false;
372 : }
373 2492116 : else if( wrote == 0 )
374 0 : LBINFO << "Zero bytes write" << std::endl;
375 :
376 2492116 : bytesLeft -= wrote;
377 2492116 : ptr += wrote;
378 : }
379 0 : catch( const co::Exception& e )
380 : {
381 0 : LBERROR << e.what() << " after " << bytes - bytesLeft
382 0 : << " bytes, closing connection" << std::endl;
383 0 : close();
384 0 : return false;
385 : }
386 :
387 : }
388 2470301 : return true;
389 : }
390 :
391 166 : bool Connection::isMulticast() const
392 : {
393 166 : return getDescription()->type >= CONNECTIONTYPE_MULTICAST;
394 : }
395 :
396 1101 : ConstConnectionDescriptionPtr Connection::getDescription() const
397 : {
398 1101 : return _impl->description;
399 : }
400 :
401 528 : ConnectionDescriptionPtr Connection::_getDescription()
402 : {
403 528 : return _impl->description;
404 : }
405 :
406 502 : std::ostream& operator << ( std::ostream& os, const Connection& connection )
407 : {
408 502 : const Connection::State state = connection.getState();
409 1004 : ConstConnectionDescriptionPtr desc = connection.getDescription();
410 :
411 1004 : os << lunchbox::className( connection ) << " " << (void*)&connection
412 : << " state " << ( state == Connection::STATE_CLOSED ? "closed" :
413 0 : state == Connection::STATE_CONNECTING ? "connecting" :
414 0 : state == Connection::STATE_CONNECTED ? "connected" :
415 0 : state == Connection::STATE_LISTENING ? "listening" :
416 : state == Connection::STATE_CLOSING ? "closing" :
417 1004 : "UNKNOWN" );
418 502 : if( desc.isValid( ))
419 502 : os << " description " << desc->toString();
420 :
421 1004 : return os;
422 : }
423 66 : }
|