Line data Source code
1 :
2 : /* Copyright (c) 2005-2017, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Daniel Nachbaur <danielnachbaur@gmail.com>
4 : * Cedric Stalder<cedric.stalder@gmail.com>
5 : *
6 : * This library is free software; you can redistribute it and/or modify it under
7 : * the terms of the GNU Lesser General Public License version 2.1 as published
8 : * by the Free Software Foundation.
9 : *
10 : * This library is distributed in the hope that it will be useful, but WITHOUT
11 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13 : * details.
14 : *
15 : * You should have received a copy of the GNU Lesser General Public License
16 : * along with this library; if not, write to the Free Software Foundation, Inc.,
17 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 : */
19 :
20 : #include "node.h"
21 :
22 : #include "client.h"
23 : #include "config.h"
24 : #include "error.h"
25 : #include "exception.h"
26 : #include "frameData.h"
27 : #include "global.h"
28 : #include "log.h"
29 : #include "nodeFactory.h"
30 : #include "nodeStatistics.h"
31 : #include "pipe.h"
32 : #include "server.h"
33 :
34 : #include <eq/fabric/axisEvent.h>
35 : #include <eq/fabric/buttonEvent.h>
36 : #include <eq/fabric/commands.h>
37 : #include <eq/fabric/elementVisitor.h>
38 : #include <eq/fabric/frameData.h>
39 : #include <eq/fabric/task.h>
40 :
41 : #include <co/barrier.h>
42 : #include <co/connection.h>
43 : #include <co/global.h>
44 : #include <co/objectICommand.h>
45 : #include <lunchbox/scopedMutex.h>
46 :
47 : namespace eq
48 : {
49 : namespace
50 : {
51 : typedef std::unordered_map<uint128_t, co::Barrier*> BarrierHash;
52 : typedef std::unordered_map<uint128_t, FrameDataPtr> FrameDataHash;
53 : typedef FrameDataHash::const_iterator FrameDataHashCIter;
54 : typedef FrameDataHash::iterator FrameDataHashIter;
55 :
56 : enum State
57 : {
58 : STATE_STOPPED,
59 : STATE_INITIALIZING,
60 : STATE_INIT_FAILED,
61 : STATE_RUNNING,
62 : STATE_FAILED
63 : };
64 : }
65 :
66 : namespace detail
67 : {
68 : class TransmitThread : public lunchbox::Thread
69 : {
70 : public:
71 1 : TransmitThread()
72 1 : : _queue(co::Global::getCommandQueueLimit())
73 : {
74 1 : }
75 1 : virtual ~TransmitThread() {}
76 4 : co::CommandQueue& getQueue() { return _queue; }
77 : protected:
78 1 : bool init() override
79 : {
80 1 : setName("Xmit");
81 1 : return true;
82 : }
83 : void run() override;
84 :
85 : private:
86 : co::CommandQueue _queue;
87 : };
88 :
89 1 : class Node
90 : {
91 : public:
92 1 : Node()
93 1 : : state(STATE_STOPPED)
94 : , finishedFrame(0)
95 1 : , unlockedFrame(0)
96 : {
97 1 : }
98 :
99 : /** The configInit/configExit state. */
100 : lunchbox::Monitor<State> state;
101 :
102 : /** The number of the last started frame. */
103 : lunchbox::Monitor<uint32_t> currentFrame;
104 :
105 : /** The number of the last finished frame. */
106 : uint32_t finishedFrame;
107 :
108 : /** The number of the last locally released frame. */
109 : uint32_t unlockedFrame;
110 :
111 : /** All barriers mapped by the node. */
112 : lunchbox::Lockable<BarrierHash> barriers;
113 :
114 : /** All frame datas used by the node during rendering. */
115 : lunchbox::Lockable<FrameDataHash> frameDatas;
116 :
117 : TransmitThread transmitter;
118 : };
119 : }
120 :
121 : /** @cond IGNORE */
122 : typedef co::CommandFunc<Node> NodeFunc;
123 : typedef fabric::Node<Config, Node, Pipe, NodeVisitor> Super;
124 : /** @endcond */
125 :
126 1 : Node::Node(Config* parent)
127 : : Super(parent)
128 1 : , _impl(new detail::Node)
129 : {
130 1 : }
131 :
132 2 : Node::~Node()
133 : {
134 1 : LBASSERT(getPipes().empty());
135 1 : delete _impl;
136 1 : }
137 :
138 1 : void Node::attach(const uint128_t& id, const uint32_t instanceID)
139 : {
140 1 : Super::attach(id, instanceID);
141 :
142 1 : co::CommandQueue* queue = getMainThreadQueue();
143 1 : co::CommandQueue* commandQ = getCommandThreadQueue();
144 1 : co::CommandQueue* transmitQ = getTransmitterQueue();
145 :
146 1 : registerCommand(fabric::CMD_NODE_CREATE_PIPE,
147 2 : NodeFunc(this, &Node::_cmdCreatePipe), queue);
148 1 : registerCommand(fabric::CMD_NODE_DESTROY_PIPE,
149 2 : NodeFunc(this, &Node::_cmdDestroyPipe), queue);
150 1 : registerCommand(fabric::CMD_NODE_CONFIG_INIT,
151 2 : NodeFunc(this, &Node::_cmdConfigInit), queue);
152 1 : registerCommand(fabric::CMD_NODE_SET_AFFINITY,
153 2 : NodeFunc(this, &Node::_cmdSetAffinity), transmitQ);
154 1 : registerCommand(fabric::CMD_NODE_CONFIG_EXIT,
155 2 : NodeFunc(this, &Node::_cmdConfigExit), queue);
156 1 : registerCommand(fabric::CMD_NODE_FRAME_START,
157 2 : NodeFunc(this, &Node::_cmdFrameStart), queue);
158 1 : registerCommand(fabric::CMD_NODE_FRAME_FINISH,
159 2 : NodeFunc(this, &Node::_cmdFrameFinish), queue);
160 1 : registerCommand(fabric::CMD_NODE_FRAME_DRAW_FINISH,
161 2 : NodeFunc(this, &Node::_cmdFrameDrawFinish), queue);
162 1 : registerCommand(fabric::CMD_NODE_FRAME_TASKS_FINISH,
163 2 : NodeFunc(this, &Node::_cmdFrameTasksFinish), queue);
164 1 : registerCommand(fabric::CMD_NODE_FRAMEDATA_TRANSMIT,
165 2 : NodeFunc(this, &Node::_cmdFrameDataTransmit), commandQ);
166 1 : registerCommand(fabric::CMD_NODE_FRAMEDATA_READY,
167 2 : NodeFunc(this, &Node::_cmdFrameDataReady), commandQ);
168 1 : }
169 :
170 1 : void Node::setDirty(const uint64_t bits)
171 : {
172 : // jump over fabric setDirty to avoid dirty'ing config node list
173 : // nodes are individually synced in frame finish
174 1 : Object::setDirty(bits);
175 1 : }
176 :
177 11 : ClientPtr Node::getClient()
178 : {
179 11 : Config* config = getConfig();
180 11 : LBASSERT(config);
181 11 : return (config ? config->getClient() : 0);
182 : }
183 :
184 13 : ServerPtr Node::getServer()
185 : {
186 13 : Config* config = getConfig();
187 13 : LBASSERT(config);
188 13 : return (config ? config->getServer() : 0);
189 : }
190 :
191 1 : co::CommandQueue* Node::getMainThreadQueue()
192 : {
193 1 : return getConfig()->getMainThreadQueue();
194 : }
195 :
196 1 : co::CommandQueue* Node::getCommandThreadQueue()
197 : {
198 1 : return getConfig()->getCommandThreadQueue();
199 : }
200 :
201 4 : co::CommandQueue* Node::getTransmitterQueue()
202 : {
203 4 : return &_impl->transmitter.getQueue();
204 : }
205 :
206 0 : uint32_t Node::getCurrentFrame() const
207 : {
208 0 : return _impl->currentFrame.get();
209 : }
210 :
211 1 : uint32_t Node::getFinishedFrame() const
212 : {
213 1 : return _impl->finishedFrame;
214 : }
215 :
216 0 : co::Barrier* Node::getBarrier(const co::ObjectVersion& barrier)
217 : {
218 0 : lunchbox::ScopedWrite mutex(_impl->barriers);
219 0 : co::Barrier* netBarrier = _impl->barriers.data[barrier.identifier];
220 :
221 0 : if (netBarrier)
222 0 : netBarrier->sync(barrier.version);
223 : else
224 : {
225 0 : ClientPtr client = getClient();
226 :
227 0 : netBarrier = new co::Barrier(client, barrier);
228 0 : if (!netBarrier->isGood())
229 : {
230 0 : LBCHECK(netBarrier->isGood());
231 0 : LBWARN << "Could not map swap barrier" << std::endl;
232 0 : delete netBarrier;
233 0 : return 0;
234 : }
235 0 : _impl->barriers.data[barrier.identifier] = netBarrier;
236 : }
237 :
238 0 : return netBarrier;
239 : }
240 :
241 5 : FrameDataPtr Node::getFrameData(const co::ObjectVersion& frameDataVersion)
242 : {
243 10 : lunchbox::ScopedWrite mutex(_impl->frameDatas);
244 5 : FrameDataPtr data = _impl->frameDatas.data[frameDataVersion.identifier];
245 :
246 5 : if (!data)
247 : {
248 1 : data = new FrameData;
249 1 : data->setID(frameDataVersion.identifier);
250 1 : _impl->frameDatas.data[frameDataVersion.identifier] = data;
251 : }
252 :
253 5 : LBASSERT(frameDataVersion.version.high() == 0);
254 5 : data->setVersion(frameDataVersion.version.low());
255 10 : return data;
256 : }
257 :
258 1 : void Node::releaseFrameData(FrameDataPtr data)
259 : {
260 2 : lunchbox::ScopedWrite mutex(_impl->frameDatas);
261 1 : FrameDataHashIter i = _impl->frameDatas->find(data->getID());
262 1 : LBASSERT(i != _impl->frameDatas->end());
263 1 : if (i == _impl->frameDatas->end())
264 0 : return;
265 :
266 1 : _impl->frameDatas->erase(i);
267 : }
268 :
269 2 : void Node::waitInitialized() const
270 : {
271 2 : _impl->state.waitGE(STATE_INIT_FAILED);
272 2 : }
273 :
274 2 : bool Node::isRunning() const
275 : {
276 2 : return _impl->state == STATE_RUNNING;
277 : }
278 :
279 1 : bool Node::isStopped() const
280 : {
281 1 : return _impl->state == STATE_STOPPED;
282 : }
283 :
284 1 : bool Node::configInit(const uint128_t&)
285 : {
286 1 : WindowSystem::configInit(this);
287 1 : return true;
288 : }
289 :
290 1 : bool Node::configExit()
291 : {
292 1 : WindowSystem::configExit(this);
293 1 : return true;
294 : }
295 :
296 1 : void Node::_setAffinity()
297 : {
298 1 : const int32_t affinity = getIAttribute(IATTR_HINT_AFFINITY);
299 1 : switch (affinity)
300 : {
301 : case OFF:
302 0 : break;
303 :
304 : case AUTO:
305 : // TODO
306 1 : LBVERB << "No automatic thread placement for node threads "
307 1 : << std::endl;
308 1 : break;
309 :
310 : default:
311 0 : co::LocalNodePtr node = getLocalNode();
312 0 : send(node, fabric::CMD_NODE_SET_AFFINITY) << affinity;
313 :
314 0 : node->setAffinity(affinity);
315 0 : break;
316 : }
317 1 : }
318 :
319 2 : void Node::waitFrameStarted(const uint32_t frameNumber) const
320 : {
321 2 : _impl->currentFrame.waitGE(frameNumber);
322 2 : }
323 :
324 1 : void Node::startFrame(const uint32_t frameNumber)
325 : {
326 1 : _impl->currentFrame = frameNumber;
327 1 : }
328 :
329 1 : void Node::frameFinish(const uint128_t&, const uint32_t frameNumber)
330 : {
331 1 : releaseFrame(frameNumber);
332 1 : }
333 :
334 1 : void Node::_finishFrame(const uint32_t frameNumber) const
335 : {
336 1 : const Pipes& pipes = getPipes();
337 3 : for (Pipes::const_iterator i = pipes.begin(); i != pipes.end(); ++i)
338 : {
339 2 : Pipe* pipe = *i;
340 2 : LBASSERT(pipe->isThreaded() || pipe->getFinishedFrame() >= frameNumber);
341 :
342 2 : pipe->waitFrameLocal(frameNumber);
343 2 : pipe->waitFrameFinished(frameNumber);
344 : }
345 1 : }
346 :
347 1 : void Node::_frameFinish(const uint128_t& frameID, const uint32_t frameNumber)
348 : {
349 1 : frameFinish(frameID, frameNumber);
350 1 : LBLOG(LOG_TASKS) << "---- Finished Frame --- " << frameNumber << std::endl;
351 :
352 1 : if (_impl->unlockedFrame < frameNumber)
353 : {
354 0 : LBWARN << "Finished frame was not locally unlocked, enforcing unlock"
355 0 : << std::endl;
356 0 : releaseFrameLocal(frameNumber);
357 : }
358 :
359 1 : if (_impl->finishedFrame < frameNumber)
360 : {
361 0 : LBWARN << "Finished frame was not released, enforcing unlock"
362 0 : << std::endl;
363 0 : releaseFrame(frameNumber);
364 : }
365 1 : }
366 :
367 1 : void Node::releaseFrame(const uint32_t frameNumber)
368 : {
369 1 : LBASSERTINFO(_impl->currentFrame >= frameNumber,
370 : "current " << _impl->currentFrame << " release "
371 : << frameNumber);
372 :
373 1 : if (_impl->finishedFrame >= frameNumber)
374 0 : return;
375 1 : _impl->finishedFrame = frameNumber;
376 :
377 1 : Config* config = getConfig();
378 2 : ServerPtr server = config->getServer();
379 2 : co::NodePtr node = server.get();
380 1 : send(node, fabric::CMD_NODE_FRAME_FINISH_REPLY) << frameNumber;
381 : }
382 :
383 1 : void Node::releaseFrameLocal(const uint32_t frameNumber)
384 : {
385 1 : LBASSERT(_impl->unlockedFrame <= frameNumber);
386 1 : _impl->unlockedFrame = frameNumber;
387 :
388 1 : Config* config = getConfig();
389 1 : LBASSERT(config->getNodes().size() == 1);
390 1 : LBASSERT(config->getNodes()[0] == this);
391 1 : config->releaseFrameLocal(frameNumber);
392 :
393 1 : LBLOG(LOG_TASKS) << "---- Unlocked Frame --- " << _impl->unlockedFrame
394 1 : << std::endl;
395 1 : }
396 :
397 1 : void Node::frameStart(const uint128_t&, const uint32_t frameNumber)
398 : {
399 1 : startFrame(frameNumber); // unlock pipe threads
400 :
401 1 : switch (getIAttribute(IATTR_THREAD_MODEL))
402 : {
403 : case ASYNC:
404 : // Don't wait for pipes to release frame locally, sync not needed
405 0 : releaseFrameLocal(frameNumber);
406 0 : break;
407 :
408 : case DRAW_SYNC: // Sync and release in frameDrawFinish
409 : case LOCAL_SYNC: // Sync and release in frameTasksFinish
410 1 : break;
411 :
412 : default:
413 0 : LBUNIMPLEMENTED;
414 : }
415 1 : }
416 :
417 1 : void Node::frameDrawFinish(const uint128_t&, const uint32_t frameNumber)
418 : {
419 1 : switch (getIAttribute(IATTR_THREAD_MODEL))
420 : {
421 : case ASYNC: // No sync, release in frameStart
422 : case LOCAL_SYNC: // Sync and release in frameTasksFinish
423 0 : break;
424 :
425 : case DRAW_SYNC:
426 : {
427 1 : const Pipes& pipes = getPipes();
428 3 : for (Pipes::const_iterator i = pipes.begin(); i != pipes.end(); ++i)
429 : {
430 2 : const Pipe* pipe = *i;
431 2 : if (pipe->getTasks() & fabric::TASK_DRAW)
432 2 : pipe->waitFrameLocal(frameNumber);
433 : }
434 :
435 1 : releaseFrameLocal(frameNumber);
436 1 : break;
437 : }
438 : default:
439 0 : LBUNIMPLEMENTED;
440 : }
441 1 : }
442 :
443 1 : void Node::frameTasksFinish(const uint128_t&, const uint32_t frameNumber)
444 : {
445 1 : switch (getIAttribute(IATTR_THREAD_MODEL))
446 : {
447 : case ASYNC: // No sync, release in frameStart
448 : case DRAW_SYNC: // Sync and release in frameDrawFinish
449 1 : break;
450 :
451 : case LOCAL_SYNC:
452 : {
453 0 : const Pipes& pipes = getPipes();
454 0 : for (Pipes::const_iterator i = pipes.begin(); i != pipes.end(); ++i)
455 : {
456 0 : const Pipe* pipe = *i;
457 0 : if (pipe->getTasks() != fabric::TASK_NONE)
458 0 : pipe->waitFrameLocal(frameNumber);
459 : }
460 :
461 0 : releaseFrameLocal(frameNumber);
462 0 : break;
463 : }
464 : default:
465 0 : LBUNIMPLEMENTED;
466 : }
467 1 : }
468 :
469 0 : EventOCommand Node::sendError(const uint32_t error)
470 : {
471 0 : return getConfig()->sendError(EVENT_NODE_ERROR, Error(error, getID()));
472 : }
473 :
474 0 : bool Node::processEvent(AxisEvent& event)
475 : {
476 0 : Config* config = getConfig();
477 0 : updateEvent(event, config->getTime());
478 0 : config->sendEvent(EVENT_MAGELLAN_AXIS) << event;
479 0 : return true;
480 : }
481 :
482 0 : bool Node::processEvent(ButtonEvent& event)
483 : {
484 0 : Config* config = getConfig();
485 0 : updateEvent(event, config->getTime());
486 0 : config->sendEvent(EVENT_MAGELLAN_BUTTON) << event;
487 0 : return true;
488 : }
489 :
490 0 : bool Node::processEvent(Statistic& event)
491 : {
492 0 : Config* config = getConfig();
493 0 : updateEvent(event, config->getTime());
494 0 : config->sendEvent(EVENT_STATISTIC) << event;
495 0 : return true;
496 : }
497 :
498 1 : void Node::_flushObjects()
499 : {
500 2 : ClientPtr client = getClient();
501 : {
502 2 : lunchbox::ScopedWrite mutex(_impl->barriers);
503 3 : for (BarrierHash::const_iterator i = _impl->barriers->begin();
504 2 : i != _impl->barriers->end(); ++i)
505 : {
506 0 : delete i->second;
507 : }
508 1 : _impl->barriers->clear();
509 : }
510 :
511 2 : lunchbox::ScopedWrite mutex(_impl->frameDatas);
512 3 : for (FrameDataHashCIter i = _impl->frameDatas->begin();
513 2 : i != _impl->frameDatas->end(); ++i)
514 : {
515 0 : FrameDataPtr frameData = i->second;
516 0 : frameData->resetPlugins();
517 0 : client->unmapObject(frameData.get());
518 : }
519 1 : _impl->frameDatas->clear();
520 1 : }
521 :
522 2 : void detail::TransmitThread::run()
523 : {
524 : while (true)
525 : {
526 3 : co::ICommand command = _queue.pop();
527 2 : if (!command.isValid())
528 2 : return; // exit thread
529 :
530 1 : LBCHECK(command());
531 1 : }
532 : }
533 :
534 0 : void Node::dirtyClientExit()
535 : {
536 0 : const Pipes& pipes = getPipes();
537 0 : for (PipesCIter i = pipes.begin(); i != pipes.end(); ++i)
538 : {
539 0 : Pipe* pipe = *i;
540 0 : pipe->cancelThread();
541 : }
542 0 : getTransmitterQueue()->push(co::ICommand()); // wake up to exit
543 0 : _impl->transmitter.join();
544 0 : }
545 :
546 : //---------------------------------------------------------------------------
547 : // command handlers
548 : //---------------------------------------------------------------------------
549 2 : bool Node::_cmdCreatePipe(co::ICommand& cmd)
550 : {
551 2 : LB_TS_THREAD(_nodeThread);
552 2 : LBASSERT(_impl->state >= STATE_INIT_FAILED);
553 :
554 4 : co::ObjectICommand command(cmd);
555 2 : const uint128_t& pipeID = command.read<uint128_t>();
556 2 : const bool threaded = command.read<bool>();
557 :
558 2 : LBLOG(LOG_INIT) << "Create pipe " << command << " id " << pipeID
559 2 : << std::endl;
560 :
561 2 : Pipe* pipe = Global::getNodeFactory()->createPipe(this);
562 2 : if (threaded)
563 2 : pipe->startThread();
564 :
565 2 : Config* config = getConfig();
566 2 : LBCHECK(config->mapObject(pipe, pipeID));
567 2 : pipe->notifyMapped();
568 :
569 4 : return true;
570 : }
571 :
572 2 : bool Node::_cmdDestroyPipe(co::ICommand& cmd)
573 : {
574 4 : co::ObjectICommand command(cmd);
575 :
576 2 : LB_TS_THREAD(_nodeThread);
577 2 : LBLOG(LOG_INIT) << "Destroy pipe " << command << std::endl;
578 :
579 2 : Pipe* pipe = findPipe(command.read<uint128_t>());
580 2 : LBASSERT(pipe);
581 2 : pipe->exitThread();
582 :
583 2 : const bool stopped = pipe->isStopped();
584 :
585 2 : Config* config = getConfig();
586 2 : config->unmapObject(pipe);
587 2 : pipe->send(getServer(), fabric::CMD_PIPE_CONFIG_EXIT_REPLY) << stopped;
588 2 : Global::getNodeFactory()->releasePipe(pipe);
589 :
590 4 : return true;
591 : }
592 :
593 1 : bool Node::_cmdConfigInit(co::ICommand& cmd)
594 : {
595 2 : co::ObjectICommand command(cmd);
596 :
597 1 : LB_TS_THREAD(_nodeThread);
598 1 : LBLOG(LOG_INIT) << "Init node " << command << std::endl;
599 :
600 1 : _impl->state = STATE_INITIALIZING;
601 :
602 1 : const uint128_t& initID = command.read<uint128_t>();
603 1 : const uint32_t frameNumber = command.read<uint32_t>();
604 :
605 1 : _impl->currentFrame = frameNumber;
606 1 : _impl->unlockedFrame = frameNumber;
607 1 : _impl->finishedFrame = frameNumber;
608 1 : _setAffinity();
609 :
610 1 : _impl->transmitter.start();
611 1 : const uint64_t result = configInit(initID);
612 :
613 1 : if (getIAttribute(IATTR_THREAD_MODEL) == eq::UNDEFINED)
614 1 : setIAttribute(IATTR_THREAD_MODEL, eq::DRAW_SYNC);
615 :
616 1 : _impl->state = result ? STATE_RUNNING : STATE_INIT_FAILED;
617 :
618 1 : commit();
619 1 : send(command.getRemoteNode(), fabric::CMD_NODE_CONFIG_INIT_REPLY) << result;
620 2 : return true;
621 : }
622 :
623 1 : bool Node::_cmdConfigExit(co::ICommand& cmd)
624 : {
625 2 : co::ObjectICommand command(cmd);
626 :
627 1 : LB_TS_THREAD(_nodeThread);
628 1 : LBLOG(LOG_INIT) << "Node exit " << command << std::endl;
629 :
630 1 : const Pipes& pipes = getPipes();
631 3 : for (PipesCIter i = pipes.begin(); i != pipes.end(); ++i)
632 : {
633 2 : Pipe* pipe = *i;
634 2 : pipe->waitExited();
635 : }
636 :
637 1 : _impl->state = configExit() ? STATE_STOPPED : STATE_FAILED;
638 1 : getTransmitterQueue()->push(co::ICommand()); // wake up to exit
639 1 : _impl->transmitter.join();
640 1 : _flushObjects();
641 :
642 2 : getConfig()->send(getLocalNode(), fabric::CMD_CONFIG_DESTROY_NODE)
643 2 : << getID();
644 2 : return true;
645 : }
646 :
647 1 : bool Node::_cmdFrameStart(co::ICommand& cmd)
648 : {
649 1 : LB_TS_THREAD(_nodeThread);
650 :
651 2 : co::ObjectICommand command(cmd);
652 1 : const uint128_t& version = command.read<uint128_t>();
653 1 : const uint128_t& configVersion = command.read<uint128_t>();
654 1 : const uint128_t& frameID = command.read<uint128_t>();
655 1 : const uint32_t frameNumber = command.read<uint32_t>();
656 :
657 1 : LBVERB << "handle node frame start " << command << " frame " << frameNumber
658 1 : << " id " << frameID << std::endl;
659 :
660 1 : LBASSERT(_impl->currentFrame == frameNumber - 1);
661 :
662 1 : LBLOG(LOG_TASKS) << "----- Begin Frame ----- " << frameNumber << std::endl;
663 :
664 1 : Config* config = getConfig();
665 :
666 1 : if (configVersion != co::VERSION_INVALID)
667 0 : config->sync(configVersion);
668 1 : sync(version);
669 :
670 1 : config->_frameStart();
671 1 : frameStart(frameID, frameNumber);
672 :
673 1 : LBASSERTINFO(_impl->currentFrame >= frameNumber,
674 : "Node::frameStart() did not start frame " << frameNumber);
675 2 : return true;
676 : }
677 :
678 1 : bool Node::_cmdFrameFinish(co::ICommand& cmd)
679 : {
680 1 : LB_TS_THREAD(_nodeThread);
681 :
682 2 : co::ObjectICommand command(cmd);
683 1 : const uint128_t& frameID = command.read<uint128_t>();
684 1 : const uint32_t frameNumber = command.read<uint32_t>();
685 :
686 1 : LBLOG(LOG_TASKS) << "TASK frame finish " << getName() << " " << command
687 0 : << " frame " << frameNumber << " id " << frameID
688 1 : << std::endl;
689 :
690 1 : _finishFrame(frameNumber);
691 1 : _frameFinish(frameID, frameNumber);
692 :
693 1 : const uint128_t version = commit();
694 1 : if (version != co::VERSION_NONE)
695 0 : send(command.getNode(), fabric::CMD_OBJECT_SYNC);
696 2 : return true;
697 : }
698 :
699 1 : bool Node::_cmdFrameDrawFinish(co::ICommand& cmd)
700 : {
701 2 : co::ObjectICommand command(cmd);
702 1 : const uint128_t& frameID = command.read<uint128_t>();
703 1 : const uint32_t frameNumber = command.read<uint32_t>();
704 :
705 1 : LBLOG(LOG_TASKS) << "TASK draw finish " << getName() << " " << command
706 0 : << " frame " << frameNumber << " id " << frameID
707 1 : << std::endl;
708 :
709 1 : frameDrawFinish(frameID, frameNumber);
710 2 : return true;
711 : }
712 :
713 1 : bool Node::_cmdFrameTasksFinish(co::ICommand& cmd)
714 : {
715 2 : co::ObjectICommand command(cmd);
716 1 : const uint128_t& frameID = command.read<uint128_t>();
717 1 : const uint32_t frameNumber = command.read<uint32_t>();
718 :
719 1 : LBLOG(LOG_TASKS) << "TASK tasks finish " << getName() << " " << command
720 1 : << std::endl;
721 :
722 1 : frameTasksFinish(frameID, frameNumber);
723 2 : return true;
724 : }
725 :
726 0 : bool Node::_cmdFrameDataTransmit(co::ICommand& cmd)
727 : {
728 0 : co::ObjectICommand command(cmd);
729 :
730 : const co::ObjectVersion& frameDataVersion =
731 0 : command.read<co::ObjectVersion>();
732 0 : const PixelViewport& pvp = command.read<PixelViewport>();
733 0 : const Zoom& zoom = command.read<Zoom>();
734 0 : const RenderContext& context = command.read<RenderContext>();
735 0 : const Frame::Buffer buffers = command.read<Frame::Buffer>();
736 0 : const uint32_t frameNumber = command.read<uint32_t>();
737 0 : const bool useAlpha = command.read<bool>();
738 : const uint8_t* data = reinterpret_cast<const uint8_t*>(
739 0 : command.getRemainingBuffer(command.getRemainingBufferSize()));
740 :
741 0 : LBLOG(LOG_ASSEMBLY) << "received image data for " << frameDataVersion
742 0 : << ", buffers " << buffers << " pvp " << pvp
743 0 : << std::endl;
744 :
745 0 : LBASSERT(pvp.isValid());
746 :
747 0 : FrameDataPtr frameData = getFrameData(frameDataVersion);
748 0 : LBASSERT(!frameData->isReady());
749 :
750 0 : NodeStatistics event(Statistic::NODE_FRAME_DECOMPRESS, this, frameNumber);
751 :
752 : // Note on the const_cast: since the PixelData structure stores non-const
753 : // pointers, we have to go non-const at some point, even though we do not
754 : // modify the data.
755 0 : LBCHECK(frameData->addImage(frameDataVersion, pvp, zoom, context, buffers,
756 : useAlpha, const_cast<uint8_t*>(data)));
757 0 : return true;
758 : }
759 :
760 0 : bool Node::_cmdFrameDataReady(co::ICommand& cmd)
761 : {
762 0 : co::ObjectICommand command(cmd);
763 :
764 : const co::ObjectVersion& frameDataVersion =
765 0 : command.read<co::ObjectVersion>();
766 0 : fabric::FrameData data;
767 0 : data.deserialize(command);
768 :
769 0 : LBLOG(LOG_ASSEMBLY) << "received ready for " << frameDataVersion
770 0 : << std::endl;
771 0 : FrameDataPtr frameData = getFrameData(frameDataVersion);
772 0 : LBASSERT(frameData);
773 0 : LBASSERT(!frameData->isReady());
774 0 : frameData->setReady(frameDataVersion, data);
775 0 : LBASSERT(frameData->isReady());
776 0 : return true;
777 : }
778 :
779 0 : bool Node::_cmdSetAffinity(co::ICommand& cmd)
780 : {
781 0 : co::ObjectICommand command(cmd);
782 :
783 0 : lunchbox::Thread::setAffinity(command.read<int32_t>());
784 0 : return true;
785 : }
786 : }
787 :
788 : #include <eq/fabric/node.ipp>
789 : template class eq::fabric::Node<eq::Config, eq::Node, eq::Pipe,
790 : eq::NodeVisitor>;
791 :
792 : /** @cond IGNORE */
793 : template EQFABRIC_API std::ostream& eq::fabric::operator<<(std::ostream&,
794 30 : const eq::Super&);
795 : /** @endcond */
|