Line data Source code
1 :
2 : /* Copyright (c) 2010-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2012, Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "versionedMasterCM.h"
22 :
23 : #include "log.h"
24 : #include "object.h"
25 : #include "objectCommand.h"
26 : #include "objectDataICommand.h"
27 : #include "objectDataIStream.h"
28 :
29 : namespace co
30 : {
31 : typedef CommandFunc<VersionedMasterCM> CmdFunc;
32 :
33 13 : VersionedMasterCM::VersionedMasterCM(Object* object)
34 : : ObjectCM(object)
35 : , _version(VERSION_NONE)
36 13 : , _maxVersion(std::numeric_limits<uint64_t>::max())
37 : {
38 13 : LBASSERT(object);
39 13 : LBASSERT(object->getLocalNode());
40 :
41 : // sync commands are send to all instances, even the master gets it
42 13 : object->registerCommand(CMD_OBJECT_INSTANCE,
43 26 : CmdFunc(this, &VersionedMasterCM::_cmdDiscard), 0);
44 13 : object->registerCommand(CMD_OBJECT_DELTA,
45 26 : CmdFunc(this, &VersionedMasterCM::_cmdDiscard), 0);
46 :
47 13 : object->registerCommand(CMD_OBJECT_SLAVE_DELTA,
48 26 : CmdFunc(this, &VersionedMasterCM::_cmdSlaveDelta),
49 13 : 0);
50 13 : object->registerCommand(CMD_OBJECT_MAX_VERSION,
51 26 : CmdFunc(this, &VersionedMasterCM::_cmdMaxVersion),
52 13 : 0);
53 13 : }
54 :
55 26 : VersionedMasterCM::~VersionedMasterCM()
56 : {
57 13 : _slaves->clear();
58 13 : }
59 :
60 0 : uint128_t VersionedMasterCM::sync(const uint128_t& inVersion)
61 : {
62 0 : LBASSERTINFO(inVersion.high() != 0 || inVersion == VERSION_NEXT ||
63 : inVersion == VERSION_HEAD,
64 : inVersion);
65 : #if 0
66 : LBLOG( LOG_OBJECTS ) << "sync to v" << inVersion << ", id "
67 : << _object->getID() << "." << _object->getInstanceID()
68 : << std::endl;
69 : #endif
70 :
71 0 : if (inVersion == VERSION_NEXT)
72 0 : return _apply(_slaveCommits.pop());
73 :
74 0 : if (inVersion == VERSION_HEAD)
75 : {
76 0 : uint128_t version = VERSION_NONE;
77 0 : for (ObjectDataIStream* is = _slaveCommits.tryPop(); is;
78 0 : is = _slaveCommits.tryPop())
79 : {
80 0 : version = _apply(is);
81 : }
82 0 : return version;
83 : }
84 : // else apply only concrete slave commit
85 :
86 0 : return _apply(_slaveCommits.pull(inVersion));
87 : }
88 :
89 0 : uint128_t VersionedMasterCM::_apply(ObjectDataIStream* is)
90 : {
91 0 : LBASSERT(!is->hasInstanceData());
92 0 : _object->unpack(*is);
93 0 : LBASSERTINFO(is->getRemainingBufferSize() == 0 &&
94 : is->nRemainingBuffers() == 0,
95 : "Object " << lunchbox::className(_object)
96 : << " did not unpack all data");
97 :
98 0 : const uint128_t version = is->getVersion();
99 0 : is->reset();
100 0 : _slaveCommits.recycle(is);
101 0 : return version;
102 : }
103 :
104 34 : bool VersionedMasterCM::addSlave(const MasterCMCommand& command)
105 : {
106 34 : LB_TS_THREAD(_cmdThread);
107 68 : Mutex mutex(_slaves);
108 :
109 34 : if (!ObjectCM::_addSlave(command, _version))
110 0 : return false;
111 :
112 68 : SlaveData data;
113 34 : data.node = command.getNode();
114 34 : data.instanceID = command.getInstanceID();
115 34 : data.maxVersion = command.getMaxVersion();
116 34 : if (data.maxVersion == 0)
117 0 : data.maxVersion = std::numeric_limits<uint64_t>::max();
118 34 : else if (data.maxVersion < std::numeric_limits<uint64_t>::max())
119 1 : data.maxVersion += _version.low();
120 :
121 34 : _slaveData.push_back(data);
122 34 : _updateMaxVersion();
123 :
124 34 : _slaves->push_back(data.node);
125 34 : lunchbox::usort(*_slaves);
126 34 : return true;
127 : }
128 :
129 33 : void VersionedMasterCM::removeSlave(NodePtr node, const uint32_t instanceID)
130 : {
131 33 : LB_TS_THREAD(_cmdThread);
132 66 : Mutex mutex(_slaves);
133 :
134 : // remove from subscribers
135 66 : SlaveData data;
136 33 : data.node = node;
137 33 : data.instanceID = instanceID;
138 33 : SlaveDatasIter i = lunchbox::find(_slaveData, data);
139 33 : LBASSERTINFO(i != _slaveData.end(), lunchbox::className(_object));
140 33 : if (i == _slaveData.end())
141 0 : return;
142 :
143 33 : _slaveData.erase(i);
144 :
145 : // update _slaves node vector
146 33 : _slaves->clear();
147 124 : for (i = _slaveData.begin(); i != _slaveData.end(); ++i)
148 91 : _slaves->push_back(i->node);
149 33 : lunchbox::usort(*_slaves);
150 33 : _updateMaxVersion();
151 : }
152 :
153 25 : void VersionedMasterCM::removeSlaves(NodePtr node)
154 : {
155 25 : LB_TS_THREAD(_cmdThread);
156 :
157 25 : Mutex mutex(_slaves);
158 :
159 25 : NodesIter i = lunchbox::find(*_slaves, node);
160 25 : if (i == _slaves->end())
161 25 : return;
162 0 : _slaves->erase(i);
163 :
164 0 : for (SlaveDatasIter j = _slaveData.begin(); j != _slaveData.end();)
165 : {
166 0 : if (j->node == node)
167 0 : j = _slaveData.erase(j);
168 : else
169 0 : ++j;
170 : }
171 0 : _updateMaxVersion();
172 : }
173 :
174 69 : void VersionedMasterCM::_updateMaxVersion()
175 : {
176 69 : uint64_t maxVersion = std::numeric_limits<uint64_t>::max();
177 287 : for (SlaveDatasCIter i = _slaveData.begin(); i != _slaveData.end(); ++i)
178 : {
179 221 : if (i->maxVersion != std::numeric_limits<uint64_t>::max() &&
180 3 : maxVersion > i->maxVersion)
181 : {
182 3 : maxVersion = i->maxVersion;
183 : }
184 : }
185 :
186 69 : if (_maxVersion != maxVersion)
187 4 : _maxVersion = maxVersion;
188 69 : }
189 :
190 : //---------------------------------------------------------------------------
191 : // command handlers
192 : //---------------------------------------------------------------------------
193 0 : bool VersionedMasterCM::_cmdSlaveDelta(ICommand& cmd)
194 : {
195 0 : ObjectDataICommand command(cmd);
196 :
197 0 : LB_TS_THREAD(_rcvThread);
198 :
199 0 : if (_slaveCommits.addDataCommand(command.get<uint128_t>(), command))
200 0 : _object->notifyNewVersion();
201 0 : return true;
202 : }
203 :
204 2 : bool VersionedMasterCM::_cmdMaxVersion(ICommand& cmd)
205 : {
206 4 : ObjectICommand command(cmd);
207 2 : const uint64_t version = command.get<uint64_t>();
208 2 : const uint32_t slaveID = command.get<uint32_t>();
209 :
210 4 : Mutex mutex(_slaves);
211 :
212 : // Update slave's max version
213 4 : SlaveData data;
214 2 : data.node = command.getNode();
215 2 : data.instanceID = slaveID;
216 2 : SlaveDatasIter i = lunchbox::find(_slaveData, data);
217 2 : if (i == _slaveData.end())
218 : {
219 0 : LBWARN << "Got max version from unmapped slave" << std::endl;
220 0 : return true;
221 : }
222 :
223 2 : i->maxVersion = version;
224 2 : _updateMaxVersion();
225 2 : return true;
226 : }
227 63 : }
|