Line data Source code
1 :
2 : /* Copyright (c) 2005-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2012, Daniel Nachbaur <danielnachbaur@gmail.com>
4 : * 2010, Cedric Stalder<cedric.stalder@gmail.com>
5 : *
6 : * This library is free software; you can redistribute it and/or modify it under
7 : * the terms of the GNU Lesser General Public License version 2.1 as published
8 : * by the Free Software Foundation.
9 : *
10 : * This library is distributed in the hope that it will be useful, but WITHOUT
11 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13 : * details.
14 : *
15 : * You should have received a copy of the GNU Lesser General Public License
16 : * along with this library; if not, write to the Free Software Foundation, Inc.,
17 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 : */
19 :
20 : #include "node.h"
21 :
22 : #include "client.h"
23 : #include "config.h"
24 : #include "error.h"
25 : #include "exception.h"
26 : #include "frameData.h"
27 : #include "global.h"
28 : #include "log.h"
29 : #include "nodeFactory.h"
30 : #include "nodeStatistics.h"
31 : #include "pipe.h"
32 : #include "server.h"
33 :
34 : #include <eq/fabric/commands.h>
35 : #include <eq/fabric/elementVisitor.h>
36 : #include <eq/fabric/task.h>
37 :
38 : #include <co/barrier.h>
39 : #include <co/connection.h>
40 : #include <co/global.h>
41 : #include <co/objectICommand.h>
42 : #include <lunchbox/scopedMutex.h>
43 :
44 : namespace eq
45 : {
46 : namespace
47 : {
48 : typedef stde::hash_map< uint128_t, co::Barrier* > BarrierHash;
49 : typedef stde::hash_map< uint128_t, FrameDataPtr > FrameDataHash;
50 : typedef FrameDataHash::const_iterator FrameDataHashCIter;
51 : typedef FrameDataHash::iterator FrameDataHashIter;
52 :
53 : enum State
54 : {
55 : STATE_STOPPED,
56 : STATE_INITIALIZING,
57 : STATE_INIT_FAILED,
58 : STATE_RUNNING,
59 : STATE_FAILED
60 : };
61 : }
62 :
63 : namespace detail
64 : {
65 : class TransmitThread : public lunchbox::Thread
66 : {
67 : public:
68 38 : TransmitThread()
69 38 : : _queue( co::Global::getCommandQueueLimit( ))
70 38 : {}
71 8 : virtual ~TransmitThread() {}
72 :
73 61 : co::CommandQueue& getQueue() { return _queue; }
74 :
75 : protected:
76 8 : bool init() override { setName( "Xmit" ); return true; }
77 : void run() override;
78 :
79 : private:
80 : co::CommandQueue _queue;
81 : };
82 :
83 8 : class Node
84 : {
85 : public:
86 38 : Node()
87 : : state( STATE_STOPPED )
88 : , finishedFrame( 0 )
89 38 : , unlockedFrame( 0 )
90 38 : {}
91 :
92 : /** The configInit/configExit state. */
93 : lunchbox::Monitor< State > state;
94 :
95 : /** The number of the last started frame. */
96 : lunchbox::Monitor< uint32_t > currentFrame;
97 :
98 : /** The number of the last finished frame. */
99 : uint32_t finishedFrame;
100 :
101 : /** The number of the last locally released frame. */
102 : uint32_t unlockedFrame;
103 :
104 : /** All barriers mapped by the node. */
105 : lunchbox::Lockable< BarrierHash > barriers;
106 :
107 : /** All frame datas used by the node during rendering. */
108 : lunchbox::Lockable< FrameDataHash > frameDatas;
109 :
110 : TransmitThread transmitter;
111 : };
112 :
113 : }
114 :
115 : /** @cond IGNORE */
116 : typedef co::CommandFunc<Node> NodeFunc;
117 : typedef fabric::Node< Config, Node, Pipe, NodeVisitor > Super;
118 : /** @endcond */
119 :
120 38 : Node::Node( Config* parent )
121 : : Super( parent )
122 38 : , _impl( new detail::Node )
123 : {
124 38 : }
125 :
126 22 : Node::~Node()
127 : {
128 8 : LBASSERT( getPipes().empty( ));
129 8 : delete _impl;
130 14 : }
131 :
132 8 : void Node::attach( const uint128_t& id, const uint32_t instanceID )
133 : {
134 8 : Super::attach( id, instanceID );
135 :
136 8 : co::CommandQueue* queue = getMainThreadQueue();
137 8 : co::CommandQueue* commandQ = getCommandThreadQueue();
138 8 : co::CommandQueue* transmitQ = getTransmitterQueue();
139 :
140 : registerCommand( fabric::CMD_NODE_CREATE_PIPE,
141 8 : NodeFunc( this, &Node::_cmdCreatePipe ), queue );
142 : registerCommand( fabric::CMD_NODE_DESTROY_PIPE,
143 8 : NodeFunc( this, &Node::_cmdDestroyPipe ), queue );
144 : registerCommand( fabric::CMD_NODE_CONFIG_INIT,
145 8 : NodeFunc( this, &Node::_cmdConfigInit ), queue );
146 : registerCommand( fabric::CMD_NODE_SET_AFFINITY,
147 8 : NodeFunc( this, &Node::_cmdSetAffinity), transmitQ );
148 : registerCommand( fabric::CMD_NODE_CONFIG_EXIT,
149 8 : NodeFunc( this, &Node::_cmdConfigExit ), queue );
150 : registerCommand( fabric::CMD_NODE_FRAME_START,
151 8 : NodeFunc( this, &Node::_cmdFrameStart ), queue );
152 : registerCommand( fabric::CMD_NODE_FRAME_FINISH,
153 8 : NodeFunc( this, &Node::_cmdFrameFinish ), queue );
154 : registerCommand( fabric::CMD_NODE_FRAME_DRAW_FINISH,
155 8 : NodeFunc( this, &Node::_cmdFrameDrawFinish ), queue );
156 : registerCommand( fabric::CMD_NODE_FRAME_TASKS_FINISH,
157 8 : NodeFunc( this, &Node::_cmdFrameTasksFinish ), queue );
158 : registerCommand( fabric::CMD_NODE_FRAMEDATA_TRANSMIT,
159 8 : NodeFunc( this, &Node::_cmdFrameDataTransmit ), commandQ );
160 : registerCommand( fabric::CMD_NODE_FRAMEDATA_READY,
161 8 : NodeFunc( this, &Node::_cmdFrameDataReady ), commandQ );
162 8 : }
163 :
164 8 : void Node::setDirty( const uint64_t bits )
165 : {
166 : // jump over fabric setDirty to avoid dirty'ing config node list
167 : // nodes are individually synced in frame finish
168 8 : Object::setDirty( bits );
169 8 : }
170 :
171 60 : ClientPtr Node::getClient()
172 : {
173 60 : Config* config = getConfig();
174 60 : LBASSERT( config );
175 60 : return ( config ? config->getClient() : 0 );
176 : }
177 :
178 87 : ServerPtr Node::getServer()
179 : {
180 87 : Config* config = getConfig();
181 87 : LBASSERT( config );
182 87 : return ( config ? config->getServer() : 0 );
183 : }
184 :
185 8 : co::CommandQueue* Node::getMainThreadQueue()
186 : {
187 8 : return getConfig()->getMainThreadQueue();
188 : }
189 :
190 8 : co::CommandQueue* Node::getCommandThreadQueue()
191 : {
192 8 : return getConfig()->getCommandThreadQueue();
193 : }
194 :
195 61 : co::CommandQueue* Node::getTransmitterQueue()
196 : {
197 61 : return &_impl->transmitter.getQueue();
198 : }
199 :
200 0 : uint32_t Node::getCurrentFrame() const
201 : {
202 0 : return _impl->currentFrame.get();
203 : }
204 :
205 6 : uint32_t Node::getFinishedFrame() const
206 : {
207 6 : return _impl->finishedFrame;
208 : }
209 :
210 0 : co::Barrier* Node::getBarrier( const co::ObjectVersion& barrier )
211 : {
212 0 : lunchbox::ScopedMutex<> mutex( _impl->barriers );
213 0 : co::Barrier* netBarrier = _impl->barriers.data[ barrier.identifier ];
214 :
215 0 : if( netBarrier )
216 0 : netBarrier->sync( barrier.version );
217 : else
218 : {
219 0 : ClientPtr client = getClient();
220 :
221 0 : netBarrier = new co::Barrier( client, barrier );
222 0 : if( !netBarrier->isGood( ))
223 : {
224 0 : LBCHECK( netBarrier->isGood( ));
225 0 : LBWARN << "Could not map swap barrier" << std::endl;
226 0 : delete netBarrier;
227 0 : return 0;
228 : }
229 0 : _impl->barriers.data[ barrier.identifier ] = netBarrier;
230 : }
231 :
232 0 : return netBarrier;
233 : }
234 :
235 10 : FrameDataPtr Node::getFrameData( const co::ObjectVersion& frameDataVersion )
236 : {
237 10 : lunchbox::ScopedWrite mutex( _impl->frameDatas );
238 10 : FrameDataPtr data = _impl->frameDatas.data[ frameDataVersion.identifier ];
239 :
240 10 : if( !data )
241 : {
242 2 : data = new FrameData;
243 2 : data->setID( frameDataVersion.identifier );
244 2 : _impl->frameDatas.data[ frameDataVersion.identifier ] = data;
245 : }
246 :
247 10 : LBASSERT( frameDataVersion.version.high() == 0 );
248 10 : data->setVersion( frameDataVersion.version.low( ));
249 10 : return data;
250 : }
251 :
252 2 : void Node::releaseFrameData( FrameDataPtr data )
253 : {
254 2 : lunchbox::ScopedWrite mutex( _impl->frameDatas );
255 2 : FrameDataHashIter i = _impl->frameDatas->find( data->getID( ));
256 2 : LBASSERT( i != _impl->frameDatas->end( ));
257 2 : if( i == _impl->frameDatas->end( ))
258 2 : return;
259 :
260 2 : _impl->frameDatas->erase( i );
261 : }
262 :
263 15 : void Node::waitInitialized() const
264 : {
265 15 : _impl->state.waitGE( STATE_INIT_FAILED );
266 15 : }
267 :
268 15 : bool Node::isRunning() const
269 : {
270 15 : return _impl->state == STATE_RUNNING;
271 : }
272 :
273 8 : bool Node::isStopped() const
274 : {
275 8 : return _impl->state == STATE_STOPPED;
276 : }
277 :
278 8 : bool Node::configInit( const uint128_t& )
279 : {
280 8 : WindowSystem::configInit( this );
281 8 : return true;
282 : }
283 :
284 8 : bool Node::configExit()
285 : {
286 8 : WindowSystem::configExit( this );
287 8 : return true;
288 : }
289 :
290 8 : void Node::_setAffinity()
291 : {
292 8 : const int32_t affinity = getIAttribute( IATTR_HINT_AFFINITY );
293 8 : switch( affinity )
294 : {
295 : case OFF:
296 0 : break;
297 :
298 : case AUTO:
299 : // TODO
300 8 : LBVERB << "No automatic thread placement for node threads "
301 8 : << std::endl;
302 8 : break;
303 :
304 : default:
305 0 : co::LocalNodePtr node = getLocalNode();
306 0 : send( node, fabric::CMD_NODE_SET_AFFINITY ) << affinity;
307 :
308 0 : node->setAffinity( affinity );
309 0 : break;
310 : }
311 8 : }
312 :
313 7 : void Node::waitFrameStarted( const uint32_t frameNumber ) const
314 : {
315 7 : _impl->currentFrame.waitGE( frameNumber );
316 7 : }
317 :
318 5 : void Node::startFrame( const uint32_t frameNumber )
319 : {
320 5 : _impl->currentFrame = frameNumber;
321 5 : }
322 :
323 5 : void Node::frameFinish( const uint128_t&, const uint32_t frameNumber )
324 : {
325 5 : releaseFrame( frameNumber );
326 5 : }
327 :
328 5 : void Node::_finishFrame( const uint32_t frameNumber ) const
329 : {
330 5 : const Pipes& pipes = getPipes();
331 12 : for( Pipes::const_iterator i = pipes.begin(); i != pipes.end(); ++i )
332 : {
333 7 : const Pipe* pipe = *i;
334 7 : LBASSERT( pipe->isThreaded() || pipe->getFinishedFrame()>=frameNumber );
335 :
336 7 : pipe->waitFrameLocal( frameNumber );
337 7 : pipe->waitFrameFinished( frameNumber );
338 : }
339 5 : }
340 :
341 5 : void Node::_frameFinish( const uint128_t& frameID,
342 : const uint32_t frameNumber )
343 : {
344 5 : frameFinish( frameID, frameNumber );
345 5 : LBLOG( LOG_TASKS ) << "---- Finished Frame --- " << frameNumber
346 5 : << std::endl;
347 :
348 5 : if( _impl->unlockedFrame < frameNumber )
349 : {
350 0 : LBWARN << "Finished frame was not locally unlocked, enforcing unlock"
351 0 : << std::endl;
352 0 : releaseFrameLocal( frameNumber );
353 : }
354 :
355 5 : if( _impl->finishedFrame < frameNumber )
356 : {
357 0 : LBWARN << "Finished frame was not released, enforcing unlock"
358 0 : << std::endl;
359 0 : releaseFrame( frameNumber );
360 : }
361 5 : }
362 :
363 5 : void Node::releaseFrame( const uint32_t frameNumber )
364 : {
365 5 : LBASSERTINFO( _impl->currentFrame >= frameNumber,
366 : "current " << _impl->currentFrame << " release " <<
367 : frameNumber );
368 :
369 5 : if( _impl->finishedFrame >= frameNumber )
370 5 : return;
371 5 : _impl->finishedFrame = frameNumber;
372 :
373 5 : Config* config = getConfig();
374 5 : ServerPtr server = config->getServer();
375 10 : co::NodePtr node = server.get();
376 10 : send( node, fabric::CMD_NODE_FRAME_FINISH_REPLY ) << frameNumber;
377 : }
378 :
379 5 : void Node::releaseFrameLocal( const uint32_t frameNumber )
380 : {
381 5 : LBASSERT( _impl->unlockedFrame <= frameNumber );
382 5 : _impl->unlockedFrame = frameNumber;
383 :
384 5 : Config* config = getConfig();
385 5 : LBASSERT( config->getNodes().size() == 1 );
386 5 : LBASSERT( config->getNodes()[0] == this );
387 5 : config->releaseFrameLocal( frameNumber );
388 :
389 5 : LBLOG( LOG_TASKS ) << "---- Unlocked Frame --- " << _impl->unlockedFrame
390 5 : << std::endl;
391 5 : }
392 :
393 5 : void Node::frameStart( const uint128_t&, const uint32_t frameNumber )
394 : {
395 5 : startFrame( frameNumber ); // unlock pipe threads
396 :
397 5 : switch( getIAttribute( IATTR_THREAD_MODEL ))
398 : {
399 : case ASYNC:
400 : // Don't wait for pipes to release frame locally, sync not needed
401 0 : releaseFrameLocal( frameNumber );
402 0 : break;
403 :
404 : case DRAW_SYNC: // Sync and release in frameDrawFinish
405 : case LOCAL_SYNC: // Sync and release in frameTasksFinish
406 5 : break;
407 :
408 : default:
409 0 : LBUNIMPLEMENTED;
410 : }
411 5 : }
412 :
413 5 : void Node::frameDrawFinish( const uint128_t&, const uint32_t frameNumber )
414 : {
415 5 : switch( getIAttribute( IATTR_THREAD_MODEL ))
416 : {
417 : case ASYNC: // No sync, release in frameStart
418 : case LOCAL_SYNC: // Sync and release in frameTasksFinish
419 0 : break;
420 :
421 : case DRAW_SYNC:
422 : {
423 5 : const Pipes& pipes = getPipes();
424 36 : for( Pipes::const_iterator i = pipes.begin();
425 24 : i != pipes.end(); ++i )
426 : {
427 7 : const Pipe* pipe = *i;
428 7 : if( pipe->getTasks() & fabric::TASK_DRAW )
429 7 : pipe->waitFrameLocal( frameNumber );
430 : }
431 :
432 5 : releaseFrameLocal( frameNumber );
433 5 : break;
434 : }
435 : default:
436 0 : LBUNIMPLEMENTED;
437 : }
438 5 : }
439 :
440 5 : void Node::frameTasksFinish( const uint128_t&, const uint32_t frameNumber )
441 : {
442 5 : switch( getIAttribute( IATTR_THREAD_MODEL ))
443 : {
444 : case ASYNC: // No sync, release in frameStart
445 : case DRAW_SYNC: // Sync and release in frameDrawFinish
446 5 : break;
447 :
448 : case LOCAL_SYNC:
449 : {
450 0 : const Pipes& pipes = getPipes();
451 0 : for( Pipes::const_iterator i = pipes.begin(); i != pipes.end(); ++i)
452 : {
453 0 : const Pipe* pipe = *i;
454 0 : if( pipe->getTasks() != fabric::TASK_NONE )
455 0 : pipe->waitFrameLocal( frameNumber );
456 : }
457 :
458 0 : releaseFrameLocal( frameNumber );
459 0 : break;
460 : }
461 : default:
462 0 : LBUNIMPLEMENTED;
463 : }
464 5 : }
465 :
466 :
467 0 : EventOCommand Node::sendError( const uint32_t error )
468 : {
469 0 : return getConfig()->sendError( Event::NODE_ERROR, getID(), error );
470 : }
471 :
472 0 : bool Node::processEvent( const Event& event )
473 : {
474 0 : ConfigEvent configEvent( event );
475 0 : getConfig()->sendEvent( configEvent );
476 0 : return true;
477 : }
478 :
479 8 : void Node::_flushObjects()
480 : {
481 8 : ClientPtr client = getClient();
482 : {
483 8 : lunchbox::ScopedMutex<> mutex( _impl->barriers );
484 24 : for( BarrierHash::const_iterator i = _impl->barriers->begin();
485 16 : i != _impl->barriers->end(); ++i )
486 : {
487 0 : delete i->second;
488 : }
489 8 : _impl->barriers->clear();
490 : }
491 :
492 16 : lunchbox::ScopedMutex<> mutex( _impl->frameDatas );
493 24 : for( FrameDataHashCIter i = _impl->frameDatas->begin();
494 16 : i != _impl->frameDatas->end(); ++i )
495 : {
496 0 : FrameDataPtr frameData = i->second;
497 0 : frameData->resetPlugins();
498 0 : client->unmapObject( frameData.get( ));
499 0 : }
500 16 : _impl->frameDatas->clear();
501 8 : }
502 :
503 10 : void detail::TransmitThread::run()
504 : {
505 : while( true )
506 : {
507 10 : co::ICommand command = _queue.pop();
508 10 : if( !command.isValid( ))
509 16 : return; // exit thread
510 :
511 2 : LBCHECK( command( ));
512 2 : }
513 : }
514 :
515 30 : void Node::dirtyClientExit()
516 : {
517 30 : const Pipes& pipes = getPipes();
518 120 : for( PipesCIter i = pipes.begin(); i != pipes.end(); ++i )
519 : {
520 90 : Pipe* pipe = *i;
521 90 : pipe->cancelThread();
522 : }
523 30 : getTransmitterQueue()->push( co::ICommand( )); // wake up to exit
524 30 : _impl->transmitter.join();
525 30 : }
526 :
527 : //---------------------------------------------------------------------------
528 : // command handlers
529 : //---------------------------------------------------------------------------
530 15 : bool Node::_cmdCreatePipe( co::ICommand& cmd )
531 : {
532 15 : LB_TS_THREAD( _nodeThread );
533 15 : LBASSERT( _impl->state >= STATE_INIT_FAILED );
534 :
535 15 : co::ObjectICommand command( cmd );
536 15 : const uint128_t& pipeID = command.read< uint128_t >();
537 15 : const bool threaded = command.read< bool >();
538 :
539 15 : LBLOG( LOG_INIT ) << "Create pipe " << command << " id " << pipeID
540 15 : << std::endl;
541 :
542 15 : Pipe* pipe = Global::getNodeFactory()->createPipe( this );
543 15 : if( threaded )
544 15 : pipe->startThread();
545 :
546 15 : Config* config = getConfig();
547 15 : LBCHECK( config->mapObject( pipe, pipeID ));
548 15 : pipe->notifyMapped();
549 :
550 15 : return true;
551 : }
552 :
553 15 : bool Node::_cmdDestroyPipe( co::ICommand& cmd )
554 : {
555 15 : co::ObjectICommand command( cmd );
556 :
557 15 : LB_TS_THREAD( _nodeThread );
558 15 : LBLOG( LOG_INIT ) << "Destroy pipe " << command << std::endl;
559 :
560 15 : Pipe* pipe = findPipe( command.read< uint128_t >( ));
561 15 : LBASSERT( pipe );
562 15 : pipe->exitThread();
563 :
564 15 : const bool stopped = pipe->isStopped();
565 :
566 15 : Config* config = getConfig();
567 15 : config->unmapObject( pipe );
568 15 : pipe->send( getServer(), fabric::CMD_PIPE_CONFIG_EXIT_REPLY ) << stopped;
569 15 : Global::getNodeFactory()->releasePipe( pipe );
570 :
571 15 : return true;
572 : }
573 :
574 8 : bool Node::_cmdConfigInit( co::ICommand& cmd )
575 : {
576 8 : co::ObjectICommand command( cmd );
577 :
578 8 : LB_TS_THREAD( _nodeThread );
579 8 : LBLOG( LOG_INIT ) << "Init node " << command << std::endl;
580 :
581 8 : _impl->state = STATE_INITIALIZING;
582 :
583 8 : const uint128_t& initID = command.read< uint128_t >();
584 8 : const uint32_t frameNumber = command.read< uint32_t >();
585 :
586 8 : _impl->currentFrame = frameNumber;
587 8 : _impl->unlockedFrame = frameNumber;
588 8 : _impl->finishedFrame = frameNumber;
589 8 : _setAffinity();
590 :
591 8 : _impl->transmitter.start();
592 8 : const uint64_t result = configInit( initID );
593 :
594 8 : if( getIAttribute( IATTR_THREAD_MODEL ) == eq::UNDEFINED )
595 8 : setIAttribute( IATTR_THREAD_MODEL, eq::DRAW_SYNC );
596 :
597 8 : _impl->state = result ? STATE_RUNNING : STATE_INIT_FAILED;
598 :
599 8 : commit();
600 : send( command.getRemoteNode(), fabric::CMD_NODE_CONFIG_INIT_REPLY )
601 8 : << result;
602 8 : return true;
603 : }
604 :
605 8 : bool Node::_cmdConfigExit( co::ICommand& cmd )
606 : {
607 8 : co::ObjectICommand command( cmd );
608 :
609 8 : LB_TS_THREAD( _nodeThread );
610 8 : LBLOG( LOG_INIT ) << "Node exit " << command << std::endl;
611 :
612 8 : const Pipes& pipes = getPipes();
613 23 : for( PipesCIter i = pipes.begin(); i != pipes.end(); ++i )
614 : {
615 15 : Pipe* pipe = *i;
616 15 : pipe->waitExited();
617 : }
618 :
619 8 : _impl->state = configExit() ? STATE_STOPPED : STATE_FAILED;
620 8 : getTransmitterQueue()->push( co::ICommand( )); // wake up to exit
621 8 : _impl->transmitter.join();
622 8 : _flushObjects();
623 :
624 8 : getConfig()->send( getLocalNode(),
625 16 : fabric::CMD_CONFIG_DESTROY_NODE ) << getID();
626 8 : return true;
627 : }
628 :
629 5 : bool Node::_cmdFrameStart( co::ICommand& cmd )
630 : {
631 5 : LB_TS_THREAD( _nodeThread );
632 :
633 5 : co::ObjectICommand command( cmd );
634 5 : const uint128_t& version = command.read< uint128_t >();
635 5 : const uint128_t& configVersion = command.read< uint128_t >();
636 5 : const uint128_t& frameID = command.read< uint128_t >();
637 5 : const uint32_t frameNumber = command.read< uint32_t >();
638 :
639 5 : LBVERB << "handle node frame start " << command << " frame " << frameNumber
640 5 : << " id " << frameID << std::endl;
641 :
642 5 : LBASSERT( _impl->currentFrame == frameNumber-1 );
643 :
644 5 : LBLOG( LOG_TASKS ) << "----- Begin Frame ----- " << frameNumber
645 5 : << std::endl;
646 :
647 5 : Config* config = getConfig();
648 :
649 5 : if( configVersion != co::VERSION_INVALID )
650 0 : config->sync( configVersion );
651 5 : sync( version );
652 :
653 5 : config->_frameStart();
654 5 : frameStart( frameID, frameNumber );
655 :
656 5 : LBASSERTINFO( _impl->currentFrame >= frameNumber,
657 : "Node::frameStart() did not start frame " << frameNumber );
658 5 : return true;
659 : }
660 :
661 5 : bool Node::_cmdFrameFinish( co::ICommand& cmd )
662 : {
663 5 : LB_TS_THREAD( _nodeThread );
664 :
665 5 : co::ObjectICommand command( cmd );
666 5 : const uint128_t& frameID = command.read< uint128_t >();
667 5 : const uint32_t frameNumber = command.read< uint32_t >();
668 :
669 5 : LBLOG( LOG_TASKS ) << "TASK frame finish " << getName() << " " << command
670 0 : << " frame " << frameNumber << " id " << frameID
671 5 : << std::endl;
672 :
673 5 : _finishFrame( frameNumber );
674 5 : _frameFinish( frameID, frameNumber );
675 :
676 5 : const uint128_t version = commit();
677 5 : if( version != co::VERSION_NONE )
678 0 : send( command.getNode(), fabric::CMD_OBJECT_SYNC );
679 5 : return true;
680 : }
681 :
682 5 : bool Node::_cmdFrameDrawFinish( co::ICommand& cmd )
683 : {
684 5 : co::ObjectICommand command( cmd );
685 5 : const uint128_t& frameID = command.read< uint128_t >();
686 5 : const uint32_t frameNumber = command.read< uint32_t >();
687 :
688 5 : LBLOG( LOG_TASKS ) << "TASK draw finish " << getName() << " " << command
689 0 : << " frame " << frameNumber << " id " << frameID
690 5 : << std::endl;
691 :
692 5 : frameDrawFinish( frameID, frameNumber );
693 5 : return true;
694 : }
695 :
696 5 : bool Node::_cmdFrameTasksFinish( co::ICommand& cmd )
697 : {
698 5 : co::ObjectICommand command( cmd );
699 5 : const uint128_t& frameID = command.read< uint128_t >();
700 5 : const uint32_t frameNumber = command.read< uint32_t >();
701 :
702 5 : LBLOG( LOG_TASKS ) << "TASK tasks finish " << getName() << " " << command
703 5 : << std::endl;
704 :
705 5 : frameTasksFinish( frameID, frameNumber );
706 5 : return true;
707 : }
708 :
709 0 : bool Node::_cmdFrameDataTransmit( co::ICommand& cmd )
710 : {
711 0 : co::ObjectICommand command( cmd );
712 :
713 : const co::ObjectVersion& frameDataVersion =
714 0 : command.read< co::ObjectVersion >();
715 0 : const PixelViewport& pvp = command.read< PixelViewport >();
716 0 : const Zoom& zoom = command.read< Zoom >();
717 0 : const uint32_t buffers = command.read< uint32_t >();
718 0 : const uint32_t frameNumber = command.read< uint32_t >();
719 0 : const bool useAlpha = command.read< bool >();
720 : const uint8_t* data = reinterpret_cast< const uint8_t* >(
721 0 : command.getRemainingBuffer( command.getRemainingBufferSize( )));
722 :
723 0 : LBLOG( LOG_ASSEMBLY )
724 0 : << "received image data for " << frameDataVersion << ", buffers "
725 0 : << buffers << " pvp " << pvp << std::endl;
726 :
727 0 : LBASSERT( pvp.isValid( ));
728 :
729 0 : FrameDataPtr frameData = getFrameData( frameDataVersion );
730 0 : LBASSERT( !frameData->isReady() );
731 :
732 : NodeStatistics event( Statistic::NODE_FRAME_DECOMPRESS, this,
733 0 : frameNumber );
734 :
735 : // Note on the const_cast: since the PixelData structure stores non-const
736 : // pointers, we have to go non-const at some point, even though we do not
737 : // modify the data.
738 0 : LBCHECK( frameData->addImage( frameDataVersion, pvp, zoom, buffers,
739 : useAlpha, const_cast< uint8_t* >( data )));
740 0 : return true;
741 : }
742 :
743 0 : bool Node::_cmdFrameDataReady( co::ICommand& cmd )
744 : {
745 0 : co::ObjectICommand command( cmd );
746 :
747 : const co::ObjectVersion& frameDataVersion =
748 0 : command.read< co::ObjectVersion >();
749 0 : const FrameData::Data& data = command.read< FrameData::Data >();
750 :
751 0 : LBLOG( LOG_ASSEMBLY ) << "received ready for " << frameDataVersion
752 0 : << std::endl;
753 0 : FrameDataPtr frameData = getFrameData( frameDataVersion );
754 0 : LBASSERT( frameData );
755 0 : LBASSERT( !frameData->isReady() );
756 0 : frameData->setReady( frameDataVersion, data );
757 0 : LBASSERT( frameData->isReady() );
758 0 : return true;
759 : }
760 :
761 0 : bool Node::_cmdSetAffinity( co::ICommand& cmd )
762 : {
763 0 : co::ObjectICommand command( cmd );
764 :
765 0 : lunchbox::Thread::setAffinity( command.read< int32_t >( ));
766 0 : return true;
767 : }
768 : }
769 :
770 : #include "../fabric/node.ipp"
771 : template class eq::fabric::Node< eq::Config, eq::Node, eq::Pipe,
772 : eq::NodeVisitor >;
773 :
774 : /** @cond IGNORE */
775 : template EQFABRIC_API std::ostream& eq::fabric::operator << ( std::ostream&,
776 36 : const eq::Super& );
777 : /** @endcond */
|