Line data Source code
1 :
2 : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "connectionSet.h"
22 :
23 : #include "connection.h"
24 : #include "connectionListener.h"
25 : #include "eventConnection.h"
26 :
27 : #include <lunchbox/algorithm.h>
28 : #include <lunchbox/buffer.h>
29 : #include <lunchbox/os.h>
30 : #include <lunchbox/scopedMutex.h>
31 : #include <lunchbox/thread.h>
32 :
33 : #include <algorithm>
34 : #include <errno.h>
35 :
36 : #ifdef _WIN32
37 : # include <lunchbox/monitor.h>
38 : # define SELECT_TIMEOUT WAIT_TIMEOUT
39 : # define SELECT_ERROR WAIT_FAILED
40 : # define MAX_CONNECTIONS (MAXIMUM_WAIT_OBJECTS - 1)
41 : #else
42 : # include <poll.h>
43 : # define SELECT_TIMEOUT 0
44 : # define SELECT_ERROR -1
45 : # define MAX_CONNECTIONS LB_100KB // Arbitrary
46 : #endif
47 :
48 : namespace co
49 : {
50 : namespace
51 : {
52 :
53 : #ifdef _WIN32
54 : /** Handles connections exceeding MAXIMUM_WAIT_OBJECTS */
55 : class Thread : public lunchbox::Thread
56 : {
57 : public:
58 : Thread( ConnectionSet* parent )
59 : : notifier( CreateEvent( 0, false, false, 0 ))
60 : , event( ConnectionSet::EVENT_NONE )
61 : , _parent( parent )
62 : {}
63 :
64 : virtual ~Thread()
65 : {}
66 :
67 : ConnectionSet set;
68 : HANDLE notifier;
69 :
70 : lunchbox::Monitor< ConnectionSet::Event > event;
71 :
72 : protected:
73 : virtual void run()
74 : {
75 : while ( !set.isEmpty( ))
76 : {
77 : event = set.select();
78 : if( event != ConnectionSet::EVENT_INTERRUPT &&
79 : event != ConnectionSet::EVENT_NONE )
80 : {
81 : SetEvent( notifier );
82 : event.waitEQ( ConnectionSet::EVENT_NONE );
83 : }
84 : }
85 : }
86 :
87 : private:
88 : ConnectionSet* const _parent;
89 : };
90 :
91 : typedef std::vector< Thread* > Threads;
92 : typedef Threads::const_iterator ThreadsCIter;
93 : typedef Threads::iterator ThreadsIter;
94 :
95 : union Result
96 : {
97 : Connection* connection;
98 : Thread* thread;
99 : };
100 : #else
101 : union Result
102 : {
103 : Connection* connection;
104 : };
105 : #endif // _WIN32
106 :
107 : }
108 :
109 : namespace detail
110 : {
111 : class ConnectionSet : public ConnectionListener
112 : {
113 : public:
114 : /** Mutex protecting changes to the set. */
115 : lunchbox::Lock lock;
116 :
117 : /** The connections of this set */
118 : Connections allConnections;
119 :
120 : // Note: std::vector had to much overhead here
121 : #ifdef _WIN32
122 : lunchbox::Buffer< HANDLE > fdSet;
123 : #else
124 : lunchbox::Buffer< pollfd > fdSetCopy; // 'const' set
125 : lunchbox::Buffer< pollfd > fdSet; // copy of _fdSetCopy used to poll
126 : #endif
127 : lunchbox::Buffer< Result > fdSetResult;
128 :
129 : /** The connection to reset a running select, see constructor. */
130 : lunchbox::RefPtr< EventConnection > selfConnection;
131 :
132 : #ifdef _WIN32
133 : /** The connections to handle */
134 : Connections connections;
135 :
136 : /** Threads used to handle more than MAXIMUM_WAIT_OBJECTS connections */
137 : Threads threads;
138 :
139 : /** Result thread. */
140 : Thread* thread;
141 : #endif
142 :
143 : // result values
144 : ConnectionPtr connection;
145 : int error;
146 :
147 : /** FD sets need rebuild. */
148 : bool dirty;
149 :
150 56 : ConnectionSet()
151 112 : : selfConnection( new EventConnection )
152 : #ifdef _WIN32
153 : , thread( 0 )
154 : #endif
155 : , error( 0 )
156 112 : , dirty( true )
157 : {
158 : // Whenever another threads modifies the connection list while the
159 : // connection set is waiting in a select, the select is interrupted
160 : // using this connection.
161 56 : LBCHECK( selfConnection->connect( ));
162 55 : }
163 :
164 108 : ~ConnectionSet()
165 108 : {
166 54 : connection = 0;
167 54 : selfConnection->close();
168 54 : selfConnection = 0;
169 108 : }
170 :
171 447 : void setDirty()
172 : {
173 447 : if( dirty )
174 253 : return;
175 :
176 194 : LBVERB << "FD set modified, restarting select" << std::endl;
177 194 : dirty = true;
178 194 : interrupt();
179 : }
180 :
181 390 : void interrupt() { selfConnection->set(); }
182 :
183 : private:
184 44 : virtual void notifyStateChanged( co::Connection* ) { setDirty(); }
185 : };
186 : }
187 :
188 56 : ConnectionSet::ConnectionSet()
189 56 : : _impl( new detail::ConnectionSet )
190 56 : {}
191 108 : ConnectionSet::~ConnectionSet()
192 : {
193 54 : _clear();
194 54 : delete _impl;
195 54 : }
196 :
197 49 : size_t ConnectionSet::getSize() const
198 : {
199 98 : lunchbox::ScopedWrite mutex( _impl->lock );
200 98 : return _impl->allConnections.size();
201 : }
202 :
203 53 : bool ConnectionSet::isEmpty() const
204 : {
205 106 : lunchbox::ScopedWrite mutex( _impl->lock );
206 106 : return _impl->allConnections.empty();
207 : }
208 :
209 99 : const Connections& ConnectionSet::getConnections() const
210 : {
211 99 : return _impl->allConnections;
212 : }
213 :
214 0 : int ConnectionSet::getError() const
215 : {
216 0 : return _impl->error;
217 : }
218 :
219 306943 : ConnectionPtr ConnectionSet::getConnection()
220 : {
221 306943 : return _impl->connection;
222 : }
223 :
224 403 : void ConnectionSet::setDirty()
225 : {
226 403 : _impl->setDirty();
227 403 : }
228 :
229 196 : void ConnectionSet::interrupt()
230 : {
231 196 : _impl->interrupt();
232 196 : }
233 :
234 175 : void ConnectionSet::addConnection( ConnectionPtr connection )
235 : {
236 175 : LBASSERT( connection->isConnected() || connection->isListening( ));
237 :
238 : {
239 350 : lunchbox::ScopedWrite mutex( _impl->lock );
240 175 : _impl->allConnections.push_back( connection );
241 :
242 : #ifdef _WIN32
243 : LBASSERT( _impl->allConnections.size() <
244 : MAX_CONNECTIONS * MAX_CONNECTIONS );
245 : if( _impl->connections.size() < MAX_CONNECTIONS - _impl->threads.size())
246 : {
247 : // can handle it ourself
248 : _impl->connections.push_back( connection );
249 : connection->addListener( _impl );
250 : }
251 : else
252 : {
253 : // add to existing thread
254 : for( ThreadsCIter i = _impl->threads.begin();
255 : i != _impl->threads.end(); ++i )
256 : {
257 : Thread* thread = *i;
258 : if( thread->set._impl->connections.size() > MAX_CONNECTIONS )
259 : continue;
260 :
261 : thread->set.addConnection( connection );
262 : return;
263 : }
264 :
265 : // add to new thread
266 : Thread* thread = new Thread( this );
267 : thread->set.addConnection( connection );
268 : thread->set.addConnection( _impl->connections.back( ));
269 : _impl->connections.pop_back();
270 :
271 : _impl->threads.push_back( thread );
272 : thread->start();
273 : }
274 : #else
275 175 : connection->addListener( _impl );
276 :
277 175 : LBASSERT( _impl->allConnections.size() < MAX_CONNECTIONS );
278 : #endif // _WIN32
279 : }
280 :
281 175 : setDirty();
282 175 : }
283 :
284 234 : bool ConnectionSet::removeConnection( ConnectionPtr connection )
285 : {
286 : {
287 408 : lunchbox::ScopedWrite mutex( _impl->lock );
288 234 : ConnectionsIter i = lunchbox::find( _impl->allConnections, connection );
289 234 : if( i == _impl->allConnections.end( ))
290 60 : return false;
291 :
292 174 : if( _impl->connection == connection )
293 84 : _impl->connection = 0;
294 :
295 : #ifdef _WIN32
296 : ConnectionsIter j = lunchbox::find( _impl->connections, connection );
297 : if( j == _impl->connections.end( ))
298 : {
299 : Threads::iterator k = _impl->threads.begin();
300 : for( ; k != _impl->threads.end(); ++k )
301 : {
302 : Thread* thread = *k;
303 : if( thread->set.removeConnection( connection ))
304 : {
305 : if( !thread->set.isEmpty( ))
306 : return true;
307 :
308 : if( thread == _impl->thread )
309 : _impl->thread = 0;
310 :
311 : thread->event = EVENT_NONE;
312 : thread->join();
313 : delete thread;
314 : break;
315 : }
316 : }
317 :
318 : LBASSERT( k != _impl->threads.end( ));
319 : _impl->threads.erase( k );
320 : }
321 : else
322 : {
323 : connection->removeListener( _impl );
324 : _impl->connections.erase( j );
325 : }
326 : #else
327 174 : connection->removeListener( _impl );
328 : #endif
329 :
330 174 : _impl->allConnections.erase( i );
331 : }
332 :
333 174 : setDirty();
334 174 : return true;
335 : }
336 :
337 54 : void ConnectionSet::_clear()
338 : {
339 54 : _impl->connection = 0;
340 :
341 : #ifdef _WIN32
342 : for( ThreadsIter i =_impl->threads.begin(); i != _impl->threads.end(); ++i )
343 : {
344 : Thread* thread = *i;
345 : thread->set._clear();
346 : thread->event = EVENT_NONE;
347 : thread->join();
348 : delete thread;
349 : }
350 : _impl->threads.clear();
351 :
352 : Connections& connections = _impl->connections;
353 : #else
354 54 : Connections& connections = _impl->allConnections;
355 : #endif
356 54 : for( ConnectionsIter i = connections.begin(); i != connections.end(); ++i )
357 0 : (*i)->removeListener( _impl );
358 :
359 54 : _impl->allConnections.clear();
360 : #ifdef _WIN32
361 : _impl->connections.clear();
362 : #endif
363 54 : setDirty();
364 54 : _impl->fdSet.clear();
365 54 : _impl->fdSetResult.clear();
366 54 : }
367 :
368 307112 : ConnectionSet::Event ConnectionSet::select( const uint32_t timeout )
369 : {
370 614223 : LB_TS_SCOPED( _selectThread );
371 :
372 : while( true )
373 : {
374 307112 : _impl->connection = 0;
375 307112 : _impl->error = 0;
376 : #ifdef _WIN32
377 : if( _impl->thread )
378 : {
379 : _impl->thread->event = EVENT_NONE; // unblock previous thread
380 : _impl->thread = 0;
381 : }
382 : #else
383 307112 : if( !_impl->dirty ) // #38: check results from previous poll()
384 : {
385 306917 : const Event event = _getSelectResult( 0 );
386 306917 : if( event != EVENT_NONE )
387 211256 : return event;
388 : }
389 : #endif
390 :
391 95856 : if( !_setupFDSet( ))
392 34 : return EVENT_INVALID_HANDLE;
393 :
394 : // poll for a result
395 : #ifdef _WIN32
396 : LBASSERT( LB_TIMEOUT_INDEFINITE == INFINITE );
397 : const DWORD ret = WaitForMultipleObjectsEx( _impl->fdSet.getSize(),
398 : _impl->fdSet.getData(),
399 : FALSE, timeout, TRUE );
400 : #else
401 95822 : const int pollTimeout = timeout == LB_TIMEOUT_INDEFINITE ?
402 95822 : -1 : int( timeout );
403 95822 : const int ret = ::poll( _impl->fdSet.getData(), _impl->fdSet.getSize(),
404 95822 : pollTimeout );
405 : #endif
406 95821 : switch( ret )
407 : {
408 : case SELECT_TIMEOUT:
409 0 : return EVENT_TIMEOUT;
410 :
411 : case SELECT_ERROR:
412 : #ifdef _WIN32
413 : if( !_impl->thread )
414 : _impl->error = GetLastError();
415 :
416 : if( _impl->error == WSA_INVALID_HANDLE )
417 : {
418 : _impl->dirty = true;
419 : break;
420 : }
421 : #else
422 0 : if( errno == EINTR ) // Interrupted system call (gdb) - ignore
423 0 : break;
424 :
425 0 : _impl->error = errno;
426 : #endif
427 :
428 0 : LBERROR << "Error during select: " << lunchbox::sysError
429 0 : << std::endl;
430 0 : return EVENT_SELECT_ERROR;
431 :
432 : default: // SUCCESS
433 : {
434 95821 : const Event event = _getSelectResult( ret );
435 95821 : if( event == EVENT_NONE )
436 0 : break;
437 95821 : return event;
438 : }
439 : }
440 0 : }
441 : }
442 :
443 :
444 402738 : ConnectionSet::Event ConnectionSet::_getSelectResult( const uint32_t index )
445 : {
446 402738 : const Event event = _parseSelect( index );
447 402738 : if( _impl->connection == _impl->selfConnection.get( ))
448 : {
449 304 : _impl->connection = 0;
450 304 : _impl->selfConnection->reset();
451 304 : return EVENT_INTERRUPT;
452 : }
453 402433 : if( event == EVENT_DATA && _impl->connection->isListening( ))
454 34 : return EVENT_CONNECT;
455 :
456 402399 : return event;
457 : }
458 :
459 : #ifdef _WIN32
460 : ConnectionSet::Event ConnectionSet::_parseSelect( const uint32_t index )
461 : {
462 : const uint32_t i = index - WAIT_OBJECT_0;
463 : LBASSERT( i < MAXIMUM_WAIT_OBJECTS );
464 :
465 : // Bug: WaitForMultipleObjects returns occasionally 16 with fdSet size 2,
466 : // when used by the RSPConnection
467 : // WAR: Catch this and ignore the result, this seems to have no side-effects
468 : if( i >= _impl->fdSetResult.getSize( ))
469 : return EVENT_NONE;
470 :
471 : if( i > _impl->connections.size( ))
472 : {
473 : _impl->thread = _impl->fdSetResult[i].thread;
474 : LBASSERT( _impl->thread->event != EVENT_NONE );
475 : LBASSERT( _impl->fdSet[ i ] == _impl->thread->notifier );
476 :
477 : ResetEvent( _impl->thread->notifier );
478 : _impl->connection = _impl->thread->set.getConnection();
479 : _impl->error = _impl->thread->set.getError();
480 : return _impl->thread->event.get();
481 : }
482 : // else locally handled connection
483 :
484 : _impl->connection = _impl->fdSetResult[i].connection;
485 : LBASSERT( _impl->fdSet[i] == _impl->connection->getNotifier() ||
486 : _impl->connection->isClosed( ));
487 : return EVENT_DATA;
488 : }
489 : #else // _WIN32
490 402738 : ConnectionSet::Event ConnectionSet::_parseSelect( const uint32_t )
491 : {
492 2456404 : for( size_t i = 0; i < _impl->fdSet.getSize(); ++i )
493 : {
494 2360743 : pollfd& pollFD = _impl->fdSet[i];
495 2360743 : if( pollFD.revents == 0 )
496 2053666 : continue;
497 :
498 307077 : const int pollEvents = pollFD.revents;
499 307077 : pollFD.revents = 0;
500 307077 : LBASSERT( pollFD.fd > 0 );
501 :
502 307077 : _impl->connection = _impl->fdSetResult[i].connection;
503 307077 : LBASSERT( _impl->connection );
504 :
505 307077 : LBVERB << "Got event on connection @" << (void*)_impl->connection.get()
506 307077 : << std::endl;
507 :
508 307077 : if( pollEvents & POLLERR )
509 : {
510 0 : LBINFO << "Error during poll(): " << lunchbox::sysError
511 0 : << std::endl;
512 0 : return EVENT_ERROR;
513 : }
514 :
515 : // disconnect event or disconnected connection
516 307077 : if( (pollEvents & POLLHUP) || (pollEvents & POLLNVAL) )
517 0 : return EVENT_DISCONNECT;
518 :
519 : // Note: Intuitively I would handle the read before HUP to
520 : // read remaining data of the connection, but at least on
521 : // OS X both events happen simultaneously and no more data
522 : // can be read.
523 307077 : if( pollEvents & POLLIN || pollEvents & POLLPRI )
524 307077 : return EVENT_DATA;
525 :
526 0 : LBERROR << "Unhandled poll event(s): " << pollEvents << std::endl;
527 0 : ::abort();
528 : }
529 95661 : return EVENT_NONE;
530 : }
531 : #endif // else not _WIN32
532 :
533 95856 : bool ConnectionSet::_setupFDSet()
534 : {
535 95856 : if( !_impl->dirty )
536 : {
537 : #ifndef _WIN32
538 : // TODO: verify that poll() really modifies _fdSet, and remove the copy
539 : // if it doesn't. The man page seems to hint that poll changes fds.
540 95661 : _impl->fdSet = _impl->fdSetCopy;
541 : #endif
542 95661 : return true;
543 : }
544 :
545 195 : _impl->dirty = false;
546 195 : _impl->fdSet.setSize( 0 );
547 195 : _impl->fdSetResult.setSize( 0 );
548 :
549 : #ifdef _WIN32
550 : // add self connection
551 : HANDLE readHandle = _impl->selfConnection->getNotifier();
552 : LBASSERT( readHandle );
553 : _impl->fdSet.append( readHandle );
554 :
555 : Result res;
556 : res.connection = _impl->selfConnection.get();
557 : _impl->fdSetResult.append( res );
558 :
559 : // add regular connections
560 : _impl->lock.set();
561 : for( ConnectionsCIter i = _impl->connections.begin();
562 : i != _impl->connections.end(); ++i )
563 : {
564 : ConnectionPtr connection = *i;
565 : readHandle = connection->getNotifier();
566 :
567 : if( !readHandle )
568 : {
569 : LBINFO << "Cannot select connection " << connection
570 : << ", connection does not provide a read handle" << std::endl;
571 : _impl->connection = connection;
572 : _impl->lock.unset();
573 : return false;
574 : }
575 :
576 : _impl->fdSet.append( readHandle );
577 :
578 : Result result;
579 : result.connection = connection.get();
580 : _impl->fdSetResult.append( result );
581 : }
582 :
583 : for( ThreadsCIter i=_impl->threads.begin(); i != _impl->threads.end(); ++i )
584 : {
585 : Thread* thread = *i;
586 : readHandle = thread->notifier;
587 : LBASSERT( readHandle );
588 : _impl->fdSet.append( readHandle );
589 :
590 : Result result;
591 : result.thread = thread;
592 : _impl->fdSetResult.append( result );
593 : }
594 : _impl->lock.unset();
595 : #else // _WIN32
596 : pollfd fd;
597 195 : fd.events = POLLIN;
598 195 : fd.revents = 0;
599 :
600 : // add self 'connection'
601 195 : fd.fd = _impl->selfConnection->getNotifier();
602 195 : LBASSERT( fd.fd > 0 );
603 195 : _impl->fdSet.append( fd );
604 :
605 : Result result;
606 195 : result.connection = _impl->selfConnection.get();
607 195 : _impl->fdSetResult.append( result );
608 :
609 : // add regular connections
610 194 : _impl->lock.set();
611 837 : for( ConnectionPtr connection : _impl->allConnections )
612 : {
613 676 : fd.fd = connection->getNotifier();
614 :
615 675 : if( fd.fd <= 0 )
616 : {
617 136 : LBINFO << "Cannot select connection " << connection
618 102 : << ", connection " << lunchbox::className( connection.get( ))
619 136 : << " doesn't have a file descriptor" << std::endl;
620 34 : _impl->connection = connection;
621 34 : _impl->lock.unset();
622 34 : return false;
623 : }
624 :
625 1282 : LBVERB << "Listening on " << lunchbox::className( connection.get( ))
626 1922 : << std::endl;
627 :
628 641 : _impl->fdSet.append( fd );
629 :
630 642 : result.connection = connection.get();
631 642 : _impl->fdSetResult.append( result );
632 : }
633 161 : _impl->lock.unset();
634 161 : _impl->fdSetCopy = _impl->fdSet;
635 : #endif
636 :
637 161 : return true;
638 : }
639 :
640 0 : std::ostream& operator << ( std::ostream& os, const ConnectionSet& set )
641 : {
642 0 : const Connections& connections = set.getConnections();
643 :
644 0 : os << "connection set " << (void*)&set << ", " << connections.size()
645 0 : << " connections";
646 :
647 0 : for( ConnectionsCIter i = connections.begin(); i != connections.end(); ++i )
648 0 : os << std::endl << " " << (*i).get();
649 :
650 0 : return os;
651 : }
652 :
653 0 : std::ostream& operator << ( std::ostream& os, const ConnectionSet::Event event )
654 : {
655 0 : if( event >= ConnectionSet::EVENT_ALL )
656 0 : return os << "unknown (" << unsigned( event ) << ')';
657 :
658 : return os << ( event == ConnectionSet::EVENT_NONE ? "none" :
659 0 : event == ConnectionSet::EVENT_CONNECT ? "connect" :
660 0 : event == ConnectionSet::EVENT_DISCONNECT ? "disconnect" :
661 0 : event == ConnectionSet::EVENT_DATA ? "data" :
662 0 : event == ConnectionSet::EVENT_TIMEOUT ? "timeout" :
663 0 : event == ConnectionSet::EVENT_INTERRUPT ? "interrupted" :
664 0 : event == ConnectionSet::EVENT_ERROR ? "error" :
665 0 : event == ConnectionSet::EVENT_SELECT_ERROR ? "select error" :
666 0 : event == ConnectionSet::EVENT_INVALID_HANDLE ?
667 0 : "invalid handle" : "ERROR" );
668 : }
669 :
670 66 : }
|