Line data Source code
1 :
2 : /* Copyright (c) 2005-2013, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2011, Daniel Nachbaur <danielnachbaur@gmail.com>
4 : * 2010, Cedric Stalder <cedric.stalder@gmail.com>
5 : *
6 : * This library is free software; you can redistribute it and/or modify it under
7 : * the terms of the GNU Lesser General Public License version 2.1 as published
8 : * by the Free Software Foundation.
9 : *
10 : * This library is distributed in the hope that it will be useful, but WITHOUT
11 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13 : * details.
14 : *
15 : * You should have received a copy of the GNU Lesser General Public License
16 : * along with this library; if not, write to the Free Software Foundation, Inc.,
17 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 : */
19 :
20 : #include "node.h"
21 :
22 : #include "channel.h"
23 : #include "config.h"
24 : #include "global.h"
25 : #include "log.h"
26 : #include "nodeFactory.h"
27 : #include "pipe.h"
28 : #include "server.h"
29 : #include "window.h"
30 :
31 : #include <eq/client/event.h>
32 : #include <eq/client/error.h>
33 :
34 : #include <eq/fabric/commands.h>
35 : #include <eq/fabric/elementVisitor.h>
36 : #include <eq/fabric/paths.h>
37 :
38 : #include <co/barrier.h>
39 : #include <co/global.h>
40 : #include <co/objectICommand.h>
41 :
42 : #include <lunchbox/clock.h>
43 : #include <lunchbox/launcher.h>
44 : #include <lunchbox/os.h>
45 : #include <lunchbox/sleep.h>
46 :
47 : namespace eq
48 : {
49 : namespace server
50 : {
51 : typedef fabric::Node< Config, Node, Pipe, NodeVisitor > Super;
52 : typedef co::CommandFunc<Node> NodeFunc;
53 : namespace
54 : {
55 : #define S_MAKE_ATTR_STRING( attr ) ( std::string("EQ_NODE_") + #attr )
56 18 : std::string _sAttributeStrings[] = {
57 : S_MAKE_ATTR_STRING( SATTR_LAUNCH_COMMAND )
58 9 : };
59 18 : std::string _cAttributeStrings[] = {
60 : S_MAKE_ATTR_STRING( CATTR_LAUNCH_COMMAND_QUOTE )
61 9 : };
62 : }
63 :
64 438 : Node::Node( Config* parent )
65 : : Super( parent )
66 : , _active( 0 )
67 : , _finishedFrame( 0 )
68 : , _flushedFrame( 0 )
69 : , _state( STATE_STOPPED )
70 438 : , _bufferedTasks( new co::BufferConnection )
71 876 : , _lastDrawPipe( 0 )
72 : {
73 438 : const Global* global = Global::instance();
74 876 : for( int i=0; i < Node::SATTR_LAST; ++i )
75 : {
76 438 : const SAttribute attr = static_cast< SAttribute >( i );
77 438 : setSAttribute( attr, global->getNodeSAttribute( attr ));
78 : }
79 876 : for( int i=0; i < Node::CATTR_LAST; ++i )
80 : {
81 438 : const CAttribute attr = static_cast< CAttribute >( i );
82 438 : setCAttribute( attr, global->getNodeCAttribute( attr ));
83 : }
84 1752 : for( int i = 0; i < IATTR_LAST; ++i )
85 : {
86 1314 : const IAttribute attr = static_cast< IAttribute >( i );
87 1314 : setIAttribute( attr, global->getNodeIAttribute( attr ));
88 : }
89 438 : }
90 :
91 874 : Node::~Node()
92 : {
93 874 : }
94 :
95 15 : void Node::attach( const uint128_t& id, const uint32_t instanceID )
96 : {
97 15 : Super::attach( id, instanceID );
98 :
99 15 : co::CommandQueue* cmdQ = getCommandThreadQueue();
100 : registerCommand( fabric::CMD_OBJECT_SYNC,
101 15 : NodeFunc( this, &Node::_cmdSync ), cmdQ );
102 : registerCommand( fabric::CMD_NODE_CONFIG_INIT_REPLY,
103 15 : NodeFunc( this, &Node::_cmdConfigInitReply ), cmdQ );
104 : registerCommand( fabric::CMD_NODE_CONFIG_EXIT_REPLY,
105 15 : NodeFunc( this, &Node::_cmdConfigExitReply ), cmdQ );
106 : registerCommand( fabric::CMD_NODE_FRAME_FINISH_REPLY,
107 15 : NodeFunc( this, &Node::_cmdFrameFinishReply ), cmdQ );
108 15 : }
109 :
110 147 : ServerPtr Node::getServer()
111 : {
112 147 : return getConfig() ? getConfig()->getServer() : 0;
113 : }
114 :
115 0 : ConstServerPtr Node::getServer() const
116 : {
117 0 : return getConfig() ? getConfig()->getServer() : 0;
118 : }
119 :
120 109 : co::CommandQueue* Node::getMainThreadQueue()
121 : {
122 109 : return getConfig()->getMainThreadQueue();
123 : }
124 :
125 147 : co::CommandQueue* Node::getCommandThreadQueue()
126 : {
127 147 : return getConfig()->getCommandThreadQueue();
128 : }
129 :
130 0 : Channel* Node::getChannel( const ChannelPath& path )
131 : {
132 0 : const Pipes& pipes = getPipes();
133 0 : LBASSERT( pipes.size() > path.pipeIndex );
134 :
135 0 : if( pipes.size() <= path.pipeIndex )
136 0 : return 0;
137 :
138 0 : return pipes[ path.pipeIndex ]->getChannel( path );
139 : }
140 :
141 348 : void Node::addTasks( const uint32_t tasks )
142 : {
143 348 : setTasks( getTasks() | tasks );
144 348 : }
145 :
146 22 : void Node::activate()
147 : {
148 22 : ++_active;
149 22 : LBLOG( LOG_VIEW ) << "activate: " << _active << std::endl;
150 22 : }
151 :
152 22 : void Node::deactivate()
153 : {
154 22 : LBASSERT( _active != 0 );
155 22 : --_active;
156 22 : LBLOG( LOG_VIEW ) << "deactivate: " << _active << std::endl;
157 22 : };
158 :
159 438 : void Node::setSAttribute( const SAttribute attr, const std::string& value )
160 : {
161 438 : _sAttributes[attr] = value;
162 438 : }
163 :
164 239 : const std::string& Node::getSAttribute( const SAttribute attr ) const
165 : {
166 239 : return _sAttributes[attr];
167 : }
168 :
169 356 : const std::string& Node::getSAttributeString( const SAttribute attr )
170 : {
171 356 : return _sAttributeStrings[attr];
172 : }
173 :
174 438 : void Node::setCAttribute( const CAttribute attr, const char value )
175 : {
176 438 : _cAttributes[attr] = value;
177 438 : }
178 :
179 239 : char Node::getCAttribute( const CAttribute attr ) const
180 : {
181 239 : return _cAttributes[attr];
182 : }
183 :
184 356 : const std::string& Node::getCAttributeString( const CAttribute attr )
185 : {
186 356 : return _cAttributeStrings[attr];
187 : }
188 :
189 : //===========================================================================
190 : // Operations
191 : //===========================================================================
192 :
193 : //---------------------------------------------------------------------------
194 : // launch and connect
195 : //---------------------------------------------------------------------------
196 :
197 : namespace
198 : {
199 0 : static co::NodePtr _createNetNode( Node* node )
200 : {
201 0 : co::NodePtr netNode = new co::Node;
202 : const co::ConnectionDescriptions& descriptions =
203 0 : node->getConnectionDescriptions();
204 0 : for( co::ConnectionDescriptionsCIter i = descriptions.begin();
205 0 : i != descriptions.end(); ++i )
206 : {
207 0 : netNode->addConnectionDescription( new co::ConnectionDescription( **i));
208 : }
209 :
210 0 : return netNode;
211 : }
212 : }
213 :
214 8 : bool Node::connect()
215 : {
216 8 : LBASSERT( isActive( ));
217 :
218 8 : if( _node.isValid( ))
219 8 : return _node->isConnected();
220 :
221 0 : if( !isStopped( ))
222 : {
223 0 : LBASSERT( _state == STATE_FAILED );
224 0 : return true;
225 : }
226 :
227 0 : co::LocalNodePtr localNode = getLocalNode();
228 0 : LBASSERT( localNode.isValid( ));
229 :
230 0 : if( !_node )
231 : {
232 0 : LBASSERT( !isApplicationNode( ));
233 0 : _node = _createNetNode( this );
234 : }
235 : else
236 : {
237 0 : LBASSERT( isApplicationNode( ));
238 : }
239 :
240 0 : LBLOG( LOG_INIT ) << "Connecting node" << std::endl;
241 0 : if( !localNode->connect( _node ) && !launch( ))
242 : {
243 0 : LBWARN << "Connection to " << _node->getNodeID() << " failed"
244 0 : << std::endl;
245 0 : _state = STATE_FAILED;
246 0 : _node = 0;
247 0 : return false;
248 : }
249 :
250 0 : return true;
251 : }
252 :
253 0 : bool Node::launch()
254 : {
255 0 : for( co::ConnectionDescriptionsCIter i = _connectionDescriptions.begin();
256 0 : _host.empty() && i != _connectionDescriptions.end(); ++i )
257 : {
258 0 : _host = (*i)->getHostname();
259 0 : LBWARN << "No host specified, guessing " << _host << " from " << *i
260 0 : << std::endl;
261 : }
262 :
263 0 : if( _launch( _host ))
264 0 : return true;
265 :
266 : // Equalizer 1.0 style: try hostnames from connection descriptions
267 0 : for( co::ConnectionDescriptionsCIter i = _connectionDescriptions.begin();
268 0 : i != _connectionDescriptions.end(); ++i )
269 : {
270 0 : const std::string& hostname = (*i)->getHostname();
271 0 : if( hostname != _host && _launch( hostname ))
272 0 : return true;
273 : }
274 :
275 0 : sendError( ERROR_NODE_LAUNCH ) << _host;
276 0 : return false;
277 : }
278 :
279 8 : bool Node::syncLaunch( const lunchbox::Clock& clock )
280 : {
281 8 : LBASSERT( isActive( ));
282 :
283 8 : if( !_node )
284 0 : return false;
285 :
286 8 : if( _node->isConnected( ))
287 8 : return true;
288 :
289 0 : LBASSERT( !isApplicationNode( ));
290 0 : co::LocalNodePtr localNode = getLocalNode();
291 0 : LBASSERT( localNode.isValid( ));
292 :
293 0 : const int32_t timeOut = getIAttribute( IATTR_LAUNCH_TIMEOUT );
294 :
295 : while( true )
296 : {
297 0 : co::NodePtr node = localNode->getNode( _node->getNodeID( ));
298 0 : if( node && node->isConnected( ))
299 : {
300 0 : LBASSERT( _node->getRefCount() == 1 );
301 0 : _node = node; // Use co::Node already connected
302 0 : return true;
303 : }
304 :
305 0 : lunchbox::sleep( 100 /*ms*/ );
306 0 : if( timeOut != static_cast<int32_t>(LB_TIMEOUT_INDEFINITE) &&
307 0 : clock.getTime64() > timeOut )
308 : {
309 0 : LBASSERT( _node->getRefCount() == 1 );
310 0 : _node = 0;
311 0 : std::ostringstream data;
312 :
313 0 : for( co::ConnectionDescriptions::const_iterator i =
314 0 : _connectionDescriptions.begin();
315 0 : i != _connectionDescriptions.end(); ++i )
316 : {
317 0 : co::ConnectionDescriptionPtr desc = *i;
318 0 : data << desc->getHostname() << ' ';
319 0 : }
320 :
321 0 : sendError( ERROR_NODE_CONNECT ) << _host;
322 0 : _state = STATE_FAILED;
323 0 : return false;
324 : }
325 0 : }
326 : }
327 :
328 0 : bool Node::_launch( const std::string& hostname ) const
329 : {
330 0 : const std::string& command = getSAttribute( SATTR_LAUNCH_COMMAND );
331 0 : const size_t commandLen = command.size();
332 :
333 0 : bool commandFound = false;
334 0 : size_t lastPos = 0;
335 0 : std::string cmd;
336 :
337 0 : for( size_t percentPos = command.find( '%' );
338 : percentPos != std::string::npos;
339 0 : percentPos = command.find( '%', percentPos+1 ))
340 : {
341 0 : std::ostringstream replacement;
342 0 : switch( command[percentPos+1] )
343 : {
344 : case 'c':
345 : {
346 0 : replacement << _createRemoteCommand();
347 0 : commandFound = true;
348 0 : break;
349 : }
350 : case 'h':
351 : {
352 0 : if( hostname.empty( ))
353 0 : replacement << "127.0.0.1";
354 : else
355 0 : replacement << hostname;
356 0 : break;
357 : }
358 : case 'n':
359 0 : replacement << _node->getNodeID();
360 0 : break;
361 :
362 : case 'd':
363 0 : replacement << getConfig()->getWorkDir();
364 0 : break;
365 :
366 : case 'q':
367 0 : replacement << getCAttribute( CATTR_LAUNCH_COMMAND_QUOTE );
368 0 : break;
369 :
370 : default:
371 0 : LBWARN << "Unknown token " << command[percentPos+1]
372 0 : << std::endl;
373 0 : replacement << '%' << command[percentPos+1];
374 : }
375 :
376 0 : cmd += command.substr( lastPos, percentPos-lastPos );
377 0 : if( !replacement.str().empty( ))
378 0 : cmd += replacement.str();
379 :
380 0 : lastPos = percentPos+2;
381 0 : }
382 :
383 0 : cmd += command.substr( lastPos, commandLen-lastPos );
384 :
385 0 : if( !commandFound )
386 0 : cmd += " " + _createRemoteCommand();
387 :
388 0 : LBVERB << "Launch command: " << cmd << std::endl;
389 0 : if( lunchbox::Launcher::run( cmd ))
390 0 : return true;
391 :
392 0 : LBWARN << "Could not launch node using '" << cmd << "'" << std::endl;
393 0 : return false;
394 : }
395 :
396 0 : std::string Node::_createRemoteCommand() const
397 : {
398 0 : const Config* config = getConfig();
399 0 : std::string program = config->getRenderClient();
400 0 : if( program.empty( ))
401 : {
402 0 : LBWARN << "No render client name, auto-launch will fail" << std::endl;
403 0 : return std::string();
404 : }
405 :
406 : //----- environment
407 0 : std::ostringstream stringStream;
408 0 : const char quote = getCAttribute( CATTR_LAUNCH_COMMAND_QUOTE );
409 :
410 : #ifndef WIN32
411 : # ifdef Darwin
412 : const char libPath[] = "DYLD_LIBRARY_PATH";
413 : # else
414 0 : const char libPath[] = "LD_LIBRARY_PATH";
415 : # endif
416 :
417 0 : stringStream << "env "; // XXX
418 0 : char* env = getenv( libPath );
419 0 : if( env )
420 0 : stringStream << libPath << "=" << env << " ";
421 :
422 0 : for( int i=0; environ[i] != 0; ++i )
423 : {
424 0 : if( strlen( environ[i] ) > 2 &&
425 0 : ( strncmp( environ[i], "LB_", 3 ) == 0 ||
426 0 : strncmp( environ[i], "CO_", 3 ) == 0 ||
427 0 : strncmp( environ[i], "EQ_", 3 ) == 0 ))
428 : {
429 0 : stringStream << quote << environ[i] << quote << " ";
430 : }
431 : }
432 :
433 0 : stringStream << "LB_LOG_LEVEL=" <<lunchbox::Log::getLogLevelString() << " ";
434 0 : if( lunchbox::Log::topics != 0 )
435 0 : stringStream << "LB_LOG_TOPICS=" <<lunchbox::Log::topics << " ";
436 : #endif // WIN32
437 :
438 : //----- program + args
439 0 : const std::string& workDir = config->getWorkDir();
440 : #ifdef WIN32
441 : LBASSERT( program.length() > 2 );
442 : if( !( program[1] == ':' && (program[2] == '/' || program[2] == '\\' )) &&
443 : // !( drive letter and full path present )
444 : !( program[0] == '/' || program[0] == '\\' ))
445 : // !full path without drive letter
446 : {
447 : program = workDir + '/' + program; // add workDir to relative path
448 : }
449 : #else
450 0 : if( program[0] != '/' )
451 0 : program = workDir + '/' + program;
452 : #endif
453 :
454 0 : const std::string ownData = getServer()->serialize();
455 0 : const std::string remoteData = _node->serialize();
456 0 : std::string collageGlobals;
457 0 : co::Global::toString( collageGlobals );
458 :
459 : stringStream
460 0 : << quote << program << quote << " -- --eq-client " << quote
461 0 : << remoteData << workDir << CO_SEPARATOR << ownData << quote
462 0 : << " --co-globals " << quote << collageGlobals << quote;
463 :
464 0 : return stringStream.str();
465 : }
466 :
467 : //---------------------------------------------------------------------------
468 : // init
469 : //---------------------------------------------------------------------------
470 8 : void Node::configInit( const uint128_t& initID, const uint32_t frameNumber )
471 : {
472 8 : LBASSERT( _state == STATE_STOPPED );
473 8 : _state = STATE_INITIALIZING;
474 :
475 8 : const Config* config = getConfig();
476 8 : _flushedFrame = config->getFinishedFrame();
477 8 : _finishedFrame = config->getFinishedFrame();
478 8 : _frameIDs.clear();
479 :
480 8 : LBLOG( LOG_INIT ) << "Create node" << std::endl;
481 8 : getConfig()->send( _node, fabric::CMD_CONFIG_CREATE_NODE ) << getID();
482 :
483 8 : LBLOG( LOG_INIT ) << "Init node" << std::endl;
484 8 : send( fabric::CMD_NODE_CONFIG_INIT ) << initID << frameNumber;
485 8 : }
486 :
487 8 : bool Node::syncConfigInit()
488 : {
489 8 : LBASSERT( _state == STATE_INITIALIZING || _state == STATE_INIT_SUCCESS ||
490 : _state == STATE_INIT_FAILED );
491 :
492 8 : _state.waitNE( STATE_INITIALIZING );
493 :
494 8 : if( _state == STATE_INIT_SUCCESS )
495 : {
496 8 : _state = STATE_RUNNING;
497 8 : return true;
498 : }
499 :
500 0 : LBWARN << "Node initialization failed" << std::endl;
501 0 : configExit();
502 0 : return false;
503 : }
504 :
505 : //---------------------------------------------------------------------------
506 : // exit
507 : //---------------------------------------------------------------------------
508 8 : void Node::configExit()
509 : {
510 8 : if( _state == STATE_EXITING )
511 8 : return;
512 :
513 8 : LBASSERT( _state == STATE_RUNNING || _state == STATE_INIT_FAILED );
514 8 : _state = STATE_EXITING;
515 :
516 8 : LBLOG( LOG_INIT ) << "Exit node" << std::endl;
517 8 : send( fabric::CMD_NODE_CONFIG_EXIT );
518 8 : flushSendBuffer();
519 : }
520 :
521 8 : bool Node::syncConfigExit()
522 : {
523 8 : LBASSERT( _state == STATE_EXITING || _state == STATE_EXIT_SUCCESS ||
524 : _state == STATE_EXIT_FAILED );
525 :
526 8 : _state.waitNE( STATE_EXITING );
527 8 : const bool success = ( _state == STATE_EXIT_SUCCESS );
528 8 : LBASSERT( success || _state == STATE_EXIT_FAILED );
529 :
530 8 : _state = isActive() ? STATE_FAILED : STATE_STOPPED;
531 8 : setTasks( fabric::TASK_NONE );
532 8 : _frameIDs.clear();
533 8 : _flushBarriers();
534 8 : return success;
535 : }
536 :
537 : //---------------------------------------------------------------------------
538 : // update
539 : //---------------------------------------------------------------------------
540 5 : void Node::update( const uint128_t& frameID, const uint32_t frameNumber )
541 : {
542 5 : if( !isRunning( ))
543 5 : return;
544 :
545 5 : LBVERB << "Start frame " << frameNumber << std::endl;
546 5 : LBASSERT( isActive( ));
547 :
548 5 : _frameIDs[ frameNumber ] = frameID;
549 :
550 5 : uint128_t configVersion = co::VERSION_INVALID;
551 5 : if( !isApplicationNode( )) // synced in Config::_cmdFrameStart
552 0 : configVersion = getConfig()->getVersion();
553 :
554 : send( fabric::CMD_NODE_FRAME_START )
555 5 : << getVersion() << configVersion << frameID << frameNumber;
556 5 : LBLOG( LOG_TASKS ) << "TASK node start frame " << std::endl;
557 :
558 5 : const Pipes& pipes = getPipes();
559 12 : for( Pipes::const_iterator i = pipes.begin(); i != pipes.end(); ++i )
560 7 : (*i)->update( frameID, frameNumber );
561 :
562 5 : if( !_lastDrawPipe ) // no FrameDrawFinish sent
563 : {
564 0 : send( fabric::CMD_NODE_FRAME_DRAW_FINISH ) << frameID << frameNumber;
565 0 : LBLOG( LOG_TASKS ) << "TASK node draw finish " << getName() << " "
566 0 : << std::endl;
567 : }
568 5 : _lastDrawPipe = 0;
569 :
570 5 : send( fabric::CMD_NODE_FRAME_TASKS_FINISH ) << frameID << frameNumber;
571 5 : LBLOG( LOG_TASKS ) << "TASK node tasks finish " << std::endl;
572 :
573 5 : _finish( frameNumber );
574 5 : flushSendBuffer();
575 : }
576 :
577 5 : uint32_t Node::_getFinishLatency() const
578 : {
579 5 : switch( getIAttribute( Node::IATTR_THREAD_MODEL ))
580 : {
581 : case fabric::UNDEFINED:
582 : case fabric::DRAW_SYNC:
583 5 : if( getTasks() & fabric::TASK_DRAW )
584 : {
585 : // More than one frame latency doesn't make sense, since the
586 : // draw sync for frame+1 does not allow for more
587 5 : const Config* config = getConfig();
588 5 : const uint32_t latency = config->getLatency();
589 :
590 5 : return LB_MIN( latency, 1 );
591 : }
592 0 : break;
593 :
594 : case fabric::LOCAL_SYNC:
595 0 : if( getTasks() != fabric::TASK_NONE )
596 : // local sync enforces no latency
597 0 : return 0;
598 0 : break;
599 :
600 : case fabric::ASYNC:
601 0 : break;
602 : default:
603 0 : LBUNIMPLEMENTED;
604 : }
605 :
606 0 : const Config* config = getConfig();
607 0 : return config->getLatency();
608 : }
609 :
610 5 : void Node::_finish( const uint32_t currentFrame )
611 : {
612 5 : const Pipes& pipes = getPipes();
613 5 : for( Pipes::const_iterator i = pipes.begin(); i != pipes.end(); ++i )
614 : {
615 5 : const Pipe* pipe = *i;
616 5 : if( pipe->getIAttribute( Pipe::IATTR_HINT_THREAD ) && pipe->isRunning())
617 : {
618 5 : const uint32_t latency = _getFinishLatency();
619 5 : if( currentFrame > latency )
620 2 : flushFrames( currentFrame - latency );
621 10 : return;
622 : }
623 : }
624 :
625 : // else only non-threaded pipes, all local tasks are done, send finish now.
626 0 : flushFrames( currentFrame );
627 : }
628 :
629 5 : void Node::flushFrames( const uint32_t frameNumber )
630 : {
631 5 : LBLOG( LOG_TASKS ) << "Flush frames including " << frameNumber << std::endl;
632 :
633 15 : while( _flushedFrame < frameNumber )
634 : {
635 5 : ++_flushedFrame;
636 5 : _sendFrameFinish( _flushedFrame );
637 : }
638 :
639 5 : flushSendBuffer();
640 5 : }
641 :
642 5 : void Node::_sendFrameFinish( const uint32_t frameNumber )
643 : {
644 5 : FrameIDHash::iterator i = _frameIDs.find( frameNumber );
645 5 : if( i == _frameIDs.end( ))
646 5 : return; // finish already send
647 :
648 5 : send( fabric::CMD_NODE_FRAME_FINISH ) << i->second << frameNumber;
649 5 : _frameIDs.erase( i );
650 5 : LBLOG( LOG_TASKS ) << "TASK node finish frame " << frameNumber << std::endl;
651 : }
652 :
653 : //---------------------------------------------------------------------------
654 : // Barrier cache
655 : //---------------------------------------------------------------------------
656 0 : co::Barrier* Node::getBarrier()
657 : {
658 0 : if( _barriers.empty( ))
659 : {
660 : co::Barrier* barrier = new co::Barrier( getServer(),
661 0 : _node->getNodeID( ));
662 0 : barrier->setAutoObsolete( getConfig()->getLatency() + 1 );
663 0 : return barrier;
664 : }
665 : // else
666 :
667 0 : co::Barrier* barrier = _barriers.back();
668 0 : _barriers.pop_back();
669 0 : barrier->setHeight( 0 );
670 0 : return barrier;
671 : }
672 :
673 0 : void Node::changeLatency( const uint32_t latency )
674 : {
675 0 : for( co::Barriers::const_iterator i = _barriers.begin();
676 0 : i != _barriers.end(); ++ i )
677 : {
678 0 : co::Barrier* barrier = *i;
679 0 : barrier->setAutoObsolete( latency + 1 );
680 : }
681 0 : }
682 :
683 0 : void Node::releaseBarrier( co::Barrier* barrier )
684 : {
685 0 : _barriers.push_back( barrier );
686 0 : }
687 :
688 8 : void Node::_flushBarriers()
689 : {
690 8 : for( co::BarriersCIter i =_barriers.begin(); i != _barriers.end(); ++i )
691 0 : delete *i;
692 8 : _barriers.clear();
693 8 : }
694 :
695 0 : bool Node::removeConnectionDescription( co::ConnectionDescriptionPtr cd )
696 : {
697 : // Don't use std::find, RefPtr::operator== compares pointers, not values.
698 0 : for(co::ConnectionDescriptions::iterator i=_connectionDescriptions.begin();
699 0 : i != _connectionDescriptions.end(); ++i )
700 : {
701 0 : if( *cd != **i )
702 0 : continue;
703 :
704 0 : _connectionDescriptions.erase( i );
705 0 : return true;
706 : }
707 0 : return false;
708 : }
709 :
710 31 : co::ObjectOCommand Node::send( const uint32_t cmd )
711 : {
712 31 : return send( cmd, getID( ));
713 : }
714 :
715 276 : co::ObjectOCommand Node::send( const uint32_t cmd, const uint128_t& id )
716 : {
717 : return co::ObjectOCommand( co::Connections( 1, _bufferedTasks ), cmd,
718 276 : co::COMMANDTYPE_OBJECT, id, CO_INSTANCE_ALL );
719 : }
720 :
721 0 : EventOCommand Node::sendError( const uint32_t error )
722 : {
723 0 : return getConfig()->sendError( Event::NODE_ERROR, getID(), error );
724 : }
725 :
726 50 : void Node::flushSendBuffer()
727 : {
728 50 : _bufferedTasks->sendBuffer( _node->getConnection( ));
729 50 : }
730 :
731 : //===========================================================================
732 : // command handling
733 : //===========================================================================
734 8 : bool Node::_cmdConfigInitReply( co::ICommand& cmd )
735 : {
736 8 : co::ObjectICommand command( cmd );
737 8 : LBVERB << "handle configInit reply " << command << std::endl;
738 8 : LBASSERT( _state == STATE_INITIALIZING );
739 8 : _state = command.read< uint64_t >() ? STATE_INIT_SUCCESS : STATE_INIT_FAILED;
740 :
741 8 : return true;
742 : }
743 :
744 8 : bool Node::_cmdConfigExitReply( co::ICommand& cmd )
745 : {
746 8 : co::ObjectICommand command( cmd );
747 8 : LBVERB << "handle configExit reply " << command << std::endl;
748 8 : LBASSERT( _state == STATE_EXITING );
749 :
750 8 : _state = command.read< bool >() ? STATE_EXIT_SUCCESS : STATE_EXIT_FAILED;
751 8 : return true;
752 : }
753 :
754 5 : bool Node::_cmdFrameFinishReply( co::ICommand& cmd )
755 : {
756 5 : co::ObjectICommand command( cmd );
757 5 : LBVERB << "handle frame finish reply " << command << std::endl;
758 :
759 5 : const uint32_t frameNumber = command.read< uint32_t >();
760 :
761 5 : _finishedFrame = frameNumber;
762 5 : getConfig()->notifyNodeFrameFinished( frameNumber );
763 :
764 5 : return true;
765 : }
766 :
767 239 : void Node::output( std::ostream& os ) const
768 : {
769 239 : if( !_host.empty( ))
770 129 : os << "host \"" << _host << '"' << std::endl;
771 :
772 239 : const co::ConnectionDescriptions& descriptions = _connectionDescriptions;
773 1230 : for( co::ConnectionDescriptions::const_iterator i = descriptions.begin();
774 820 : i != descriptions.end(); ++i )
775 : {
776 171 : co::ConnectionDescriptionPtr desc = *i;
777 171 : os << *desc;
778 171 : }
779 :
780 239 : bool attrPrinted = false;
781 :
782 956 : for( Node::SAttribute i = static_cast<Node::SAttribute>( 0 );
783 478 : i < Node::SATTR_LAST;
784 : i = static_cast<Node::SAttribute>( static_cast<uint32_t>( i )+1))
785 : {
786 239 : const std::string& value = getSAttribute( i );
787 239 : if( value == Global::instance()->getNodeSAttribute( i ))
788 239 : continue;
789 :
790 0 : if( !attrPrinted )
791 : {
792 0 : os << std::endl << "attributes" << std::endl;
793 0 : os << "{" << std::endl << lunchbox::indent;
794 0 : attrPrinted = true;
795 : }
796 :
797 : os << ( i==Node::SATTR_LAUNCH_COMMAND ? "launch_command " :
798 0 : "ERROR" )
799 0 : << "\"" << value << "\"" << std::endl;
800 : }
801 :
802 956 : for( Node::CAttribute i = static_cast<Node::CAttribute>( 0 );
803 478 : i < Node::CATTR_LAST;
804 : i = static_cast<Node::CAttribute>( static_cast<uint32_t>( i )+1))
805 : {
806 239 : const char value = getCAttribute( i );
807 239 : if( value == Global::instance()->getNodeCAttribute( i ))
808 239 : continue;
809 :
810 0 : if( !attrPrinted )
811 : {
812 0 : os << std::endl << "attributes" << std::endl;
813 0 : os << "{" << std::endl << lunchbox::indent;
814 0 : attrPrinted = true;
815 : }
816 :
817 : os << ( i==Node::CATTR_LAUNCH_COMMAND_QUOTE ? "launch_command_quote " :
818 0 : "ERROR" )
819 0 : << "'" << value << "'" << std::endl;
820 : }
821 :
822 1912 : for( Node::IAttribute i = static_cast< Node::IAttribute>( 0 );
823 956 : i < Node::IATTR_LAST;
824 : i = static_cast< Node::IAttribute >( static_cast<uint32_t>( i )+1))
825 : {
826 717 : const int value = getIAttribute( i );
827 717 : if( value == Global::instance()->getNodeIAttribute( i ))
828 717 : continue;
829 :
830 0 : if( !attrPrinted )
831 : {
832 0 : os << std::endl << "attributes" << std::endl;
833 0 : os << "{" << std::endl << lunchbox::indent;
834 0 : attrPrinted = true;
835 : }
836 :
837 : os << ( i== Node::IATTR_LAUNCH_TIMEOUT ? "launch_timeout " :
838 : i== Node::IATTR_THREAD_MODEL ? "thread_model " :
839 : i== Node::IATTR_HINT_AFFINITY ? "hint_affinity " :
840 0 : "ERROR" )
841 0 : << static_cast< fabric::IAttribute >( value ) << std::endl;
842 : }
843 :
844 239 : if( attrPrinted )
845 0 : os << lunchbox::exdent << "}" << std::endl << std::endl;
846 239 : }
847 :
848 : }
849 : }
850 :
851 : #include "../fabric/node.ipp"
852 : template class eq::fabric::Node< eq::server::Config, eq::server::Node,
853 : eq::server::Pipe, eq::server::NodeVisitor >;
854 : /** @cond IGNORE */
855 : template std::ostream& eq::fabric::operator << ( std::ostream&,
856 27 : const eq::server::Super& );
857 : /** @endcond */
|