LCOV - code coverage report
Current view: top level - eq/server - node.cpp (source / functions) Hit Total Coverage
Test: Equalizer Lines: 214 387 55.3 %
Date: 2017-12-16 05:07:20 Functions: 39 56 69.6 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2016, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *                          Cedric Stalder <cedric.stalder@gmail.com>
       5             :  *
       6             :  * This library is free software; you can redistribute it and/or modify it under
       7             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       8             :  * by the Free Software Foundation.
       9             :  *
      10             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      11             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      12             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      13             :  * details.
      14             :  *
      15             :  * You should have received a copy of the GNU Lesser General Public License
      16             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      17             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      18             :  */
      19             : 
      20             : #include "node.h"
      21             : 
      22             : #include "channel.h"
      23             : #include "config.h"
      24             : #include "global.h"
      25             : #include "log.h"
      26             : #include "nodeFactory.h"
      27             : #include "pipe.h"
      28             : #include "server.h"
      29             : #include "window.h"
      30             : 
      31             : #include <eq/fabric/commands.h>
      32             : #include <eq/fabric/elementVisitor.h>
      33             : #include <eq/fabric/event.h>
      34             : #include <eq/fabric/paths.h>
      35             : 
      36             : #include <co/barrier.h>
      37             : #include <co/global.h>
      38             : #include <co/objectICommand.h>
      39             : 
      40             : #include <lunchbox/clock.h>
      41             : #include <lunchbox/os.h>
      42             : #include <lunchbox/sleep.h>
      43             : 
      44             : #include <boost/filesystem/operations.hpp>
      45             : #include <boost/filesystem/path.hpp>
      46             : 
      47             : namespace eq
      48             : {
      49             : namespace server
      50             : {
      51             : typedef fabric::Node<Config, Node, Pipe, NodeVisitor> Super;
      52             : typedef co::CommandFunc<Node> NodeFunc;
      53             : namespace
      54             : {
      55             : #define S_MAKE_ATTR_STRING(attr) (std::string("EQ_NODE_") + #attr)
      56          40 : std::string _sAttributeStrings[] = {S_MAKE_ATTR_STRING(SATTR_LAUNCH_COMMAND)};
      57          40 : std::string _cAttributeStrings[] = {
      58          20 :     S_MAKE_ATTR_STRING(CATTR_LAUNCH_COMMAND_QUOTE)};
      59             : 
      60           0 : void _addEnv(std::ostringstream& stream, const char* key)
      61             : {
      62           0 :     char* env = getenv(key);
      63           0 :     if (env)
      64           0 :         stream << key << "=" << env << " ";
      65           0 : }
      66             : 
      67           0 : bool _containsPrefix(const std::string& str, const Strings& prefixes)
      68             : {
      69           0 :     for (const auto& prefix : {"LB_", "CO_", "EQ_", "DEFLECT_"})
      70           0 :         if (str.find(prefix) == 0)
      71           0 :             return true;
      72             : 
      73           0 :     for (const auto& prefix : prefixes)
      74           0 :         if (str.find(prefix) == 0)
      75           0 :             return true;
      76             : 
      77           0 :     return false;
      78             : }
      79             : }
      80             : 
      81         828 : Node::Node(Config* parent)
      82             :     : Super(parent)
      83             :     , _active(0)
      84             :     , _finishedFrame(0)
      85             :     , _flushedFrame(0)
      86             :     , _state(STATE_STOPPED)
      87         828 :     , _bufferedTasks(new co::BufferConnection)
      88        1656 :     , _lastDrawPipe(0)
      89             : {
      90         828 :     const Global* global = Global::instance();
      91        1656 :     for (int i = 0; i < Node::SATTR_LAST; ++i)
      92             :     {
      93         828 :         const SAttribute attr = static_cast<SAttribute>(i);
      94         828 :         setSAttribute(attr, global->getNodeSAttribute(attr));
      95             :     }
      96        1656 :     for (int i = 0; i < Node::CATTR_LAST; ++i)
      97             :     {
      98         828 :         const CAttribute attr = static_cast<CAttribute>(i);
      99         828 :         setCAttribute(attr, global->getNodeCAttribute(attr));
     100             :     }
     101        3312 :     for (int i = 0; i < IATTR_LAST; ++i)
     102             :     {
     103        2484 :         const IAttribute attr = static_cast<IAttribute>(i);
     104        2484 :         setIAttribute(attr, global->getNodeIAttribute(attr));
     105             :     }
     106         828 : }
     107             : 
     108        1652 : Node::~Node()
     109             : {
     110        1652 : }
     111             : 
     112           2 : void Node::attach(const uint128_t& id, const uint32_t instanceID)
     113             : {
     114           2 :     Super::attach(id, instanceID);
     115             : 
     116           2 :     co::CommandQueue* cmdQ = getCommandThreadQueue();
     117           4 :     registerCommand(fabric::CMD_OBJECT_SYNC, NodeFunc(this, &Node::_cmdSync),
     118           2 :                     cmdQ);
     119           2 :     registerCommand(fabric::CMD_NODE_CONFIG_INIT_REPLY,
     120           4 :                     NodeFunc(this, &Node::_cmdConfigInitReply), cmdQ);
     121           2 :     registerCommand(fabric::CMD_NODE_CONFIG_EXIT_REPLY,
     122           4 :                     NodeFunc(this, &Node::_cmdConfigExitReply), cmdQ);
     123           2 :     registerCommand(fabric::CMD_NODE_FRAME_FINISH_REPLY,
     124           4 :                     NodeFunc(this, &Node::_cmdFrameFinishReply), cmdQ);
     125           2 : }
     126             : 
     127          28 : ServerPtr Node::getServer()
     128             : {
     129          28 :     return getConfig() ? getConfig()->getServer() : 0;
     130             : }
     131             : 
     132           0 : ConstServerPtr Node::getServer() const
     133             : {
     134           0 :     return getConfig() ? getConfig()->getServer() : 0;
     135             : }
     136             : 
     137          22 : co::CommandQueue* Node::getMainThreadQueue()
     138             : {
     139          22 :     return getConfig()->getMainThreadQueue();
     140             : }
     141             : 
     142          28 : co::CommandQueue* Node::getCommandThreadQueue()
     143             : {
     144          28 :     return getConfig()->getCommandThreadQueue();
     145             : }
     146             : 
     147           0 : Channel* Node::getChannel(const ChannelPath& path)
     148             : {
     149           0 :     const Pipes& pipes = getPipes();
     150           0 :     LBASSERT(pipes.size() > path.pipeIndex);
     151             : 
     152           0 :     if (pipes.size() <= path.pipeIndex)
     153           0 :         return 0;
     154             : 
     155           0 :     return pipes[path.pipeIndex]->getChannel(path);
     156             : }
     157             : 
     158         126 : void Node::addTasks(const uint32_t tasks)
     159             : {
     160         126 :     setTasks(getTasks() | tasks);
     161         126 : }
     162             : 
     163           8 : void Node::activate()
     164             : {
     165           8 :     ++_active;
     166           8 :     LBLOG(LOG_VIEW) << "activate: " << _active << std::endl;
     167           8 : }
     168             : 
     169           8 : void Node::deactivate()
     170             : {
     171           8 :     LBASSERT(_active != 0);
     172           8 :     --_active;
     173           8 :     LBLOG(LOG_VIEW) << "deactivate: " << _active << std::endl;
     174           8 : };
     175             : 
     176         828 : void Node::setSAttribute(const SAttribute attr, const std::string& value)
     177             : {
     178         828 :     _sAttributes[attr] = value;
     179         828 : }
     180             : 
     181         416 : const std::string& Node::getSAttribute(const SAttribute attr) const
     182             : {
     183         416 :     return _sAttributes[attr];
     184             : }
     185             : 
     186         612 : const std::string& Node::getSAttributeString(const SAttribute attr)
     187             : {
     188         612 :     return _sAttributeStrings[attr];
     189             : }
     190             : 
     191         828 : void Node::setCAttribute(const CAttribute attr, const char value)
     192             : {
     193         828 :     _cAttributes[attr] = value;
     194         828 : }
     195             : 
     196         416 : char Node::getCAttribute(const CAttribute attr) const
     197             : {
     198         416 :     return _cAttributes[attr];
     199             : }
     200             : 
     201         612 : const std::string& Node::getCAttributeString(const CAttribute attr)
     202             : {
     203         612 :     return _cAttributeStrings[attr];
     204             : }
     205             : 
     206             : //===========================================================================
     207             : // Operations
     208             : //===========================================================================
     209             : 
     210             : //---------------------------------------------------------------------------
     211             : // launch and connect
     212             : //---------------------------------------------------------------------------
     213             : 
     214             : namespace
     215             : {
     216           0 : class NetNode : public co::Node
     217             : {
     218             : public:
     219           0 :     explicit NetNode(server::Node& node)
     220           0 :         : co::Node()
     221           0 :         , _node(node)
     222             :     {
     223           0 :     }
     224             : 
     225             : private:
     226             :     server::Node& _node;
     227             : 
     228           0 :     std::string getWorkDir() const override
     229             :     {
     230           0 :         return _node.getConfig()->getWorkDir();
     231             :     }
     232             : 
     233           0 :     std::string getLaunchQuote() const override
     234             :     {
     235           0 :         return std::string(1, _node.getCAttribute(
     236           0 :                                   server::Node::CATTR_LAUNCH_COMMAND_QUOTE));
     237             :     }
     238             : };
     239             : 
     240           0 : static co::NodePtr _createNetNode(Node* node)
     241             : {
     242           0 :     co::NodePtr netNode = new NetNode(*node);
     243             :     const co::ConnectionDescriptions& descriptions =
     244           0 :         node->getConnectionDescriptions();
     245           0 :     for (co::ConnectionDescriptionPtr desc : descriptions)
     246             :     {
     247           0 :         netNode->addConnectionDescription(new co::ConnectionDescription(*desc));
     248             : 
     249           0 :         if (node->getHost().empty())
     250             :         {
     251           0 :             node->setHost(desc->getHostname());
     252           0 :             LBWARN << "No host specified, guessing " << node->getHost()
     253           0 :                    << " from " << desc << std::endl;
     254             :         }
     255             :     }
     256             : 
     257           0 :     netNode->setHostname(node->getHost());
     258           0 :     return netNode;
     259             : }
     260             : }
     261             : 
     262           2 : bool Node::connect()
     263             : {
     264           2 :     LBASSERT(isActive());
     265             : 
     266           2 :     if (_node)
     267           2 :         return _node->isConnected();
     268             : 
     269           0 :     if (!isStopped())
     270             :     {
     271           0 :         LBASSERT(_state == STATE_FAILED);
     272           0 :         return true;
     273             :     }
     274             : 
     275           0 :     co::LocalNodePtr localNode = getLocalNode();
     276           0 :     LBASSERT(localNode);
     277             : 
     278           0 :     _node = _createNetNode(this);
     279             : 
     280           0 :     LBLOG(LOG_INIT) << "Connecting node" << std::endl;
     281           0 :     if (localNode->connect(_node) ||
     282           0 :         localNode->launch(_node, _createLaunchCommand()))
     283             :     {
     284           0 :         return true;
     285             :     }
     286             : 
     287           0 :     LBWARN << "Connection to " << _node->getNodeID() << " failed" << std::endl;
     288           0 :     sendError(fabric::ERROR_NODE_LAUNCH) << _host;
     289           0 :     _state = STATE_FAILED;
     290           0 :     _node = nullptr;
     291           0 :     return false;
     292             : }
     293             : 
     294           2 : bool Node::syncLaunch(const lunchbox::Clock& clock)
     295             : {
     296           2 :     LBASSERT(isActive());
     297             : 
     298           2 :     if (!_node)
     299           0 :         return false;
     300             : 
     301           2 :     if (_node->isConnected())
     302           2 :         return true;
     303             : 
     304           0 :     LBASSERT(!isApplicationNode());
     305             : 
     306           0 :     co::LocalNodePtr localNode = getLocalNode();
     307           0 :     const int64_t timeOut = getIAttribute(IATTR_LAUNCH_TIMEOUT);
     308           0 :     _node = localNode->syncLaunch(_node->getNodeID(),
     309             :                                   std::max(int64_t(0),
     310           0 :                                            timeOut - clock.getTime64()));
     311           0 :     if (_node)
     312           0 :         return true;
     313             : 
     314           0 :     sendError(fabric::ERROR_NODE_CONNECT) << _host;
     315           0 :     _state = STATE_FAILED;
     316           0 :     return false;
     317             : }
     318             : 
     319           0 : std::string Node::_createLaunchCommand() const
     320             : {
     321           0 :     const std::string& command = getSAttribute(SATTR_LAUNCH_COMMAND);
     322           0 :     const size_t commandPos = command.find("%c");
     323           0 :     if (commandPos == std::string::npos)
     324           0 :         return command + " " + _createRemoteCommand();
     325             : 
     326           0 :     return command.substr(0, commandPos) + _createRemoteCommand() +
     327           0 :            command.substr(commandPos + 2);
     328             : }
     329             : 
     330           0 : std::string Node::_createRemoteCommand() const
     331             : {
     332           0 :     const Config* config = getConfig();
     333           0 :     std::string program = config->getRenderClient();
     334           0 :     if (program.empty())
     335             :     {
     336           0 :         LBWARN << "No render client name, auto-launch will fail" << std::endl;
     337           0 :         return program;
     338             :     }
     339             : 
     340             :     //----- environment
     341           0 :     std::ostringstream os;
     342           0 :     const std::string& quote = _node->getLaunchQuote();
     343             : 
     344             : //----- program + args
     345             : #ifndef WIN32
     346             : #ifdef Darwin
     347             :     const char libPath[] = "DYLD_LIBRARY_PATH";
     348             : #else
     349           0 :     const char libPath[] = "LD_LIBRARY_PATH";
     350             : #endif
     351             : 
     352           0 :     os << "env ";
     353           0 :     _addEnv(os, libPath);
     354           0 :     _addEnv(os, "PATH");
     355           0 :     _addEnv(os, "PYTHONPATH");
     356             : #ifdef EQUALIZER_USE_DEFLECT
     357           0 :     _addEnv(os, "DEFLECT_HOST");
     358           0 :     _addEnv(os, "DEFLECT_ID");
     359             : #endif
     360             : 
     361           0 :     const Strings& prefixes = config->getRenderClientEnvPrefixes();
     362           0 :     for (int i = 0; environ[i] != 0; ++i)
     363             :     {
     364           0 :         const std::string var = environ[i];
     365           0 :         if (_containsPrefix(var, prefixes))
     366           0 :             os << quote << var << quote << " ";
     367             :     }
     368             : 
     369           0 :     os << "LB_LOG_LEVEL=" << lunchbox::Log::getLogLevelString() << " ";
     370           0 :     if (lunchbox::Log::topics != 0)
     371           0 :         os << "LB_LOG_TOPICS=" << lunchbox::Log::topics << " ";
     372             : #endif
     373             : 
     374             :     const boost::filesystem::path absolute =
     375           0 :         boost::filesystem::system_complete(boost::filesystem::path(program));
     376           0 :     program = absolute.string();
     377             : 
     378           0 :     std::string options;
     379           0 :     for (const std::string& arg : config->getRenderClientArgs())
     380           0 :         options += std::string(" ") + quote + arg + quote;
     381           0 :     return os.str() + quote + program + quote + options + " -- --eq-client %o ";
     382             : }
     383             : 
     384             : //---------------------------------------------------------------------------
     385             : // init
     386             : //---------------------------------------------------------------------------
     387           2 : void Node::configInit(const uint128_t& initID, const uint32_t frameNumber)
     388             : {
     389           2 :     LBASSERT(_state == STATE_STOPPED);
     390           2 :     _state = STATE_INITIALIZING;
     391             : 
     392           2 :     const Config* config = getConfig();
     393           2 :     _flushedFrame = config->getFinishedFrame();
     394           2 :     _finishedFrame = config->getFinishedFrame();
     395           2 :     _frameIDs.clear();
     396             : 
     397           2 :     LBLOG(LOG_INIT) << "Create node" << std::endl;
     398           2 :     getConfig()->send(_node, fabric::CMD_CONFIG_CREATE_NODE) << getID();
     399             : 
     400           2 :     LBLOG(LOG_INIT) << "Init node" << std::endl;
     401           2 :     send(fabric::CMD_NODE_CONFIG_INIT) << initID << frameNumber;
     402           2 : }
     403             : 
     404           2 : bool Node::syncConfigInit()
     405             : {
     406           2 :     LBASSERT(_state == STATE_INITIALIZING || _state == STATE_INIT_SUCCESS ||
     407             :              _state == STATE_INIT_FAILED);
     408             : 
     409           2 :     _state.waitNE(STATE_INITIALIZING);
     410             : 
     411           2 :     if (_state == STATE_INIT_SUCCESS)
     412             :     {
     413           2 :         _state = STATE_RUNNING;
     414           2 :         return true;
     415             :     }
     416             : 
     417           0 :     LBWARN << "Node initialization failed" << std::endl;
     418           0 :     configExit();
     419           0 :     return false;
     420             : }
     421             : 
     422             : //---------------------------------------------------------------------------
     423             : // exit
     424             : //---------------------------------------------------------------------------
     425           2 : void Node::configExit()
     426             : {
     427           2 :     if (_state == STATE_EXITING)
     428           0 :         return;
     429             : 
     430           2 :     LBASSERT(_state == STATE_RUNNING || _state == STATE_INIT_FAILED);
     431           2 :     _state = STATE_EXITING;
     432             : 
     433           2 :     LBLOG(LOG_INIT) << "Exit node" << std::endl;
     434           2 :     send(fabric::CMD_NODE_CONFIG_EXIT);
     435           2 :     flushSendBuffer();
     436             : }
     437             : 
     438           2 : bool Node::syncConfigExit()
     439             : {
     440           2 :     LBASSERT(_state == STATE_EXITING || _state == STATE_EXIT_SUCCESS ||
     441             :              _state == STATE_EXIT_FAILED);
     442             : 
     443           2 :     _state.waitNE(STATE_EXITING);
     444           2 :     const bool success = (_state == STATE_EXIT_SUCCESS);
     445           2 :     LBASSERT(success || _state == STATE_EXIT_FAILED);
     446             : 
     447           2 :     _state = isActive() ? STATE_FAILED : STATE_STOPPED;
     448           2 :     setTasks(fabric::TASK_NONE);
     449           2 :     _frameIDs.clear();
     450           2 :     _flushBarriers();
     451           2 :     return success;
     452             : }
     453             : 
     454             : //---------------------------------------------------------------------------
     455             : // update
     456             : //---------------------------------------------------------------------------
     457           2 : void Node::update(const uint128_t& frameID, const uint32_t frameNumber)
     458             : {
     459           2 :     if (!isRunning())
     460           0 :         return;
     461             : 
     462           2 :     LBVERB << "Start frame " << frameNumber << std::endl;
     463           2 :     LBASSERT(isActive());
     464             : 
     465           2 :     _frameIDs[frameNumber] = frameID;
     466             : 
     467           2 :     uint128_t configVersion = co::VERSION_INVALID;
     468           2 :     if (!isApplicationNode()) // synced in Config::_cmdFrameStart
     469           0 :         configVersion = getConfig()->getVersion();
     470             : 
     471           4 :     send(fabric::CMD_NODE_FRAME_START) << getVersion() << configVersion
     472           2 :                                        << frameID << frameNumber;
     473           2 :     LBLOG(LOG_TASKS) << "TASK node start frame " << std::endl;
     474             : 
     475           2 :     const Pipes& pipes = getPipes();
     476           6 :     for (Pipes::const_iterator i = pipes.begin(); i != pipes.end(); ++i)
     477           4 :         (*i)->update(frameID, frameNumber);
     478             : 
     479           2 :     if (!_lastDrawPipe) // no FrameDrawFinish sent
     480             :     {
     481           0 :         send(fabric::CMD_NODE_FRAME_DRAW_FINISH) << frameID << frameNumber;
     482           0 :         LBLOG(LOG_TASKS) << "TASK node draw finish " << getName() << " "
     483           0 :                          << std::endl;
     484             :     }
     485           2 :     _lastDrawPipe = 0;
     486             : 
     487           2 :     send(fabric::CMD_NODE_FRAME_TASKS_FINISH) << frameID << frameNumber;
     488           2 :     LBLOG(LOG_TASKS) << "TASK node tasks finish " << std::endl;
     489             : 
     490           2 :     _finish(frameNumber);
     491           2 :     flushSendBuffer();
     492             : }
     493             : 
     494           2 : uint32_t Node::_getFinishLatency() const
     495             : {
     496           2 :     switch (getIAttribute(Node::IATTR_THREAD_MODEL))
     497             :     {
     498             :     case fabric::UNDEFINED:
     499             :     case fabric::DRAW_SYNC:
     500           2 :         if (getTasks() & fabric::TASK_DRAW)
     501             :         {
     502             :             // More than one frame latency doesn't make sense, since the
     503             :             // draw sync for frame+1 does not allow for more
     504           2 :             const Config* config = getConfig();
     505           2 :             const uint32_t latency = config->getLatency();
     506             : 
     507           2 :             return LB_MIN(latency, 1);
     508             :         }
     509           0 :         break;
     510             : 
     511             :     case fabric::LOCAL_SYNC:
     512           0 :         if (getTasks() != fabric::TASK_NONE)
     513             :             // local sync enforces no latency
     514           0 :             return 0;
     515           0 :         break;
     516             : 
     517             :     case fabric::ASYNC:
     518           0 :         break;
     519             :     default:
     520           0 :         LBUNIMPLEMENTED;
     521             :     }
     522             : 
     523           0 :     const Config* config = getConfig();
     524           0 :     return config->getLatency();
     525             : }
     526             : 
     527           2 : void Node::_finish(const uint32_t currentFrame)
     528             : {
     529           2 :     const Pipes& pipes = getPipes();
     530           2 :     for (Pipes::const_iterator i = pipes.begin(); i != pipes.end(); ++i)
     531             :     {
     532           2 :         const Pipe* pipe = *i;
     533           2 :         if (pipe->getIAttribute(Pipe::IATTR_HINT_THREAD) && pipe->isRunning())
     534             :         {
     535           2 :             const uint32_t latency = _getFinishLatency();
     536           2 :             if (currentFrame > latency)
     537           0 :                 flushFrames(currentFrame - latency);
     538           4 :             return;
     539             :         }
     540             :     }
     541             : 
     542             :     // else only non-threaded pipes, all local tasks are done, send finish now.
     543           0 :     flushFrames(currentFrame);
     544             : }
     545             : 
     546           2 : void Node::flushFrames(const uint32_t frameNumber)
     547             : {
     548           2 :     LBLOG(LOG_TASKS) << "Flush frames including " << frameNumber << std::endl;
     549             : 
     550           6 :     while (_flushedFrame < frameNumber)
     551             :     {
     552           2 :         ++_flushedFrame;
     553           2 :         _sendFrameFinish(_flushedFrame);
     554             :     }
     555             : 
     556           2 :     flushSendBuffer();
     557           2 : }
     558             : 
     559           2 : void Node::_sendFrameFinish(const uint32_t frameNumber)
     560             : {
     561           2 :     FrameIDHash::iterator i = _frameIDs.find(frameNumber);
     562           2 :     if (i == _frameIDs.end())
     563           0 :         return; // finish already send
     564             : 
     565           2 :     send(fabric::CMD_NODE_FRAME_FINISH) << i->second << frameNumber;
     566           2 :     _frameIDs.erase(i);
     567           2 :     LBLOG(LOG_TASKS) << "TASK node finish frame " << frameNumber << std::endl;
     568             : }
     569             : 
     570             : //---------------------------------------------------------------------------
     571             : // Barrier cache
     572             : //---------------------------------------------------------------------------
     573           0 : co::Barrier* Node::getBarrier()
     574             : {
     575           0 :     if (_barriers.empty())
     576             :     {
     577           0 :         co::Barrier* barrier = new co::Barrier(getServer(), _node->getNodeID());
     578           0 :         barrier->setAutoObsolete(getConfig()->getLatency() + 1);
     579           0 :         return barrier;
     580             :     }
     581             :     // else
     582             : 
     583           0 :     co::Barrier* barrier = _barriers.back();
     584           0 :     _barriers.pop_back();
     585           0 :     barrier->setHeight(0);
     586           0 :     return barrier;
     587             : }
     588             : 
     589           0 : void Node::changeLatency(const uint32_t latency)
     590             : {
     591           0 :     for (co::Barriers::const_iterator i = _barriers.begin();
     592           0 :          i != _barriers.end(); ++i)
     593             :     {
     594           0 :         co::Barrier* barrier = *i;
     595           0 :         barrier->setAutoObsolete(latency + 1);
     596             :     }
     597           0 : }
     598             : 
     599           0 : void Node::releaseBarrier(co::Barrier* barrier)
     600             : {
     601           0 :     _barriers.push_back(barrier);
     602           0 : }
     603             : 
     604           2 : void Node::_flushBarriers()
     605             : {
     606           2 :     for (co::BarriersCIter i = _barriers.begin(); i != _barriers.end(); ++i)
     607           0 :         delete *i;
     608           2 :     _barriers.clear();
     609           2 : }
     610             : 
     611           0 : bool Node::removeConnectionDescription(co::ConnectionDescriptionPtr cd)
     612             : {
     613             :     // Don't use std::find, RefPtr::operator== compares pointers, not values.
     614           0 :     for (co::ConnectionDescriptions::iterator i =
     615           0 :              _connectionDescriptions.begin();
     616           0 :          i != _connectionDescriptions.end(); ++i)
     617             :     {
     618           0 :         if (*cd != **i)
     619           0 :             continue;
     620             : 
     621           0 :         _connectionDescriptions.erase(i);
     622           0 :         return true;
     623             :     }
     624           0 :     return false;
     625             : }
     626             : 
     627          10 : co::ObjectOCommand Node::send(const uint32_t cmd)
     628             : {
     629          10 :     return send(cmd, getID());
     630             : }
     631             : 
     632         110 : co::ObjectOCommand Node::send(const uint32_t cmd, const uint128_t& id)
     633             : {
     634         220 :     return co::ObjectOCommand(co::Connections(1, _bufferedTasks), cmd,
     635         220 :                               co::COMMANDTYPE_OBJECT, id, CO_INSTANCE_ALL);
     636             : }
     637             : 
     638           0 : EventOCommand Node::sendError(const uint32_t error)
     639             : {
     640           0 :     return getConfig()->sendError(EVENT_NODE_ERROR, Error(error, getID()));
     641             : }
     642             : 
     643          14 : void Node::flushSendBuffer()
     644             : {
     645          14 :     _bufferedTasks->sendBuffer(_node->getConnection());
     646          14 : }
     647             : 
     648             : //===========================================================================
     649             : // command handling
     650             : //===========================================================================
     651           2 : bool Node::_cmdConfigInitReply(co::ICommand& cmd)
     652             : {
     653           4 :     co::ObjectICommand command(cmd);
     654           2 :     LBVERB << "handle configInit reply " << command << std::endl;
     655           2 :     LBASSERT(_state == STATE_INITIALIZING);
     656           2 :     _state = command.read<uint64_t>() ? STATE_INIT_SUCCESS : STATE_INIT_FAILED;
     657             : 
     658           4 :     return true;
     659             : }
     660             : 
     661           2 : bool Node::_cmdConfigExitReply(co::ICommand& cmd)
     662             : {
     663           4 :     co::ObjectICommand command(cmd);
     664           2 :     LBVERB << "handle configExit reply " << command << std::endl;
     665           2 :     LBASSERT(_state == STATE_EXITING);
     666             : 
     667           2 :     _state = command.read<bool>() ? STATE_EXIT_SUCCESS : STATE_EXIT_FAILED;
     668           4 :     return true;
     669             : }
     670             : 
     671           2 : bool Node::_cmdFrameFinishReply(co::ICommand& cmd)
     672             : {
     673           4 :     co::ObjectICommand command(cmd);
     674           2 :     LBVERB << "handle frame finish reply " << command << std::endl;
     675             : 
     676           2 :     const uint32_t frameNumber = command.read<uint32_t>();
     677             : 
     678           2 :     _finishedFrame = frameNumber;
     679           2 :     getConfig()->notifyNodeFrameFinished(frameNumber);
     680             : 
     681           4 :     return true;
     682             : }
     683             : 
     684         416 : void Node::output(std::ostream& os) const
     685             : {
     686         416 :     if (!_host.empty())
     687         248 :         os << "host     \"" << _host << '"' << std::endl;
     688             : 
     689         416 :     const co::ConnectionDescriptions& descriptions = _connectionDescriptions;
     690        2130 :     for (co::ConnectionDescriptions::const_iterator i = descriptions.begin();
     691        1420 :          i != descriptions.end(); ++i)
     692             :     {
     693         588 :         co::ConnectionDescriptionPtr desc = *i;
     694         294 :         os << *desc;
     695             :     }
     696             : 
     697         416 :     bool attrPrinted = false;
     698             : 
     699         832 :     for (Node::SAttribute i = static_cast<Node::SAttribute>(0);
     700         832 :          i < Node::SATTR_LAST;
     701         416 :          i = static_cast<Node::SAttribute>(static_cast<uint32_t>(i) + 1))
     702             :     {
     703         416 :         const std::string& value = getSAttribute(i);
     704         416 :         if (value == Global::instance()->getNodeSAttribute(i))
     705         416 :             continue;
     706             : 
     707           0 :         if (!attrPrinted)
     708             :         {
     709           0 :             os << std::endl << "attributes" << std::endl;
     710           0 :             os << "{" << std::endl << lunchbox::indent;
     711           0 :             attrPrinted = true;
     712             :         }
     713             : 
     714             :         os << (i == Node::SATTR_LAUNCH_COMMAND ? "launch_command       "
     715             :                                                : "ERROR")
     716           0 :            << "\"" << value << "\"" << std::endl;
     717             :     }
     718             : 
     719         832 :     for (Node::CAttribute i = static_cast<Node::CAttribute>(0);
     720         832 :          i < Node::CATTR_LAST;
     721         416 :          i = static_cast<Node::CAttribute>(static_cast<uint32_t>(i) + 1))
     722             :     {
     723         416 :         const char value = getCAttribute(i);
     724         416 :         if (value == Global::instance()->getNodeCAttribute(i))
     725         416 :             continue;
     726             : 
     727           0 :         if (!attrPrinted)
     728             :         {
     729           0 :             os << std::endl << "attributes" << std::endl;
     730           0 :             os << "{" << std::endl << lunchbox::indent;
     731           0 :             attrPrinted = true;
     732             :         }
     733             : 
     734             :         os << (i == Node::CATTR_LAUNCH_COMMAND_QUOTE ? "launch_command_quote "
     735             :                                                      : "ERROR")
     736           0 :            << "'" << value << "'" << std::endl;
     737             :     }
     738             : 
     739        1664 :     for (Node::IAttribute i = static_cast<Node::IAttribute>(0);
     740        1664 :          i < Node::IATTR_LAST;
     741        1248 :          i = static_cast<Node::IAttribute>(static_cast<uint32_t>(i) + 1))
     742             :     {
     743        1248 :         const int value = getIAttribute(i);
     744        1248 :         if (value == Global::instance()->getNodeIAttribute(i))
     745        1248 :             continue;
     746             : 
     747           0 :         if (!attrPrinted)
     748             :         {
     749           0 :             os << std::endl << "attributes" << std::endl;
     750           0 :             os << "{" << std::endl << lunchbox::indent;
     751           0 :             attrPrinted = true;
     752             :         }
     753             : 
     754             :         os << (i == Node::IATTR_LAUNCH_TIMEOUT
     755             :                    ? "launch_timeout       "
     756             :                    : i == Node::IATTR_THREAD_MODEL
     757           0 :                          ? "thread_model         "
     758             :                          : i == Node::IATTR_HINT_AFFINITY
     759           0 :                                ? "hint_affinity        "
     760           0 :                                : "ERROR")
     761           0 :            << static_cast<fabric::IAttribute>(value) << std::endl;
     762             :     }
     763             : 
     764         416 :     if (attrPrinted)
     765           0 :         os << lunchbox::exdent << "}" << std::endl << std::endl;
     766         416 : }
     767             : }
     768             : }
     769             : 
     770             : #include "../fabric/node.ipp"
     771             : template class eq::fabric::Node<eq::server::Config, eq::server::Node,
     772             :                                 eq::server::Pipe, eq::server::NodeVisitor>;
     773             : /** @cond IGNORE */
     774             : template std::ostream& eq::fabric::operator<<(std::ostream&,
     775          60 :                                               const eq::server::Super&);
     776             : /** @endcond */

Generated by: LCOV version 1.11