Line data Source code
1 :
2 : /* Copyright (c) 2005-2015, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Daniel Nachbaur <danielnachbaur@gmail.com>
4 : * Cedric Stalder <cedric.stalder@gmail.com>
5 : *
6 : * This library is free software; you can redistribute it and/or modify it under
7 : * the terms of the GNU Lesser General Public License version 2.1 as published
8 : * by the Free Software Foundation.
9 : *
10 : * This library is distributed in the hope that it will be useful, but WITHOUT
11 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13 : * details.
14 : *
15 : * You should have received a copy of the GNU Lesser General Public License
16 : * along with this library; if not, write to the Free Software Foundation, Inc.,
17 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 : */
19 :
20 : #include "pipe.h"
21 :
22 : // must be included before any header defining Bool
23 : #ifdef EQ_QT_USED
24 : # include "qt/window.h"
25 : # include <QThread>
26 : #endif
27 :
28 : #include "channel.h"
29 : #include "client.h"
30 : #include "config.h"
31 : #include "exception.h"
32 : #include "frame.h"
33 : #include "frameData.h"
34 : #include "global.h"
35 : #include "log.h"
36 : #include "node.h"
37 : #include "nodeFactory.h"
38 : #include "pipeStatistics.h"
39 : #include "server.h"
40 : #include "view.h"
41 : #include "window.h"
42 :
43 : #include "messagePump.h"
44 : #include "systemPipe.h"
45 :
46 : #include "computeContext.h"
47 : #ifdef EQUALIZER_USE_CUDA
48 : # include "cudaContext.h"
49 : #endif
50 :
51 : #include <eq/fabric/commands.h>
52 : #include <eq/fabric/elementVisitor.h>
53 : #include <eq/fabric/leafVisitor.h>
54 : #include <eq/fabric/task.h>
55 :
56 : #include <co/global.h>
57 : #include <co/objectICommand.h>
58 : #include <co/queueSlave.h>
59 : #include <co/worker.h>
60 : #include <boost/lexical_cast.hpp>
61 : #include <sstream>
62 :
63 : #ifdef EQUALIZER_USE_HWLOC_GL
64 : # include <hwloc.h>
65 : # include <hwloc/gl.h>
66 : #endif
67 :
68 : #ifdef EQUALIZER_USE_QT5WIDGETS
69 : # include <QGuiApplication>
70 : # include <QRegularExpression>
71 : #endif
72 :
73 : namespace eq
74 : {
75 : /** @cond IGNORE */
76 : typedef fabric::Pipe< Node, Pipe, Window, PipeVisitor > Super;
77 : typedef co::CommandFunc<Pipe> PipeFunc;
78 : /** @endcond */
79 :
80 : namespace
81 : {
82 : enum State
83 : {
84 : STATE_MAPPED,
85 : STATE_INITIALIZING,
86 : STATE_RUNNING,
87 : STATE_STOPPING, // must come after running
88 : STATE_STOPPED, // must come after running
89 : STATE_FAILED
90 : };
91 :
92 : typedef stde::hash_map< uint128_t, Frame* > FrameHash;
93 : typedef stde::hash_map< uint128_t, FrameDataPtr > FrameDataHash;
94 : typedef stde::hash_map< uint128_t, View* > ViewHash;
95 : typedef stde::hash_map< uint128_t, co::QueueSlave* > QueueHash;
96 : typedef FrameHash::const_iterator FrameHashCIter;
97 : typedef FrameDataHash::const_iterator FrameDataHashCIter;
98 : typedef ViewHash::const_iterator ViewHashCIter;
99 : typedef ViewHash::iterator ViewHashIter;
100 : typedef QueueHash::const_iterator QueueHashCIter;
101 : }
102 :
103 : namespace detail
104 : {
105 :
106 2 : class RenderThread : public eq::Worker
107 : {
108 : public:
109 1 : explicit RenderThread( eq::Pipe* pipe )
110 : : eq::Worker( co::Global::getCommandQueueLimit( ))
111 1 : , _pipe( pipe )
112 1 : {}
113 :
114 : protected:
115 1 : bool init() override
116 : {
117 2 : setName( std::string( "Draw" ) +
118 2 : boost::lexical_cast< std::string >( _pipe->getPath().pipeIndex ));
119 1 : return true;
120 : }
121 :
122 : void run() override;
123 20 : bool stopRunning() override { return !_pipe; }
124 :
125 : private:
126 : eq::Pipe* _pipe;
127 : friend class eq::Pipe;
128 : };
129 :
130 :
131 : /** Asynchronous, per-pipe readback thread. */
132 1 : class TransferThread : public co::Worker
133 : {
134 : public:
135 91 : explicit TransferThread( const uint32_t index )
136 : : co::Worker( co::Global::getCommandQueueLimit( ))
137 : , _index( index )
138 : , _qThread( nullptr )
139 91 : , _stop( false )
140 91 : {}
141 :
142 0 : bool init() override
143 : {
144 0 : if( !co::Worker::init( ))
145 0 : return false;
146 0 : setName( std::string( "Tfer" ) +
147 0 : boost::lexical_cast< std::string >( _index ));
148 : #ifdef EQ_QT_USED
149 0 : _qThread = QThread::currentThread();
150 : #endif
151 0 : return true;
152 : }
153 :
154 0 : bool stopRunning() override { return _stop; }
155 0 : void postStop() { _stop = true; }
156 :
157 0 : QThread* getQThread() { return _qThread; }
158 :
159 : private:
160 : uint32_t _index;
161 : QThread* _qThread;
162 : bool _stop; // thread will exit if this is true
163 : };
164 :
165 : class Pipe
166 : {
167 : public:
168 91 : explicit Pipe( const uint32_t index )
169 : : systemPipe( 0 )
170 : #ifdef AGL
171 : , windowSystem( "AGL" )
172 : #elif GLX
173 : , windowSystem( "GLX" )
174 : #elif WGL
175 : , windowSystem( "WGL" )
176 : #elif EQ_QT_USED
177 : , windowSystem( "Qt" )
178 : #endif
179 : , state( STATE_STOPPED )
180 : , currentFrame( 0 )
181 : , frameTime( 0 )
182 : , thread( 0 )
183 : , transferThread( index )
184 91 : , computeContext( 0 )
185 91 : {}
186 :
187 1 : ~Pipe()
188 1 : {
189 1 : delete thread;
190 1 : thread = 0;
191 1 : }
192 :
193 : /** Window-system specific functions class */
194 : SystemPipe* systemPipe;
195 :
196 : /** The current window system. */
197 : WindowSystem windowSystem;
198 :
199 : /** The configInit/configExit state. */
200 : lunchbox::Monitor< State > state;
201 :
202 : /** The last started frame. */
203 : uint32_t currentFrame;
204 :
205 : /** The number of the last finished frame. */
206 : lunchbox::Monitor< uint32_t > finishedFrame;
207 :
208 : /** The number of the last locally unlocked frame. */
209 : lunchbox::Monitor< uint32_t > unlockedFrame;
210 :
211 : /** The running per-frame statistic clocks. */
212 : std::deque< int64_t > frameTimes;
213 : lunchbox::Lock frameTimeMutex;
214 :
215 : /** The base time for the currently active frame. */
216 : int64_t frameTime;
217 :
218 : /** All assembly frames used by the pipe during rendering. */
219 : FrameHash frames;
220 :
221 : /** All output frame datas used by the pipe during rendering. */
222 : FrameDataHash outputFrameDatas;
223 :
224 : /** All input frame datas used by the pipe during rendering. */
225 : FrameDataHash inputFrameDatas;
226 :
227 : /** All views used by the pipe's channels during rendering. */
228 : ViewHash views;
229 :
230 : /** All queues used by the pipe's channels during rendering. */
231 : QueueHash queues;
232 :
233 : /** The pipe thread. */
234 : RenderThread* thread;
235 :
236 : detail::TransferThread transferThread;
237 :
238 : /** GPU Computing context */
239 : ComputeContext *computeContext;
240 : };
241 :
242 1 : void RenderThread::run()
243 : {
244 1 : LB_TS_THREAD( _pipe->_pipeThread );
245 1 : LBDEBUG << "Entered pipe thread" << std::endl;
246 :
247 1 : eq::Pipe* pipe = _pipe; // _pipe gets cleared on exit
248 1 : pipe->_impl->state.waitEQ( STATE_MAPPED );
249 1 : pipe->_impl->windowSystem = pipe->selectWindowSystem();
250 1 : pipe->_setupCommandQueue();
251 1 : pipe->_setupAffinity();
252 :
253 1 : Worker::run();
254 :
255 1 : pipe->_exitCommandQueue();
256 1 : }
257 : }
258 :
259 91 : Pipe::Pipe( Node* parent )
260 : : Super( parent )
261 91 : , _impl( new detail::Pipe( getPath().pipeIndex ))
262 : {
263 91 : }
264 :
265 3 : Pipe::~Pipe()
266 : {
267 1 : LBASSERT( getWindows().empty( ));
268 1 : delete _impl;
269 2 : }
270 :
271 9 : Config* Pipe::getConfig()
272 : {
273 9 : Node* node = getNode();
274 9 : LBASSERT( node );
275 9 : return ( node ? node->getConfig() : 0);
276 : }
277 :
278 0 : const Config* Pipe::getConfig() const
279 : {
280 0 : const Node* node = getNode();
281 0 : LBASSERT( node );
282 0 : return ( node ? node->getConfig() : 0);
283 : }
284 :
285 2 : ClientPtr Pipe::getClient()
286 : {
287 2 : Node* node = getNode();
288 2 : LBASSERT( node );
289 2 : return ( node ? node->getClient() : 0);
290 : }
291 :
292 4 : ServerPtr Pipe::getServer()
293 : {
294 4 : Node* node = getNode();
295 4 : LBASSERT( node );
296 4 : return ( node ? node->getServer() : 0);
297 : }
298 :
299 1 : void Pipe::attach( const uint128_t& id, const uint32_t instanceID )
300 : {
301 1 : Super::attach( id, instanceID );
302 :
303 1 : co::CommandQueue* queue = getPipeThreadQueue();
304 1 : co::CommandQueue* transferQ = getTransferThreadQueue();
305 :
306 : registerCommand( fabric::CMD_PIPE_CONFIG_INIT,
307 1 : PipeFunc( this, &Pipe::_cmdConfigInit ), queue );
308 : registerCommand( fabric::CMD_PIPE_CONFIG_EXIT,
309 1 : PipeFunc( this, &Pipe::_cmdConfigExit ), queue );
310 : registerCommand( fabric::CMD_PIPE_CREATE_WINDOW,
311 1 : PipeFunc( this, &Pipe::_cmdCreateWindow ), queue );
312 : registerCommand( fabric::CMD_PIPE_DESTROY_WINDOW,
313 1 : PipeFunc( this, &Pipe::_cmdDestroyWindow ), queue );
314 : registerCommand( fabric::CMD_PIPE_FRAME_START,
315 1 : PipeFunc( this, &Pipe::_cmdFrameStart ), queue );
316 : registerCommand( fabric::CMD_PIPE_FRAME_FINISH,
317 1 : PipeFunc( this, &Pipe::_cmdFrameFinish ), queue );
318 : registerCommand( fabric::CMD_PIPE_FRAME_DRAW_FINISH,
319 1 : PipeFunc( this, &Pipe::_cmdFrameDrawFinish ), queue );
320 : registerCommand( fabric::CMD_PIPE_FRAME_START_CLOCK,
321 1 : PipeFunc( this, &Pipe::_cmdFrameStartClock ), 0 );
322 : registerCommand( fabric::CMD_PIPE_EXIT_THREAD,
323 1 : PipeFunc( this, &Pipe::_cmdExitThread ), queue );
324 : registerCommand( fabric::CMD_PIPE_DETACH_VIEW,
325 1 : PipeFunc( this, &Pipe::_cmdDetachView ), queue );
326 : registerCommand( fabric::CMD_PIPE_EXIT_TRANSFER_THREAD,
327 : PipeFunc( this, &Pipe::_cmdExitTransferThread ),
328 1 : transferQ );
329 1 : }
330 :
331 1084 : void Pipe::setDirty( const uint64_t bits )
332 : {
333 : // jump over fabric setDirty to avoid dirty'ing node pipes list
334 : // pipes are individually synced in frame finish for thread-safety
335 1084 : Object::setDirty( bits );
336 1084 : }
337 :
338 0 : bool Pipe::isWindowSystemAvailable( const std::string& name ) const
339 : {
340 0 : bool available = false;
341 0 : if( name != "Qt" )
342 : {
343 : #ifdef AGL
344 : available = name == "AGL";
345 : #elif GLX
346 0 : available = name == "GLX";
347 : #elif WGL
348 : available = name == "WGL";
349 : #endif
350 : }
351 :
352 : #ifdef EQ_QT_USED
353 0 : if( name != "Qt" )
354 0 : return available;
355 :
356 : // Qt can only use ports that refer to QScreens available to the
357 : // QApplication. In Windows and Mac all physical displays are
358 : // queriable using platform dependent functions (no idea about virtual
359 : // displays like Xming or XQuartz), so we will assume that if Equalizer
360 : // was built with Qt support, then all devices can be used by Qt.
361 : // (How to choose the right screen for a given device it's a different
362 : // story)
363 : # ifdef AGL
364 : available = true;
365 : # elif WGL
366 : available = true;
367 : # else
368 : // For X it's different. Qt can only use screens that are part of the
369 : // display server referred by the DISPLAY environmental variable (in
370 : // particular its value at the moment of the creation of the QApplication)
371 : # ifdef __APPLE__
372 : // In MAC this is simpler, because there can only be one display server.
373 : return true;
374 : # else
375 : // There's no way to infer the X server number from the Qt API except
376 : // QScreen names, but only under certain conditions. In regular desktop
377 : // usage QScreen::name gives a name related to what xrandr prints, so it's
378 : // not usable. The names seems to follow the X naming convention
379 : // (e.g. host:0.0) when XRANDR is not available (e.g. Xnest) or doesn't
380 : // provide information about where the displays are connected (e.g.
381 : // headless or Xvnc). These are corner cases so we cannot rely on QScreen.
382 : // Therefore, we will infer which port is Qt using directly from DISPLAY.
383 :
384 : // Is the port is undefined that means that the default server must
385 : // be used. This should match the Qt display server.
386 0 : if( getPort() == LB_UNDEFINED_UINT32 )
387 0 : return true;
388 :
389 : QGuiApplication* app =
390 0 : dynamic_cast< QGuiApplication* >( QCoreApplication::instance( ));
391 0 : if( !app || !app->primaryScreen() )
392 0 : return false; // Qt won't be able to access anything anyway
393 :
394 0 : QRegularExpression regex( "^[a-z]*\\:([0-9]+)(\\.[0-9]+)?$" );
395 : QRegularExpressionMatch match =
396 0 : regex.match( getenv( "DISPLAY" ) ? getenv( "DISPLAY" ) : "" );
397 0 : available = match.captured( 1 ) == QString::number( getPort(), 10 );
398 : # endif
399 : # endif
400 : #endif
401 :
402 0 : return available;
403 : }
404 :
405 1 : WindowSystem Pipe::selectWindowSystem() const
406 : {
407 : #ifdef AGL
408 : return WindowSystem( "AGL" );
409 : #elif GLX
410 1 : return WindowSystem( "GLX" );
411 : #elif WGL
412 : return WindowSystem( "WGL" );
413 : #elif EQ_QT_USED
414 : if( !isWindowSystemAvailable( "Qt" ))
415 : {
416 : // Throwing because there's no reasonable alternative.
417 : std::stringstream msg;
418 : msg << "Cannot choose windowing system for pipe at port " << getPort()
419 : << ". Qt was set as the default, but it cannot be used. In X"
420 : " based systems this means that the value of DISPLAY taken by"
421 : " Qt refers to a different display server." << std::endl;
422 : LBTHROW( std::runtime_error( msg.str( )));
423 : }
424 : return WindowSystem( "Qt" );
425 : #endif
426 : }
427 :
428 1 : void Pipe::_setupCommandQueue()
429 : {
430 3 : LBDEBUG << "Set up pipe message pump for " << _impl->windowSystem
431 3 : << std::endl;
432 :
433 1 : Config* config = getConfig();
434 1 : config->setupMessagePump( this );
435 :
436 1 : if( !_impl->thread ) // Non-threaded pipes have no pipe thread message pump
437 1 : return;
438 :
439 1 : CommandQueue* queue = _impl->thread->getWorkerQueue();
440 1 : LBASSERT( queue );
441 1 : LBASSERT( !queue->getMessagePump( ));
442 :
443 1 : Global::enterCarbon();
444 1 : MessagePump* pump = createMessagePump();
445 1 : if( pump )
446 1 : pump->dispatchAll(); // initializes _impl->receiverQueue
447 :
448 1 : queue->setMessagePump( pump );
449 1 : Global::leaveCarbon();
450 : }
451 :
452 1 : int32_t Pipe::_getAutoAffinity() const
453 : {
454 : #ifdef EQUALIZER_USE_HWLOC_GL
455 : uint32_t port = getPort();
456 : uint32_t device = getDevice();
457 :
458 : if( port == LB_UNDEFINED_UINT32 && device == LB_UNDEFINED_UINT32 )
459 : return lunchbox::Thread::NONE;
460 :
461 : if( port == LB_UNDEFINED_UINT32 )
462 : port = 0;
463 : if( device == LB_UNDEFINED_UINT32 )
464 : device = 0;
465 :
466 : hwloc_topology_t topology;
467 : if( hwloc_topology_init( &topology ) < 0 )
468 : {
469 : LBINFO << "Automatic pipe thread placement failed: "
470 : << "hwloc_topology_init() failed" << std::endl;
471 : return lunchbox::Thread::NONE;
472 : }
473 :
474 : // Load I/O devices, bridges and their relevant info
475 : const unsigned long loading_flags = HWLOC_TOPOLOGY_FLAG_IO_BRIDGES |
476 : HWLOC_TOPOLOGY_FLAG_IO_DEVICES;
477 : if( hwloc_topology_set_flags( topology, loading_flags ) < 0 )
478 : {
479 : LBINFO << "Automatic pipe thread placement failed: "
480 : << "hwloc_topology_set_flags() failed" << std::endl;
481 : hwloc_topology_destroy( topology );
482 : return lunchbox::Thread::NONE;
483 : }
484 :
485 : if( hwloc_topology_load( topology ) < 0 )
486 : {
487 : LBINFO << "Automatic pipe thread placement failed: "
488 : << "hwloc_topology_load() failed" << std::endl;
489 : hwloc_topology_destroy( topology );
490 : return lunchbox::Thread::NONE;
491 : }
492 :
493 : const hwloc_obj_t osdev =
494 : hwloc_gl_get_display_osdev_by_port_device( topology,
495 : int( port ), int( device ));
496 : if( !osdev )
497 : {
498 : LBINFO << "Automatic pipe thread placement failed: GPU not found"
499 : << std::endl;
500 : hwloc_topology_destroy( topology );
501 : return lunchbox::Thread::NONE;
502 : }
503 :
504 : const hwloc_obj_t pcidev = osdev->parent;
505 : const hwloc_obj_t parent = hwloc_get_non_io_ancestor_obj( topology, pcidev);
506 : const int numCpus =
507 : hwloc_get_nbobjs_inside_cpuset_by_type( topology, parent->cpuset,
508 : HWLOC_OBJ_SOCKET );
509 : if( numCpus != 1 )
510 : {
511 : LBINFO << "Automatic pipe thread placement failed: GPU attached to "
512 : << numCpus << " processors?" << std::endl;
513 : hwloc_topology_destroy( topology );
514 : return lunchbox::Thread::NONE;
515 : }
516 :
517 : const hwloc_obj_t cpuObj =
518 : hwloc_get_obj_inside_cpuset_by_type( topology, parent->cpuset,
519 : HWLOC_OBJ_SOCKET, 0 );
520 : if( cpuObj == 0 )
521 : {
522 : LBINFO << "Automatic pipe thread placement failed: "
523 : << "hwloc_get_obj_inside_cpuset_by_type() failed" << std::endl;
524 : hwloc_topology_destroy( topology );
525 : return lunchbox::Thread::NONE;
526 : }
527 :
528 : const int cpuIndex = cpuObj->logical_index;
529 : hwloc_topology_destroy( topology );
530 : return cpuIndex + lunchbox::Thread::SOCKET;
531 : #else
532 3 : LBDEBUG << "Automatic thread placement not supported, no hwloc GL support"
533 3 : << std::endl;
534 : #endif
535 1 : return lunchbox::Thread::NONE;
536 : }
537 :
538 1 : void Pipe::_setupAffinity()
539 : {
540 1 : const int32_t affinity = getIAttribute( IATTR_HINT_AFFINITY );
541 1 : switch( affinity )
542 : {
543 : case AUTO:
544 1 : lunchbox::Thread::setAffinity( _getAutoAffinity( ));
545 1 : break;
546 :
547 : case OFF:
548 : default:
549 0 : lunchbox::Thread::setAffinity( affinity );
550 0 : break;
551 : }
552 1 : }
553 :
554 1 : void Pipe::_exitCommandQueue()
555 : {
556 : // Non-threaded pipes have no pipe thread message pump
557 1 : if( !_impl->thread )
558 1 : return;
559 :
560 1 : CommandQueue* queue = _impl->thread->getWorkerQueue();
561 1 : LBASSERT( queue );
562 :
563 1 : MessagePump* pump = queue->getMessagePump();
564 1 : queue->setMessagePump( 0 );
565 1 : delete pump;
566 : }
567 :
568 1 : MessagePump* Pipe::createMessagePump()
569 : {
570 1 : return _impl->windowSystem.createMessagePump();
571 : }
572 :
573 0 : MessagePump* Pipe::getMessagePump()
574 : {
575 0 : LB_TS_THREAD( _pipeThread );
576 0 : if( !_impl->thread )
577 0 : return 0;
578 :
579 0 : CommandQueue* queue = _impl->thread->getWorkerQueue();
580 0 : return queue->getMessagePump();
581 : }
582 :
583 3 : co::CommandQueue* Pipe::getPipeThreadQueue()
584 : {
585 3 : if( _impl->thread )
586 3 : return _impl->thread->getWorkerQueue();
587 :
588 0 : return getNode()->getMainThreadQueue();
589 : }
590 :
591 2 : co::CommandQueue* Pipe::getTransferThreadQueue()
592 : {
593 2 : return _impl->transferThread.getWorkerQueue();
594 : }
595 :
596 1 : co::CommandQueue* Pipe::getMainThreadQueue()
597 : {
598 1 : return getServer()->getMainThreadQueue();
599 : }
600 :
601 1 : co::CommandQueue* Pipe::getCommandThreadQueue()
602 : {
603 1 : return getServer()->getCommandThreadQueue();
604 : }
605 :
606 0 : Frame* Pipe::getFrame( const co::ObjectVersion& frameVersion, const Eye eye,
607 : const bool isOutput )
608 : {
609 0 : LB_TS_THREAD( _pipeThread );
610 0 : Frame* frame = _impl->frames[ frameVersion.identifier ];
611 :
612 0 : if( !frame )
613 : {
614 0 : ClientPtr client = getClient();
615 0 : frame = new Frame();
616 :
617 0 : LBCHECK( client->mapObject( frame, frameVersion ));
618 0 : _impl->frames[ frameVersion.identifier ] = frame;
619 : }
620 : else
621 0 : frame->sync( frameVersion.version );
622 :
623 0 : const co::ObjectVersion& dataVersion = frame->getDataVersion( eye );
624 0 : LBLOG( LOG_ASSEMBLY ) << "Use " << dataVersion << std::endl;
625 :
626 0 : FrameDataPtr frameData = getNode()->getFrameData( dataVersion );
627 0 : LBASSERT( frameData );
628 :
629 0 : if( isOutput )
630 : {
631 0 : if( !frameData->isAttached() )
632 : {
633 0 : ClientPtr client = getClient();
634 0 : LBCHECK( client->mapObject( frameData.get(), dataVersion ));
635 : }
636 0 : else if( frameData->getVersion() < dataVersion.version )
637 0 : frameData->sync( dataVersion.version );
638 :
639 0 : _impl->outputFrameDatas[ dataVersion.identifier ] = frameData;
640 : }
641 : else
642 0 : _impl->inputFrameDatas[ dataVersion.identifier ] = frameData;
643 :
644 0 : frame->setFrameData( frameData );
645 0 : return frame;
646 : }
647 :
648 0 : void Pipe::flushFrames( util::ObjectManager& om )
649 : {
650 0 : LB_TS_THREAD( _pipeThread );
651 0 : ClientPtr client = getClient();
652 0 : for( FrameHashCIter i = _impl->frames.begin(); i !=_impl->frames.end(); ++i)
653 : {
654 0 : Frame* frame = i->second;
655 0 : frame->setFrameData( 0 ); // datas are flushed below
656 0 : client->unmapObject( frame );
657 0 : delete frame;
658 : }
659 0 : _impl->frames.clear();
660 :
661 0 : for( FrameDataHashCIter i = _impl->inputFrameDatas.begin();
662 0 : i != _impl->inputFrameDatas.end(); ++i )
663 : {
664 0 : FrameDataPtr data = i->second;
665 0 : data->deleteGLObjects( om );
666 0 : }
667 0 : _impl->inputFrameDatas.clear();
668 :
669 0 : for( FrameDataHashCIter i = _impl->outputFrameDatas.begin();
670 0 : i != _impl->outputFrameDatas.end(); ++i )
671 : {
672 0 : FrameDataPtr data = i->second;
673 0 : data->resetPlugins();
674 0 : data->deleteGLObjects( om );
675 0 : client->unmapObject( data.get( ));
676 0 : getNode()->releaseFrameData( data );
677 0 : }
678 0 : _impl->outputFrameDatas.clear();
679 0 : }
680 :
681 0 : co::QueueSlave* Pipe::getQueue( const uint128_t& queueID )
682 : {
683 0 : LB_TS_THREAD( _pipeThread );
684 0 : if( queueID == 0 )
685 0 : return 0;
686 :
687 0 : co::QueueSlave* queue = _impl->queues[ queueID ];
688 0 : if( !queue )
689 : {
690 0 : queue = new co::QueueSlave;
691 0 : ClientPtr client = getClient();
692 0 : LBCHECK( client->mapObject( queue, queueID ));
693 :
694 0 : _impl->queues[ queueID ] = queue;
695 : }
696 :
697 0 : return queue;
698 : }
699 :
700 1 : void Pipe::_flushQueues()
701 : {
702 1 : LB_TS_THREAD( _pipeThread );
703 1 : ClientPtr client = getClient();
704 :
705 1 : for( QueueHashCIter i = _impl->queues.begin(); i !=_impl->queues.end(); ++i)
706 : {
707 0 : co::QueueSlave* queue = i->second;
708 0 : client->unmapObject( queue );
709 0 : delete queue;
710 : }
711 1 : _impl->queues.clear();
712 1 : }
713 :
714 0 : const View* Pipe::getView( const co::ObjectVersion& viewVersion ) const
715 : {
716 : // Yie-ha: we want to have a const-interface to get a view on the render
717 : // clients, but view mapping is by definition non-const.
718 0 : return const_cast< Pipe* >( this )->getView( viewVersion );
719 : }
720 :
721 0 : View* Pipe::getView( const co::ObjectVersion& viewVersion )
722 : {
723 0 : LB_TS_THREAD( _pipeThread );
724 0 : if( viewVersion.identifier == 0 )
725 0 : return 0;
726 :
727 0 : View* view = _impl->views[ viewVersion.identifier ];
728 0 : if( !view )
729 : {
730 0 : NodeFactory* nodeFactory = Global::getNodeFactory();
731 0 : view = nodeFactory->createView( 0 );
732 0 : LBASSERT( view );
733 0 : view->_pipe = this;
734 0 : ClientPtr client = getClient();
735 0 : LBCHECK( client->mapObject( view, viewVersion ));
736 :
737 0 : _impl->views[ viewVersion.identifier ] = view;
738 : }
739 :
740 0 : view->sync( viewVersion.version );
741 0 : return view;
742 : }
743 :
744 0 : void Pipe::_releaseViews()
745 : {
746 0 : LB_TS_THREAD( _pipeThread );
747 0 : for( bool changed = true; changed; )
748 : {
749 0 : changed = false;
750 0 : for( ViewHashIter i =_impl->views.begin(); i !=_impl->views.end(); ++i )
751 : {
752 0 : View* view = i->second;
753 0 : view->commit();
754 0 : if( view->getVersion() + 20 > view->getHeadVersion( ))
755 0 : continue;
756 :
757 : // release unused view to avoid memory leaks due to deltas piling up
758 0 : view->_pipe = 0;
759 :
760 0 : ClientPtr client = getClient();
761 0 : client->unmapObject( view );
762 0 : _impl->views.erase( i );
763 :
764 0 : NodeFactory* nodeFactory = Global::getNodeFactory();
765 0 : nodeFactory->releaseView( view );
766 :
767 0 : changed = true;
768 0 : break;
769 0 : }
770 : }
771 0 : }
772 :
773 1 : void Pipe::_flushViews()
774 : {
775 1 : LB_TS_THREAD( _pipeThread );
776 1 : NodeFactory* nodeFactory = Global::getNodeFactory();
777 1 : ClientPtr client = getClient();
778 :
779 1 : for( ViewHashCIter i = _impl->views.begin(); i != _impl->views.end(); ++i )
780 : {
781 0 : View* view = i->second;
782 :
783 0 : client->unmapObject( view );
784 0 : view->_pipe = 0;
785 0 : nodeFactory->releaseView( view );
786 : }
787 1 : _impl->views.clear();
788 1 : }
789 :
790 1 : void Pipe::startThread()
791 : {
792 1 : _impl->thread = new detail::RenderThread( this );
793 1 : _impl->thread->start();
794 1 : }
795 :
796 1 : void Pipe::exitThread()
797 : {
798 1 : _stopTransferThread();
799 :
800 1 : if( !_impl->thread )
801 1 : return;
802 :
803 1 : send( getLocalNode(), fabric::CMD_PIPE_EXIT_THREAD );
804 :
805 1 : _impl->thread->join();
806 1 : delete _impl->thread;
807 1 : _impl->thread = 0;
808 : }
809 :
810 90 : void Pipe::cancelThread()
811 : {
812 90 : _stopTransferThread();
813 :
814 90 : if( !_impl->thread )
815 180 : return;
816 :
817 : // local command dispatching
818 : co::ObjectOCommand( this, getLocalNode(), fabric::CMD_PIPE_EXIT_THREAD,
819 0 : co::COMMANDTYPE_OBJECT, getID(), CO_INSTANCE_ALL );
820 : }
821 :
822 0 : void Pipe::waitExited() const
823 : {
824 0 : _impl->state.waitGE( STATE_STOPPED );
825 0 : }
826 :
827 1 : bool Pipe::isRunning() const
828 : {
829 1 : return (_impl->state == STATE_RUNNING);
830 : }
831 :
832 1 : bool Pipe::isStopped() const
833 : {
834 1 : return (_impl->state == STATE_STOPPED);
835 : }
836 :
837 1 : void Pipe::notifyMapped()
838 : {
839 1 : LBASSERT( _impl->state == STATE_STOPPED );
840 1 : _impl->state = STATE_MAPPED;
841 1 : }
842 :
843 : namespace
844 : {
845 0 : class WaitFinishedVisitor : public PipeVisitor
846 : {
847 : public:
848 0 : WaitFinishedVisitor( const uint32_t frame, MessagePump* pump )
849 0 : : _frame( frame ), _pump( pump ) {}
850 :
851 0 : virtual VisitorResult visit( Channel* channel )
852 : {
853 0 : while( !channel->waitFrameFinished( _frame, 100 ))
854 : {
855 : // process potential pending Qt slots
856 0 : if( _pump )
857 0 : _pump->dispatchAll();
858 : }
859 :
860 0 : return TRAVERSE_CONTINUE;
861 : }
862 :
863 : private:
864 : const uint32_t _frame;
865 : MessagePump* _pump;
866 : };
867 : }
868 :
869 0 : void Pipe::waitFrameFinished( const uint32_t frameNumber )
870 : {
871 0 : MessagePump* pump = getConfig()->getMessagePump();
872 0 : while( !_impl->finishedFrame.timedWaitGE( frameNumber, 100 ))
873 : {
874 : // process potential pending Qt slots
875 0 : if( pump )
876 0 : pump->dispatchAll();
877 : }
878 :
879 0 : WaitFinishedVisitor waiter( frameNumber, pump );
880 0 : accept( waiter );
881 0 : }
882 :
883 0 : void Pipe::waitFrameLocal( const uint32_t frameNumber ) const
884 : {
885 0 : _impl->unlockedFrame.waitGE( frameNumber );
886 0 : }
887 :
888 0 : uint32_t Pipe::getCurrentFrame() const
889 : {
890 0 : LB_TS_THREAD( _pipeThread );
891 0 : return _impl->currentFrame;
892 : }
893 :
894 0 : uint32_t Pipe::getFinishedFrame() const
895 : {
896 0 : return _impl->finishedFrame.get();
897 : }
898 :
899 1 : WindowSystem Pipe::getWindowSystem() const
900 : {
901 1 : return _impl->windowSystem;
902 : }
903 :
904 1 : EventOCommand Pipe::sendError( const uint32_t error )
905 : {
906 1 : return getConfig()->sendError( Event::PIPE_ERROR, Error( error, getID( )));
907 : }
908 :
909 0 : bool Pipe::processEvent( const Event& event )
910 : {
911 0 : ConfigEvent configEvent( event );
912 0 : getConfig()->sendEvent( configEvent );
913 0 : return true;
914 : }
915 :
916 : //---------------------------------------------------------------------------
917 : // pipe-thread methods
918 : //---------------------------------------------------------------------------
919 1 : bool Pipe::configInit( const uint128_t& initID )
920 : {
921 1 : LB_TS_THREAD( _pipeThread );
922 :
923 1 : LBASSERT( !_impl->systemPipe );
924 :
925 1 : if ( !configInitSystemPipe( initID ))
926 1 : return false;
927 :
928 : // -------------------------------------------------------------------------
929 0 : LBASSERT(!_impl->computeContext);
930 :
931 : // for now we only support CUDA
932 : #ifdef EQUALIZER_USE_CUDA
933 : if( getIAttribute( IATTR_HINT_CUDA_GL_INTEROP ) == eq::ON )
934 : {
935 : LBDEBUG << "Initializing CUDAContext" << std::endl;
936 : ComputeContext* computeCtx = new CUDAContext( this );
937 :
938 : if( !computeCtx->configInit() )
939 : {
940 : LBWARN << "GPU Computing context initialization failed "
941 : << std::endl;
942 : delete computeCtx;
943 : return false;
944 : }
945 : setComputeContext( computeCtx );
946 : }
947 : #endif
948 :
949 0 : return true;
950 : }
951 :
952 1 : bool Pipe::configInitSystemPipe( const uint128_t& )
953 : {
954 1 : SystemPipe* systemPipe = _impl->windowSystem.createPipe( this );
955 1 : LBASSERT( systemPipe );
956 :
957 1 : if( !systemPipe->configInit( ))
958 : {
959 1 : LBERROR << "System pipe context initialization failed" << std::endl;
960 1 : delete systemPipe;
961 1 : return false;
962 : }
963 :
964 0 : setSystemPipe( systemPipe );
965 0 : return true;
966 : }
967 :
968 1 : bool Pipe::configExit()
969 : {
970 1 : LB_TS_THREAD( _pipeThread );
971 :
972 1 : if( _impl->computeContext )
973 : {
974 0 : _impl->computeContext->configExit();
975 0 : delete _impl->computeContext;
976 0 : _impl->computeContext = 0;
977 : }
978 :
979 1 : if( _impl->systemPipe )
980 : {
981 0 : _impl->systemPipe->configExit( );
982 0 : delete _impl->systemPipe;
983 0 : _impl->systemPipe = 0;
984 : }
985 1 : return true;
986 : }
987 :
988 :
989 0 : void Pipe::frameStart( const uint128_t&, const uint32_t frameNumber )
990 : {
991 0 : LB_TS_THREAD( _pipeThread );
992 :
993 0 : const Node* node = getNode();
994 0 : switch( node->getIAttribute( Node::IATTR_THREAD_MODEL ))
995 : {
996 : case ASYNC: // No sync, release immediately
997 0 : releaseFrameLocal( frameNumber );
998 0 : break;
999 :
1000 : case DRAW_SYNC: // Sync, release in frameDrawFinish
1001 : case LOCAL_SYNC: // Sync, release in frameFinish
1002 0 : node->waitFrameStarted( frameNumber );
1003 0 : break;
1004 :
1005 : default:
1006 0 : LBUNIMPLEMENTED;
1007 : }
1008 :
1009 0 : startFrame( frameNumber );
1010 0 : }
1011 :
1012 0 : void Pipe::frameDrawFinish( const uint128_t&, const uint32_t frameNumber )
1013 : {
1014 0 : const Node* node = getNode();
1015 0 : switch( node->getIAttribute( Node::IATTR_THREAD_MODEL ))
1016 : {
1017 : case ASYNC: // released in frameStart
1018 0 : break;
1019 :
1020 : case DRAW_SYNC: // release
1021 0 : releaseFrameLocal( frameNumber );
1022 0 : break;
1023 :
1024 : case LOCAL_SYNC: // release in frameFinish
1025 0 : break;
1026 :
1027 : default:
1028 0 : LBUNIMPLEMENTED;
1029 : }
1030 0 : }
1031 :
1032 0 : void Pipe::frameFinish( const uint128_t&, const uint32_t frameNumber )
1033 : {
1034 0 : const Node* node = getNode();
1035 0 : switch( node->getIAttribute( Node::IATTR_THREAD_MODEL ))
1036 : {
1037 : case ASYNC: // released in frameStart
1038 0 : break;
1039 :
1040 : case DRAW_SYNC: // released in frameDrawFinish
1041 0 : break;
1042 :
1043 : case LOCAL_SYNC: // release
1044 0 : releaseFrameLocal( frameNumber );
1045 0 : break;
1046 :
1047 : default:
1048 0 : LBUNIMPLEMENTED;
1049 : }
1050 :
1051 : // Global release
1052 0 : releaseFrame( frameNumber );
1053 0 : }
1054 :
1055 0 : void Pipe::startFrame( const uint32_t frameNumber )
1056 : {
1057 0 : LB_TS_THREAD( _pipeThread );
1058 0 : _impl->currentFrame = frameNumber;
1059 0 : LBLOG( LOG_TASKS ) << "---- Started Frame ---- "<< frameNumber << std::endl;
1060 0 : }
1061 :
1062 0 : void Pipe::releaseFrame( const uint32_t frameNumber )
1063 : {
1064 0 : LB_TS_THREAD( _pipeThread );
1065 0 : _impl->finishedFrame = frameNumber;
1066 0 : LBLOG( LOG_TASKS ) << "---- Finished Frame --- "<< frameNumber << std::endl;
1067 0 : }
1068 :
1069 0 : void Pipe::releaseFrameLocal( const uint32_t frameNumber )
1070 : {
1071 0 : LB_TS_THREAD( _pipeThread );
1072 0 : LBASSERTINFO( _impl->unlockedFrame + 1 == frameNumber,
1073 : _impl->unlockedFrame << ", " << frameNumber );
1074 :
1075 0 : _impl->unlockedFrame = frameNumber;
1076 0 : LBLOG( LOG_TASKS ) << "---- Unlocked Frame --- "
1077 0 : << _impl->unlockedFrame.get() << std::endl;
1078 0 : }
1079 :
1080 0 : bool Pipe::startTransferThread()
1081 : {
1082 0 : if( _impl->transferThread.isRunning( ))
1083 0 : return true;
1084 :
1085 0 : return _impl->transferThread.start();
1086 : }
1087 :
1088 0 : QThread* Pipe::getTransferQThread()
1089 : {
1090 0 : return _impl->transferThread.getQThread();
1091 : }
1092 :
1093 1 : bool Pipe::hasTransferThread() const
1094 : {
1095 1 : return _impl->transferThread.isRunning();
1096 : }
1097 :
1098 91 : void Pipe::_stopTransferThread()
1099 : {
1100 91 : if( _impl->transferThread.isStopped( ))
1101 182 : return;
1102 :
1103 0 : send( getLocalNode(), fabric::CMD_PIPE_EXIT_TRANSFER_THREAD );
1104 0 : _impl->transferThread.join();
1105 : }
1106 :
1107 0 : void Pipe::setSystemPipe( SystemPipe* pipe )
1108 : {
1109 0 : _impl->systemPipe = pipe;
1110 0 : }
1111 :
1112 0 : SystemPipe* Pipe::getSystemPipe()
1113 : {
1114 0 : return _impl->systemPipe;
1115 : }
1116 :
1117 0 : const SystemPipe* Pipe::getSystemPipe() const
1118 : {
1119 0 : return _impl->systemPipe;
1120 : }
1121 :
1122 0 : void Pipe::setComputeContext( ComputeContext* ctx )
1123 : {
1124 0 : _impl->computeContext = ctx;
1125 0 : }
1126 :
1127 0 : const ComputeContext* Pipe::getComputeContext() const
1128 : {
1129 0 : return _impl->computeContext;
1130 : }
1131 :
1132 0 : ComputeContext* Pipe::getComputeContext()
1133 : {
1134 0 : return _impl->computeContext;
1135 : }
1136 :
1137 : //---------------------------------------------------------------------------
1138 : // command handlers
1139 : //---------------------------------------------------------------------------
1140 1 : bool Pipe::_cmdCreateWindow( co::ICommand& cmd )
1141 : {
1142 1 : co::ObjectICommand command( cmd );
1143 1 : const uint128_t& windowID = command.read< uint128_t >();
1144 :
1145 1 : LBLOG( LOG_INIT ) << "Create window " << command << " id " << windowID
1146 1 : << std::endl;
1147 :
1148 1 : Window* window = Global::getNodeFactory()->createWindow( this );
1149 1 : window->init(); // not in ctor, virtual method
1150 :
1151 1 : Config* config = getConfig();
1152 1 : LBCHECK( config->mapObject( window, windowID ));
1153 :
1154 1 : return true;
1155 : }
1156 :
1157 1 : bool Pipe::_cmdDestroyWindow( co::ICommand& cmd )
1158 : {
1159 1 : co::ObjectICommand command( cmd );
1160 :
1161 1 : LBLOG( LOG_INIT ) << "Destroy window " << command << std::endl;
1162 :
1163 1 : Window* window = _findWindow( command.read< uint128_t >( ));
1164 1 : LBASSERT( window );
1165 :
1166 : // re-set shared windows accordingly
1167 1 : Window* newSharedWindow = 0;
1168 1 : const Windows& windows = getWindows();
1169 2 : for( Windows::const_iterator i = windows.begin(); i != windows.end(); ++i )
1170 : {
1171 1 : Window* candidate = *i;
1172 :
1173 1 : if( candidate == window )
1174 1 : continue; // ignore
1175 :
1176 0 : if( candidate->getSharedContextWindow() == window )
1177 : {
1178 0 : if( newSharedWindow )
1179 0 : candidate->setSharedContextWindow( newSharedWindow );
1180 : else
1181 : {
1182 0 : newSharedWindow = candidate;
1183 0 : newSharedWindow->setSharedContextWindow( candidate );
1184 : }
1185 : }
1186 :
1187 0 : LBASSERT( candidate->getSharedContextWindow() != window );
1188 : }
1189 :
1190 1 : const bool stopped = window->isStopped();
1191 : window->send( getServer(),
1192 1 : fabric::CMD_WINDOW_CONFIG_EXIT_REPLY ) << stopped;
1193 :
1194 1 : Config* config = getConfig();
1195 1 : config->unmapObject( window );
1196 1 : Global::getNodeFactory()->releaseWindow( window );
1197 :
1198 1 : return true;
1199 : }
1200 :
1201 1 : bool Pipe::_cmdConfigInit( co::ICommand& cmd )
1202 : {
1203 1 : LB_TS_THREAD( _pipeThread );
1204 :
1205 1 : co::ObjectICommand command( cmd );
1206 1 : const uint128_t& initID = command.read< uint128_t >();
1207 1 : const uint32_t frameNumber = command.read< uint32_t >();
1208 :
1209 1 : LBLOG( LOG_INIT ) << "Init pipe " << command << " init id " << initID
1210 1 : << " frame " << frameNumber << std::endl;
1211 :
1212 1 : if( !isThreaded( ))
1213 : {
1214 0 : _impl->windowSystem = selectWindowSystem();
1215 0 : _setupCommandQueue();
1216 : }
1217 :
1218 1 : Node* node = getNode();
1219 1 : LBASSERT( node );
1220 1 : node->waitInitialized();
1221 :
1222 1 : bool result = false;
1223 1 : if( node->isRunning( ))
1224 : {
1225 1 : _impl->currentFrame = frameNumber;
1226 1 : _impl->finishedFrame = frameNumber;
1227 1 : _impl->unlockedFrame = frameNumber;
1228 1 : _impl->state = STATE_INITIALIZING;
1229 :
1230 1 : result = configInit( initID );
1231 :
1232 1 : if( result )
1233 0 : _impl->state = STATE_RUNNING;
1234 : }
1235 : else
1236 0 : sendError( ERROR_PIPE_NODE_NOTRUNNING );
1237 :
1238 1 : LBLOG( LOG_INIT ) << "TASK pipe config init reply result " << result
1239 1 : << std::endl;
1240 :
1241 1 : commit();
1242 : send( command.getRemoteNode(), fabric::CMD_PIPE_CONFIG_INIT_REPLY )
1243 1 : << result;
1244 1 : return true;
1245 : }
1246 :
1247 1 : bool Pipe::_cmdConfigExit( co::ICommand& cmd )
1248 : {
1249 1 : co::ObjectICommand command( cmd );
1250 :
1251 1 : LB_TS_THREAD( _pipeThread );
1252 1 : LBLOG( LOG_INIT ) << "TASK pipe config exit " << command << std::endl;
1253 :
1254 1 : _impl->state = STATE_STOPPING; // needed in View::detach (from _flushViews)
1255 :
1256 : // send before node gets a chance to send its destroy command
1257 1 : getNode()->send( getLocalNode(), fabric::CMD_NODE_DESTROY_PIPE ) << getID();
1258 :
1259 : // Flush views before exit since they are created after init
1260 : // - application may need initialized pipe to exit
1261 : // - configExit can't access views since all channels are gone already
1262 1 : _flushViews();
1263 1 : _flushQueues();
1264 1 : _impl->state = configExit() ? STATE_STOPPED : STATE_FAILED;
1265 1 : return true;
1266 : }
1267 :
1268 1 : bool Pipe::_cmdExitThread( co::ICommand& )
1269 : {
1270 1 : LBASSERT( _impl->thread );
1271 1 : _impl->thread->_pipe = 0;
1272 1 : return true;
1273 : }
1274 :
1275 0 : bool Pipe::_cmdExitTransferThread( co::ICommand& )
1276 : {
1277 0 : _impl->transferThread.postStop();
1278 0 : return true;
1279 : }
1280 :
1281 0 : bool Pipe::_cmdFrameStartClock( co::ICommand& )
1282 : {
1283 0 : LBVERB << "start frame clock" << std::endl;
1284 0 : _impl->frameTimeMutex.set();
1285 0 : _impl->frameTimes.push_back( getConfig()->getTime( ));
1286 0 : _impl->frameTimeMutex.unset();
1287 0 : return true;
1288 : }
1289 :
1290 0 : bool Pipe::_cmdFrameStart( co::ICommand& cmd )
1291 : {
1292 0 : LB_TS_THREAD( _pipeThread );
1293 :
1294 0 : co::ObjectICommand command( cmd );
1295 0 : const uint128_t& version = command.read< uint128_t >();
1296 0 : const uint128_t& frameID = command.read< uint128_t >();
1297 0 : const uint32_t frameNumber = command.read< uint32_t >();
1298 :
1299 0 : LBVERB << "handle pipe frame start " << command << " frame " << frameNumber
1300 0 : << " id " << frameID << std::endl;
1301 :
1302 0 : LBLOG( LOG_TASKS ) << "---- TASK start frame ---- frame " << frameNumber
1303 0 : << " id " << frameID << std::endl;
1304 0 : sync( version );
1305 0 : const int64_t lastFrameTime = _impl->frameTime;
1306 :
1307 0 : _impl->frameTimeMutex.set();
1308 0 : LBASSERT( !_impl->frameTimes.empty( ));
1309 :
1310 0 : _impl->frameTime = _impl->frameTimes.front();
1311 0 : _impl->frameTimes.pop_front();
1312 0 : _impl->frameTimeMutex.unset();
1313 :
1314 0 : if( lastFrameTime > 0 )
1315 : {
1316 0 : PipeStatistics waitEvent( Statistic::PIPE_IDLE, this );
1317 : waitEvent.event.data.statistic.idleTime =
1318 0 : _impl->thread ? _impl->thread->getWorkerQueue()->resetWaitTime() :0;
1319 : waitEvent.event.data.statistic.totalTime =
1320 0 : LB_MAX( _impl->frameTime - lastFrameTime, 1 ); // avoid SIGFPE
1321 : }
1322 :
1323 0 : LBASSERTINFO( _impl->currentFrame + 1 == frameNumber,
1324 : "current " <<_impl->currentFrame << " start " << frameNumber);
1325 :
1326 0 : frameStart( frameID, frameNumber );
1327 0 : return true;
1328 : }
1329 :
1330 0 : bool Pipe::_cmdFrameFinish( co::ICommand& cmd )
1331 : {
1332 0 : LB_TS_THREAD( _pipeThread );
1333 :
1334 0 : co::ObjectICommand command( cmd );
1335 0 : const uint128_t& frameID = command.read< uint128_t >();
1336 0 : const uint32_t frameNumber = command.read< uint32_t >();
1337 :
1338 0 : LBLOG( LOG_TASKS ) << "---- TASK finish frame --- " << command << " frame "
1339 0 : << frameNumber << " id " << frameID << std::endl;
1340 :
1341 0 : LBASSERTINFO( _impl->currentFrame >= frameNumber,
1342 : "current " <<_impl->currentFrame << " finish " <<frameNumber);
1343 :
1344 0 : frameFinish( frameID, frameNumber );
1345 :
1346 0 : LBASSERTINFO( _impl->finishedFrame >= frameNumber,
1347 : "Pipe::frameFinish() did not release frame " << frameNumber );
1348 :
1349 0 : if( _impl->unlockedFrame < frameNumber )
1350 : {
1351 0 : LBWARN << "Finished frame was not locally unlocked, enforcing unlock"
1352 0 : << std::endl << " unlocked " << _impl->unlockedFrame.get()
1353 0 : << " done " << frameNumber << std::endl;
1354 0 : releaseFrameLocal( frameNumber );
1355 : }
1356 :
1357 0 : if( _impl->finishedFrame < frameNumber )
1358 : {
1359 0 : LBWARN << "Finished frame was not released, enforcing unlock"
1360 0 : << std::endl;
1361 0 : releaseFrame( frameNumber );
1362 : }
1363 :
1364 0 : _releaseViews();
1365 :
1366 0 : const uint128_t version = commit();
1367 0 : if( version != co::VERSION_NONE )
1368 0 : send( command.getRemoteNode(), fabric::CMD_OBJECT_SYNC );
1369 0 : return true;
1370 : }
1371 :
1372 0 : bool Pipe::_cmdFrameDrawFinish( co::ICommand& cmd )
1373 : {
1374 0 : LB_TS_THREAD( _pipeThread );
1375 :
1376 0 : co::ObjectICommand command( cmd );
1377 0 : const uint128_t& frameID = command.read< uint128_t >();
1378 0 : const uint32_t frameNumber = command.read< uint32_t >();
1379 :
1380 0 : LBLOG( LOG_TASKS ) << "TASK draw finish " << getName()
1381 0 : << " frame " << frameNumber << " id " << frameID
1382 0 : << std::endl;
1383 :
1384 0 : frameDrawFinish( frameID, frameNumber );
1385 0 : return true;
1386 : }
1387 :
1388 0 : bool Pipe::_cmdDetachView( co::ICommand& cmd )
1389 : {
1390 0 : co::ObjectICommand command( cmd );
1391 :
1392 0 : LB_TS_THREAD( _pipeThread );
1393 :
1394 0 : ViewHash::iterator i = _impl->views.find( command.read< uint128_t >( ));
1395 0 : if( i != _impl->views.end( ))
1396 : {
1397 0 : View* view = i->second;
1398 0 : _impl->views.erase( i );
1399 :
1400 0 : NodeFactory* nodeFactory = Global::getNodeFactory();
1401 0 : nodeFactory->releaseView( view );
1402 : }
1403 0 : return true;
1404 : }
1405 :
1406 : }
1407 :
1408 : #include <eq/fabric/pipe.ipp>
1409 : template class eq::fabric::Pipe< eq::Node, eq::Pipe, eq::Window,
1410 : eq::PipeVisitor >;
1411 :
1412 : /** @cond IGNORE */
1413 : template EQFABRIC_API std::ostream& eq::fabric::operator << ( std::ostream&,
1414 42 : const eq::Super& );
1415 : /** @endcond */
|