Line data Source code
1 :
2 : /* Copyright (c) 2011-2017, Stefan Eilemann <eile@eyescale.ch>
3 : * Carsten Rohn <carsten.rohn@rtt.ag>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "queueSlave.h"
23 :
24 : #include "buffer.h"
25 : #include "commandQueue.h"
26 : #include "dataIStream.h"
27 : #include "exception.h"
28 : #include "global.h"
29 : #include "objectICommand.h"
30 : #include "objectOCommand.h"
31 : #include "queueCommand.h"
32 :
33 : namespace co
34 : {
35 : namespace detail
36 : {
37 1 : class QueueSlave
38 : {
39 : public:
40 1 : QueueSlave(const uint32_t mark, const uint32_t amount)
41 1 : : masterInstanceID(CO_INSTANCE_ALL)
42 : , prefetchMark(
43 : mark == LB_UNDEFINED_UINT32
44 1 : ? Global::getIAttribute(Global::IATTR_TILE_QUEUE_MIN_SIZE)
45 : : mark)
46 : , prefetchAmount(
47 : amount == LB_UNDEFINED_UINT32
48 1 : ? Global::getIAttribute(Global::IATTR_TILE_QUEUE_REFILL)
49 3 : : amount)
50 : {
51 1 : }
52 :
53 : co::CommandQueue queue;
54 : NodePtr master;
55 : uint32_t masterInstanceID;
56 :
57 : const uint32_t prefetchMark;
58 : const uint32_t prefetchAmount;
59 : };
60 : }
61 :
62 1 : QueueSlave::QueueSlave(const uint32_t prefetchMark,
63 1 : const uint32_t prefetchAmount)
64 1 : : _impl(new detail::QueueSlave(prefetchMark, prefetchAmount))
65 : {
66 1 : }
67 :
68 3 : QueueSlave::~QueueSlave()
69 : {
70 1 : delete _impl;
71 2 : }
72 :
73 1 : void QueueSlave::attach(const uint128_t& id, const uint32_t instanceID)
74 : {
75 1 : Object::attach(id, instanceID);
76 1 : registerCommand(CMD_QUEUE_ITEM, CommandFunc<Object>(0, 0), &_impl->queue);
77 1 : registerCommand(CMD_QUEUE_EMPTY, CommandFunc<Object>(0, 0), &_impl->queue);
78 1 : }
79 :
80 1 : void QueueSlave::applyInstanceData(co::DataIStream& is)
81 : {
82 1 : NodeID masterNodeID;
83 1 : is >> _impl->masterInstanceID >> masterNodeID;
84 :
85 1 : LBASSERT(masterNodeID != 0);
86 1 : LBASSERT(!_impl->master);
87 2 : LocalNodePtr localNode = getLocalNode();
88 1 : _impl->master = localNode->connect(masterNodeID);
89 1 : }
90 :
91 5 : ObjectICommand QueueSlave::pop(const uint32_t timeout)
92 : {
93 5 : static lunchbox::a_int32_t _request;
94 5 : const int32_t request = ++_request;
95 :
96 : while (true)
97 : {
98 5 : const size_t queueSize = _impl->queue.getSize();
99 5 : if (queueSize <= _impl->prefetchMark)
100 : {
101 10 : send(_impl->master, CMD_QUEUE_GET_ITEM, _impl->masterInstanceID)
102 15 : << _impl->prefetchAmount << getInstanceID() << request;
103 : }
104 :
105 5 : ObjectICommand cmd(_impl->queue.pop(timeout));
106 5 : if (!cmd.isValid())
107 : {
108 0 : LBERROR << "Timeout during QueueSlave::pop()" << std::endl;
109 0 : return ObjectICommand(0, 0, 0);
110 : }
111 :
112 5 : switch (cmd.getCommand())
113 : {
114 : case CMD_QUEUE_ITEM:
115 4 : return ObjectICommand(cmd);
116 :
117 : default:
118 0 : LBUNIMPLEMENTED;
119 : case CMD_QUEUE_EMPTY:
120 1 : if (cmd.get<int32_t>() == request)
121 1 : return ObjectICommand(0, 0, 0);
122 : // else left-over or not our empty command, discard and retry
123 0 : break;
124 : }
125 0 : }
126 : }
127 63 : }
|