LCOV - code coverage report
Current view: top level - eq - pipe.cpp (source / functions) Hit Total Coverage
Test: Equalizer Lines: 468 587 79.7 %
Date: 2017-12-16 05:07:20 Functions: 79 91 86.8 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2005-2017, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       4             :  *                          Cedric Stalder <cedric.stalder@gmail.com>
       5             :  *
       6             :  * This library is free software; you can redistribute it and/or modify it under
       7             :  * the terms of the GNU Lesser General Public License version 2.1 as published
       8             :  * by the Free Software Foundation.
       9             :  *
      10             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      11             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      12             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      13             :  * details.
      14             :  *
      15             :  * You should have received a copy of the GNU Lesser General Public License
      16             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      17             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      18             :  */
      19             : 
      20             : #include "pipe.h"
      21             : 
      22             : // must be included before any header defining Bool
      23             : #ifdef EQ_QT_USED
      24             : #include "qt/window.h"
      25             : #include <QThread>
      26             : #endif
      27             : 
      28             : #include "channel.h"
      29             : #include "client.h"
      30             : #include "config.h"
      31             : #include "exception.h"
      32             : #include "frame.h"
      33             : #include "frameData.h"
      34             : #include "global.h"
      35             : #include "log.h"
      36             : #include "node.h"
      37             : #include "nodeFactory.h"
      38             : #include "pipeStatistics.h"
      39             : #include "server.h"
      40             : #include "view.h"
      41             : #include "window.h"
      42             : 
      43             : #include "messagePump.h"
      44             : #include "systemPipe.h"
      45             : 
      46             : #include <eq/fabric/commands.h>
      47             : #include <eq/fabric/elementVisitor.h>
      48             : #include <eq/fabric/leafVisitor.h>
      49             : #include <eq/fabric/task.h>
      50             : 
      51             : #include <boost/lexical_cast.hpp>
      52             : #include <co/global.h>
      53             : #include <co/objectICommand.h>
      54             : #include <co/queueSlave.h>
      55             : #include <co/worker.h>
      56             : #include <sstream>
      57             : 
      58             : #ifdef EQUALIZER_USE_HWLOC_GL
      59             : #include <hwloc.h>
      60             : #include <hwloc/gl.h>
      61             : #endif
      62             : 
      63             : #ifdef EQUALIZER_USE_QT5WIDGETS
      64             : #include <QGuiApplication>
      65             : #include <QRegularExpression>
      66             : #endif
      67             : 
      68             : namespace eq
      69             : {
      70             : /** @cond IGNORE */
      71             : typedef fabric::Pipe<Node, Pipe, Window, PipeVisitor> Super;
      72             : typedef co::CommandFunc<Pipe> PipeFunc;
      73             : /** @endcond */
      74             : 
      75             : namespace
      76             : {
      77             : enum State
      78             : {
      79             :     STATE_MAPPED,
      80             :     STATE_INITIALIZING,
      81             :     STATE_RUNNING,
      82             :     STATE_STOPPING, // must come after running
      83             :     STATE_STOPPED,  // must come after running
      84             :     STATE_FAILED
      85             : };
      86             : 
      87             : typedef std::unordered_map<uint128_t, Frame*> FrameHash;
      88             : typedef std::unordered_map<uint128_t, FrameDataPtr> FrameDataHash;
      89             : typedef std::unordered_map<uint128_t, View*> ViewHash;
      90             : typedef std::unordered_map<uint128_t, co::QueueSlave*> QueueHash;
      91             : typedef FrameHash::const_iterator FrameHashCIter;
      92             : typedef FrameDataHash::const_iterator FrameDataHashCIter;
      93             : typedef ViewHash::const_iterator ViewHashCIter;
      94             : typedef ViewHash::iterator ViewHashIter;
      95             : typedef QueueHash::const_iterator QueueHashCIter;
      96             : }
      97             : 
      98             : namespace detail
      99             : {
     100           4 : class RenderThread : public eq::Worker
     101             : {
     102             : public:
     103           2 :     explicit RenderThread(eq::Pipe* pipe)
     104           2 :         : eq::Worker(co::Global::getCommandQueueLimit())
     105           2 :         , _pipe(pipe)
     106             :     {
     107           2 :     }
     108             : 
     109             : protected:
     110           2 :     bool init() override
     111             :     {
     112           4 :         setName(std::string("Draw") +
     113           6 :                 std::to_string(_pipe->getPath().pipeIndex));
     114           2 :         return true;
     115             :     }
     116             : 
     117             :     void run() override;
     118          74 :     bool stopRunning() override { return !_pipe; }
     119             : private:
     120             :     eq::Pipe* _pipe;
     121             :     friend class eq::Pipe;
     122             : };
     123             : 
     124             : /** Asynchronous, per-pipe readback thread. */
     125           2 : class TransferThread : public co::Worker
     126             : {
     127             : public:
     128           2 :     explicit TransferThread(const uint32_t index)
     129           2 :         : co::Worker(co::Global::getCommandQueueLimit())
     130             :         , _index(index)
     131             :         , _qThread(nullptr)
     132           2 :         , _stop(false)
     133             :     {
     134           2 :     }
     135             : 
     136           1 :     bool init() override
     137             :     {
     138           1 :         if (!co::Worker::init())
     139           0 :             return false;
     140           1 :         setName(std::string("Tfer") + std::to_string(_index));
     141             : #ifdef EQ_QT_USED
     142           1 :         _qThread = QThread::currentThread();
     143             : #endif
     144           1 :         return true;
     145             :     }
     146             : 
     147           9 :     bool stopRunning() override { return _stop; }
     148           1 :     void postStop() { _stop = true; }
     149           1 :     QThread* getQThread() { return _qThread; }
     150             : private:
     151             :     uint32_t _index;
     152             :     QThread* _qThread;
     153             :     bool _stop; // thread will exit if this is true
     154             : };
     155             : 
     156             : class Pipe
     157             : {
     158             : public:
     159           2 :     explicit Pipe(const uint32_t index)
     160           2 :         : systemPipe(0)
     161             : #ifdef AGL
     162             :         , windowSystem("AGL")
     163             : #elif GLX
     164             :         , windowSystem("GLX")
     165             : #elif WGL
     166             :         , windowSystem("WGL")
     167             : #elif EQ_QT_USED
     168             :         , windowSystem("Qt")
     169             : #endif
     170             :         , state(STATE_STOPPED)
     171             :         , currentFrame(0)
     172             :         , frameTime(0)
     173             :         , thread(0)
     174           2 :         , transferThread(index)
     175             :     {
     176           2 :     }
     177             : 
     178           2 :     ~Pipe()
     179           2 :     {
     180           2 :         delete thread;
     181           2 :         thread = 0;
     182           2 :     }
     183             : 
     184             :     /** Window-system specific functions class */
     185             :     SystemPipe* systemPipe;
     186             : 
     187             :     /** The current window system. */
     188             :     WindowSystem windowSystem;
     189             : 
     190             :     /** The configInit/configExit state. */
     191             :     lunchbox::Monitor<State> state;
     192             : 
     193             :     /** The last started frame. */
     194             :     uint32_t currentFrame;
     195             : 
     196             :     /** The number of the last finished frame. */
     197             :     lunchbox::Monitor<uint32_t> finishedFrame;
     198             : 
     199             :     /** The number of the last locally unlocked frame. */
     200             :     lunchbox::Monitor<uint32_t> unlockedFrame;
     201             : 
     202             :     /** The running per-frame statistic clocks. */
     203             :     std::deque<int64_t> frameTimes;
     204             :     std::mutex frameTimeMutex;
     205             : 
     206             :     /** The base time for the currently active frame. */
     207             :     int64_t frameTime;
     208             : 
     209             :     /** All assembly frames used by the pipe during rendering. */
     210             :     FrameHash frames;
     211             : 
     212             :     /** All output frame datas used by the pipe during rendering. */
     213             :     FrameDataHash outputFrameDatas;
     214             : 
     215             :     /** All input frame datas used by the pipe during rendering. */
     216             :     FrameDataHash inputFrameDatas;
     217             : 
     218             :     /** All views used by the pipe's channels during rendering. */
     219             :     ViewHash views;
     220             : 
     221             :     /** All queues used by the pipe's channels during rendering. */
     222             :     QueueHash queues;
     223             : 
     224             :     /** The pipe thread. */
     225             :     RenderThread* thread;
     226             : 
     227             :     detail::TransferThread transferThread;
     228             : };
     229             : 
     230           2 : void RenderThread::run()
     231             : {
     232           2 :     LB_TS_THREAD(_pipe->_pipeThread);
     233           2 :     LBDEBUG << "Entered pipe thread" << std::endl;
     234             : 
     235           2 :     eq::Pipe* pipe = _pipe; // _pipe gets cleared on exit
     236           2 :     pipe->_impl->state.waitEQ(STATE_MAPPED);
     237           2 :     pipe->_impl->windowSystem = pipe->selectWindowSystem();
     238           2 :     pipe->_setupCommandQueue();
     239           2 :     pipe->_setupAffinity();
     240             : 
     241           2 :     Worker::run();
     242             : 
     243           2 :     pipe->_exitCommandQueue();
     244           2 : }
     245             : }
     246             : 
     247           2 : Pipe::Pipe(Node* parent)
     248             :     : Super(parent)
     249           2 :     , _impl(new detail::Pipe(getPath().pipeIndex))
     250             : {
     251           2 : }
     252             : 
     253           4 : Pipe::~Pipe()
     254             : {
     255           2 :     LBASSERT(getWindows().empty());
     256           2 :     delete _impl;
     257           2 : }
     258             : 
     259          61 : Config* Pipe::getConfig()
     260             : {
     261          61 :     Node* node = getNode();
     262          61 :     LBASSERT(node);
     263          61 :     return (node ? node->getConfig() : 0);
     264             : }
     265             : 
     266           0 : const Config* Pipe::getConfig() const
     267             : {
     268           0 :     const Node* node = getNode();
     269           0 :     LBASSERT(node);
     270           0 :     return (node ? node->getConfig() : 0);
     271             : }
     272             : 
     273          10 : ClientPtr Pipe::getClient()
     274             : {
     275          10 :     Node* node = getNode();
     276          10 :     LBASSERT(node);
     277          10 :     return (node ? node->getClient() : 0);
     278             : }
     279             : 
     280          11 : ServerPtr Pipe::getServer()
     281             : {
     282          11 :     Node* node = getNode();
     283          11 :     LBASSERT(node);
     284          11 :     return (node ? node->getServer() : 0);
     285             : }
     286             : 
     287           2 : void Pipe::attach(const uint128_t& id, const uint32_t instanceID)
     288             : {
     289           2 :     Super::attach(id, instanceID);
     290             : 
     291           2 :     co::CommandQueue* queue = getPipeThreadQueue();
     292           2 :     co::CommandQueue* transferQ = getTransferThreadQueue();
     293             : 
     294           2 :     registerCommand(fabric::CMD_PIPE_CONFIG_INIT,
     295           4 :                     PipeFunc(this, &Pipe::_cmdConfigInit), queue);
     296           2 :     registerCommand(fabric::CMD_PIPE_CONFIG_EXIT,
     297           4 :                     PipeFunc(this, &Pipe::_cmdConfigExit), queue);
     298           2 :     registerCommand(fabric::CMD_PIPE_CREATE_WINDOW,
     299           4 :                     PipeFunc(this, &Pipe::_cmdCreateWindow), queue);
     300           2 :     registerCommand(fabric::CMD_PIPE_DESTROY_WINDOW,
     301           4 :                     PipeFunc(this, &Pipe::_cmdDestroyWindow), queue);
     302           2 :     registerCommand(fabric::CMD_PIPE_FRAME_START,
     303           4 :                     PipeFunc(this, &Pipe::_cmdFrameStart), queue);
     304           2 :     registerCommand(fabric::CMD_PIPE_FRAME_FINISH,
     305           4 :                     PipeFunc(this, &Pipe::_cmdFrameFinish), queue);
     306           2 :     registerCommand(fabric::CMD_PIPE_FRAME_DRAW_FINISH,
     307           4 :                     PipeFunc(this, &Pipe::_cmdFrameDrawFinish), queue);
     308           2 :     registerCommand(fabric::CMD_PIPE_FRAME_START_CLOCK,
     309           4 :                     PipeFunc(this, &Pipe::_cmdFrameStartClock), 0);
     310           2 :     registerCommand(fabric::CMD_PIPE_EXIT_THREAD,
     311           4 :                     PipeFunc(this, &Pipe::_cmdExitThread), queue);
     312           2 :     registerCommand(fabric::CMD_PIPE_DETACH_VIEW,
     313           4 :                     PipeFunc(this, &Pipe::_cmdDetachView), queue);
     314           2 :     registerCommand(fabric::CMD_PIPE_EXIT_TRANSFER_THREAD,
     315           4 :                     PipeFunc(this, &Pipe::_cmdExitTransferThread), transferQ);
     316           2 : }
     317             : 
     318          25 : void Pipe::setDirty(const uint64_t bits)
     319             : {
     320             :     // jump over fabric setDirty to avoid dirty'ing node pipes list
     321             :     // pipes are individually synced in frame finish for thread-safety
     322          25 :     Object::setDirty(bits);
     323          25 : }
     324             : 
     325           0 : bool Pipe::isWindowSystemAvailable(const std::string& name) const
     326             : {
     327           0 :     bool available = false;
     328           0 :     if (name != "Qt")
     329             :     {
     330             : #ifdef AGL
     331             :         available = name == "AGL";
     332             : #elif GLX
     333           0 :         available = name == "GLX";
     334             : #elif WGL
     335             :         available = name == "WGL";
     336             : #endif
     337             :     }
     338             : 
     339             : #ifdef EQ_QT_USED
     340           0 :     if (name != "Qt")
     341           0 :         return available;
     342             : 
     343             : // Qt can only use ports that refer to QScreens available to the
     344             : // QApplication. In Windows and Mac all physical displays are
     345             : // queriable using platform dependent functions (no idea about virtual
     346             : // displays like Xming or XQuartz), so we will assume that if Equalizer
     347             : // was built with Qt support, then all devices can be used by Qt.
     348             : // (How to choose the right screen for a given device it's a different
     349             : // story)
     350             : #ifdef AGL
     351             :     available = true;
     352             : #elif WGL
     353             :     available = true;
     354             : #else
     355             : // For X it's different. Qt can only use screens that are part of the
     356             : // display server referred by the DISPLAY environmental variable (in
     357             : // particular its value at the moment of the creation of the QApplication)
     358             : #ifdef __APPLE__
     359             :     // In MAC this is simpler, because there can only be one display server.
     360             :     return true;
     361             : #else
     362             :     // There's no way to infer the X server number from the Qt API except
     363             :     // QScreen names, but only under certain conditions. In regular desktop
     364             :     // usage QScreen::name gives a name related to what xrandr prints, so it's
     365             :     // not usable. The names seems to follow the X naming convention
     366             :     // (e.g. host:0.0) when XRANDR is not available (e.g. Xnest) or doesn't
     367             :     // provide information about where the displays are connected (e.g.
     368             :     // headless or Xvnc). These are corner cases so we cannot rely on QScreen.
     369             :     // Therefore, we will infer which port is Qt using directly from DISPLAY.
     370             : 
     371             :     // Is the port is undefined that means that the default server must
     372             :     // be used. This should match the Qt display server.
     373           0 :     if (getPort() == LB_UNDEFINED_UINT32)
     374           0 :         return true;
     375             : 
     376             :     QGuiApplication* app =
     377           0 :         dynamic_cast<QGuiApplication*>(QCoreApplication::instance());
     378           0 :     if (!app || !app->primaryScreen())
     379           0 :         return false; // Qt won't be able to access anything anyway
     380             : 
     381           0 :     QRegularExpression regex("^[a-z]*\\:([0-9]+)(\\.[0-9]+)?$");
     382             :     QRegularExpressionMatch match =
     383           0 :         regex.match(getenv("DISPLAY") ? getenv("DISPLAY") : "");
     384           0 :     available = match.captured(1) == QString::number(getPort(), 10);
     385             : #endif
     386             : #endif
     387             : #endif
     388             : 
     389           0 :     return available;
     390             : }
     391             : 
     392           2 : WindowSystem Pipe::selectWindowSystem() const
     393             : {
     394             : #ifdef AGL
     395             :     return WindowSystem("AGL");
     396             : #elif GLX
     397           2 :     return WindowSystem("GLX");
     398             : #elif WGL
     399             :     return WindowSystem("WGL");
     400             : #elif EQ_QT_USED
     401             :     if (!isWindowSystemAvailable("Qt"))
     402             :     {
     403             :         // Throwing because there's no reasonable alternative.
     404             :         std::stringstream msg;
     405             :         msg << "Cannot choose windowing system for pipe at port " << getPort()
     406             :             << ". Qt was set as the default, but it cannot be used. In X"
     407             :                " based systems this means that the value of DISPLAY taken by"
     408             :                " Qt refers to a different display server."
     409             :             << std::endl;
     410             :         LBTHROW(std::runtime_error(msg.str()));
     411             :     }
     412             :     return WindowSystem("Qt");
     413             : #endif
     414             : }
     415             : 
     416           2 : void Pipe::_setupCommandQueue()
     417             : {
     418           6 :     LBDEBUG << "Set up pipe message pump for " << _impl->windowSystem
     419           6 :             << std::endl;
     420             : 
     421           2 :     Config* config = getConfig();
     422           2 :     config->setupMessagePump(this);
     423             : 
     424           2 :     if (!_impl->thread) // Non-threaded pipes have no pipe thread message pump
     425           0 :         return;
     426             : 
     427           2 :     CommandQueue* queue = _impl->thread->getWorkerQueue();
     428           2 :     LBASSERT(queue);
     429           2 :     LBASSERT(!queue->getMessagePump());
     430             : 
     431           2 :     Global::enterCarbon();
     432           2 :     MessagePump* pump = createMessagePump();
     433           2 :     if (pump)
     434           2 :         pump->dispatchAll(); // initializes _impl->receiverQueue
     435             : 
     436           2 :     queue->setMessagePump(pump);
     437           2 :     Global::leaveCarbon();
     438             : }
     439             : 
     440           2 : int32_t Pipe::_getAutoAffinity() const
     441             : {
     442             : #ifdef EQUALIZER_USE_HWLOC_GL
     443             :     uint32_t port = getPort();
     444             :     uint32_t device = getDevice();
     445             : 
     446             :     if (port == LB_UNDEFINED_UINT32 && device == LB_UNDEFINED_UINT32)
     447             :         return lunchbox::Thread::NONE;
     448             : 
     449             :     if (port == LB_UNDEFINED_UINT32)
     450             :         port = 0;
     451             :     if (device == LB_UNDEFINED_UINT32)
     452             :         device = 0;
     453             : 
     454             :     hwloc_topology_t topology;
     455             :     if (hwloc_topology_init(&topology) < 0)
     456             :     {
     457             :         LBINFO << "Automatic pipe thread placement failed: "
     458             :                << "hwloc_topology_init() failed" << std::endl;
     459             :         return lunchbox::Thread::NONE;
     460             :     }
     461             : 
     462             :     // Load I/O devices, bridges and their relevant info
     463             :     const unsigned long loading_flags =
     464             :         HWLOC_TOPOLOGY_FLAG_IO_BRIDGES | HWLOC_TOPOLOGY_FLAG_IO_DEVICES;
     465             :     if (hwloc_topology_set_flags(topology, loading_flags) < 0)
     466             :     {
     467             :         LBINFO << "Automatic pipe thread placement failed: "
     468             :                << "hwloc_topology_set_flags() failed" << std::endl;
     469             :         hwloc_topology_destroy(topology);
     470             :         return lunchbox::Thread::NONE;
     471             :     }
     472             : 
     473             :     if (hwloc_topology_load(topology) < 0)
     474             :     {
     475             :         LBINFO << "Automatic pipe thread placement failed: "
     476             :                << "hwloc_topology_load() failed" << std::endl;
     477             :         hwloc_topology_destroy(topology);
     478             :         return lunchbox::Thread::NONE;
     479             :     }
     480             : 
     481             :     const hwloc_obj_t osdev =
     482             :         hwloc_gl_get_display_osdev_by_port_device(topology, int(port),
     483             :                                                   int(device));
     484             :     if (!osdev)
     485             :     {
     486             :         LBINFO << "Automatic pipe thread placement failed: GPU not found"
     487             :                << std::endl;
     488             :         hwloc_topology_destroy(topology);
     489             :         return lunchbox::Thread::NONE;
     490             :     }
     491             : 
     492             :     const hwloc_obj_t pcidev = osdev->parent;
     493             :     const hwloc_obj_t parent = hwloc_get_non_io_ancestor_obj(topology, pcidev);
     494             :     const int numCpus =
     495             :         hwloc_get_nbobjs_inside_cpuset_by_type(topology, parent->cpuset,
     496             :                                                HWLOC_OBJ_SOCKET);
     497             :     if (numCpus != 1)
     498             :     {
     499             :         LBINFO << "Automatic pipe thread placement failed: GPU attached to "
     500             :                << numCpus << " processors?" << std::endl;
     501             :         hwloc_topology_destroy(topology);
     502             :         return lunchbox::Thread::NONE;
     503             :     }
     504             : 
     505             :     const hwloc_obj_t cpuObj =
     506             :         hwloc_get_obj_inside_cpuset_by_type(topology, parent->cpuset,
     507             :                                             HWLOC_OBJ_SOCKET, 0);
     508             :     if (cpuObj == 0)
     509             :     {
     510             :         LBINFO << "Automatic pipe thread placement failed: "
     511             :                << "hwloc_get_obj_inside_cpuset_by_type() failed" << std::endl;
     512             :         hwloc_topology_destroy(topology);
     513             :         return lunchbox::Thread::NONE;
     514             :     }
     515             : 
     516             :     const int cpuIndex = cpuObj->logical_index;
     517             :     hwloc_topology_destroy(topology);
     518             :     return cpuIndex + lunchbox::Thread::SOCKET;
     519             : #else
     520           6 :     LBDEBUG << "Automatic thread placement not supported, no hwloc GL support"
     521           6 :             << std::endl;
     522             : #endif
     523           2 :     return lunchbox::Thread::NONE;
     524             : }
     525             : 
     526           2 : void Pipe::_setupAffinity()
     527             : {
     528           2 :     const int32_t affinity = getIAttribute(IATTR_HINT_AFFINITY);
     529           2 :     switch (affinity)
     530             :     {
     531             :     case AUTO:
     532           2 :         lunchbox::Thread::setAffinity(_getAutoAffinity());
     533           2 :         break;
     534             : 
     535             :     case OFF:
     536             :     default:
     537           0 :         lunchbox::Thread::setAffinity(affinity);
     538           0 :         break;
     539             :     }
     540           2 : }
     541             : 
     542           2 : void Pipe::_exitCommandQueue()
     543             : {
     544             :     // Non-threaded pipes have no pipe thread message pump
     545           2 :     if (!_impl->thread)
     546           0 :         return;
     547             : 
     548           2 :     CommandQueue* queue = _impl->thread->getWorkerQueue();
     549           2 :     LBASSERT(queue);
     550             : 
     551           2 :     MessagePump* pump = queue->getMessagePump();
     552           2 :     queue->setMessagePump(0);
     553           2 :     delete pump;
     554             : }
     555             : 
     556           2 : MessagePump* Pipe::createMessagePump()
     557             : {
     558           2 :     return _impl->windowSystem.createMessagePump();
     559             : }
     560             : 
     561           2 : MessagePump* Pipe::getMessagePump()
     562             : {
     563           2 :     LB_TS_THREAD(_pipeThread);
     564           2 :     if (!_impl->thread)
     565           0 :         return 0;
     566             : 
     567           2 :     CommandQueue* queue = _impl->thread->getWorkerQueue();
     568           2 :     return queue->getMessagePump();
     569             : }
     570             : 
     571           6 : co::CommandQueue* Pipe::getPipeThreadQueue()
     572             : {
     573           6 :     if (_impl->thread)
     574           6 :         return _impl->thread->getWorkerQueue();
     575             : 
     576           0 :     return getNode()->getMainThreadQueue();
     577             : }
     578             : 
     579           4 : co::CommandQueue* Pipe::getTransferThreadQueue()
     580             : {
     581           4 :     return _impl->transferThread.getWorkerQueue();
     582             : }
     583             : 
     584           2 : co::CommandQueue* Pipe::getMainThreadQueue()
     585             : {
     586           2 :     return getServer()->getMainThreadQueue();
     587             : }
     588             : 
     589           2 : co::CommandQueue* Pipe::getCommandThreadQueue()
     590             : {
     591           2 :     return getServer()->getCommandThreadQueue();
     592             : }
     593             : 
     594           2 : Frame* Pipe::getFrame(const co::ObjectVersion& frameVersion, const Eye eye,
     595             :                       const bool isOutput)
     596             : {
     597           2 :     LB_TS_THREAD(_pipeThread);
     598           2 :     Frame* frame = _impl->frames[frameVersion.identifier];
     599             : 
     600           2 :     if (!frame)
     601             :     {
     602           4 :         ClientPtr client = getClient();
     603           2 :         frame = new Frame();
     604             : 
     605           2 :         LBCHECK(client->mapObject(frame, frameVersion));
     606           2 :         _impl->frames[frameVersion.identifier] = frame;
     607             :     }
     608             :     else
     609           0 :         frame->sync(frameVersion.version);
     610             : 
     611           2 :     const co::ObjectVersion& dataVersion = frame->getDataVersion(eye);
     612           2 :     LBLOG(LOG_ASSEMBLY) << "Use " << dataVersion << std::endl;
     613             : 
     614           4 :     FrameDataPtr frameData = getNode()->getFrameData(dataVersion);
     615           2 :     LBASSERT(frameData);
     616             : 
     617           2 :     if (isOutput)
     618             :     {
     619           1 :         if (!frameData->isAttached())
     620             :         {
     621           2 :             ClientPtr client = getClient();
     622           1 :             LBCHECK(client->mapObject(frameData.get(), dataVersion));
     623             :         }
     624           0 :         else if (frameData->getVersion() < dataVersion.version)
     625           0 :             frameData->sync(dataVersion.version);
     626             : 
     627           1 :         _impl->outputFrameDatas[dataVersion.identifier] = frameData;
     628             :     }
     629             :     else
     630           1 :         _impl->inputFrameDatas[dataVersion.identifier] = frameData;
     631             : 
     632           2 :     frame->setFrameData(frameData);
     633           4 :     return frame;
     634             : }
     635             : 
     636           2 : void Pipe::flushFrames(util::ObjectManager& om)
     637             : {
     638           2 :     LB_TS_THREAD(_pipeThread);
     639           4 :     ClientPtr client = getClient();
     640           4 :     for (FrameHashCIter i = _impl->frames.begin(); i != _impl->frames.end();
     641             :          ++i)
     642             :     {
     643           2 :         Frame* frame = i->second;
     644           2 :         frame->setFrameData(0); // datas are flushed below
     645           2 :         client->unmapObject(frame);
     646           2 :         delete frame;
     647             :     }
     648           2 :     _impl->frames.clear();
     649             : 
     650           9 :     for (FrameDataHashCIter i = _impl->inputFrameDatas.begin();
     651           6 :          i != _impl->inputFrameDatas.end(); ++i)
     652             :     {
     653           2 :         FrameDataPtr data = i->second;
     654           1 :         data->deleteGLObjects(om);
     655             :     }
     656           2 :     _impl->inputFrameDatas.clear();
     657             : 
     658           9 :     for (FrameDataHashCIter i = _impl->outputFrameDatas.begin();
     659           6 :          i != _impl->outputFrameDatas.end(); ++i)
     660             :     {
     661           2 :         FrameDataPtr data = i->second;
     662           1 :         data->resetPlugins();
     663           1 :         data->deleteGLObjects(om);
     664           1 :         client->unmapObject(data.get());
     665           1 :         getNode()->releaseFrameData(data);
     666             :     }
     667           2 :     _impl->outputFrameDatas.clear();
     668           2 : }
     669             : 
     670           0 : co::QueueSlave* Pipe::getQueue(const uint128_t& queueID)
     671             : {
     672           0 :     LB_TS_THREAD(_pipeThread);
     673           0 :     if (queueID == 0)
     674           0 :         return 0;
     675             : 
     676           0 :     co::QueueSlave* queue = _impl->queues[queueID];
     677           0 :     if (!queue)
     678             :     {
     679           0 :         queue = new co::QueueSlave;
     680           0 :         ClientPtr client = getClient();
     681           0 :         LBCHECK(client->mapObject(queue, queueID));
     682             : 
     683           0 :         _impl->queues[queueID] = queue;
     684             :     }
     685             : 
     686           0 :     return queue;
     687             : }
     688             : 
     689           2 : void Pipe::_flushQueues()
     690             : {
     691           2 :     LB_TS_THREAD(_pipeThread);
     692           4 :     ClientPtr client = getClient();
     693             : 
     694           2 :     for (QueueHashCIter i = _impl->queues.begin(); i != _impl->queues.end();
     695             :          ++i)
     696             :     {
     697           0 :         co::QueueSlave* queue = i->second;
     698           0 :         client->unmapObject(queue);
     699           0 :         delete queue;
     700             :     }
     701           2 :     _impl->queues.clear();
     702           2 : }
     703             : 
     704           1 : const View* Pipe::getView(const co::ObjectVersion& viewVersion) const
     705             : {
     706             :     // Yie-ha: we want to have a const-interface to get a view on the render
     707             :     //         clients, but view mapping is by definition non-const.
     708           1 :     return const_cast<Pipe*>(this)->getView(viewVersion);
     709             : }
     710             : 
     711           6 : View* Pipe::getView(const co::ObjectVersion& viewVersion)
     712             : {
     713           6 :     LB_TS_THREAD(_pipeThread);
     714           6 :     if (viewVersion.identifier == 0)
     715           1 :         return 0;
     716             : 
     717           5 :     View* view = _impl->views[viewVersion.identifier];
     718           5 :     if (!view)
     719             :     {
     720           1 :         NodeFactory* nodeFactory = Global::getNodeFactory();
     721           1 :         view = nodeFactory->createView(0);
     722           1 :         LBASSERT(view);
     723           1 :         view->_pipe = this;
     724           2 :         ClientPtr client = getClient();
     725           1 :         LBCHECK(client->mapObject(view, viewVersion));
     726             : 
     727           1 :         _impl->views[viewVersion.identifier] = view;
     728             :     }
     729             : 
     730           5 :     view->sync(viewVersion.version);
     731           5 :     return view;
     732             : }
     733             : 
     734           2 : void Pipe::_releaseViews()
     735             : {
     736           2 :     LB_TS_THREAD(_pipeThread);
     737           4 :     for (bool changed = true; changed;)
     738             :     {
     739           2 :         changed = false;
     740           3 :         for (ViewHashIter i = _impl->views.begin(); i != _impl->views.end();
     741             :              ++i)
     742             :         {
     743           1 :             View* view = i->second;
     744           1 :             view->commit();
     745           1 :             if (view->getVersion() + 20 > view->getHeadVersion())
     746           1 :                 continue;
     747             : 
     748             :             // release unused view to avoid memory leaks due to deltas piling up
     749           0 :             view->_pipe = 0;
     750             : 
     751           0 :             ClientPtr client = getClient();
     752           0 :             client->unmapObject(view);
     753           0 :             _impl->views.erase(i);
     754             : 
     755           0 :             NodeFactory* nodeFactory = Global::getNodeFactory();
     756           0 :             nodeFactory->releaseView(view);
     757             : 
     758           0 :             changed = true;
     759           0 :             break;
     760             :         }
     761             :     }
     762           2 : }
     763             : 
     764           2 : void Pipe::_flushViews()
     765             : {
     766           2 :     LB_TS_THREAD(_pipeThread);
     767           2 :     NodeFactory* nodeFactory = Global::getNodeFactory();
     768           4 :     ClientPtr client = getClient();
     769             : 
     770           3 :     for (ViewHashCIter i = _impl->views.begin(); i != _impl->views.end(); ++i)
     771             :     {
     772           1 :         View* view = i->second;
     773             : 
     774           1 :         client->unmapObject(view);
     775           1 :         view->_pipe = 0;
     776           1 :         nodeFactory->releaseView(view);
     777             :     }
     778           2 :     _impl->views.clear();
     779           2 : }
     780             : 
     781           2 : void Pipe::startThread()
     782             : {
     783           2 :     _impl->thread = new detail::RenderThread(this);
     784           2 :     _impl->thread->start();
     785           2 : }
     786             : 
     787           2 : void Pipe::exitThread()
     788             : {
     789           2 :     _stopTransferThread();
     790             : 
     791           2 :     if (!_impl->thread)
     792           0 :         return;
     793             : 
     794           2 :     send(getLocalNode(), fabric::CMD_PIPE_EXIT_THREAD);
     795             : 
     796           2 :     _impl->thread->join();
     797           2 :     delete _impl->thread;
     798           2 :     _impl->thread = 0;
     799             : }
     800             : 
     801           0 : void Pipe::cancelThread()
     802             : {
     803           0 :     _stopTransferThread();
     804             : 
     805           0 :     if (!_impl->thread)
     806           0 :         return;
     807             : 
     808             :     // local command dispatching
     809           0 :     co::ObjectOCommand(this, getLocalNode(), fabric::CMD_PIPE_EXIT_THREAD,
     810           0 :                        co::COMMANDTYPE_OBJECT, getID(), CO_INSTANCE_ALL);
     811             : }
     812             : 
     813           2 : void Pipe::waitExited() const
     814             : {
     815           2 :     _impl->state.waitGE(STATE_STOPPED);
     816           2 : }
     817             : 
     818           5 : bool Pipe::isRunning() const
     819             : {
     820           5 :     return (_impl->state == STATE_RUNNING);
     821             : }
     822             : 
     823           2 : bool Pipe::isStopped() const
     824             : {
     825           2 :     return (_impl->state == STATE_STOPPED);
     826             : }
     827             : 
     828           2 : void Pipe::notifyMapped()
     829             : {
     830           2 :     LBASSERT(_impl->state == STATE_STOPPED);
     831           2 :     _impl->state = STATE_MAPPED;
     832           2 : }
     833             : 
     834             : namespace
     835             : {
     836           2 : class WaitFinishedVisitor : public PipeVisitor
     837             : {
     838             : public:
     839           2 :     WaitFinishedVisitor(const uint32_t frame, MessagePump* pump)
     840           2 :         : _frame(frame)
     841           2 :         , _pump(pump)
     842             :     {
     843           2 :     }
     844             : 
     845           2 :     virtual VisitorResult visit(Channel* channel)
     846             :     {
     847           2 :         while (!channel->waitFrameFinished(_frame, 100))
     848             :         {
     849             :             // process potential pending Qt slots
     850           0 :             if (_pump)
     851           0 :                 _pump->dispatchAll();
     852             :         }
     853             : 
     854           2 :         return TRAVERSE_CONTINUE;
     855             :     }
     856             : 
     857             : private:
     858             :     const uint32_t _frame;
     859             :     MessagePump* _pump;
     860             : };
     861             : }
     862             : 
     863           2 : void Pipe::waitFrameFinished(const uint32_t frameNumber)
     864             : {
     865           2 :     MessagePump* pump = getConfig()->getMessagePump();
     866           4 :     while (!_impl->finishedFrame.timedWaitGE(frameNumber, 100))
     867             :     {
     868             :         // process potential pending Qt slots
     869           1 :         if (pump)
     870           0 :             pump->dispatchAll();
     871             :     }
     872             : 
     873           4 :     WaitFinishedVisitor waiter(frameNumber, pump);
     874           2 :     accept(waiter);
     875           2 : }
     876             : 
     877           4 : void Pipe::waitFrameLocal(const uint32_t frameNumber) const
     878             : {
     879           4 :     _impl->unlockedFrame.waitGE(frameNumber);
     880           4 : }
     881             : 
     882          15 : uint32_t Pipe::getCurrentFrame() const
     883             : {
     884          15 :     LB_TS_THREAD(_pipeThread);
     885          15 :     return _impl->currentFrame;
     886             : }
     887             : 
     888           0 : uint32_t Pipe::getFinishedFrame() const
     889             : {
     890           0 :     return _impl->finishedFrame.get();
     891             : }
     892             : 
     893           5 : WindowSystem Pipe::getWindowSystem() const
     894             : {
     895           5 :     return _impl->windowSystem;
     896             : }
     897             : 
     898           0 : EventOCommand Pipe::sendError(const uint32_t error)
     899             : {
     900           0 :     return getConfig()->sendError(EVENT_PIPE_ERROR, Error(error, getID()));
     901             : }
     902             : 
     903           0 : bool Pipe::processEvent(Statistic& event)
     904             : {
     905           0 :     Config* config = getConfig();
     906           0 :     updateEvent(event, config->getTime());
     907           0 :     config->sendEvent(EVENT_STATISTIC) << event;
     908           0 :     return true;
     909             : }
     910             : 
     911             : //---------------------------------------------------------------------------
     912             : // pipe-thread methods
     913             : //---------------------------------------------------------------------------
     914           2 : bool Pipe::configInit(const uint128_t& initID)
     915             : {
     916           2 :     LB_TS_THREAD(_pipeThread);
     917           2 :     LBASSERT(!_impl->systemPipe);
     918           2 :     return configInitSystemPipe(initID);
     919             : }
     920             : 
     921           2 : bool Pipe::configInitSystemPipe(const uint128_t&)
     922             : {
     923           2 :     SystemPipe* systemPipe = _impl->windowSystem.createPipe(this);
     924           2 :     LBASSERT(systemPipe);
     925             : 
     926           2 :     if (!systemPipe->configInit())
     927             :     {
     928           0 :         LBERROR << "System pipe context initialization failed" << std::endl;
     929           0 :         delete systemPipe;
     930           0 :         return false;
     931             :     }
     932             : 
     933           2 :     setSystemPipe(systemPipe);
     934           2 :     return true;
     935             : }
     936             : 
     937           2 : bool Pipe::configExit()
     938             : {
     939           2 :     LB_TS_THREAD(_pipeThread);
     940             : 
     941           2 :     if (_impl->systemPipe)
     942             :     {
     943           2 :         _impl->systemPipe->configExit();
     944           2 :         delete _impl->systemPipe;
     945           2 :         _impl->systemPipe = 0;
     946             :     }
     947           2 :     return true;
     948             : }
     949             : 
     950           2 : void Pipe::frameStart(const uint128_t&, const uint32_t frameNumber)
     951             : {
     952           2 :     LB_TS_THREAD(_pipeThread);
     953             : 
     954           2 :     const Node* node = getNode();
     955           2 :     switch (node->getIAttribute(Node::IATTR_THREAD_MODEL))
     956             :     {
     957             :     case ASYNC: // No sync, release immediately
     958           0 :         releaseFrameLocal(frameNumber);
     959           0 :         break;
     960             : 
     961             :     case DRAW_SYNC:  // Sync, release in frameDrawFinish
     962             :     case LOCAL_SYNC: // Sync, release in frameFinish
     963           2 :         node->waitFrameStarted(frameNumber);
     964           2 :         break;
     965             : 
     966             :     default:
     967           0 :         LBUNIMPLEMENTED;
     968             :     }
     969             : 
     970           2 :     startFrame(frameNumber);
     971           2 : }
     972             : 
     973           2 : void Pipe::frameDrawFinish(const uint128_t&, const uint32_t frameNumber)
     974             : {
     975           2 :     const Node* node = getNode();
     976           2 :     switch (node->getIAttribute(Node::IATTR_THREAD_MODEL))
     977             :     {
     978             :     case ASYNC: // released in frameStart
     979           0 :         break;
     980             : 
     981             :     case DRAW_SYNC: // release
     982           2 :         releaseFrameLocal(frameNumber);
     983           2 :         break;
     984             : 
     985             :     case LOCAL_SYNC: // release in frameFinish
     986           0 :         break;
     987             : 
     988             :     default:
     989           0 :         LBUNIMPLEMENTED;
     990             :     }
     991           2 : }
     992             : 
     993           2 : void Pipe::frameFinish(const uint128_t&, const uint32_t frameNumber)
     994             : {
     995           2 :     const Node* node = getNode();
     996           2 :     switch (node->getIAttribute(Node::IATTR_THREAD_MODEL))
     997             :     {
     998             :     case ASYNC: // released in frameStart
     999           0 :         break;
    1000             : 
    1001             :     case DRAW_SYNC: // released in frameDrawFinish
    1002           2 :         break;
    1003             : 
    1004             :     case LOCAL_SYNC: // release
    1005           0 :         releaseFrameLocal(frameNumber);
    1006           0 :         break;
    1007             : 
    1008             :     default:
    1009           0 :         LBUNIMPLEMENTED;
    1010             :     }
    1011             : 
    1012             :     // Global release
    1013           2 :     releaseFrame(frameNumber);
    1014           2 : }
    1015             : 
    1016           2 : void Pipe::startFrame(const uint32_t frameNumber)
    1017             : {
    1018           2 :     LB_TS_THREAD(_pipeThread);
    1019           2 :     _impl->currentFrame = frameNumber;
    1020           2 :     LBLOG(LOG_TASKS) << "---- Started Frame ---- " << frameNumber << std::endl;
    1021           2 : }
    1022             : 
    1023           2 : void Pipe::releaseFrame(const uint32_t frameNumber)
    1024             : {
    1025           2 :     LB_TS_THREAD(_pipeThread);
    1026           2 :     _impl->finishedFrame = frameNumber;
    1027           2 :     LBLOG(LOG_TASKS) << "---- Finished Frame --- " << frameNumber << std::endl;
    1028           2 : }
    1029             : 
    1030           2 : void Pipe::releaseFrameLocal(const uint32_t frameNumber)
    1031             : {
    1032           2 :     LB_TS_THREAD(_pipeThread);
    1033           2 :     LBASSERTINFO(_impl->unlockedFrame + 1 == frameNumber,
    1034             :                  _impl->unlockedFrame << ", " << frameNumber);
    1035             : 
    1036           2 :     _impl->unlockedFrame = frameNumber;
    1037           2 :     LBLOG(LOG_TASKS) << "---- Unlocked Frame --- " << _impl->unlockedFrame.get()
    1038           2 :                      << std::endl;
    1039           2 : }
    1040             : 
    1041           1 : bool Pipe::startTransferThread()
    1042             : {
    1043           1 :     if (_impl->transferThread.isRunning())
    1044           0 :         return true;
    1045             : 
    1046           1 :     return _impl->transferThread.start();
    1047             : }
    1048             : 
    1049           1 : QThread* Pipe::getTransferQThread()
    1050             : {
    1051           1 :     return _impl->transferThread.getQThread();
    1052             : }
    1053             : 
    1054           2 : bool Pipe::hasTransferThread() const
    1055             : {
    1056           2 :     return _impl->transferThread.isRunning();
    1057             : }
    1058             : 
    1059           2 : void Pipe::_stopTransferThread()
    1060             : {
    1061           2 :     if (_impl->transferThread.isStopped())
    1062           1 :         return;
    1063             : 
    1064           1 :     send(getLocalNode(), fabric::CMD_PIPE_EXIT_TRANSFER_THREAD);
    1065           1 :     _impl->transferThread.join();
    1066             : }
    1067             : 
    1068           2 : void Pipe::setSystemPipe(SystemPipe* pipe)
    1069             : {
    1070           2 :     _impl->systemPipe = pipe;
    1071           2 : }
    1072             : 
    1073           7 : SystemPipe* Pipe::getSystemPipe()
    1074             : {
    1075           7 :     return _impl->systemPipe;
    1076             : }
    1077             : 
    1078           0 : const SystemPipe* Pipe::getSystemPipe() const
    1079             : {
    1080           0 :     return _impl->systemPipe;
    1081             : }
    1082             : 
    1083             : //---------------------------------------------------------------------------
    1084             : // command handlers
    1085             : //---------------------------------------------------------------------------
    1086           2 : bool Pipe::_cmdCreateWindow(co::ICommand& cmd)
    1087             : {
    1088           4 :     co::ObjectICommand command(cmd);
    1089           2 :     const uint128_t& windowID = command.read<uint128_t>();
    1090             : 
    1091           2 :     LBLOG(LOG_INIT) << "Create window " << command << " id " << windowID
    1092           2 :                     << std::endl;
    1093             : 
    1094           2 :     Window* window = Global::getNodeFactory()->createWindow(this);
    1095           2 :     window->init(); // not in ctor, virtual method
    1096             : 
    1097           2 :     Config* config = getConfig();
    1098           2 :     LBCHECK(config->mapObject(window, windowID));
    1099             : 
    1100           4 :     return true;
    1101             : }
    1102             : 
    1103           2 : bool Pipe::_cmdDestroyWindow(co::ICommand& cmd)
    1104             : {
    1105           4 :     co::ObjectICommand command(cmd);
    1106             : 
    1107           2 :     LBLOG(LOG_INIT) << "Destroy window " << command << std::endl;
    1108             : 
    1109           2 :     Window* window = _findWindow(command.read<uint128_t>());
    1110           2 :     LBASSERT(window);
    1111             : 
    1112             :     // re-set shared windows accordingly
    1113           2 :     Window* newSharedWindow = 0;
    1114           2 :     const Windows& windows = getWindows();
    1115           4 :     for (Windows::const_iterator i = windows.begin(); i != windows.end(); ++i)
    1116             :     {
    1117           2 :         Window* candidate = *i;
    1118             : 
    1119           2 :         if (candidate == window)
    1120           2 :             continue; // ignore
    1121             : 
    1122           0 :         if (candidate->getSharedContextWindow() == window)
    1123             :         {
    1124           0 :             if (newSharedWindow)
    1125           0 :                 candidate->setSharedContextWindow(newSharedWindow);
    1126             :             else
    1127             :             {
    1128           0 :                 newSharedWindow = candidate;
    1129           0 :                 newSharedWindow->setSharedContextWindow(candidate);
    1130             :             }
    1131             :         }
    1132             : 
    1133           0 :         LBASSERT(candidate->getSharedContextWindow() != window);
    1134             :     }
    1135             : 
    1136           2 :     const bool stopped = window->isStopped();
    1137           2 :     window->send(getServer(), fabric::CMD_WINDOW_CONFIG_EXIT_REPLY) << stopped;
    1138             : 
    1139           2 :     Config* config = getConfig();
    1140           2 :     config->unmapObject(window);
    1141           2 :     Global::getNodeFactory()->releaseWindow(window);
    1142             : 
    1143           4 :     return true;
    1144             : }
    1145             : 
    1146           2 : bool Pipe::_cmdConfigInit(co::ICommand& cmd)
    1147             : {
    1148           2 :     LB_TS_THREAD(_pipeThread);
    1149             : 
    1150           4 :     co::ObjectICommand command(cmd);
    1151           2 :     const uint128_t& initID = command.read<uint128_t>();
    1152           2 :     const uint32_t frameNumber = command.read<uint32_t>();
    1153             : 
    1154           2 :     LBLOG(LOG_INIT) << "Init pipe " << command << " init id " << initID
    1155           2 :                     << " frame " << frameNumber << std::endl;
    1156             : 
    1157           2 :     if (!isThreaded())
    1158             :     {
    1159           0 :         _impl->windowSystem = selectWindowSystem();
    1160           0 :         _setupCommandQueue();
    1161             :     }
    1162             : 
    1163           2 :     Node* node = getNode();
    1164           2 :     LBASSERT(node);
    1165           2 :     node->waitInitialized();
    1166             : 
    1167           2 :     bool result = false;
    1168           2 :     if (node->isRunning())
    1169             :     {
    1170           2 :         _impl->currentFrame = frameNumber;
    1171           2 :         _impl->finishedFrame = frameNumber;
    1172           2 :         _impl->unlockedFrame = frameNumber;
    1173           2 :         _impl->state = STATE_INITIALIZING;
    1174             : 
    1175           2 :         result = configInit(initID);
    1176             : 
    1177           2 :         if (result)
    1178           2 :             _impl->state = STATE_RUNNING;
    1179             :     }
    1180             :     else
    1181           0 :         sendError(ERROR_PIPE_NODE_NOTRUNNING);
    1182             : 
    1183           2 :     LBLOG(LOG_INIT) << "TASK pipe config init reply result " << result
    1184           2 :                     << std::endl;
    1185             : 
    1186           2 :     commit();
    1187           2 :     send(command.getRemoteNode(), fabric::CMD_PIPE_CONFIG_INIT_REPLY) << result;
    1188           4 :     return true;
    1189             : }
    1190             : 
    1191           2 : bool Pipe::_cmdConfigExit(co::ICommand& cmd)
    1192             : {
    1193           4 :     co::ObjectICommand command(cmd);
    1194             : 
    1195           2 :     LB_TS_THREAD(_pipeThread);
    1196           2 :     LBLOG(LOG_INIT) << "TASK pipe config exit " << command << std::endl;
    1197             : 
    1198           2 :     _impl->state = STATE_STOPPING; // needed in View::detach (from _flushViews)
    1199             : 
    1200             :     // send before node gets a chance to send its destroy command
    1201           2 :     getNode()->send(getLocalNode(), fabric::CMD_NODE_DESTROY_PIPE) << getID();
    1202             : 
    1203             :     // Flush views before exit since they are created after init
    1204             :     // - application may need initialized pipe to exit
    1205             :     // - configExit can't access views since all channels are gone already
    1206           2 :     _flushViews();
    1207           2 :     _flushQueues();
    1208           2 :     _impl->state = configExit() ? STATE_STOPPED : STATE_FAILED;
    1209           4 :     return true;
    1210             : }
    1211             : 
    1212           2 : bool Pipe::_cmdExitThread(co::ICommand&)
    1213             : {
    1214           2 :     LBASSERT(_impl->thread);
    1215           2 :     _impl->thread->_pipe = 0;
    1216           2 :     return true;
    1217             : }
    1218             : 
    1219           1 : bool Pipe::_cmdExitTransferThread(co::ICommand&)
    1220             : {
    1221           1 :     _impl->transferThread.postStop();
    1222           1 :     return true;
    1223             : }
    1224             : 
    1225           2 : bool Pipe::_cmdFrameStartClock(co::ICommand&)
    1226             : {
    1227           2 :     LBVERB << "start frame clock" << std::endl;
    1228           2 :     _impl->frameTimeMutex.lock();
    1229           2 :     _impl->frameTimes.push_back(getConfig()->getTime());
    1230           2 :     _impl->frameTimeMutex.unlock();
    1231           2 :     return true;
    1232             : }
    1233             : 
    1234           2 : bool Pipe::_cmdFrameStart(co::ICommand& cmd)
    1235             : {
    1236           2 :     LB_TS_THREAD(_pipeThread);
    1237             : 
    1238           4 :     co::ObjectICommand command(cmd);
    1239           2 :     const uint128_t& version = command.read<uint128_t>();
    1240           2 :     const uint128_t& frameID = command.read<uint128_t>();
    1241           2 :     const uint32_t frameNumber = command.read<uint32_t>();
    1242             : 
    1243           2 :     LBVERB << "handle pipe frame start " << command << " frame " << frameNumber
    1244           2 :            << " id " << frameID << std::endl;
    1245             : 
    1246           2 :     LBLOG(LOG_TASKS) << "---- TASK start frame ---- frame " << frameNumber
    1247           2 :                      << " id " << frameID << std::endl;
    1248           2 :     sync(version);
    1249           2 :     const int64_t lastFrameTime = _impl->frameTime;
    1250             : 
    1251           2 :     _impl->frameTimeMutex.lock();
    1252           2 :     LBASSERT(!_impl->frameTimes.empty());
    1253             : 
    1254           2 :     _impl->frameTime = _impl->frameTimes.front();
    1255           2 :     _impl->frameTimes.pop_front();
    1256           2 :     _impl->frameTimeMutex.unlock();
    1257             : 
    1258           2 :     if (lastFrameTime > 0)
    1259             :     {
    1260           0 :         PipeStatistics waitEvent(Statistic::PIPE_IDLE, this);
    1261           0 :         waitEvent.statistic.idleTime =
    1262           0 :             _impl->thread ? _impl->thread->getWorkerQueue()->resetWaitTime()
    1263             :                           : 0;
    1264           0 :         waitEvent.statistic.totalTime =
    1265           0 :             std::max(_impl->frameTime - lastFrameTime,
    1266           0 :                      int64_t(1)); // avoid SIGFPE
    1267             :     }
    1268             : 
    1269           2 :     LBASSERTINFO(_impl->currentFrame + 1 == frameNumber,
    1270             :                  "current " << _impl->currentFrame << " start " << frameNumber);
    1271             : 
    1272           2 :     frameStart(frameID, frameNumber);
    1273           4 :     return true;
    1274             : }
    1275             : 
    1276           2 : bool Pipe::_cmdFrameFinish(co::ICommand& cmd)
    1277             : {
    1278           2 :     LB_TS_THREAD(_pipeThread);
    1279             : 
    1280           4 :     co::ObjectICommand command(cmd);
    1281           2 :     const uint128_t& frameID = command.read<uint128_t>();
    1282           2 :     const uint32_t frameNumber = command.read<uint32_t>();
    1283             : 
    1284           2 :     LBLOG(LOG_TASKS) << "---- TASK finish frame --- " << command << " frame "
    1285           2 :                      << frameNumber << " id " << frameID << std::endl;
    1286             : 
    1287           2 :     LBASSERTINFO(_impl->currentFrame >= frameNumber,
    1288             :                  "current " << _impl->currentFrame << " finish "
    1289             :                             << frameNumber);
    1290             : 
    1291           2 :     frameFinish(frameID, frameNumber);
    1292             : 
    1293           2 :     LBASSERTINFO(_impl->finishedFrame >= frameNumber,
    1294             :                  "Pipe::frameFinish() did not release frame " << frameNumber);
    1295             : 
    1296           2 :     if (_impl->unlockedFrame < frameNumber)
    1297             :     {
    1298           0 :         LBWARN << "Finished frame was not locally unlocked, enforcing unlock"
    1299           0 :                << std::endl
    1300           0 :                << "    unlocked " << _impl->unlockedFrame.get() << " done "
    1301           0 :                << frameNumber << std::endl;
    1302           0 :         releaseFrameLocal(frameNumber);
    1303             :     }
    1304             : 
    1305           2 :     if (_impl->finishedFrame < frameNumber)
    1306             :     {
    1307           0 :         LBWARN << "Finished frame was not released, enforcing unlock"
    1308           0 :                << std::endl;
    1309           0 :         releaseFrame(frameNumber);
    1310             :     }
    1311             : 
    1312           2 :     _releaseViews();
    1313             : 
    1314           2 :     const uint128_t version = commit();
    1315           2 :     if (version != co::VERSION_NONE)
    1316           2 :         send(command.getRemoteNode(), fabric::CMD_OBJECT_SYNC);
    1317           4 :     return true;
    1318             : }
    1319             : 
    1320           2 : bool Pipe::_cmdFrameDrawFinish(co::ICommand& cmd)
    1321             : {
    1322           2 :     LB_TS_THREAD(_pipeThread);
    1323             : 
    1324           4 :     co::ObjectICommand command(cmd);
    1325           2 :     const uint128_t& frameID = command.read<uint128_t>();
    1326           2 :     const uint32_t frameNumber = command.read<uint32_t>();
    1327             : 
    1328           2 :     LBLOG(LOG_TASKS) << "TASK draw finish " << getName() << " frame "
    1329           2 :                      << frameNumber << " id " << frameID << std::endl;
    1330             : 
    1331           2 :     frameDrawFinish(frameID, frameNumber);
    1332           4 :     return true;
    1333             : }
    1334             : 
    1335           0 : bool Pipe::_cmdDetachView(co::ICommand& cmd)
    1336             : {
    1337           0 :     co::ObjectICommand command(cmd);
    1338             : 
    1339           0 :     LB_TS_THREAD(_pipeThread);
    1340             : 
    1341           0 :     ViewHash::iterator i = _impl->views.find(command.read<uint128_t>());
    1342           0 :     if (i != _impl->views.end())
    1343             :     {
    1344           0 :         View* view = i->second;
    1345           0 :         _impl->views.erase(i);
    1346             : 
    1347           0 :         NodeFactory* nodeFactory = Global::getNodeFactory();
    1348           0 :         nodeFactory->releaseView(view);
    1349             :     }
    1350           0 :     return true;
    1351             : }
    1352             : }
    1353             : 
    1354             : #include <eq/fabric/pipe.ipp>
    1355             : template class eq::fabric::Pipe<eq::Node, eq::Pipe, eq::Window,
    1356             :                                 eq::PipeVisitor>;
    1357             : 
    1358             : /** @cond IGNORE */
    1359             : template EQFABRIC_API std::ostream& eq::fabric::operator<<(std::ostream&,
    1360          30 :                                                            const eq::Super&);
    1361             : /** @endcond */

Generated by: LCOV version 1.11