LCOV - code coverage report
Current view: top level - eq - node.cpp (source / functions) Hit Total Coverage
Test: Equalizer Lines: 289 400 72.2 %
Date: 2017-12-16 05:07:20 Functions: 46 58 79.3 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2017, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *                          Cedric Stalder<cedric.stalder@gmail.com>
       5             :  *
       6             :  * This library is free software; you can redistribute it and/or modify it under
       7             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       8             :  * by the Free Software Foundation.
       9             :  *
      10             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      11             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      12             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      13             :  * details.
      14             :  *
      15             :  * You should have received a copy of the GNU Lesser General Public License
      16             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      17             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      18             :  */
      19             : 
      20             : #include "node.h"
      21             : 
      22             : #include "client.h"
      23             : #include "config.h"
      24             : #include "error.h"
      25             : #include "exception.h"
      26             : #include "frameData.h"
      27             : #include "global.h"
      28             : #include "log.h"
      29             : #include "nodeFactory.h"
      30             : #include "nodeStatistics.h"
      31             : #include "pipe.h"
      32             : #include "server.h"
      33             : 
      34             : #include <eq/fabric/axisEvent.h>
      35             : #include <eq/fabric/buttonEvent.h>
      36             : #include <eq/fabric/commands.h>
      37             : #include <eq/fabric/elementVisitor.h>
      38             : #include <eq/fabric/frameData.h>
      39             : #include <eq/fabric/task.h>
      40             : 
      41             : #include <co/barrier.h>
      42             : #include <co/connection.h>
      43             : #include <co/global.h>
      44             : #include <co/objectICommand.h>
      45             : #include <lunchbox/scopedMutex.h>
      46             : 
      47             : namespace eq
      48             : {
      49             : namespace
      50             : {
      51             : typedef std::unordered_map<uint128_t, co::Barrier*> BarrierHash;
      52             : typedef std::unordered_map<uint128_t, FrameDataPtr> FrameDataHash;
      53             : typedef FrameDataHash::const_iterator FrameDataHashCIter;
      54             : typedef FrameDataHash::iterator FrameDataHashIter;
      55             : 
      56             : enum State
      57             : {
      58             :     STATE_STOPPED,
      59             :     STATE_INITIALIZING,
      60             :     STATE_INIT_FAILED,
      61             :     STATE_RUNNING,
      62             :     STATE_FAILED
      63             : };
      64             : }
      65             : 
      66             : namespace detail
      67             : {
      68             : class TransmitThread : public lunchbox::Thread
      69             : {
      70             : public:
      71           1 :     TransmitThread()
      72           1 :         : _queue(co::Global::getCommandQueueLimit())
      73             :     {
      74           1 :     }
      75           1 :     virtual ~TransmitThread() {}
      76           4 :     co::CommandQueue& getQueue() { return _queue; }
      77             : protected:
      78           1 :     bool init() override
      79             :     {
      80           1 :         setName("Xmit");
      81           1 :         return true;
      82             :     }
      83             :     void run() override;
      84             : 
      85             : private:
      86             :     co::CommandQueue _queue;
      87             : };
      88             : 
      89           1 : class Node
      90             : {
      91             : public:
      92           1 :     Node()
      93           1 :         : state(STATE_STOPPED)
      94             :         , finishedFrame(0)
      95           1 :         , unlockedFrame(0)
      96             :     {
      97           1 :     }
      98             : 
      99             :     /** The configInit/configExit state. */
     100             :     lunchbox::Monitor<State> state;
     101             : 
     102             :     /** The number of the last started frame. */
     103             :     lunchbox::Monitor<uint32_t> currentFrame;
     104             : 
     105             :     /** The number of the last finished frame. */
     106             :     uint32_t finishedFrame;
     107             : 
     108             :     /** The number of the last locally released frame. */
     109             :     uint32_t unlockedFrame;
     110             : 
     111             :     /** All barriers mapped by the node. */
     112             :     lunchbox::Lockable<BarrierHash> barriers;
     113             : 
     114             :     /** All frame datas used by the node during rendering. */
     115             :     lunchbox::Lockable<FrameDataHash> frameDatas;
     116             : 
     117             :     TransmitThread transmitter;
     118             : };
     119             : }
     120             : 
     121             : /** @cond IGNORE */
     122             : typedef co::CommandFunc<Node> NodeFunc;
     123             : typedef fabric::Node<Config, Node, Pipe, NodeVisitor> Super;
     124             : /** @endcond */
     125             : 
     126           1 : Node::Node(Config* parent)
     127             :     : Super(parent)
     128           1 :     , _impl(new detail::Node)
     129             : {
     130           1 : }
     131             : 
     132           2 : Node::~Node()
     133             : {
     134           1 :     LBASSERT(getPipes().empty());
     135           1 :     delete _impl;
     136           1 : }
     137             : 
     138           1 : void Node::attach(const uint128_t& id, const uint32_t instanceID)
     139             : {
     140           1 :     Super::attach(id, instanceID);
     141             : 
     142           1 :     co::CommandQueue* queue = getMainThreadQueue();
     143           1 :     co::CommandQueue* commandQ = getCommandThreadQueue();
     144           1 :     co::CommandQueue* transmitQ = getTransmitterQueue();
     145             : 
     146           1 :     registerCommand(fabric::CMD_NODE_CREATE_PIPE,
     147           2 :                     NodeFunc(this, &Node::_cmdCreatePipe), queue);
     148           1 :     registerCommand(fabric::CMD_NODE_DESTROY_PIPE,
     149           2 :                     NodeFunc(this, &Node::_cmdDestroyPipe), queue);
     150           1 :     registerCommand(fabric::CMD_NODE_CONFIG_INIT,
     151           2 :                     NodeFunc(this, &Node::_cmdConfigInit), queue);
     152           1 :     registerCommand(fabric::CMD_NODE_SET_AFFINITY,
     153           2 :                     NodeFunc(this, &Node::_cmdSetAffinity), transmitQ);
     154           1 :     registerCommand(fabric::CMD_NODE_CONFIG_EXIT,
     155           2 :                     NodeFunc(this, &Node::_cmdConfigExit), queue);
     156           1 :     registerCommand(fabric::CMD_NODE_FRAME_START,
     157           2 :                     NodeFunc(this, &Node::_cmdFrameStart), queue);
     158           1 :     registerCommand(fabric::CMD_NODE_FRAME_FINISH,
     159           2 :                     NodeFunc(this, &Node::_cmdFrameFinish), queue);
     160           1 :     registerCommand(fabric::CMD_NODE_FRAME_DRAW_FINISH,
     161           2 :                     NodeFunc(this, &Node::_cmdFrameDrawFinish), queue);
     162           1 :     registerCommand(fabric::CMD_NODE_FRAME_TASKS_FINISH,
     163           2 :                     NodeFunc(this, &Node::_cmdFrameTasksFinish), queue);
     164           1 :     registerCommand(fabric::CMD_NODE_FRAMEDATA_TRANSMIT,
     165           2 :                     NodeFunc(this, &Node::_cmdFrameDataTransmit), commandQ);
     166           1 :     registerCommand(fabric::CMD_NODE_FRAMEDATA_READY,
     167           2 :                     NodeFunc(this, &Node::_cmdFrameDataReady), commandQ);
     168           1 : }
     169             : 
     170           1 : void Node::setDirty(const uint64_t bits)
     171             : {
     172             :     // jump over fabric setDirty to avoid dirty'ing config node list
     173             :     // nodes are individually synced in frame finish
     174           1 :     Object::setDirty(bits);
     175           1 : }
     176             : 
     177          11 : ClientPtr Node::getClient()
     178             : {
     179          11 :     Config* config = getConfig();
     180          11 :     LBASSERT(config);
     181          11 :     return (config ? config->getClient() : 0);
     182             : }
     183             : 
     184          13 : ServerPtr Node::getServer()
     185             : {
     186          13 :     Config* config = getConfig();
     187          13 :     LBASSERT(config);
     188          13 :     return (config ? config->getServer() : 0);
     189             : }
     190             : 
     191           1 : co::CommandQueue* Node::getMainThreadQueue()
     192             : {
     193           1 :     return getConfig()->getMainThreadQueue();
     194             : }
     195             : 
     196           1 : co::CommandQueue* Node::getCommandThreadQueue()
     197             : {
     198           1 :     return getConfig()->getCommandThreadQueue();
     199             : }
     200             : 
     201           4 : co::CommandQueue* Node::getTransmitterQueue()
     202             : {
     203           4 :     return &_impl->transmitter.getQueue();
     204             : }
     205             : 
     206           0 : uint32_t Node::getCurrentFrame() const
     207             : {
     208           0 :     return _impl->currentFrame.get();
     209             : }
     210             : 
     211           1 : uint32_t Node::getFinishedFrame() const
     212             : {
     213           1 :     return _impl->finishedFrame;
     214             : }
     215             : 
     216           0 : co::Barrier* Node::getBarrier(const co::ObjectVersion& barrier)
     217             : {
     218           0 :     lunchbox::ScopedWrite mutex(_impl->barriers);
     219           0 :     co::Barrier* netBarrier = _impl->barriers.data[barrier.identifier];
     220             : 
     221           0 :     if (netBarrier)
     222           0 :         netBarrier->sync(barrier.version);
     223             :     else
     224             :     {
     225           0 :         ClientPtr client = getClient();
     226             : 
     227           0 :         netBarrier = new co::Barrier(client, barrier);
     228           0 :         if (!netBarrier->isGood())
     229             :         {
     230           0 :             LBCHECK(netBarrier->isGood());
     231           0 :             LBWARN << "Could not map swap barrier" << std::endl;
     232           0 :             delete netBarrier;
     233           0 :             return 0;
     234             :         }
     235           0 :         _impl->barriers.data[barrier.identifier] = netBarrier;
     236             :     }
     237             : 
     238           0 :     return netBarrier;
     239             : }
     240             : 
     241           5 : FrameDataPtr Node::getFrameData(const co::ObjectVersion& frameDataVersion)
     242             : {
     243          10 :     lunchbox::ScopedWrite mutex(_impl->frameDatas);
     244           5 :     FrameDataPtr data = _impl->frameDatas.data[frameDataVersion.identifier];
     245             : 
     246           5 :     if (!data)
     247             :     {
     248           1 :         data = new FrameData;
     249           1 :         data->setID(frameDataVersion.identifier);
     250           1 :         _impl->frameDatas.data[frameDataVersion.identifier] = data;
     251             :     }
     252             : 
     253           5 :     LBASSERT(frameDataVersion.version.high() == 0);
     254           5 :     data->setVersion(frameDataVersion.version.low());
     255          10 :     return data;
     256             : }
     257             : 
     258           1 : void Node::releaseFrameData(FrameDataPtr data)
     259             : {
     260           2 :     lunchbox::ScopedWrite mutex(_impl->frameDatas);
     261           1 :     FrameDataHashIter i = _impl->frameDatas->find(data->getID());
     262           1 :     LBASSERT(i != _impl->frameDatas->end());
     263           1 :     if (i == _impl->frameDatas->end())
     264           0 :         return;
     265             : 
     266           1 :     _impl->frameDatas->erase(i);
     267             : }
     268             : 
     269           2 : void Node::waitInitialized() const
     270             : {
     271           2 :     _impl->state.waitGE(STATE_INIT_FAILED);
     272           2 : }
     273             : 
     274           2 : bool Node::isRunning() const
     275             : {
     276           2 :     return _impl->state == STATE_RUNNING;
     277             : }
     278             : 
     279           1 : bool Node::isStopped() const
     280             : {
     281           1 :     return _impl->state == STATE_STOPPED;
     282             : }
     283             : 
     284           1 : bool Node::configInit(const uint128_t&)
     285             : {
     286           1 :     WindowSystem::configInit(this);
     287           1 :     return true;
     288             : }
     289             : 
     290           1 : bool Node::configExit()
     291             : {
     292           1 :     WindowSystem::configExit(this);
     293           1 :     return true;
     294             : }
     295             : 
     296           1 : void Node::_setAffinity()
     297             : {
     298           1 :     const int32_t affinity = getIAttribute(IATTR_HINT_AFFINITY);
     299           1 :     switch (affinity)
     300             :     {
     301             :     case OFF:
     302           0 :         break;
     303             : 
     304             :     case AUTO:
     305             :         // TODO
     306           1 :         LBVERB << "No automatic thread placement for node threads "
     307           1 :                << std::endl;
     308           1 :         break;
     309             : 
     310             :     default:
     311           0 :         co::LocalNodePtr node = getLocalNode();
     312           0 :         send(node, fabric::CMD_NODE_SET_AFFINITY) << affinity;
     313             : 
     314           0 :         node->setAffinity(affinity);
     315           0 :         break;
     316             :     }
     317           1 : }
     318             : 
     319           2 : void Node::waitFrameStarted(const uint32_t frameNumber) const
     320             : {
     321           2 :     _impl->currentFrame.waitGE(frameNumber);
     322           2 : }
     323             : 
     324           1 : void Node::startFrame(const uint32_t frameNumber)
     325             : {
     326           1 :     _impl->currentFrame = frameNumber;
     327           1 : }
     328             : 
     329           1 : void Node::frameFinish(const uint128_t&, const uint32_t frameNumber)
     330             : {
     331           1 :     releaseFrame(frameNumber);
     332           1 : }
     333             : 
     334           1 : void Node::_finishFrame(const uint32_t frameNumber) const
     335             : {
     336           1 :     const Pipes& pipes = getPipes();
     337           3 :     for (Pipes::const_iterator i = pipes.begin(); i != pipes.end(); ++i)
     338             :     {
     339           2 :         Pipe* pipe = *i;
     340           2 :         LBASSERT(pipe->isThreaded() || pipe->getFinishedFrame() >= frameNumber);
     341             : 
     342           2 :         pipe->waitFrameLocal(frameNumber);
     343           2 :         pipe->waitFrameFinished(frameNumber);
     344             :     }
     345           1 : }
     346             : 
     347           1 : void Node::_frameFinish(const uint128_t& frameID, const uint32_t frameNumber)
     348             : {
     349           1 :     frameFinish(frameID, frameNumber);
     350           1 :     LBLOG(LOG_TASKS) << "---- Finished Frame --- " << frameNumber << std::endl;
     351             : 
     352           1 :     if (_impl->unlockedFrame < frameNumber)
     353             :     {
     354           0 :         LBWARN << "Finished frame was not locally unlocked, enforcing unlock"
     355           0 :                << std::endl;
     356           0 :         releaseFrameLocal(frameNumber);
     357             :     }
     358             : 
     359           1 :     if (_impl->finishedFrame < frameNumber)
     360             :     {
     361           0 :         LBWARN << "Finished frame was not released, enforcing unlock"
     362           0 :                << std::endl;
     363           0 :         releaseFrame(frameNumber);
     364             :     }
     365           1 : }
     366             : 
     367           1 : void Node::releaseFrame(const uint32_t frameNumber)
     368             : {
     369           1 :     LBASSERTINFO(_impl->currentFrame >= frameNumber,
     370             :                  "current " << _impl->currentFrame << " release "
     371             :                             << frameNumber);
     372             : 
     373           1 :     if (_impl->finishedFrame >= frameNumber)
     374           0 :         return;
     375           1 :     _impl->finishedFrame = frameNumber;
     376             : 
     377           1 :     Config* config = getConfig();
     378           2 :     ServerPtr server = config->getServer();
     379           2 :     co::NodePtr node = server.get();
     380           1 :     send(node, fabric::CMD_NODE_FRAME_FINISH_REPLY) << frameNumber;
     381             : }
     382             : 
     383           1 : void Node::releaseFrameLocal(const uint32_t frameNumber)
     384             : {
     385           1 :     LBASSERT(_impl->unlockedFrame <= frameNumber);
     386           1 :     _impl->unlockedFrame = frameNumber;
     387             : 
     388           1 :     Config* config = getConfig();
     389           1 :     LBASSERT(config->getNodes().size() == 1);
     390           1 :     LBASSERT(config->getNodes()[0] == this);
     391           1 :     config->releaseFrameLocal(frameNumber);
     392             : 
     393           1 :     LBLOG(LOG_TASKS) << "---- Unlocked Frame --- " << _impl->unlockedFrame
     394           1 :                      << std::endl;
     395           1 : }
     396             : 
     397           1 : void Node::frameStart(const uint128_t&, const uint32_t frameNumber)
     398             : {
     399           1 :     startFrame(frameNumber); // unlock pipe threads
     400             : 
     401           1 :     switch (getIAttribute(IATTR_THREAD_MODEL))
     402             :     {
     403             :     case ASYNC:
     404             :         // Don't wait for pipes to release frame locally, sync not needed
     405           0 :         releaseFrameLocal(frameNumber);
     406           0 :         break;
     407             : 
     408             :     case DRAW_SYNC:  // Sync and release in frameDrawFinish
     409             :     case LOCAL_SYNC: // Sync and release in frameTasksFinish
     410           1 :         break;
     411             : 
     412             :     default:
     413           0 :         LBUNIMPLEMENTED;
     414             :     }
     415           1 : }
     416             : 
     417           1 : void Node::frameDrawFinish(const uint128_t&, const uint32_t frameNumber)
     418             : {
     419           1 :     switch (getIAttribute(IATTR_THREAD_MODEL))
     420             :     {
     421             :     case ASYNC:      // No sync, release in frameStart
     422             :     case LOCAL_SYNC: // Sync and release in frameTasksFinish
     423           0 :         break;
     424             : 
     425             :     case DRAW_SYNC:
     426             :     {
     427           1 :         const Pipes& pipes = getPipes();
     428           3 :         for (Pipes::const_iterator i = pipes.begin(); i != pipes.end(); ++i)
     429             :         {
     430           2 :             const Pipe* pipe = *i;
     431           2 :             if (pipe->getTasks() & fabric::TASK_DRAW)
     432           2 :                 pipe->waitFrameLocal(frameNumber);
     433             :         }
     434             : 
     435           1 :         releaseFrameLocal(frameNumber);
     436           1 :         break;
     437             :     }
     438             :     default:
     439           0 :         LBUNIMPLEMENTED;
     440             :     }
     441           1 : }
     442             : 
     443           1 : void Node::frameTasksFinish(const uint128_t&, const uint32_t frameNumber)
     444             : {
     445           1 :     switch (getIAttribute(IATTR_THREAD_MODEL))
     446             :     {
     447             :     case ASYNC:     // No sync, release in frameStart
     448             :     case DRAW_SYNC: // Sync and release in frameDrawFinish
     449           1 :         break;
     450             : 
     451             :     case LOCAL_SYNC:
     452             :     {
     453           0 :         const Pipes& pipes = getPipes();
     454           0 :         for (Pipes::const_iterator i = pipes.begin(); i != pipes.end(); ++i)
     455             :         {
     456           0 :             const Pipe* pipe = *i;
     457           0 :             if (pipe->getTasks() != fabric::TASK_NONE)
     458           0 :                 pipe->waitFrameLocal(frameNumber);
     459             :         }
     460             : 
     461           0 :         releaseFrameLocal(frameNumber);
     462           0 :         break;
     463             :     }
     464             :     default:
     465           0 :         LBUNIMPLEMENTED;
     466             :     }
     467           1 : }
     468             : 
     469           0 : EventOCommand Node::sendError(const uint32_t error)
     470             : {
     471           0 :     return getConfig()->sendError(EVENT_NODE_ERROR, Error(error, getID()));
     472             : }
     473             : 
     474           0 : bool Node::processEvent(AxisEvent& event)
     475             : {
     476           0 :     Config* config = getConfig();
     477           0 :     updateEvent(event, config->getTime());
     478           0 :     config->sendEvent(EVENT_MAGELLAN_AXIS) << event;
     479           0 :     return true;
     480             : }
     481             : 
     482           0 : bool Node::processEvent(ButtonEvent& event)
     483             : {
     484           0 :     Config* config = getConfig();
     485           0 :     updateEvent(event, config->getTime());
     486           0 :     config->sendEvent(EVENT_MAGELLAN_BUTTON) << event;
     487           0 :     return true;
     488             : }
     489             : 
     490           0 : bool Node::processEvent(Statistic& event)
     491             : {
     492           0 :     Config* config = getConfig();
     493           0 :     updateEvent(event, config->getTime());
     494           0 :     config->sendEvent(EVENT_STATISTIC) << event;
     495           0 :     return true;
     496             : }
     497             : 
     498           1 : void Node::_flushObjects()
     499             : {
     500           2 :     ClientPtr client = getClient();
     501             :     {
     502           2 :         lunchbox::ScopedWrite mutex(_impl->barriers);
     503           3 :         for (BarrierHash::const_iterator i = _impl->barriers->begin();
     504           2 :              i != _impl->barriers->end(); ++i)
     505             :         {
     506           0 :             delete i->second;
     507             :         }
     508           1 :         _impl->barriers->clear();
     509             :     }
     510             : 
     511           2 :     lunchbox::ScopedWrite mutex(_impl->frameDatas);
     512           3 :     for (FrameDataHashCIter i = _impl->frameDatas->begin();
     513           2 :          i != _impl->frameDatas->end(); ++i)
     514             :     {
     515           0 :         FrameDataPtr frameData = i->second;
     516           0 :         frameData->resetPlugins();
     517           0 :         client->unmapObject(frameData.get());
     518             :     }
     519           1 :     _impl->frameDatas->clear();
     520           1 : }
     521             : 
     522           2 : void detail::TransmitThread::run()
     523             : {
     524             :     while (true)
     525             :     {
     526           3 :         co::ICommand command = _queue.pop();
     527           2 :         if (!command.isValid())
     528           2 :             return; // exit thread
     529             : 
     530           1 :         LBCHECK(command());
     531           1 :     }
     532             : }
     533             : 
     534           0 : void Node::dirtyClientExit()
     535             : {
     536           0 :     const Pipes& pipes = getPipes();
     537           0 :     for (PipesCIter i = pipes.begin(); i != pipes.end(); ++i)
     538             :     {
     539           0 :         Pipe* pipe = *i;
     540           0 :         pipe->cancelThread();
     541             :     }
     542           0 :     getTransmitterQueue()->push(co::ICommand()); // wake up to exit
     543           0 :     _impl->transmitter.join();
     544           0 : }
     545             : 
     546             : //---------------------------------------------------------------------------
     547             : // command handlers
     548             : //---------------------------------------------------------------------------
     549           2 : bool Node::_cmdCreatePipe(co::ICommand& cmd)
     550             : {
     551           2 :     LB_TS_THREAD(_nodeThread);
     552           2 :     LBASSERT(_impl->state >= STATE_INIT_FAILED);
     553             : 
     554           4 :     co::ObjectICommand command(cmd);
     555           2 :     const uint128_t& pipeID = command.read<uint128_t>();
     556           2 :     const bool threaded = command.read<bool>();
     557             : 
     558           2 :     LBLOG(LOG_INIT) << "Create pipe " << command << " id " << pipeID
     559           2 :                     << std::endl;
     560             : 
     561           2 :     Pipe* pipe = Global::getNodeFactory()->createPipe(this);
     562           2 :     if (threaded)
     563           2 :         pipe->startThread();
     564             : 
     565           2 :     Config* config = getConfig();
     566           2 :     LBCHECK(config->mapObject(pipe, pipeID));
     567           2 :     pipe->notifyMapped();
     568             : 
     569           4 :     return true;
     570             : }
     571             : 
     572           2 : bool Node::_cmdDestroyPipe(co::ICommand& cmd)
     573             : {
     574           4 :     co::ObjectICommand command(cmd);
     575             : 
     576           2 :     LB_TS_THREAD(_nodeThread);
     577           2 :     LBLOG(LOG_INIT) << "Destroy pipe " << command << std::endl;
     578             : 
     579           2 :     Pipe* pipe = findPipe(command.read<uint128_t>());
     580           2 :     LBASSERT(pipe);
     581           2 :     pipe->exitThread();
     582             : 
     583           2 :     const bool stopped = pipe->isStopped();
     584             : 
     585           2 :     Config* config = getConfig();
     586           2 :     config->unmapObject(pipe);
     587           2 :     pipe->send(getServer(), fabric::CMD_PIPE_CONFIG_EXIT_REPLY) << stopped;
     588           2 :     Global::getNodeFactory()->releasePipe(pipe);
     589             : 
     590           4 :     return true;
     591             : }
     592             : 
     593           1 : bool Node::_cmdConfigInit(co::ICommand& cmd)
     594             : {
     595           2 :     co::ObjectICommand command(cmd);
     596             : 
     597           1 :     LB_TS_THREAD(_nodeThread);
     598           1 :     LBLOG(LOG_INIT) << "Init node " << command << std::endl;
     599             : 
     600           1 :     _impl->state = STATE_INITIALIZING;
     601             : 
     602           1 :     const uint128_t& initID = command.read<uint128_t>();
     603           1 :     const uint32_t frameNumber = command.read<uint32_t>();
     604             : 
     605           1 :     _impl->currentFrame = frameNumber;
     606           1 :     _impl->unlockedFrame = frameNumber;
     607           1 :     _impl->finishedFrame = frameNumber;
     608           1 :     _setAffinity();
     609             : 
     610           1 :     _impl->transmitter.start();
     611           1 :     const uint64_t result = configInit(initID);
     612             : 
     613           1 :     if (getIAttribute(IATTR_THREAD_MODEL) == eq::UNDEFINED)
     614           1 :         setIAttribute(IATTR_THREAD_MODEL, eq::DRAW_SYNC);
     615             : 
     616           1 :     _impl->state = result ? STATE_RUNNING : STATE_INIT_FAILED;
     617             : 
     618           1 :     commit();
     619           1 :     send(command.getRemoteNode(), fabric::CMD_NODE_CONFIG_INIT_REPLY) << result;
     620           2 :     return true;
     621             : }
     622             : 
     623           1 : bool Node::_cmdConfigExit(co::ICommand& cmd)
     624             : {
     625           2 :     co::ObjectICommand command(cmd);
     626             : 
     627           1 :     LB_TS_THREAD(_nodeThread);
     628           1 :     LBLOG(LOG_INIT) << "Node exit " << command << std::endl;
     629             : 
     630           1 :     const Pipes& pipes = getPipes();
     631           3 :     for (PipesCIter i = pipes.begin(); i != pipes.end(); ++i)
     632             :     {
     633           2 :         Pipe* pipe = *i;
     634           2 :         pipe->waitExited();
     635             :     }
     636             : 
     637           1 :     _impl->state = configExit() ? STATE_STOPPED : STATE_FAILED;
     638           1 :     getTransmitterQueue()->push(co::ICommand()); // wake up to exit
     639           1 :     _impl->transmitter.join();
     640           1 :     _flushObjects();
     641             : 
     642           2 :     getConfig()->send(getLocalNode(), fabric::CMD_CONFIG_DESTROY_NODE)
     643           2 :         << getID();
     644           2 :     return true;
     645             : }
     646             : 
     647           1 : bool Node::_cmdFrameStart(co::ICommand& cmd)
     648             : {
     649           1 :     LB_TS_THREAD(_nodeThread);
     650             : 
     651           2 :     co::ObjectICommand command(cmd);
     652           1 :     const uint128_t& version = command.read<uint128_t>();
     653           1 :     const uint128_t& configVersion = command.read<uint128_t>();
     654           1 :     const uint128_t& frameID = command.read<uint128_t>();
     655           1 :     const uint32_t frameNumber = command.read<uint32_t>();
     656             : 
     657           1 :     LBVERB << "handle node frame start " << command << " frame " << frameNumber
     658           1 :            << " id " << frameID << std::endl;
     659             : 
     660           1 :     LBASSERT(_impl->currentFrame == frameNumber - 1);
     661             : 
     662           1 :     LBLOG(LOG_TASKS) << "----- Begin Frame ----- " << frameNumber << std::endl;
     663             : 
     664           1 :     Config* config = getConfig();
     665             : 
     666           1 :     if (configVersion != co::VERSION_INVALID)
     667           0 :         config->sync(configVersion);
     668           1 :     sync(version);
     669             : 
     670           1 :     config->_frameStart();
     671           1 :     frameStart(frameID, frameNumber);
     672             : 
     673           1 :     LBASSERTINFO(_impl->currentFrame >= frameNumber,
     674             :                  "Node::frameStart() did not start frame " << frameNumber);
     675           2 :     return true;
     676             : }
     677             : 
     678           1 : bool Node::_cmdFrameFinish(co::ICommand& cmd)
     679             : {
     680           1 :     LB_TS_THREAD(_nodeThread);
     681             : 
     682           2 :     co::ObjectICommand command(cmd);
     683           1 :     const uint128_t& frameID = command.read<uint128_t>();
     684           1 :     const uint32_t frameNumber = command.read<uint32_t>();
     685             : 
     686           1 :     LBLOG(LOG_TASKS) << "TASK frame finish " << getName() << " " << command
     687           0 :                      << " frame " << frameNumber << " id " << frameID
     688           1 :                      << std::endl;
     689             : 
     690           1 :     _finishFrame(frameNumber);
     691           1 :     _frameFinish(frameID, frameNumber);
     692             : 
     693           1 :     const uint128_t version = commit();
     694           1 :     if (version != co::VERSION_NONE)
     695           0 :         send(command.getNode(), fabric::CMD_OBJECT_SYNC);
     696           2 :     return true;
     697             : }
     698             : 
     699           1 : bool Node::_cmdFrameDrawFinish(co::ICommand& cmd)
     700             : {
     701           2 :     co::ObjectICommand command(cmd);
     702           1 :     const uint128_t& frameID = command.read<uint128_t>();
     703           1 :     const uint32_t frameNumber = command.read<uint32_t>();
     704             : 
     705           1 :     LBLOG(LOG_TASKS) << "TASK draw finish " << getName() << " " << command
     706           0 :                      << " frame " << frameNumber << " id " << frameID
     707           1 :                      << std::endl;
     708             : 
     709           1 :     frameDrawFinish(frameID, frameNumber);
     710           2 :     return true;
     711             : }
     712             : 
     713           1 : bool Node::_cmdFrameTasksFinish(co::ICommand& cmd)
     714             : {
     715           2 :     co::ObjectICommand command(cmd);
     716           1 :     const uint128_t& frameID = command.read<uint128_t>();
     717           1 :     const uint32_t frameNumber = command.read<uint32_t>();
     718             : 
     719           1 :     LBLOG(LOG_TASKS) << "TASK tasks finish " << getName() << " " << command
     720           1 :                      << std::endl;
     721             : 
     722           1 :     frameTasksFinish(frameID, frameNumber);
     723           2 :     return true;
     724             : }
     725             : 
     726           0 : bool Node::_cmdFrameDataTransmit(co::ICommand& cmd)
     727             : {
     728           0 :     co::ObjectICommand command(cmd);
     729             : 
     730             :     const co::ObjectVersion& frameDataVersion =
     731           0 :         command.read<co::ObjectVersion>();
     732           0 :     const PixelViewport& pvp = command.read<PixelViewport>();
     733           0 :     const Zoom& zoom = command.read<Zoom>();
     734           0 :     const RenderContext& context = command.read<RenderContext>();
     735           0 :     const Frame::Buffer buffers = command.read<Frame::Buffer>();
     736           0 :     const uint32_t frameNumber = command.read<uint32_t>();
     737           0 :     const bool useAlpha = command.read<bool>();
     738             :     const uint8_t* data = reinterpret_cast<const uint8_t*>(
     739           0 :         command.getRemainingBuffer(command.getRemainingBufferSize()));
     740             : 
     741           0 :     LBLOG(LOG_ASSEMBLY) << "received image data for " << frameDataVersion
     742           0 :                         << ", buffers " << buffers << " pvp " << pvp
     743           0 :                         << std::endl;
     744             : 
     745           0 :     LBASSERT(pvp.isValid());
     746             : 
     747           0 :     FrameDataPtr frameData = getFrameData(frameDataVersion);
     748           0 :     LBASSERT(!frameData->isReady());
     749             : 
     750           0 :     NodeStatistics event(Statistic::NODE_FRAME_DECOMPRESS, this, frameNumber);
     751             : 
     752             :     // Note on the const_cast: since the PixelData structure stores non-const
     753             :     // pointers, we have to go non-const at some point, even though we do not
     754             :     // modify the data.
     755           0 :     LBCHECK(frameData->addImage(frameDataVersion, pvp, zoom, context, buffers,
     756             :                                 useAlpha, const_cast<uint8_t*>(data)));
     757           0 :     return true;
     758             : }
     759             : 
     760           0 : bool Node::_cmdFrameDataReady(co::ICommand& cmd)
     761             : {
     762           0 :     co::ObjectICommand command(cmd);
     763             : 
     764             :     const co::ObjectVersion& frameDataVersion =
     765           0 :         command.read<co::ObjectVersion>();
     766           0 :     fabric::FrameData data;
     767           0 :     data.deserialize(command);
     768             : 
     769           0 :     LBLOG(LOG_ASSEMBLY) << "received ready for " << frameDataVersion
     770           0 :                         << std::endl;
     771           0 :     FrameDataPtr frameData = getFrameData(frameDataVersion);
     772           0 :     LBASSERT(frameData);
     773           0 :     LBASSERT(!frameData->isReady());
     774           0 :     frameData->setReady(frameDataVersion, data);
     775           0 :     LBASSERT(frameData->isReady());
     776           0 :     return true;
     777             : }
     778             : 
     779           0 : bool Node::_cmdSetAffinity(co::ICommand& cmd)
     780             : {
     781           0 :     co::ObjectICommand command(cmd);
     782             : 
     783           0 :     lunchbox::Thread::setAffinity(command.read<int32_t>());
     784           0 :     return true;
     785             : }
     786             : }
     787             : 
     788             : #include <eq/fabric/node.ipp>
     789             : template class eq::fabric::Node<eq::Config, eq::Node, eq::Pipe,
     790             :                                 eq::NodeVisitor>;
     791             : 
     792             : /** @cond IGNORE */
     793             : template EQFABRIC_API std::ostream& eq::fabric::operator<<(std::ostream&,
     794          30 :                                                            const eq::Super&);
     795             : /** @endcond */

Generated by: LCOV version 1.11