Line data Source code
1 :
2 : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2009-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
4 : * 2010, Cedric Stalder <cedric.stalder@gmail.com>
5 : *
6 : * This library is free software; you can redistribute it and/or modify it under
7 : * the terms of the GNU Lesser General Public License version 2.1 as published
8 : * by the Free Software Foundation.
9 : *
10 : * This library is distributed in the hope that it will be useful, but WITHOUT
11 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13 : * details.
14 : *
15 : * You should have received a copy of the GNU Lesser General Public License
16 : * along with this library; if not, write to the Free Software Foundation, Inc.,
17 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 : */
19 :
20 : #include "pipe.h"
21 :
22 : #include "channel.h"
23 : #include "client.h"
24 : #include "config.h"
25 : #include "exception.h"
26 : #include "frame.h"
27 : #include "frameData.h"
28 : #include "global.h"
29 : #include "log.h"
30 : #include "node.h"
31 : #include "nodeFactory.h"
32 : #include "pipeStatistics.h"
33 : #include "server.h"
34 : #include "view.h"
35 : #include "window.h"
36 :
37 : #include "messagePump.h"
38 : #include "systemPipe.h"
39 :
40 : #include "computeContext.h"
41 : #ifdef EQUALIZER_USE_CUDA
42 : # include "cudaContext.h"
43 : #endif
44 :
45 : #include <eq/fabric/commands.h>
46 : #include <eq/fabric/elementVisitor.h>
47 : #include <eq/fabric/leafVisitor.h>
48 : #include <eq/fabric/task.h>
49 :
50 : #include <co/global.h>
51 : #include <co/objectICommand.h>
52 : #include <co/queueSlave.h>
53 : #include <co/worker.h>
54 : #include <boost/lexical_cast.hpp>
55 : #include <sstream>
56 :
57 : #ifdef EQUALIZER_USE_HWLOC_GL
58 : # include <hwloc.h>
59 : # include <hwloc/gl.h>
60 : #endif
61 :
62 : namespace eq
63 : {
64 : /** @cond IGNORE */
65 : typedef fabric::Pipe< Node, Pipe, Window, PipeVisitor > Super;
66 : typedef co::CommandFunc<Pipe> PipeFunc;
67 : /** @endcond */
68 :
69 : namespace
70 : {
71 : enum State
72 : {
73 : STATE_MAPPED,
74 : STATE_INITIALIZING,
75 : STATE_RUNNING,
76 : STATE_STOPPING, // must come after running
77 : STATE_STOPPED, // must come after running
78 : STATE_FAILED
79 : };
80 :
81 : typedef stde::hash_map< uint128_t, Frame* > FrameHash;
82 : typedef stde::hash_map< uint128_t, FrameDataPtr > FrameDataHash;
83 : typedef stde::hash_map< uint128_t, View* > ViewHash;
84 : typedef stde::hash_map< uint128_t, co::QueueSlave* > QueueHash;
85 : typedef FrameHash::const_iterator FrameHashCIter;
86 : typedef FrameDataHash::const_iterator FrameDataHashCIter;
87 : typedef ViewHash::const_iterator ViewHashCIter;
88 : typedef ViewHash::iterator ViewHashIter;
89 : typedef QueueHash::const_iterator QueueHashCIter;
90 : }
91 :
92 : namespace detail
93 : {
94 :
95 30 : class RenderThread : public eq::Worker
96 : {
97 : public:
98 15 : explicit RenderThread( eq::Pipe* pipe )
99 : : eq::Worker( co::Global::getCommandQueueLimit( ))
100 15 : , _pipe( pipe )
101 15 : {}
102 :
103 : protected:
104 15 : bool init() override
105 : {
106 30 : setName( std::string( "Draw" ) +
107 30 : boost::lexical_cast< std::string >( _pipe->getPath().pipeIndex ));
108 15 : return true;
109 : }
110 :
111 : void run() override;
112 383 : bool stopRunning() override { return !_pipe; }
113 :
114 : private:
115 : eq::Pipe* _pipe;
116 : friend class eq::Pipe;
117 : };
118 :
119 :
120 : /** Asynchronous, per-pipe readback thread. */
121 15 : class TransferThread : public co::Worker
122 : {
123 : public:
124 105 : explicit TransferThread( const uint32_t index )
125 : : co::Worker( co::Global::getCommandQueueLimit( ))
126 : , _index( index )
127 105 : , _stop( false )
128 105 : {}
129 :
130 2 : bool init() override
131 : {
132 2 : if( !co::Worker::init( ))
133 0 : return false;
134 4 : setName( std::string( "Tfer" ) +
135 2 : boost::lexical_cast< std::string >( _index ));
136 2 : return true;
137 : }
138 :
139 17 : bool stopRunning() override { return _stop; }
140 2 : void postStop() { _stop = true; }
141 :
142 : private:
143 : uint32_t _index;
144 : bool _stop; // thread will exit if this is true
145 : };
146 :
147 : class Pipe
148 : {
149 : public:
150 105 : explicit Pipe( const uint32_t index )
151 : : systemPipe( 0 )
152 : , state( STATE_STOPPED )
153 : , currentFrame( 0 )
154 : , frameTime( 0 )
155 : , thread( 0 )
156 : , transferThread( index )
157 105 : , computeContext( 0 )
158 105 : {}
159 :
160 15 : ~Pipe()
161 15 : {
162 15 : delete thread;
163 15 : thread = 0;
164 15 : }
165 :
166 : /** Window-system specific functions class */
167 : SystemPipe* systemPipe;
168 :
169 : /** The current window system. */
170 : WindowSystem windowSystem;
171 :
172 : /** The configInit/configExit state. */
173 : lunchbox::Monitor< State > state;
174 :
175 : /** The last started frame. */
176 : uint32_t currentFrame;
177 :
178 : /** The number of the last finished frame. */
179 : lunchbox::Monitor< uint32_t > finishedFrame;
180 :
181 : /** The number of the last locally unlocked frame. */
182 : lunchbox::Monitor<uint32_t> unlockedFrame;
183 :
184 : /** The running per-frame statistic clocks. */
185 : std::deque< int64_t > frameTimes;
186 : lunchbox::Lock frameTimeMutex;
187 :
188 : /** The base time for the currently active frame. */
189 : int64_t frameTime;
190 :
191 : /** All assembly frames used by the pipe during rendering. */
192 : FrameHash frames;
193 :
194 : /** All output frame datas used by the pipe during rendering. */
195 : FrameDataHash outputFrameDatas;
196 :
197 : /** All input frame datas used by the pipe during rendering. */
198 : FrameDataHash inputFrameDatas;
199 :
200 : /** All views used by the pipe's channels during rendering. */
201 : ViewHash views;
202 :
203 : /** All queues used by the pipe's channels during rendering. */
204 : QueueHash queues;
205 :
206 : /** The pipe thread. */
207 : RenderThread* thread;
208 :
209 : detail::TransferThread transferThread;
210 :
211 : /** GPU Computing context */
212 : ComputeContext *computeContext;
213 : };
214 :
215 15 : void RenderThread::run()
216 : {
217 15 : LB_TS_THREAD( _pipe->_pipeThread );
218 15 : LBINFO << "Entered pipe thread" << std::endl;
219 :
220 15 : eq::Pipe* pipe = _pipe; // _pipe gets cleared on exit
221 15 : pipe->_impl->state.waitEQ( STATE_MAPPED );
222 15 : pipe->_impl->windowSystem = pipe->selectWindowSystem();
223 15 : pipe->_setupCommandQueue();
224 15 : pipe->_setupAffinity();
225 :
226 15 : Worker::run();
227 :
228 15 : pipe->_exitCommandQueue();
229 15 : }
230 : }
231 :
232 105 : Pipe::Pipe( Node* parent )
233 : : Super( parent )
234 105 : , _impl( new detail::Pipe( getPath().pipeIndex ))
235 : {
236 105 : }
237 :
238 40 : Pipe::~Pipe()
239 : {
240 15 : LBASSERT( getWindows().empty( ));
241 15 : delete _impl;
242 25 : }
243 :
244 252 : Config* Pipe::getConfig()
245 : {
246 252 : Node* node = getNode();
247 252 : LBASSERT( node );
248 252 : return ( node ? node->getConfig() : 0);
249 : }
250 :
251 0 : const Config* Pipe::getConfig() const
252 : {
253 0 : const Node* node = getNode();
254 0 : LBASSERT( node );
255 0 : return ( node ? node->getConfig() : 0);
256 : }
257 :
258 52 : ClientPtr Pipe::getClient()
259 : {
260 52 : Node* node = getNode();
261 52 : LBASSERT( node );
262 52 : return ( node ? node->getClient() : 0);
263 : }
264 :
265 72 : ServerPtr Pipe::getServer()
266 : {
267 72 : Node* node = getNode();
268 72 : LBASSERT( node );
269 72 : return ( node ? node->getServer() : 0);
270 : }
271 :
272 15 : void Pipe::attach( const uint128_t& id, const uint32_t instanceID )
273 : {
274 15 : Super::attach( id, instanceID );
275 :
276 15 : co::CommandQueue* queue = getPipeThreadQueue();
277 15 : co::CommandQueue* transferQ = getTransferThreadQueue();
278 :
279 : registerCommand( fabric::CMD_PIPE_CONFIG_INIT,
280 15 : PipeFunc( this, &Pipe::_cmdConfigInit ), queue );
281 : registerCommand( fabric::CMD_PIPE_CONFIG_EXIT,
282 15 : PipeFunc( this, &Pipe::_cmdConfigExit ), queue );
283 : registerCommand( fabric::CMD_PIPE_CREATE_WINDOW,
284 15 : PipeFunc( this, &Pipe::_cmdCreateWindow ), queue );
285 : registerCommand( fabric::CMD_PIPE_DESTROY_WINDOW,
286 15 : PipeFunc( this, &Pipe::_cmdDestroyWindow ), queue );
287 : registerCommand( fabric::CMD_PIPE_FRAME_START,
288 15 : PipeFunc( this, &Pipe::_cmdFrameStart ), queue );
289 : registerCommand( fabric::CMD_PIPE_FRAME_FINISH,
290 15 : PipeFunc( this, &Pipe::_cmdFrameFinish ), queue );
291 : registerCommand( fabric::CMD_PIPE_FRAME_DRAW_FINISH,
292 15 : PipeFunc( this, &Pipe::_cmdFrameDrawFinish ), queue );
293 : registerCommand( fabric::CMD_PIPE_FRAME_START_CLOCK,
294 15 : PipeFunc( this, &Pipe::_cmdFrameStartClock ), 0 );
295 : registerCommand( fabric::CMD_PIPE_EXIT_THREAD,
296 15 : PipeFunc( this, &Pipe::_cmdExitThread ), queue );
297 : registerCommand( fabric::CMD_PIPE_DETACH_VIEW,
298 15 : PipeFunc( this, &Pipe::_cmdDetachView ), queue );
299 : registerCommand( fabric::CMD_PIPE_EXIT_TRANSFER_THREAD,
300 : PipeFunc( this, &Pipe::_cmdExitTransferThread ),
301 15 : transferQ );
302 15 : }
303 :
304 1255 : void Pipe::setDirty( const uint64_t bits )
305 : {
306 : // jump over fabric setDirty to avoid dirty'ing node pipes list
307 : // pipes are individually synced in frame finish for thread-safety
308 1255 : Object::setDirty( bits );
309 1255 : }
310 :
311 15 : WindowSystem Pipe::selectWindowSystem() const
312 : {
313 : #ifdef AGL
314 : return WindowSystem( "AGL" ); // prefer over GLX
315 : #else
316 15 : return WindowSystem();
317 : #endif
318 : }
319 :
320 15 : void Pipe::_setupCommandQueue()
321 : {
322 45 : LBINFO << "Set up pipe message pump for " << _impl->windowSystem
323 45 : << std::endl;
324 :
325 15 : Config* config = getConfig();
326 15 : config->setupMessagePump( this );
327 :
328 15 : if( !_impl->thread ) // Non-threaded pipes have no pipe thread message pump
329 15 : return;
330 :
331 15 : CommandQueue* queue = _impl->thread->getWorkerQueue();
332 15 : LBASSERT( queue );
333 15 : LBASSERT( !queue->getMessagePump( ));
334 :
335 15 : Global::enterCarbon();
336 15 : MessagePump* pump = createMessagePump();
337 15 : if( pump )
338 15 : pump->dispatchAll(); // initializes _impl->receiverQueue
339 :
340 15 : queue->setMessagePump( pump );
341 15 : Global::leaveCarbon();
342 : }
343 :
344 15 : int32_t Pipe::_getAutoAffinity() const
345 : {
346 : #ifdef EQUALIZER_USE_HWLOC_GL
347 : uint32_t port = getPort();
348 : uint32_t device = getDevice();
349 :
350 : if( port == LB_UNDEFINED_UINT32 && device == LB_UNDEFINED_UINT32 )
351 : return lunchbox::Thread::NONE;
352 :
353 : if( port == LB_UNDEFINED_UINT32 )
354 : port = 0;
355 : if( device == LB_UNDEFINED_UINT32 )
356 : device = 0;
357 :
358 : hwloc_topology_t topology;
359 : if( hwloc_topology_init( &topology ) < 0 )
360 : {
361 : LBINFO << "Automatic pipe thread placement failed: "
362 : << "hwloc_topology_init() failed" << std::endl;
363 : return lunchbox::Thread::NONE;
364 : }
365 :
366 : // Load I/O devices, bridges and their relevant info
367 : const unsigned long loading_flags = HWLOC_TOPOLOGY_FLAG_IO_BRIDGES |
368 : HWLOC_TOPOLOGY_FLAG_IO_DEVICES;
369 : if( hwloc_topology_set_flags( topology, loading_flags ) < 0 )
370 : {
371 : LBINFO << "Automatic pipe thread placement failed: "
372 : << "hwloc_topology_set_flags() failed" << std::endl;
373 : hwloc_topology_destroy( topology );
374 : return lunchbox::Thread::NONE;
375 : }
376 :
377 : if( hwloc_topology_load( topology ) < 0 )
378 : {
379 : LBINFO << "Automatic pipe thread placement failed: "
380 : << "hwloc_topology_load() failed" << std::endl;
381 : hwloc_topology_destroy( topology );
382 : return lunchbox::Thread::NONE;
383 : }
384 :
385 : const hwloc_obj_t osdev =
386 : hwloc_gl_get_display_osdev_by_port_device( topology,
387 : int( port ), int( device ));
388 : if( !osdev )
389 : {
390 : LBINFO << "Automatic pipe thread placement failed: GPU not found"
391 : << std::endl;
392 : hwloc_topology_destroy( topology );
393 : return lunchbox::Thread::NONE;
394 : }
395 :
396 : const hwloc_obj_t pcidev = osdev->parent;
397 : const hwloc_obj_t parent = hwloc_get_non_io_ancestor_obj( topology, pcidev);
398 : const int numCpus =
399 : hwloc_get_nbobjs_inside_cpuset_by_type( topology, parent->cpuset,
400 : HWLOC_OBJ_SOCKET );
401 : if( numCpus != 1 )
402 : {
403 : LBINFO << "Automatic pipe thread placement failed: GPU attached to "
404 : << numCpus << " processors?" << std::endl;
405 : hwloc_topology_destroy( topology );
406 : return lunchbox::Thread::NONE;
407 : }
408 :
409 : const hwloc_obj_t cpuObj =
410 : hwloc_get_obj_inside_cpuset_by_type( topology, parent->cpuset,
411 : HWLOC_OBJ_SOCKET, 0 );
412 : if( cpuObj == 0 )
413 : {
414 : LBINFO << "Automatic pipe thread placement failed: "
415 : << "hwloc_get_obj_inside_cpuset_by_type() failed" << std::endl;
416 : hwloc_topology_destroy( topology );
417 : return lunchbox::Thread::NONE;
418 : }
419 :
420 : const int cpuIndex = cpuObj->logical_index;
421 : hwloc_topology_destroy( topology );
422 : return cpuIndex + lunchbox::Thread::SOCKET;
423 : #else
424 45 : LBINFO << "Automatic thread placement not supported, no hwloc GL support"
425 45 : << std::endl;
426 : #endif
427 15 : return lunchbox::Thread::NONE;
428 : }
429 :
430 15 : void Pipe::_setupAffinity()
431 : {
432 15 : const int32_t affinity = getIAttribute( IATTR_HINT_AFFINITY );
433 15 : switch( affinity )
434 : {
435 : case AUTO:
436 15 : lunchbox::Thread::setAffinity( _getAutoAffinity( ));
437 15 : break;
438 :
439 : case OFF:
440 : default:
441 0 : lunchbox::Thread::setAffinity( affinity );
442 0 : break;
443 : }
444 15 : }
445 :
446 15 : void Pipe::_exitCommandQueue()
447 : {
448 : // Non-threaded pipes have no pipe thread message pump
449 15 : if( !_impl->thread )
450 15 : return;
451 :
452 15 : CommandQueue* queue = _impl->thread->getWorkerQueue();
453 15 : LBASSERT( queue );
454 :
455 15 : MessagePump* pump = queue->getMessagePump();
456 15 : queue->setMessagePump( 0 );
457 15 : delete pump;
458 : }
459 :
460 15 : MessagePump* Pipe::createMessagePump()
461 : {
462 15 : return _impl->windowSystem.createMessagePump();
463 : }
464 :
465 15 : MessagePump* Pipe::getMessagePump()
466 : {
467 15 : LB_TS_THREAD( _pipeThread );
468 15 : if( !_impl->thread )
469 0 : return 0;
470 :
471 15 : CommandQueue* queue = _impl->thread->getWorkerQueue();
472 15 : return queue->getMessagePump();
473 : }
474 :
475 45 : co::CommandQueue* Pipe::getPipeThreadQueue()
476 : {
477 45 : if( _impl->thread )
478 45 : return _impl->thread->getWorkerQueue();
479 :
480 0 : return getNode()->getMainThreadQueue();
481 : }
482 :
483 30 : co::CommandQueue* Pipe::getTransferThreadQueue()
484 : {
485 30 : return _impl->transferThread.getWorkerQueue();
486 : }
487 :
488 15 : co::CommandQueue* Pipe::getMainThreadQueue()
489 : {
490 15 : return getServer()->getMainThreadQueue();
491 : }
492 :
493 15 : co::CommandQueue* Pipe::getCommandThreadQueue()
494 : {
495 15 : return getServer()->getCommandThreadQueue();
496 : }
497 :
498 4 : Frame* Pipe::getFrame( const co::ObjectVersion& frameVersion, const Eye eye,
499 : const bool isOutput )
500 : {
501 4 : LB_TS_THREAD( _pipeThread );
502 4 : Frame* frame = _impl->frames[ frameVersion.identifier ];
503 :
504 4 : if( !frame )
505 : {
506 4 : ClientPtr client = getClient();
507 4 : frame = new Frame();
508 :
509 4 : LBCHECK( client->mapObject( frame, frameVersion ));
510 4 : _impl->frames[ frameVersion.identifier ] = frame;
511 : }
512 : else
513 0 : frame->sync( frameVersion.version );
514 :
515 4 : const co::ObjectVersion& dataVersion = frame->getDataVersion( eye );
516 4 : LBLOG( LOG_ASSEMBLY ) << "Use " << dataVersion << std::endl;
517 :
518 4 : FrameDataPtr frameData = getNode()->getFrameData( dataVersion );
519 4 : LBASSERT( frameData );
520 :
521 4 : if( isOutput )
522 : {
523 2 : if( !frameData->isAttached() )
524 : {
525 2 : ClientPtr client = getClient();
526 2 : LBCHECK( client->mapObject( frameData.get(), dataVersion ));
527 : }
528 0 : else if( frameData->getVersion() < dataVersion.version )
529 0 : frameData->sync( dataVersion.version );
530 :
531 2 : _impl->outputFrameDatas[ dataVersion.identifier ] = frameData;
532 : }
533 : else
534 2 : _impl->inputFrameDatas[ dataVersion.identifier ] = frameData;
535 :
536 4 : frame->setFrameData( frameData );
537 4 : return frame;
538 : }
539 :
540 15 : void Pipe::flushFrames( util::ObjectManager& om )
541 : {
542 15 : LB_TS_THREAD( _pipeThread );
543 15 : ClientPtr client = getClient();
544 19 : for( FrameHashCIter i = _impl->frames.begin(); i !=_impl->frames.end(); ++i)
545 : {
546 4 : Frame* frame = i->second;
547 4 : frame->setFrameData( 0 ); // datas are flushed below
548 4 : client->unmapObject( frame );
549 4 : delete frame;
550 : }
551 15 : _impl->frames.clear();
552 :
553 51 : for( FrameDataHashCIter i = _impl->inputFrameDatas.begin();
554 34 : i != _impl->inputFrameDatas.end(); ++i )
555 : {
556 2 : FrameDataPtr data = i->second;
557 2 : data->deleteGLObjects( om );
558 2 : }
559 15 : _impl->inputFrameDatas.clear();
560 :
561 51 : for( FrameDataHashCIter i = _impl->outputFrameDatas.begin();
562 34 : i != _impl->outputFrameDatas.end(); ++i )
563 : {
564 2 : FrameDataPtr data = i->second;
565 2 : data->resetPlugins();
566 2 : data->deleteGLObjects( om );
567 2 : client->unmapObject( data.get( ));
568 2 : getNode()->releaseFrameData( data );
569 2 : }
570 15 : _impl->outputFrameDatas.clear();
571 15 : }
572 :
573 0 : co::QueueSlave* Pipe::getQueue( const uint128_t& queueID )
574 : {
575 0 : LB_TS_THREAD( _pipeThread );
576 0 : if( queueID == 0 )
577 0 : return 0;
578 :
579 0 : co::QueueSlave* queue = _impl->queues[ queueID ];
580 0 : if( !queue )
581 : {
582 0 : queue = new co::QueueSlave;
583 0 : ClientPtr client = getClient();
584 0 : LBCHECK( client->mapObject( queue, queueID ));
585 :
586 0 : _impl->queues[ queueID ] = queue;
587 : }
588 :
589 0 : return queue;
590 : }
591 :
592 15 : void Pipe::_flushQueues()
593 : {
594 15 : LB_TS_THREAD( _pipeThread );
595 15 : ClientPtr client = getClient();
596 :
597 15 : for( QueueHashCIter i = _impl->queues.begin(); i !=_impl->queues.end(); ++i)
598 : {
599 0 : co::QueueSlave* queue = i->second;
600 0 : client->unmapObject( queue );
601 0 : delete queue;
602 : }
603 15 : _impl->queues.clear();
604 15 : }
605 :
606 1 : const View* Pipe::getView( const co::ObjectVersion& viewVersion ) const
607 : {
608 : // Yie-ha: we want to have a const-interface to get a view on the render
609 : // clients, but view mapping is by definition non-const.
610 1 : return const_cast< Pipe* >( this )->getView( viewVersion );
611 : }
612 :
613 1 : View* Pipe::getView( const co::ObjectVersion& viewVersion )
614 : {
615 1 : LB_TS_THREAD( _pipeThread );
616 1 : if( viewVersion.identifier == 0 )
617 0 : return 0;
618 :
619 1 : View* view = _impl->views[ viewVersion.identifier ];
620 1 : if( !view )
621 : {
622 1 : NodeFactory* nodeFactory = Global::getNodeFactory();
623 1 : view = nodeFactory->createView( 0 );
624 1 : LBASSERT( view );
625 1 : view->_pipe = this;
626 1 : ClientPtr client = getClient();
627 1 : LBCHECK( client->mapObject( view, viewVersion ));
628 :
629 1 : _impl->views[ viewVersion.identifier ] = view;
630 : }
631 :
632 1 : view->sync( viewVersion.version );
633 1 : return view;
634 : }
635 :
636 7 : void Pipe::_releaseViews()
637 : {
638 7 : LB_TS_THREAD( _pipeThread );
639 21 : for( bool changed = true; changed; )
640 : {
641 7 : changed = false;
642 16 : for( ViewHashIter i =_impl->views.begin(); i !=_impl->views.end(); ++i )
643 : {
644 1 : View* view = i->second;
645 1 : view->commit();
646 1 : if( view->getVersion() + 20 > view->getHeadVersion( ))
647 1 : continue;
648 :
649 : // release unused view to avoid memory leaks due to deltas piling up
650 0 : view->_pipe = 0;
651 :
652 0 : ClientPtr client = getClient();
653 0 : client->unmapObject( view );
654 0 : _impl->views.erase( i );
655 :
656 0 : NodeFactory* nodeFactory = Global::getNodeFactory();
657 0 : nodeFactory->releaseView( view );
658 :
659 0 : changed = true;
660 0 : break;
661 0 : }
662 : }
663 7 : }
664 :
665 15 : void Pipe::_flushViews()
666 : {
667 15 : LB_TS_THREAD( _pipeThread );
668 15 : NodeFactory* nodeFactory = Global::getNodeFactory();
669 15 : ClientPtr client = getClient();
670 :
671 16 : for( ViewHashCIter i = _impl->views.begin(); i != _impl->views.end(); ++i )
672 : {
673 1 : View* view = i->second;
674 :
675 1 : client->unmapObject( view );
676 1 : view->_pipe = 0;
677 1 : nodeFactory->releaseView( view );
678 : }
679 15 : _impl->views.clear();
680 15 : }
681 :
682 15 : void Pipe::startThread()
683 : {
684 15 : _impl->thread = new detail::RenderThread( this );
685 15 : _impl->thread->start();
686 15 : }
687 :
688 15 : void Pipe::exitThread()
689 : {
690 15 : _stopTransferThread();
691 :
692 15 : if( !_impl->thread )
693 15 : return;
694 :
695 15 : send( getLocalNode(), fabric::CMD_PIPE_EXIT_THREAD );
696 :
697 15 : _impl->thread->join();
698 15 : delete _impl->thread;
699 15 : _impl->thread = 0;
700 : }
701 :
702 90 : void Pipe::cancelThread()
703 : {
704 90 : _stopTransferThread();
705 :
706 90 : if( !_impl->thread )
707 180 : return;
708 :
709 : // local command dispatching
710 : co::ObjectOCommand( this, getLocalNode(), fabric::CMD_PIPE_EXIT_THREAD,
711 0 : co::COMMANDTYPE_OBJECT, getID(), CO_INSTANCE_ALL );
712 : }
713 :
714 15 : void Pipe::waitExited() const
715 : {
716 15 : _impl->state.waitGE( STATE_STOPPED );
717 15 : }
718 :
719 31 : bool Pipe::isRunning() const
720 : {
721 31 : return (_impl->state == STATE_RUNNING);
722 : }
723 :
724 15 : bool Pipe::isStopped() const
725 : {
726 15 : return (_impl->state == STATE_STOPPED);
727 : }
728 :
729 15 : void Pipe::notifyMapped()
730 : {
731 15 : LBASSERT( _impl->state == STATE_STOPPED );
732 15 : _impl->state = STATE_MAPPED;
733 15 : }
734 :
735 : namespace
736 : {
737 7 : class WaitFinishedVisitor : public PipeVisitor
738 : {
739 : public:
740 7 : WaitFinishedVisitor( const uint32_t frame ) : _frame( frame ) {}
741 :
742 7 : virtual VisitorResult visit( Channel* channel )
743 : {
744 7 : channel->waitFrameFinished( _frame );
745 7 : return TRAVERSE_CONTINUE;
746 : }
747 :
748 : private:
749 : const uint32_t _frame;
750 : };
751 : }
752 :
753 7 : void Pipe::waitFrameFinished( const uint32_t frameNumber ) const
754 : {
755 7 : _impl->finishedFrame.waitGE( frameNumber );
756 7 : WaitFinishedVisitor waiter( frameNumber );
757 7 : accept( waiter );
758 7 : }
759 :
760 14 : void Pipe::waitFrameLocal( const uint32_t frameNumber ) const
761 : {
762 14 : _impl->unlockedFrame.waitGE( frameNumber );
763 14 : }
764 :
765 57 : uint32_t Pipe::getCurrentFrame() const
766 : {
767 57 : LB_TS_THREAD( _pipeThread );
768 57 : return _impl->currentFrame;
769 : }
770 :
771 0 : uint32_t Pipe::getFinishedFrame() const
772 : {
773 0 : return _impl->finishedFrame.get();
774 : }
775 :
776 32 : WindowSystem Pipe::getWindowSystem() const
777 : {
778 32 : return _impl->windowSystem;
779 : }
780 :
781 0 : EventOCommand Pipe::sendError( const uint32_t error )
782 : {
783 0 : return getConfig()->sendError( Event::PIPE_ERROR, getID(), error );
784 : }
785 :
786 2 : bool Pipe::processEvent( const Event& event )
787 : {
788 2 : ConfigEvent configEvent( event );
789 2 : getConfig()->sendEvent( configEvent );
790 2 : return true;
791 : }
792 :
793 : //---------------------------------------------------------------------------
794 : // pipe-thread methods
795 : //---------------------------------------------------------------------------
796 15 : bool Pipe::configInit( const uint128_t& initID )
797 : {
798 15 : LB_TS_THREAD( _pipeThread );
799 :
800 15 : LBASSERT( !_impl->systemPipe );
801 :
802 15 : if ( !configInitSystemPipe( initID ))
803 0 : return false;
804 :
805 : // -------------------------------------------------------------------------
806 15 : LBASSERT(!_impl->computeContext);
807 :
808 : // for now we only support CUDA
809 : #ifdef EQUALIZER_USE_CUDA
810 : if( getIAttribute( IATTR_HINT_CUDA_GL_INTEROP ) == eq::ON )
811 : {
812 : LBINFO << "Initializing CUDAContext" << std::endl;
813 : ComputeContext* computeCtx = new CUDAContext( this );
814 :
815 : if( !computeCtx->configInit() )
816 : {
817 : LBWARN << "GPU Computing context initialization failed "
818 : << std::endl;
819 : delete computeCtx;
820 : return false;
821 : }
822 : setComputeContext( computeCtx );
823 : }
824 : #endif
825 :
826 15 : return true;
827 : }
828 :
829 15 : bool Pipe::configInitSystemPipe( const uint128_t& )
830 : {
831 15 : SystemPipe* systemPipe = _impl->windowSystem.createPipe( this );
832 15 : LBASSERT( systemPipe );
833 :
834 15 : if( !systemPipe->configInit( ))
835 : {
836 0 : LBERROR << "System pipe context initialization failed" << std::endl;
837 0 : delete systemPipe;
838 0 : return false;
839 : }
840 :
841 15 : setSystemPipe( systemPipe );
842 15 : return true;
843 : }
844 :
845 15 : bool Pipe::configExit()
846 : {
847 15 : LB_TS_THREAD( _pipeThread );
848 :
849 15 : if( _impl->computeContext )
850 : {
851 0 : _impl->computeContext->configExit();
852 0 : delete _impl->computeContext;
853 0 : _impl->computeContext = 0;
854 : }
855 :
856 15 : if( _impl->systemPipe )
857 : {
858 15 : _impl->systemPipe->configExit( );
859 15 : delete _impl->systemPipe;
860 15 : _impl->systemPipe = 0;
861 : }
862 15 : return true;
863 : }
864 :
865 :
866 7 : void Pipe::frameStart( const uint128_t&, const uint32_t frameNumber )
867 : {
868 7 : LB_TS_THREAD( _pipeThread );
869 :
870 7 : const Node* node = getNode();
871 7 : switch( node->getIAttribute( Node::IATTR_THREAD_MODEL ))
872 : {
873 : case ASYNC: // No sync, release immediately
874 0 : releaseFrameLocal( frameNumber );
875 0 : break;
876 :
877 : case DRAW_SYNC: // Sync, release in frameDrawFinish
878 : case LOCAL_SYNC: // Sync, release in frameFinish
879 7 : node->waitFrameStarted( frameNumber );
880 7 : break;
881 :
882 : default:
883 0 : LBUNIMPLEMENTED;
884 : }
885 :
886 7 : startFrame( frameNumber );
887 7 : }
888 :
889 7 : void Pipe::frameDrawFinish( const uint128_t&, const uint32_t frameNumber )
890 : {
891 7 : const Node* node = getNode();
892 7 : switch( node->getIAttribute( Node::IATTR_THREAD_MODEL ))
893 : {
894 : case ASYNC: // released in frameStart
895 0 : break;
896 :
897 : case DRAW_SYNC: // release
898 7 : releaseFrameLocal( frameNumber );
899 7 : break;
900 :
901 : case LOCAL_SYNC: // release in frameFinish
902 0 : break;
903 :
904 : default:
905 0 : LBUNIMPLEMENTED;
906 : }
907 7 : }
908 :
909 7 : void Pipe::frameFinish( const uint128_t&, const uint32_t frameNumber )
910 : {
911 7 : const Node* node = getNode();
912 7 : switch( node->getIAttribute( Node::IATTR_THREAD_MODEL ))
913 : {
914 : case ASYNC: // released in frameStart
915 0 : break;
916 :
917 : case DRAW_SYNC: // released in frameDrawFinish
918 7 : break;
919 :
920 : case LOCAL_SYNC: // release
921 0 : releaseFrameLocal( frameNumber );
922 0 : break;
923 :
924 : default:
925 0 : LBUNIMPLEMENTED;
926 : }
927 :
928 : // Global release
929 7 : releaseFrame( frameNumber );
930 7 : }
931 :
932 7 : void Pipe::startFrame( const uint32_t frameNumber )
933 : {
934 7 : LB_TS_THREAD( _pipeThread );
935 7 : _impl->currentFrame = frameNumber;
936 7 : LBLOG( LOG_TASKS ) << "---- Started Frame ---- "<< frameNumber << std::endl;
937 7 : }
938 :
939 7 : void Pipe::releaseFrame( const uint32_t frameNumber )
940 : {
941 7 : LB_TS_THREAD( _pipeThread );
942 7 : _impl->finishedFrame = frameNumber;
943 7 : LBLOG( LOG_TASKS ) << "---- Finished Frame --- "<< frameNumber << std::endl;
944 7 : }
945 :
946 7 : void Pipe::releaseFrameLocal( const uint32_t frameNumber )
947 : {
948 7 : LB_TS_THREAD( _pipeThread );
949 7 : LBASSERTINFO( _impl->unlockedFrame + 1 == frameNumber,
950 : _impl->unlockedFrame << ", " << frameNumber );
951 :
952 7 : _impl->unlockedFrame = frameNumber;
953 7 : LBLOG( LOG_TASKS ) << "---- Unlocked Frame --- "
954 7 : << _impl->unlockedFrame.get() << std::endl;
955 7 : }
956 :
957 2 : bool Pipe::startTransferThread()
958 : {
959 2 : if( _impl->transferThread.isRunning( ))
960 0 : return true;
961 :
962 2 : return _impl->transferThread.start();
963 : }
964 :
965 15 : bool Pipe::hasTransferThread() const
966 : {
967 15 : return _impl->transferThread.isRunning();
968 : }
969 :
970 105 : void Pipe::_stopTransferThread()
971 : {
972 105 : if( _impl->transferThread.isStopped( ))
973 208 : return;
974 :
975 2 : send( getLocalNode(), fabric::CMD_PIPE_EXIT_TRANSFER_THREAD );
976 2 : _impl->transferThread.join();
977 : }
978 :
979 15 : void Pipe::setSystemPipe( SystemPipe* pipe )
980 : {
981 15 : _impl->systemPipe = pipe;
982 15 : }
983 :
984 17 : SystemPipe* Pipe::getSystemPipe()
985 : {
986 17 : return _impl->systemPipe;
987 : }
988 :
989 0 : const SystemPipe* Pipe::getSystemPipe() const
990 : {
991 0 : return _impl->systemPipe;
992 : }
993 :
994 0 : void Pipe::setComputeContext( ComputeContext* ctx )
995 : {
996 0 : _impl->computeContext = ctx;
997 0 : }
998 :
999 0 : const ComputeContext* Pipe::getComputeContext() const
1000 : {
1001 0 : return _impl->computeContext;
1002 : }
1003 :
1004 0 : ComputeContext* Pipe::getComputeContext()
1005 : {
1006 0 : return _impl->computeContext;
1007 : }
1008 :
1009 : //---------------------------------------------------------------------------
1010 : // command handlers
1011 : //---------------------------------------------------------------------------
1012 15 : bool Pipe::_cmdCreateWindow( co::ICommand& cmd )
1013 : {
1014 15 : co::ObjectICommand command( cmd );
1015 15 : const uint128_t& windowID = command.read< uint128_t >();
1016 :
1017 15 : LBLOG( LOG_INIT ) << "Create window " << command << " id " << windowID
1018 15 : << std::endl;
1019 :
1020 15 : Window* window = Global::getNodeFactory()->createWindow( this );
1021 15 : window->init(); // not in ctor, virtual method
1022 :
1023 15 : Config* config = getConfig();
1024 15 : LBCHECK( config->mapObject( window, windowID ));
1025 :
1026 15 : return true;
1027 : }
1028 :
1029 15 : bool Pipe::_cmdDestroyWindow( co::ICommand& cmd )
1030 : {
1031 15 : co::ObjectICommand command( cmd );
1032 :
1033 15 : LBLOG( LOG_INIT ) << "Destroy window " << command << std::endl;
1034 :
1035 15 : Window* window = _findWindow( command.read< uint128_t >( ));
1036 15 : LBASSERT( window );
1037 :
1038 : // re-set shared windows accordingly
1039 15 : Window* newSharedWindow = 0;
1040 15 : const Windows& windows = getWindows();
1041 30 : for( Windows::const_iterator i = windows.begin(); i != windows.end(); ++i )
1042 : {
1043 15 : Window* candidate = *i;
1044 :
1045 15 : if( candidate == window )
1046 15 : continue; // ignore
1047 :
1048 0 : if( candidate->getSharedContextWindow() == window )
1049 : {
1050 0 : if( newSharedWindow )
1051 0 : candidate->setSharedContextWindow( newSharedWindow );
1052 : else
1053 : {
1054 0 : newSharedWindow = candidate;
1055 0 : newSharedWindow->setSharedContextWindow( candidate );
1056 : }
1057 : }
1058 :
1059 0 : LBASSERT( candidate->getSharedContextWindow() != window );
1060 : }
1061 :
1062 15 : const bool stopped = window->isStopped();
1063 : window->send( getServer(),
1064 15 : fabric::CMD_WINDOW_CONFIG_EXIT_REPLY ) << stopped;
1065 :
1066 15 : Config* config = getConfig();
1067 15 : config->unmapObject( window );
1068 15 : Global::getNodeFactory()->releaseWindow( window );
1069 :
1070 15 : return true;
1071 : }
1072 :
1073 15 : bool Pipe::_cmdConfigInit( co::ICommand& cmd )
1074 : {
1075 15 : LB_TS_THREAD( _pipeThread );
1076 :
1077 15 : co::ObjectICommand command( cmd );
1078 15 : const uint128_t& initID = command.read< uint128_t >();
1079 15 : const uint32_t frameNumber = command.read< uint32_t >();
1080 :
1081 15 : LBLOG( LOG_INIT ) << "Init pipe " << command << " init id " << initID
1082 15 : << " frame " << frameNumber << std::endl;
1083 :
1084 15 : if( !isThreaded( ))
1085 : {
1086 0 : _impl->windowSystem = selectWindowSystem();
1087 0 : _setupCommandQueue();
1088 : }
1089 :
1090 15 : Node* node = getNode();
1091 15 : LBASSERT( node );
1092 15 : node->waitInitialized();
1093 :
1094 15 : bool result = false;
1095 15 : if( node->isRunning( ))
1096 : {
1097 15 : _impl->currentFrame = frameNumber;
1098 15 : _impl->finishedFrame = frameNumber;
1099 15 : _impl->unlockedFrame = frameNumber;
1100 15 : _impl->state = STATE_INITIALIZING;
1101 :
1102 15 : result = configInit( initID );
1103 :
1104 15 : if( result )
1105 15 : _impl->state = STATE_RUNNING;
1106 : }
1107 : else
1108 0 : sendError( ERROR_PIPE_NODE_NOTRUNNING );
1109 :
1110 15 : LBLOG( LOG_INIT ) << "TASK pipe config init reply result " << result
1111 15 : << std::endl;
1112 :
1113 15 : commit();
1114 : send( command.getRemoteNode(), fabric::CMD_PIPE_CONFIG_INIT_REPLY )
1115 15 : << result;
1116 15 : return true;
1117 : }
1118 :
1119 15 : bool Pipe::_cmdConfigExit( co::ICommand& cmd )
1120 : {
1121 15 : co::ObjectICommand command( cmd );
1122 :
1123 15 : LB_TS_THREAD( _pipeThread );
1124 15 : LBLOG( LOG_INIT ) << "TASK pipe config exit " << command << std::endl;
1125 :
1126 15 : _impl->state = STATE_STOPPING; // needed in View::detach (from _flushViews)
1127 :
1128 : // send before node gets a chance to send its destroy command
1129 15 : getNode()->send( getLocalNode(), fabric::CMD_NODE_DESTROY_PIPE ) << getID();
1130 :
1131 : // Flush views before exit since they are created after init
1132 : // - application may need initialized pipe to exit
1133 : // - configExit can't access views since all channels are gone already
1134 15 : _flushViews();
1135 15 : _flushQueues();
1136 15 : _impl->state = configExit() ? STATE_STOPPED : STATE_FAILED;
1137 15 : return true;
1138 : }
1139 :
1140 15 : bool Pipe::_cmdExitThread( co::ICommand& )
1141 : {
1142 15 : LBASSERT( _impl->thread );
1143 15 : _impl->thread->_pipe = 0;
1144 15 : return true;
1145 : }
1146 :
1147 2 : bool Pipe::_cmdExitTransferThread( co::ICommand& )
1148 : {
1149 2 : _impl->transferThread.postStop();
1150 2 : return true;
1151 : }
1152 :
1153 7 : bool Pipe::_cmdFrameStartClock( co::ICommand& )
1154 : {
1155 7 : LBVERB << "start frame clock" << std::endl;
1156 7 : _impl->frameTimeMutex.set();
1157 7 : _impl->frameTimes.push_back( getConfig()->getTime( ));
1158 7 : _impl->frameTimeMutex.unset();
1159 7 : return true;
1160 : }
1161 :
1162 7 : bool Pipe::_cmdFrameStart( co::ICommand& cmd )
1163 : {
1164 7 : LB_TS_THREAD( _pipeThread );
1165 :
1166 7 : co::ObjectICommand command( cmd );
1167 7 : const uint128_t& version = command.read< uint128_t >();
1168 7 : const uint128_t& frameID = command.read< uint128_t >();
1169 7 : const uint32_t frameNumber = command.read< uint32_t >();
1170 :
1171 7 : LBVERB << "handle pipe frame start " << command << " frame " << frameNumber
1172 7 : << " id " << frameID << std::endl;
1173 :
1174 7 : LBLOG( LOG_TASKS ) << "---- TASK start frame ---- frame " << frameNumber
1175 7 : << " id " << frameID << std::endl;
1176 7 : sync( version );
1177 7 : const int64_t lastFrameTime = _impl->frameTime;
1178 :
1179 7 : _impl->frameTimeMutex.set();
1180 7 : LBASSERT( !_impl->frameTimes.empty( ));
1181 :
1182 7 : _impl->frameTime = _impl->frameTimes.front();
1183 7 : _impl->frameTimes.pop_front();
1184 7 : _impl->frameTimeMutex.unset();
1185 :
1186 7 : if( lastFrameTime > 0 )
1187 : {
1188 2 : PipeStatistics waitEvent( Statistic::PIPE_IDLE, this );
1189 : waitEvent.event.data.statistic.idleTime =
1190 2 : _impl->thread ? _impl->thread->getWorkerQueue()->resetWaitTime() :0;
1191 : waitEvent.event.data.statistic.totalTime =
1192 2 : LB_MAX( _impl->frameTime - lastFrameTime, 1 ); // avoid SIGFPE
1193 : }
1194 :
1195 7 : LBASSERTINFO( _impl->currentFrame + 1 == frameNumber,
1196 : "current " <<_impl->currentFrame << " start " << frameNumber);
1197 :
1198 7 : frameStart( frameID, frameNumber );
1199 7 : return true;
1200 : }
1201 :
1202 7 : bool Pipe::_cmdFrameFinish( co::ICommand& cmd )
1203 : {
1204 7 : LB_TS_THREAD( _pipeThread );
1205 :
1206 7 : co::ObjectICommand command( cmd );
1207 7 : const uint128_t& frameID = command.read< uint128_t >();
1208 7 : const uint32_t frameNumber = command.read< uint32_t >();
1209 :
1210 7 : LBLOG( LOG_TASKS ) << "---- TASK finish frame --- " << command << " frame "
1211 7 : << frameNumber << " id " << frameID << std::endl;
1212 :
1213 7 : LBASSERTINFO( _impl->currentFrame >= frameNumber,
1214 : "current " <<_impl->currentFrame << " finish " <<frameNumber);
1215 :
1216 7 : frameFinish( frameID, frameNumber );
1217 :
1218 7 : LBASSERTINFO( _impl->finishedFrame >= frameNumber,
1219 : "Pipe::frameFinish() did not release frame " << frameNumber );
1220 :
1221 7 : if( _impl->unlockedFrame < frameNumber )
1222 : {
1223 0 : LBWARN << "Finished frame was not locally unlocked, enforcing unlock"
1224 0 : << std::endl << " unlocked " << _impl->unlockedFrame.get()
1225 0 : << " done " << frameNumber << std::endl;
1226 0 : releaseFrameLocal( frameNumber );
1227 : }
1228 :
1229 7 : if( _impl->finishedFrame < frameNumber )
1230 : {
1231 0 : LBWARN << "Finished frame was not released, enforcing unlock"
1232 0 : << std::endl;
1233 0 : releaseFrame( frameNumber );
1234 : }
1235 :
1236 7 : _releaseViews();
1237 :
1238 7 : const uint128_t version = commit();
1239 7 : if( version != co::VERSION_NONE )
1240 5 : send( command.getRemoteNode(), fabric::CMD_OBJECT_SYNC );
1241 7 : return true;
1242 : }
1243 :
1244 7 : bool Pipe::_cmdFrameDrawFinish( co::ICommand& cmd )
1245 : {
1246 7 : LB_TS_THREAD( _pipeThread );
1247 :
1248 7 : co::ObjectICommand command( cmd );
1249 7 : const uint128_t& frameID = command.read< uint128_t >();
1250 7 : const uint32_t frameNumber = command.read< uint32_t >();
1251 :
1252 7 : LBLOG( LOG_TASKS ) << "TASK draw finish " << getName()
1253 0 : << " frame " << frameNumber << " id " << frameID
1254 7 : << std::endl;
1255 :
1256 7 : frameDrawFinish( frameID, frameNumber );
1257 7 : return true;
1258 : }
1259 :
1260 0 : bool Pipe::_cmdDetachView( co::ICommand& cmd )
1261 : {
1262 0 : co::ObjectICommand command( cmd );
1263 :
1264 0 : LB_TS_THREAD( _pipeThread );
1265 :
1266 0 : ViewHash::iterator i = _impl->views.find( command.read< uint128_t >( ));
1267 0 : if( i != _impl->views.end( ))
1268 : {
1269 0 : View* view = i->second;
1270 0 : _impl->views.erase( i );
1271 :
1272 0 : NodeFactory* nodeFactory = Global::getNodeFactory();
1273 0 : nodeFactory->releaseView( view );
1274 : }
1275 0 : return true;
1276 : }
1277 :
1278 : }
1279 :
1280 : #include "../fabric/pipe.ipp"
1281 : template class eq::fabric::Pipe< eq::Node, eq::Pipe, eq::Window,
1282 : eq::PipeVisitor >;
1283 :
1284 : /** @cond IGNORE */
1285 : template EQFABRIC_API std::ostream& eq::fabric::operator << ( std::ostream&,
1286 36 : const eq::Super& );
1287 : /** @endcond */
|