Line data Source code
1 :
2 : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Daniel Nachbaur <danielnachbaur@gmail.com>
4 : * Cedric Stalder<cedric.stalder@gmail.com>
5 : *
6 : * This library is free software; you can redistribute it and/or modify it under
7 : * the terms of the GNU Lesser General Public License version 2.1 as published
8 : * by the Free Software Foundation.
9 : *
10 : * This library is distributed in the hope that it will be useful, but WITHOUT
11 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13 : * details.
14 : *
15 : * You should have received a copy of the GNU Lesser General Public License
16 : * along with this library; if not, write to the Free Software Foundation, Inc.,
17 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 : */
19 :
20 : #include "node.h"
21 :
22 : #include "client.h"
23 : #include "config.h"
24 : #include "error.h"
25 : #include "exception.h"
26 : #include "frameData.h"
27 : #include "global.h"
28 : #include "log.h"
29 : #include "nodeFactory.h"
30 : #include "nodeStatistics.h"
31 : #include "pipe.h"
32 : #include "server.h"
33 :
34 : #include <eq/fabric/commands.h>
35 : #include <eq/fabric/elementVisitor.h>
36 : #include <eq/fabric/frameData.h>
37 : #include <eq/fabric/task.h>
38 :
39 : #include <co/barrier.h>
40 : #include <co/connection.h>
41 : #include <co/global.h>
42 : #include <co/objectICommand.h>
43 : #include <lunchbox/scopedMutex.h>
44 :
45 : namespace eq
46 : {
47 : namespace
48 : {
49 : typedef stde::hash_map< uint128_t, co::Barrier* > BarrierHash;
50 : typedef stde::hash_map< uint128_t, FrameDataPtr > FrameDataHash;
51 : typedef FrameDataHash::const_iterator FrameDataHashCIter;
52 : typedef FrameDataHash::iterator FrameDataHashIter;
53 :
54 : enum State
55 : {
56 : STATE_STOPPED,
57 : STATE_INITIALIZING,
58 : STATE_INIT_FAILED,
59 : STATE_RUNNING,
60 : STATE_FAILED
61 : };
62 : }
63 :
64 : namespace detail
65 : {
66 : class TransmitThread : public lunchbox::Thread
67 : {
68 : public:
69 31 : TransmitThread()
70 31 : : _queue( co::Global::getCommandQueueLimit( ))
71 31 : {}
72 1 : virtual ~TransmitThread() {}
73 :
74 33 : co::CommandQueue& getQueue() { return _queue; }
75 :
76 : protected:
77 1 : bool init() override { setName( "Xmit" ); return true; }
78 : void run() override;
79 :
80 : private:
81 : co::CommandQueue _queue;
82 : };
83 :
84 1 : class Node
85 : {
86 : public:
87 31 : Node()
88 : : state( STATE_STOPPED )
89 : , finishedFrame( 0 )
90 31 : , unlockedFrame( 0 )
91 31 : {}
92 :
93 : /** The configInit/configExit state. */
94 : lunchbox::Monitor< State > state;
95 :
96 : /** The number of the last started frame. */
97 : lunchbox::Monitor< uint32_t > currentFrame;
98 :
99 : /** The number of the last finished frame. */
100 : uint32_t finishedFrame;
101 :
102 : /** The number of the last locally released frame. */
103 : uint32_t unlockedFrame;
104 :
105 : /** All barriers mapped by the node. */
106 : lunchbox::Lockable< BarrierHash > barriers;
107 :
108 : /** All frame datas used by the node during rendering. */
109 : lunchbox::Lockable< FrameDataHash > frameDatas;
110 :
111 : TransmitThread transmitter;
112 : };
113 :
114 : }
115 :
116 : /** @cond IGNORE */
117 : typedef co::CommandFunc<Node> NodeFunc;
118 : typedef fabric::Node< Config, Node, Pipe, NodeVisitor > Super;
119 : /** @endcond */
120 :
121 31 : Node::Node( Config* parent )
122 : : Super( parent )
123 31 : , _impl( new detail::Node )
124 : {
125 31 : }
126 :
127 3 : Node::~Node()
128 : {
129 1 : LBASSERT( getPipes().empty( ));
130 1 : delete _impl;
131 2 : }
132 :
133 1 : void Node::attach( const uint128_t& id, const uint32_t instanceID )
134 : {
135 1 : Super::attach( id, instanceID );
136 :
137 1 : co::CommandQueue* queue = getMainThreadQueue();
138 1 : co::CommandQueue* commandQ = getCommandThreadQueue();
139 1 : co::CommandQueue* transmitQ = getTransmitterQueue();
140 :
141 : registerCommand( fabric::CMD_NODE_CREATE_PIPE,
142 1 : NodeFunc( this, &Node::_cmdCreatePipe ), queue );
143 : registerCommand( fabric::CMD_NODE_DESTROY_PIPE,
144 1 : NodeFunc( this, &Node::_cmdDestroyPipe ), queue );
145 : registerCommand( fabric::CMD_NODE_CONFIG_INIT,
146 1 : NodeFunc( this, &Node::_cmdConfigInit ), queue );
147 : registerCommand( fabric::CMD_NODE_SET_AFFINITY,
148 1 : NodeFunc( this, &Node::_cmdSetAffinity), transmitQ );
149 : registerCommand( fabric::CMD_NODE_CONFIG_EXIT,
150 1 : NodeFunc( this, &Node::_cmdConfigExit ), queue );
151 : registerCommand( fabric::CMD_NODE_FRAME_START,
152 1 : NodeFunc( this, &Node::_cmdFrameStart ), queue );
153 : registerCommand( fabric::CMD_NODE_FRAME_FINISH,
154 1 : NodeFunc( this, &Node::_cmdFrameFinish ), queue );
155 : registerCommand( fabric::CMD_NODE_FRAME_DRAW_FINISH,
156 1 : NodeFunc( this, &Node::_cmdFrameDrawFinish ), queue );
157 : registerCommand( fabric::CMD_NODE_FRAME_TASKS_FINISH,
158 1 : NodeFunc( this, &Node::_cmdFrameTasksFinish ), queue );
159 : registerCommand( fabric::CMD_NODE_FRAMEDATA_TRANSMIT,
160 1 : NodeFunc( this, &Node::_cmdFrameDataTransmit ), commandQ );
161 : registerCommand( fabric::CMD_NODE_FRAMEDATA_READY,
162 1 : NodeFunc( this, &Node::_cmdFrameDataReady ), commandQ );
163 1 : }
164 :
165 1 : void Node::setDirty( const uint64_t bits )
166 : {
167 : // jump over fabric setDirty to avoid dirty'ing config node list
168 : // nodes are individually synced in frame finish
169 1 : Object::setDirty( bits );
170 1 : }
171 :
172 3 : ClientPtr Node::getClient()
173 : {
174 3 : Config* config = getConfig();
175 3 : LBASSERT( config );
176 3 : return ( config ? config->getClient() : 0 );
177 : }
178 :
179 5 : ServerPtr Node::getServer()
180 : {
181 5 : Config* config = getConfig();
182 5 : LBASSERT( config );
183 5 : return ( config ? config->getServer() : 0 );
184 : }
185 :
186 1 : co::CommandQueue* Node::getMainThreadQueue()
187 : {
188 1 : return getConfig()->getMainThreadQueue();
189 : }
190 :
191 1 : co::CommandQueue* Node::getCommandThreadQueue()
192 : {
193 1 : return getConfig()->getCommandThreadQueue();
194 : }
195 :
196 33 : co::CommandQueue* Node::getTransmitterQueue()
197 : {
198 33 : return &_impl->transmitter.getQueue();
199 : }
200 :
201 0 : uint32_t Node::getCurrentFrame() const
202 : {
203 0 : return _impl->currentFrame.get();
204 : }
205 :
206 0 : uint32_t Node::getFinishedFrame() const
207 : {
208 0 : return _impl->finishedFrame;
209 : }
210 :
211 0 : co::Barrier* Node::getBarrier( const co::ObjectVersion& barrier )
212 : {
213 0 : lunchbox::ScopedMutex<> mutex( _impl->barriers );
214 0 : co::Barrier* netBarrier = _impl->barriers.data[ barrier.identifier ];
215 :
216 0 : if( netBarrier )
217 0 : netBarrier->sync( barrier.version );
218 : else
219 : {
220 0 : ClientPtr client = getClient();
221 :
222 0 : netBarrier = new co::Barrier( client, barrier );
223 0 : if( !netBarrier->isGood( ))
224 : {
225 0 : LBCHECK( netBarrier->isGood( ));
226 0 : LBWARN << "Could not map swap barrier" << std::endl;
227 0 : delete netBarrier;
228 0 : return 0;
229 : }
230 0 : _impl->barriers.data[ barrier.identifier ] = netBarrier;
231 : }
232 :
233 0 : return netBarrier;
234 : }
235 :
236 0 : FrameDataPtr Node::getFrameData( const co::ObjectVersion& frameDataVersion )
237 : {
238 0 : lunchbox::ScopedWrite mutex( _impl->frameDatas );
239 0 : FrameDataPtr data = _impl->frameDatas.data[ frameDataVersion.identifier ];
240 :
241 0 : if( !data )
242 : {
243 0 : data = new FrameData;
244 0 : data->setID( frameDataVersion.identifier );
245 0 : _impl->frameDatas.data[ frameDataVersion.identifier ] = data;
246 : }
247 :
248 0 : LBASSERT( frameDataVersion.version.high() == 0 );
249 0 : data->setVersion( frameDataVersion.version.low( ));
250 0 : return data;
251 : }
252 :
253 0 : void Node::releaseFrameData( FrameDataPtr data )
254 : {
255 0 : lunchbox::ScopedWrite mutex( _impl->frameDatas );
256 0 : FrameDataHashIter i = _impl->frameDatas->find( data->getID( ));
257 0 : LBASSERT( i != _impl->frameDatas->end( ));
258 0 : if( i == _impl->frameDatas->end( ))
259 0 : return;
260 :
261 0 : _impl->frameDatas->erase( i );
262 : }
263 :
264 1 : void Node::waitInitialized() const
265 : {
266 1 : _impl->state.waitGE( STATE_INIT_FAILED );
267 1 : }
268 :
269 1 : bool Node::isRunning() const
270 : {
271 1 : return _impl->state == STATE_RUNNING;
272 : }
273 :
274 1 : bool Node::isStopped() const
275 : {
276 1 : return _impl->state == STATE_STOPPED;
277 : }
278 :
279 1 : bool Node::configInit( const uint128_t& )
280 : {
281 1 : WindowSystem::configInit( this );
282 1 : return true;
283 : }
284 :
285 1 : bool Node::configExit()
286 : {
287 1 : WindowSystem::configExit( this );
288 1 : return true;
289 : }
290 :
291 1 : void Node::_setAffinity()
292 : {
293 1 : const int32_t affinity = getIAttribute( IATTR_HINT_AFFINITY );
294 1 : switch( affinity )
295 : {
296 : case OFF:
297 0 : break;
298 :
299 : case AUTO:
300 : // TODO
301 1 : LBVERB << "No automatic thread placement for node threads "
302 1 : << std::endl;
303 1 : break;
304 :
305 : default:
306 0 : co::LocalNodePtr node = getLocalNode();
307 0 : send( node, fabric::CMD_NODE_SET_AFFINITY ) << affinity;
308 :
309 0 : node->setAffinity( affinity );
310 0 : break;
311 : }
312 1 : }
313 :
314 0 : void Node::waitFrameStarted( const uint32_t frameNumber ) const
315 : {
316 0 : _impl->currentFrame.waitGE( frameNumber );
317 0 : }
318 :
319 0 : void Node::startFrame( const uint32_t frameNumber )
320 : {
321 0 : _impl->currentFrame = frameNumber;
322 0 : }
323 :
324 0 : void Node::frameFinish( const uint128_t&, const uint32_t frameNumber )
325 : {
326 0 : releaseFrame( frameNumber );
327 0 : }
328 :
329 0 : void Node::_finishFrame( const uint32_t frameNumber ) const
330 : {
331 0 : const Pipes& pipes = getPipes();
332 0 : for( Pipes::const_iterator i = pipes.begin(); i != pipes.end(); ++i )
333 : {
334 0 : Pipe* pipe = *i;
335 0 : LBASSERT( pipe->isThreaded() || pipe->getFinishedFrame()>=frameNumber );
336 :
337 0 : pipe->waitFrameLocal( frameNumber );
338 0 : pipe->waitFrameFinished( frameNumber );
339 : }
340 0 : }
341 :
342 0 : void Node::_frameFinish( const uint128_t& frameID,
343 : const uint32_t frameNumber )
344 : {
345 0 : frameFinish( frameID, frameNumber );
346 0 : LBLOG( LOG_TASKS ) << "---- Finished Frame --- " << frameNumber
347 0 : << std::endl;
348 :
349 0 : if( _impl->unlockedFrame < frameNumber )
350 : {
351 0 : LBWARN << "Finished frame was not locally unlocked, enforcing unlock"
352 0 : << std::endl;
353 0 : releaseFrameLocal( frameNumber );
354 : }
355 :
356 0 : if( _impl->finishedFrame < frameNumber )
357 : {
358 0 : LBWARN << "Finished frame was not released, enforcing unlock"
359 0 : << std::endl;
360 0 : releaseFrame( frameNumber );
361 : }
362 0 : }
363 :
364 0 : void Node::releaseFrame( const uint32_t frameNumber )
365 : {
366 0 : LBASSERTINFO( _impl->currentFrame >= frameNumber,
367 : "current " << _impl->currentFrame << " release " <<
368 : frameNumber );
369 :
370 0 : if( _impl->finishedFrame >= frameNumber )
371 0 : return;
372 0 : _impl->finishedFrame = frameNumber;
373 :
374 0 : Config* config = getConfig();
375 0 : ServerPtr server = config->getServer();
376 0 : co::NodePtr node = server.get();
377 0 : send( node, fabric::CMD_NODE_FRAME_FINISH_REPLY ) << frameNumber;
378 : }
379 :
380 0 : void Node::releaseFrameLocal( const uint32_t frameNumber )
381 : {
382 0 : LBASSERT( _impl->unlockedFrame <= frameNumber );
383 0 : _impl->unlockedFrame = frameNumber;
384 :
385 0 : Config* config = getConfig();
386 0 : LBASSERT( config->getNodes().size() == 1 );
387 0 : LBASSERT( config->getNodes()[0] == this );
388 0 : config->releaseFrameLocal( frameNumber );
389 :
390 0 : LBLOG( LOG_TASKS ) << "---- Unlocked Frame --- " << _impl->unlockedFrame
391 0 : << std::endl;
392 0 : }
393 :
394 0 : void Node::frameStart( const uint128_t&, const uint32_t frameNumber )
395 : {
396 0 : startFrame( frameNumber ); // unlock pipe threads
397 :
398 0 : switch( getIAttribute( IATTR_THREAD_MODEL ))
399 : {
400 : case ASYNC:
401 : // Don't wait for pipes to release frame locally, sync not needed
402 0 : releaseFrameLocal( frameNumber );
403 0 : break;
404 :
405 : case DRAW_SYNC: // Sync and release in frameDrawFinish
406 : case LOCAL_SYNC: // Sync and release in frameTasksFinish
407 0 : break;
408 :
409 : default:
410 0 : LBUNIMPLEMENTED;
411 : }
412 0 : }
413 :
414 0 : void Node::frameDrawFinish( const uint128_t&, const uint32_t frameNumber )
415 : {
416 0 : switch( getIAttribute( IATTR_THREAD_MODEL ))
417 : {
418 : case ASYNC: // No sync, release in frameStart
419 : case LOCAL_SYNC: // Sync and release in frameTasksFinish
420 0 : break;
421 :
422 : case DRAW_SYNC:
423 : {
424 0 : const Pipes& pipes = getPipes();
425 0 : for( Pipes::const_iterator i = pipes.begin();
426 0 : i != pipes.end(); ++i )
427 : {
428 0 : const Pipe* pipe = *i;
429 0 : if( pipe->getTasks() & fabric::TASK_DRAW )
430 0 : pipe->waitFrameLocal( frameNumber );
431 : }
432 :
433 0 : releaseFrameLocal( frameNumber );
434 0 : break;
435 : }
436 : default:
437 0 : LBUNIMPLEMENTED;
438 : }
439 0 : }
440 :
441 0 : void Node::frameTasksFinish( const uint128_t&, const uint32_t frameNumber )
442 : {
443 0 : switch( getIAttribute( IATTR_THREAD_MODEL ))
444 : {
445 : case ASYNC: // No sync, release in frameStart
446 : case DRAW_SYNC: // Sync and release in frameDrawFinish
447 0 : break;
448 :
449 : case LOCAL_SYNC:
450 : {
451 0 : const Pipes& pipes = getPipes();
452 0 : for( Pipes::const_iterator i = pipes.begin(); i != pipes.end(); ++i)
453 : {
454 0 : const Pipe* pipe = *i;
455 0 : if( pipe->getTasks() != fabric::TASK_NONE )
456 0 : pipe->waitFrameLocal( frameNumber );
457 : }
458 :
459 0 : releaseFrameLocal( frameNumber );
460 0 : break;
461 : }
462 : default:
463 0 : LBUNIMPLEMENTED;
464 : }
465 0 : }
466 :
467 :
468 0 : EventOCommand Node::sendError( const uint32_t error )
469 : {
470 0 : return getConfig()->sendError( Event::NODE_ERROR, Error( error, getID( )));
471 : }
472 :
473 0 : bool Node::processEvent( const Event& event )
474 : {
475 0 : ConfigEvent configEvent( event );
476 0 : getConfig()->sendEvent( configEvent );
477 0 : return true;
478 : }
479 :
480 1 : void Node::_flushObjects()
481 : {
482 1 : ClientPtr client = getClient();
483 : {
484 1 : lunchbox::ScopedMutex<> mutex( _impl->barriers );
485 3 : for( BarrierHash::const_iterator i = _impl->barriers->begin();
486 2 : i != _impl->barriers->end(); ++i )
487 : {
488 0 : delete i->second;
489 : }
490 1 : _impl->barriers->clear();
491 : }
492 :
493 2 : lunchbox::ScopedMutex<> mutex( _impl->frameDatas );
494 3 : for( FrameDataHashCIter i = _impl->frameDatas->begin();
495 2 : i != _impl->frameDatas->end(); ++i )
496 : {
497 0 : FrameDataPtr frameData = i->second;
498 0 : frameData->resetPlugins();
499 0 : client->unmapObject( frameData.get( ));
500 0 : }
501 2 : _impl->frameDatas->clear();
502 1 : }
503 :
504 1 : void detail::TransmitThread::run()
505 : {
506 : while( true )
507 : {
508 1 : co::ICommand command = _queue.pop();
509 1 : if( !command.isValid( ))
510 2 : return; // exit thread
511 :
512 0 : LBCHECK( command( ));
513 0 : }
514 : }
515 :
516 30 : void Node::dirtyClientExit()
517 : {
518 30 : const Pipes& pipes = getPipes();
519 120 : for( PipesCIter i = pipes.begin(); i != pipes.end(); ++i )
520 : {
521 90 : Pipe* pipe = *i;
522 90 : pipe->cancelThread();
523 : }
524 30 : getTransmitterQueue()->push( co::ICommand( )); // wake up to exit
525 30 : _impl->transmitter.join();
526 30 : }
527 :
528 : //---------------------------------------------------------------------------
529 : // command handlers
530 : //---------------------------------------------------------------------------
531 1 : bool Node::_cmdCreatePipe( co::ICommand& cmd )
532 : {
533 1 : LB_TS_THREAD( _nodeThread );
534 1 : LBASSERT( _impl->state >= STATE_INIT_FAILED );
535 :
536 1 : co::ObjectICommand command( cmd );
537 1 : const uint128_t& pipeID = command.read< uint128_t >();
538 1 : const bool threaded = command.read< bool >();
539 :
540 1 : LBLOG( LOG_INIT ) << "Create pipe " << command << " id " << pipeID
541 1 : << std::endl;
542 :
543 1 : Pipe* pipe = Global::getNodeFactory()->createPipe( this );
544 1 : if( threaded )
545 1 : pipe->startThread();
546 :
547 1 : Config* config = getConfig();
548 1 : LBCHECK( config->mapObject( pipe, pipeID ));
549 1 : pipe->notifyMapped();
550 :
551 1 : return true;
552 : }
553 :
554 1 : bool Node::_cmdDestroyPipe( co::ICommand& cmd )
555 : {
556 1 : co::ObjectICommand command( cmd );
557 :
558 1 : LB_TS_THREAD( _nodeThread );
559 1 : LBLOG( LOG_INIT ) << "Destroy pipe " << command << std::endl;
560 :
561 1 : Pipe* pipe = findPipe( command.read< uint128_t >( ));
562 1 : LBASSERT( pipe );
563 1 : pipe->exitThread();
564 :
565 1 : const bool stopped = pipe->isStopped();
566 :
567 1 : Config* config = getConfig();
568 1 : config->unmapObject( pipe );
569 1 : pipe->send( getServer(), fabric::CMD_PIPE_CONFIG_EXIT_REPLY ) << stopped;
570 1 : Global::getNodeFactory()->releasePipe( pipe );
571 :
572 1 : return true;
573 : }
574 :
575 1 : bool Node::_cmdConfigInit( co::ICommand& cmd )
576 : {
577 1 : co::ObjectICommand command( cmd );
578 :
579 1 : LB_TS_THREAD( _nodeThread );
580 1 : LBLOG( LOG_INIT ) << "Init node " << command << std::endl;
581 :
582 1 : _impl->state = STATE_INITIALIZING;
583 :
584 1 : const uint128_t& initID = command.read< uint128_t >();
585 1 : const uint32_t frameNumber = command.read< uint32_t >();
586 :
587 1 : _impl->currentFrame = frameNumber;
588 1 : _impl->unlockedFrame = frameNumber;
589 1 : _impl->finishedFrame = frameNumber;
590 1 : _setAffinity();
591 :
592 1 : _impl->transmitter.start();
593 1 : const uint64_t result = configInit( initID );
594 :
595 1 : if( getIAttribute( IATTR_THREAD_MODEL ) == eq::UNDEFINED )
596 1 : setIAttribute( IATTR_THREAD_MODEL, eq::DRAW_SYNC );
597 :
598 1 : _impl->state = result ? STATE_RUNNING : STATE_INIT_FAILED;
599 :
600 1 : commit();
601 : send( command.getRemoteNode(), fabric::CMD_NODE_CONFIG_INIT_REPLY )
602 1 : << result;
603 1 : return true;
604 : }
605 :
606 1 : bool Node::_cmdConfigExit( co::ICommand& cmd )
607 : {
608 1 : co::ObjectICommand command( cmd );
609 :
610 1 : LB_TS_THREAD( _nodeThread );
611 1 : LBLOG( LOG_INIT ) << "Node exit " << command << std::endl;
612 :
613 1 : const Pipes& pipes = getPipes();
614 1 : for( PipesCIter i = pipes.begin(); i != pipes.end(); ++i )
615 : {
616 0 : Pipe* pipe = *i;
617 0 : pipe->waitExited();
618 : }
619 :
620 1 : _impl->state = configExit() ? STATE_STOPPED : STATE_FAILED;
621 1 : getTransmitterQueue()->push( co::ICommand( )); // wake up to exit
622 1 : _impl->transmitter.join();
623 1 : _flushObjects();
624 :
625 1 : getConfig()->send( getLocalNode(),
626 2 : fabric::CMD_CONFIG_DESTROY_NODE ) << getID();
627 1 : return true;
628 : }
629 :
630 0 : bool Node::_cmdFrameStart( co::ICommand& cmd )
631 : {
632 0 : LB_TS_THREAD( _nodeThread );
633 :
634 0 : co::ObjectICommand command( cmd );
635 0 : const uint128_t& version = command.read< uint128_t >();
636 0 : const uint128_t& configVersion = command.read< uint128_t >();
637 0 : const uint128_t& frameID = command.read< uint128_t >();
638 0 : const uint32_t frameNumber = command.read< uint32_t >();
639 :
640 0 : LBVERB << "handle node frame start " << command << " frame " << frameNumber
641 0 : << " id " << frameID << std::endl;
642 :
643 0 : LBASSERT( _impl->currentFrame == frameNumber-1 );
644 :
645 0 : LBLOG( LOG_TASKS ) << "----- Begin Frame ----- " << frameNumber
646 0 : << std::endl;
647 :
648 0 : Config* config = getConfig();
649 :
650 0 : if( configVersion != co::VERSION_INVALID )
651 0 : config->sync( configVersion );
652 0 : sync( version );
653 :
654 0 : config->_frameStart();
655 0 : frameStart( frameID, frameNumber );
656 :
657 0 : LBASSERTINFO( _impl->currentFrame >= frameNumber,
658 : "Node::frameStart() did not start frame " << frameNumber );
659 0 : return true;
660 : }
661 :
662 0 : bool Node::_cmdFrameFinish( co::ICommand& cmd )
663 : {
664 0 : LB_TS_THREAD( _nodeThread );
665 :
666 0 : co::ObjectICommand command( cmd );
667 0 : const uint128_t& frameID = command.read< uint128_t >();
668 0 : const uint32_t frameNumber = command.read< uint32_t >();
669 :
670 0 : LBLOG( LOG_TASKS ) << "TASK frame finish " << getName() << " " << command
671 0 : << " frame " << frameNumber << " id " << frameID
672 0 : << std::endl;
673 :
674 0 : _finishFrame( frameNumber );
675 0 : _frameFinish( frameID, frameNumber );
676 :
677 0 : const uint128_t version = commit();
678 0 : if( version != co::VERSION_NONE )
679 0 : send( command.getNode(), fabric::CMD_OBJECT_SYNC );
680 0 : return true;
681 : }
682 :
683 0 : bool Node::_cmdFrameDrawFinish( co::ICommand& cmd )
684 : {
685 0 : co::ObjectICommand command( cmd );
686 0 : const uint128_t& frameID = command.read< uint128_t >();
687 0 : const uint32_t frameNumber = command.read< uint32_t >();
688 :
689 0 : LBLOG( LOG_TASKS ) << "TASK draw finish " << getName() << " " << command
690 0 : << " frame " << frameNumber << " id " << frameID
691 0 : << std::endl;
692 :
693 0 : frameDrawFinish( frameID, frameNumber );
694 0 : return true;
695 : }
696 :
697 0 : bool Node::_cmdFrameTasksFinish( co::ICommand& cmd )
698 : {
699 0 : co::ObjectICommand command( cmd );
700 0 : const uint128_t& frameID = command.read< uint128_t >();
701 0 : const uint32_t frameNumber = command.read< uint32_t >();
702 :
703 0 : LBLOG( LOG_TASKS ) << "TASK tasks finish " << getName() << " " << command
704 0 : << std::endl;
705 :
706 0 : frameTasksFinish( frameID, frameNumber );
707 0 : return true;
708 : }
709 :
710 0 : bool Node::_cmdFrameDataTransmit( co::ICommand& cmd )
711 : {
712 0 : co::ObjectICommand command( cmd );
713 :
714 : const co::ObjectVersion& frameDataVersion =
715 0 : command.read< co::ObjectVersion >();
716 0 : const PixelViewport& pvp = command.read< PixelViewport >();
717 0 : const Zoom& zoom = command.read< Zoom >();
718 0 : const RenderContext& context = command.read< RenderContext >();
719 0 : const uint32_t buffers = command.read< uint32_t >();
720 0 : const uint32_t frameNumber = command.read< uint32_t >();
721 0 : const bool useAlpha = command.read< bool >();
722 : const uint8_t* data = reinterpret_cast< const uint8_t* >(
723 0 : command.getRemainingBuffer( command.getRemainingBufferSize( )));
724 :
725 0 : LBLOG( LOG_ASSEMBLY )
726 0 : << "received image data for " << frameDataVersion << ", buffers "
727 0 : << buffers << " pvp " << pvp << std::endl;
728 :
729 0 : LBASSERT( pvp.isValid( ));
730 :
731 0 : FrameDataPtr frameData = getFrameData( frameDataVersion );
732 0 : LBASSERT( !frameData->isReady() );
733 :
734 : NodeStatistics event( Statistic::NODE_FRAME_DECOMPRESS, this,
735 0 : frameNumber );
736 :
737 : // Note on the const_cast: since the PixelData structure stores non-const
738 : // pointers, we have to go non-const at some point, even though we do not
739 : // modify the data.
740 0 : LBCHECK( frameData->addImage( frameDataVersion, pvp, zoom, context, buffers,
741 : useAlpha, const_cast< uint8_t* >( data )));
742 0 : return true;
743 : }
744 :
745 0 : bool Node::_cmdFrameDataReady( co::ICommand& cmd )
746 : {
747 0 : co::ObjectICommand command( cmd );
748 :
749 : const co::ObjectVersion& frameDataVersion =
750 0 : command.read< co::ObjectVersion >();
751 0 : fabric::FrameData data;
752 0 : data.deserialize( command );
753 :
754 0 : LBLOG( LOG_ASSEMBLY ) << "received ready for " << frameDataVersion
755 0 : << std::endl;
756 0 : FrameDataPtr frameData = getFrameData( frameDataVersion );
757 0 : LBASSERT( frameData );
758 0 : LBASSERT( !frameData->isReady() );
759 0 : frameData->setReady( frameDataVersion, data );
760 0 : LBASSERT( frameData->isReady() );
761 0 : return true;
762 : }
763 :
764 0 : bool Node::_cmdSetAffinity( co::ICommand& cmd )
765 : {
766 0 : co::ObjectICommand command( cmd );
767 :
768 0 : lunchbox::Thread::setAffinity( command.read< int32_t >( ));
769 0 : return true;
770 : }
771 : }
772 :
773 : #include <eq/fabric/node.ipp>
774 : template class eq::fabric::Node< eq::Config, eq::Node, eq::Pipe,
775 : eq::NodeVisitor >;
776 :
777 : /** @cond IGNORE */
778 : template EQFABRIC_API std::ostream& eq::fabric::operator << ( std::ostream&,
779 42 : const eq::Super& );
780 : /** @endcond */
|