Line data Source code
1 :
2 : /* Copyright (c) 2005-2017, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "connectionSet.h"
22 :
23 : #include "connection.h"
24 : #include "connectionListener.h"
25 : #include "eventConnection.h"
26 :
27 : #include <lunchbox/algorithm.h>
28 : #include <lunchbox/buffer.h>
29 : #include <lunchbox/os.h>
30 : #include <lunchbox/scopedMutex.h>
31 : #include <lunchbox/thread.h>
32 :
33 : #include <algorithm>
34 : #include <errno.h>
35 :
36 : #ifdef _WIN32
37 : #include <lunchbox/monitor.h>
38 : #define SELECT_TIMEOUT WAIT_TIMEOUT
39 : #define SELECT_ERROR WAIT_FAILED
40 : #define MAX_CONNECTIONS (MAXIMUM_WAIT_OBJECTS - 1)
41 : #else
42 : #include <poll.h>
43 : #define SELECT_TIMEOUT 0
44 : #define SELECT_ERROR -1
45 : #define MAX_CONNECTIONS LB_100KB // Arbitrary
46 : #endif
47 :
48 : namespace co
49 : {
50 : namespace
51 : {
52 : #ifdef _WIN32
53 : /** Handles connections exceeding MAXIMUM_WAIT_OBJECTS */
54 : class Thread : public lunchbox::Thread
55 : {
56 : public:
57 : Thread(ConnectionSet* parent)
58 : : notifier(CreateEvent(0, false, false, 0))
59 : , event(ConnectionSet::EVENT_NONE)
60 : , _parent(parent)
61 : {
62 : }
63 :
64 : virtual ~Thread() {}
65 : ConnectionSet set;
66 : HANDLE notifier;
67 :
68 : lunchbox::Monitor<ConnectionSet::Event> event;
69 :
70 : protected:
71 : virtual void run()
72 : {
73 : while (!set.isEmpty())
74 : {
75 : event = set.select();
76 : if (event != ConnectionSet::EVENT_INTERRUPT &&
77 : event != ConnectionSet::EVENT_NONE)
78 : {
79 : SetEvent(notifier);
80 : event.waitEQ(ConnectionSet::EVENT_NONE);
81 : }
82 : }
83 : }
84 :
85 : private:
86 : ConnectionSet* const _parent;
87 : };
88 :
89 : typedef std::vector<Thread*> Threads;
90 : typedef Threads::const_iterator ThreadsCIter;
91 : typedef Threads::iterator ThreadsIter;
92 :
93 : union Result {
94 : Connection* connection;
95 : Thread* thread;
96 : };
97 : #else
98 : union Result {
99 : Connection* connection;
100 : };
101 : #endif // _WIN32
102 : }
103 :
104 : namespace detail
105 : {
106 : class ConnectionSet : public ConnectionListener
107 : {
108 : public:
109 : /** Mutex protecting changes to the set. */
110 : std::mutex lock;
111 :
112 : /** The connections of this set */
113 : Connections allConnections;
114 :
115 : // Note: std::vector had to much overhead here
116 : #ifdef _WIN32
117 : lunchbox::Buffer<HANDLE> fdSet;
118 : #else
119 : lunchbox::Buffer<pollfd> fdSetCopy; // 'const' set
120 : lunchbox::Buffer<pollfd> fdSet; // copy of _fdSetCopy used to poll
121 : #endif
122 : lunchbox::Buffer<Result> fdSetResult;
123 :
124 : /** The connection to reset a running select, see constructor. */
125 : lunchbox::RefPtr<EventConnection> selfConnection;
126 :
127 : #ifdef _WIN32
128 : /** The connections to handle */
129 : Connections connections;
130 :
131 : /** Threads used to handle more than MAXIMUM_WAIT_OBJECTS connections */
132 : Threads threads;
133 :
134 : /** Result thread. */
135 : Thread* thread;
136 : #endif
137 :
138 : // result values
139 : ConnectionPtr connection;
140 : int error;
141 :
142 : /** FD sets need rebuild. */
143 : bool dirty;
144 :
145 54 : ConnectionSet()
146 108 : : selfConnection(new EventConnection)
147 : #ifdef _WIN32
148 : , thread(0)
149 : #endif
150 : , error(0)
151 108 : , dirty(true)
152 : {
153 : // Whenever another threads modifies the connection list while the
154 : // connection set is waiting in a select, the select is interrupted
155 : // using this connection.
156 54 : LBCHECK(selfConnection->connect());
157 53 : }
158 :
159 104 : ~ConnectionSet()
160 104 : {
161 52 : connection = 0;
162 52 : selfConnection->close();
163 52 : selfConnection = 0;
164 104 : }
165 :
166 432 : void setDirty()
167 : {
168 432 : if (dirty)
169 245 : return;
170 :
171 187 : LBVERB << "FD set modified, restarting select" << std::endl;
172 187 : dirty = true;
173 187 : interrupt();
174 : }
175 :
176 371 : void interrupt() { selfConnection->set(); }
177 : private:
178 43 : virtual void notifyStateChanged(co::Connection*) { setDirty(); }
179 : };
180 : }
181 :
182 54 : ConnectionSet::ConnectionSet()
183 54 : : _impl(new detail::ConnectionSet)
184 : {
185 54 : }
186 104 : ConnectionSet::~ConnectionSet()
187 : {
188 52 : _clear();
189 52 : delete _impl;
190 52 : }
191 :
192 47 : size_t ConnectionSet::getSize() const
193 : {
194 94 : lunchbox::ScopedWrite mutex(_impl->lock);
195 94 : return _impl->allConnections.size();
196 : }
197 :
198 51 : bool ConnectionSet::isEmpty() const
199 : {
200 102 : lunchbox::ScopedWrite mutex(_impl->lock);
201 102 : return _impl->allConnections.empty();
202 : }
203 :
204 95 : const Connections& ConnectionSet::getConnections() const
205 : {
206 95 : return _impl->allConnections;
207 : }
208 :
209 0 : int ConnectionSet::getError() const
210 : {
211 0 : return _impl->error;
212 : }
213 :
214 306862 : ConnectionPtr ConnectionSet::getConnection()
215 : {
216 306862 : return _impl->connection;
217 : }
218 :
219 389 : void ConnectionSet::setDirty()
220 : {
221 389 : _impl->setDirty();
222 389 : }
223 :
224 184 : void ConnectionSet::interrupt()
225 : {
226 184 : _impl->interrupt();
227 184 : }
228 :
229 169 : void ConnectionSet::addConnection(ConnectionPtr connection)
230 : {
231 169 : LBASSERT(connection->isConnected() || connection->isListening());
232 :
233 : {
234 338 : lunchbox::ScopedWrite mutex(_impl->lock);
235 169 : _impl->allConnections.push_back(connection);
236 :
237 : #ifdef _WIN32
238 : LBASSERT(_impl->allConnections.size() <
239 : MAX_CONNECTIONS * MAX_CONNECTIONS);
240 : if (_impl->connections.size() < MAX_CONNECTIONS - _impl->threads.size())
241 : {
242 : // can handle it ourself
243 : _impl->connections.push_back(connection);
244 : connection->addListener(_impl);
245 : }
246 : else
247 : {
248 : // add to existing thread
249 : for (ThreadsCIter i = _impl->threads.begin();
250 : i != _impl->threads.end(); ++i)
251 : {
252 : Thread* thread = *i;
253 : if (thread->set._impl->connections.size() > MAX_CONNECTIONS)
254 : continue;
255 :
256 : thread->set.addConnection(connection);
257 : return;
258 : }
259 :
260 : // add to new thread
261 : Thread* thread = new Thread(this);
262 : thread->set.addConnection(connection);
263 : thread->set.addConnection(_impl->connections.back());
264 : _impl->connections.pop_back();
265 :
266 : _impl->threads.push_back(thread);
267 : thread->start();
268 : }
269 : #else
270 169 : connection->addListener(_impl);
271 :
272 169 : LBASSERT(_impl->allConnections.size() < MAX_CONNECTIONS);
273 : #endif // _WIN32
274 : }
275 :
276 169 : setDirty();
277 169 : }
278 :
279 227 : bool ConnectionSet::removeConnection(ConnectionPtr connection)
280 : {
281 : {
282 395 : lunchbox::ScopedWrite mutex(_impl->lock);
283 227 : ConnectionsIter i = lunchbox::find(_impl->allConnections, connection);
284 227 : if (i == _impl->allConnections.end())
285 59 : return false;
286 :
287 168 : if (_impl->connection == connection)
288 81 : _impl->connection = 0;
289 :
290 : #ifdef _WIN32
291 : ConnectionsIter j = lunchbox::find(_impl->connections, connection);
292 : if (j == _impl->connections.end())
293 : {
294 : Threads::iterator k = _impl->threads.begin();
295 : for (; k != _impl->threads.end(); ++k)
296 : {
297 : Thread* thread = *k;
298 : if (thread->set.removeConnection(connection))
299 : {
300 : if (!thread->set.isEmpty())
301 : return true;
302 :
303 : if (thread == _impl->thread)
304 : _impl->thread = 0;
305 :
306 : thread->event = EVENT_NONE;
307 : thread->join();
308 : delete thread;
309 : break;
310 : }
311 : }
312 :
313 : LBASSERT(k != _impl->threads.end());
314 : _impl->threads.erase(k);
315 : }
316 : else
317 : {
318 : connection->removeListener(_impl);
319 : _impl->connections.erase(j);
320 : }
321 : #else
322 168 : connection->removeListener(_impl);
323 : #endif
324 :
325 168 : _impl->allConnections.erase(i);
326 : }
327 :
328 168 : setDirty();
329 168 : return true;
330 : }
331 :
332 52 : void ConnectionSet::_clear()
333 : {
334 52 : _impl->connection = 0;
335 :
336 : #ifdef _WIN32
337 : for (ThreadsIter i = _impl->threads.begin(); i != _impl->threads.end(); ++i)
338 : {
339 : Thread* thread = *i;
340 : thread->set._clear();
341 : thread->event = EVENT_NONE;
342 : thread->join();
343 : delete thread;
344 : }
345 : _impl->threads.clear();
346 :
347 : Connections& connections = _impl->connections;
348 : #else
349 52 : Connections& connections = _impl->allConnections;
350 : #endif
351 52 : for (ConnectionsIter i = connections.begin(); i != connections.end(); ++i)
352 0 : (*i)->removeListener(_impl);
353 :
354 52 : _impl->allConnections.clear();
355 : #ifdef _WIN32
356 : _impl->connections.clear();
357 : #endif
358 52 : setDirty();
359 52 : _impl->fdSet.clear();
360 52 : _impl->fdSetResult.clear();
361 52 : }
362 :
363 307020 : ConnectionSet::Event ConnectionSet::select(const uint32_t timeout)
364 : {
365 614039 : LB_TS_SCOPED(_selectThread);
366 :
367 : while (true)
368 : {
369 307020 : _impl->connection = 0;
370 307020 : _impl->error = 0;
371 : #ifdef _WIN32
372 : if (_impl->thread)
373 : {
374 : _impl->thread->event = EVENT_NONE; // unblock previous thread
375 : _impl->thread = 0;
376 : }
377 : #else
378 307020 : if (!_impl->dirty) // #38: check results from previous poll()
379 : {
380 306832 : const Event event = _getSelectResult(0);
381 306832 : if (event != EVENT_NONE)
382 211246 : return event;
383 : }
384 : #endif
385 :
386 95774 : if (!_setupFDSet())
387 33 : return EVENT_INVALID_HANDLE;
388 :
389 : // poll for a result
390 : #ifdef _WIN32
391 : LBASSERT(LB_TIMEOUT_INDEFINITE == INFINITE);
392 : const DWORD ret = WaitForMultipleObjectsEx(_impl->fdSet.getSize(),
393 : _impl->fdSet.getData(),
394 : FALSE, timeout, TRUE);
395 : #else
396 : const int pollTimeout =
397 95741 : timeout == LB_TIMEOUT_INDEFINITE ? -1 : int(timeout);
398 : const int ret =
399 95741 : ::poll(_impl->fdSet.getData(), _impl->fdSet.getSize(), pollTimeout);
400 : #endif
401 95740 : switch (ret)
402 : {
403 : case SELECT_TIMEOUT:
404 0 : return EVENT_TIMEOUT;
405 :
406 : case SELECT_ERROR:
407 : #ifdef _WIN32
408 : if (!_impl->thread)
409 : _impl->error = GetLastError();
410 :
411 : if (_impl->error == WSA_INVALID_HANDLE)
412 : {
413 : _impl->dirty = true;
414 : break;
415 : }
416 : #else
417 0 : if (errno == EINTR) // Interrupted system call (gdb) - ignore
418 0 : break;
419 :
420 0 : _impl->error = errno;
421 : #endif
422 :
423 0 : LBERROR << "Error during select: " << lunchbox::sysError
424 0 : << std::endl;
425 0 : return EVENT_SELECT_ERROR;
426 :
427 : default: // SUCCESS
428 : {
429 95740 : const Event event = _getSelectResult(ret);
430 95740 : if (event == EVENT_NONE)
431 0 : break;
432 95740 : return event;
433 : }
434 : }
435 0 : }
436 : }
437 :
438 402571 : ConnectionSet::Event ConnectionSet::_getSelectResult(const uint32_t index)
439 : {
440 402571 : const Event event = _parseSelect(index);
441 402572 : if (_impl->connection == _impl->selfConnection.get())
442 : {
443 289 : _impl->connection = 0;
444 289 : _impl->selfConnection->reset();
445 289 : return EVENT_INTERRUPT;
446 : }
447 402283 : if (event == EVENT_DATA && _impl->connection->isListening())
448 33 : return EVENT_CONNECT;
449 :
450 402250 : return event;
451 : }
452 :
453 : #ifdef _WIN32
454 : ConnectionSet::Event ConnectionSet::_parseSelect(const uint32_t index)
455 : {
456 : const uint32_t i = index - WAIT_OBJECT_0;
457 : LBASSERT(i < MAXIMUM_WAIT_OBJECTS);
458 :
459 : // Bug: WaitForMultipleObjects returns occasionally 16 with fdSet size 2,
460 : // when used by the RSPConnection
461 : // WAR: Catch this and ignore the result, this seems to have no side-effects
462 : if (i >= _impl->fdSetResult.getSize())
463 : return EVENT_NONE;
464 :
465 : if (i > _impl->connections.size())
466 : {
467 : _impl->thread = _impl->fdSetResult[i].thread;
468 : LBASSERT(_impl->thread->event != EVENT_NONE);
469 : LBASSERT(_impl->fdSet[i] == _impl->thread->notifier);
470 :
471 : ResetEvent(_impl->thread->notifier);
472 : _impl->connection = _impl->thread->set.getConnection();
473 : _impl->error = _impl->thread->set.getError();
474 : return _impl->thread->event.get();
475 : }
476 : // else locally handled connection
477 :
478 : _impl->connection = _impl->fdSetResult[i].connection;
479 : LBASSERT(_impl->fdSet[i] == _impl->connection->getNotifier() ||
480 : _impl->connection->isClosed());
481 : return EVENT_DATA;
482 : }
483 : #else // _WIN32
484 402572 : ConnectionSet::Event ConnectionSet::_parseSelect(const uint32_t)
485 : {
486 2455768 : for (size_t i = 0; i < _impl->fdSet.getSize(); ++i)
487 : {
488 2360181 : pollfd& pollFD = _impl->fdSet[i];
489 2360182 : if (pollFD.revents == 0)
490 2053196 : continue;
491 :
492 306986 : const int pollEvents = pollFD.revents;
493 306986 : pollFD.revents = 0;
494 306986 : LBASSERT(pollFD.fd > 0);
495 :
496 306986 : _impl->connection = _impl->fdSetResult[i].connection;
497 306986 : LBASSERT(_impl->connection);
498 :
499 306986 : LBVERB << "Got event on connection @" << (void*)_impl->connection.get()
500 306986 : << std::endl;
501 :
502 306986 : if (pollEvents & POLLERR)
503 : {
504 0 : LBINFO << "Error during poll: " << lunchbox::sysError << " on "
505 0 : << _impl->connection << std::endl;
506 0 : errno = 0;
507 0 : return EVENT_ERROR;
508 : }
509 :
510 : // disconnect event or disconnected connection
511 306986 : if ((pollEvents & POLLHUP) || (pollEvents & POLLNVAL))
512 0 : return EVENT_DISCONNECT;
513 :
514 : // Note: Intuitively I would handle the read before HUP to
515 : // read remaining data of the connection, but at least on
516 : // OS X both events happen simultaneously and no more data
517 : // can be read.
518 306986 : if (pollEvents & POLLIN || pollEvents & POLLPRI)
519 306986 : return EVENT_DATA;
520 :
521 0 : LBERROR << "Unhandled poll event(s): " << pollEvents << std::endl;
522 0 : ::abort();
523 : }
524 95586 : return EVENT_NONE;
525 : }
526 : #endif // else not _WIN32
527 :
528 95774 : bool ConnectionSet::_setupFDSet()
529 : {
530 95774 : if (!_impl->dirty)
531 : {
532 : #ifndef _WIN32
533 : // TODO: verify that poll() really modifies _fdSet, and remove the copy
534 : // if it doesn't. The man page seems to hint that poll changes fds.
535 95586 : _impl->fdSet = _impl->fdSetCopy;
536 : #endif
537 95586 : return true;
538 : }
539 :
540 188 : _impl->dirty = false;
541 188 : _impl->fdSet.setSize(0);
542 188 : _impl->fdSetResult.setSize(0);
543 :
544 : #ifdef _WIN32
545 : // add self connection
546 : HANDLE readHandle = _impl->selfConnection->getNotifier();
547 : LBASSERT(readHandle);
548 : _impl->fdSet.append(readHandle);
549 :
550 : Result res;
551 : res.connection = _impl->selfConnection.get();
552 : _impl->fdSetResult.append(res);
553 :
554 : // add regular connections
555 : _impl->lock.set();
556 : for (ConnectionsCIter i = _impl->connections.begin();
557 : i != _impl->connections.end(); ++i)
558 : {
559 : ConnectionPtr connection = *i;
560 : readHandle = connection->getNotifier();
561 :
562 : if (!readHandle)
563 : {
564 : LBDEBUG << "Cannot select connection " << connection
565 : << ", connection does not provide a read handle"
566 : << std::endl;
567 : _impl->connection = connection;
568 : _impl->lock.unset();
569 : return false;
570 : }
571 :
572 : _impl->fdSet.append(readHandle);
573 :
574 : Result result;
575 : result.connection = connection.get();
576 : _impl->fdSetResult.append(result);
577 : }
578 :
579 : for (ThreadsCIter i = _impl->threads.begin(); i != _impl->threads.end();
580 : ++i)
581 : {
582 : Thread* thread = *i;
583 : readHandle = thread->notifier;
584 : LBASSERT(readHandle);
585 : _impl->fdSet.append(readHandle);
586 :
587 : Result result;
588 : result.thread = thread;
589 : _impl->fdSetResult.append(result);
590 : }
591 : _impl->lock.unset();
592 : #else // _WIN32
593 : pollfd fd;
594 188 : fd.events = POLLIN;
595 188 : fd.revents = 0;
596 :
597 : // add self 'connection'
598 188 : fd.fd = _impl->selfConnection->getNotifier();
599 188 : LBASSERT(fd.fd > 0);
600 188 : _impl->fdSet.append(fd);
601 :
602 : Result result;
603 188 : result.connection = _impl->selfConnection.get();
604 188 : _impl->fdSetResult.append(result);
605 :
606 : // add regular connections
607 187 : _impl->lock.lock();
608 814 : for (ConnectionPtr connection : _impl->allConnections)
609 : {
610 659 : fd.fd = connection->getNotifier();
611 :
612 659 : if (fd.fd <= 0)
613 : {
614 132 : LBDEBUG << "Cannot select connection " << connection
615 99 : << ", connection " << lunchbox::className(connection.get())
616 132 : << " doesn't have a file descriptor" << std::endl;
617 33 : _impl->connection = connection;
618 33 : _impl->lock.unlock();
619 33 : return false;
620 : }
621 :
622 1252 : LBVERB << "Listening on " << lunchbox::className(connection.get())
623 1878 : << std::endl;
624 :
625 626 : _impl->fdSet.append(fd);
626 :
627 626 : result.connection = connection.get();
628 626 : _impl->fdSetResult.append(result);
629 : }
630 155 : _impl->lock.unlock();
631 155 : _impl->fdSetCopy = _impl->fdSet;
632 : #endif
633 :
634 155 : return true;
635 : }
636 :
637 0 : std::ostream& operator<<(std::ostream& os, const ConnectionSet& set)
638 : {
639 0 : const Connections& connections = set.getConnections();
640 :
641 0 : os << "connection set " << (void*)&set << ", " << connections.size()
642 0 : << " connections";
643 :
644 0 : for (ConnectionsCIter i = connections.begin(); i != connections.end(); ++i)
645 0 : os << std::endl << " " << (*i).get();
646 :
647 0 : return os;
648 : }
649 :
650 0 : std::ostream& operator<<(std::ostream& os, const ConnectionSet::Event event)
651 : {
652 0 : if (event >= ConnectionSet::EVENT_ALL)
653 0 : return os << "unknown (" << unsigned(event) << ')';
654 :
655 : return os
656 : << (event == ConnectionSet::EVENT_NONE
657 : ? "none"
658 : : event == ConnectionSet::EVENT_CONNECT
659 0 : ? "connect"
660 : : event == ConnectionSet::EVENT_DISCONNECT
661 0 : ? "disconnect"
662 : : event == ConnectionSet::EVENT_DATA
663 0 : ? "data"
664 : : event == ConnectionSet::EVENT_TIMEOUT
665 0 : ? "timeout"
666 : : event == ConnectionSet::
667 : EVENT_INTERRUPT
668 0 : ? "interrupted"
669 : : event == ConnectionSet::
670 : EVENT_ERROR
671 0 : ? "error"
672 : : event ==
673 : ConnectionSet::
674 : EVENT_SELECT_ERROR
675 0 : ? "select error"
676 : : event ==
677 : ConnectionSet::
678 : EVENT_INVALID_HANDLE
679 0 : ? "invalid "
680 : "handle"
681 0 : : "ERROR");
682 : }
683 63 : }
|