Line data Source code
1 :
2 : /* Copyright (c) 2007-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "fullMasterCM.h"
22 :
23 : #include "log.h"
24 : #include "node.h"
25 : #include "nodeCommand.h"
26 : #include "oCommand.h"
27 : #include "object.h"
28 : #include "objectDataIStream.h"
29 :
30 : //#define CO_INSTRUMENT
31 :
32 : namespace co
33 : {
34 : namespace
35 : {
36 : #ifdef CO_INSTRUMENT
37 : lunchbox::a_int32_t _bytesBuffered;
38 : #endif
39 : }
40 :
41 11 : FullMasterCM::FullMasterCM(Object* object)
42 : : VersionedMasterCM(object)
43 : , _commitCount(0)
44 11 : , _nVersions(0)
45 : {
46 11 : }
47 :
48 26 : FullMasterCM::~FullMasterCM()
49 : {
50 72 : for (InstanceDataDeque::const_iterator i = _instanceDatas.begin();
51 48 : i != _instanceDatas.end(); ++i)
52 : {
53 13 : delete *i;
54 : }
55 11 : _instanceDatas.clear();
56 :
57 33 : for (InstanceDatas::const_iterator i = _instanceDataCache.begin();
58 22 : i != _instanceDataCache.end(); ++i)
59 : {
60 0 : delete *i;
61 : }
62 11 : _instanceDataCache.clear();
63 15 : }
64 :
65 0 : void FullMasterCM::sendInstanceData(const Nodes& nodes)
66 : {
67 0 : LB_TS_THREAD(_cmdThread);
68 0 : Mutex mutex(_slaves);
69 0 : if (!_slaves->empty())
70 0 : return;
71 :
72 0 : InstanceData* data = _instanceDatas.back();
73 0 : data->os.sendInstanceData(nodes);
74 : }
75 :
76 11 : void FullMasterCM::init()
77 : {
78 11 : LBASSERT(_commitCount == 0);
79 11 : VersionedMasterCM::init();
80 :
81 11 : InstanceData* data = _newInstanceData();
82 :
83 11 : data->os.enableCommit(VERSION_FIRST, *_slaves);
84 11 : _object->getInstanceData(data->os);
85 11 : data->os.disable();
86 :
87 11 : _instanceDatas.push_back(data);
88 11 : ++_version;
89 11 : ++_commitCount;
90 11 : }
91 :
92 1 : void FullMasterCM::setAutoObsolete(const uint32_t count)
93 : {
94 2 : Mutex mutex(_slaves);
95 1 : _nVersions = count;
96 1 : _obsolete();
97 1 : }
98 :
99 110 : void FullMasterCM::_updateCommitCount(const uint32_t incarnation)
100 : {
101 110 : LBASSERT(!_instanceDatas.empty());
102 110 : if (incarnation == CO_COMMIT_NEXT)
103 : {
104 110 : ++_commitCount;
105 110 : return;
106 : }
107 :
108 0 : if (incarnation >= _commitCount)
109 : {
110 0 : _commitCount = incarnation;
111 0 : return;
112 : }
113 :
114 0 : LBASSERTINFO(incarnation >= _commitCount,
115 : "Detected decreasing commit incarnation counter");
116 0 : _commitCount = incarnation;
117 :
118 : // obsolete 'future' old packages
119 0 : while (_instanceDatas.size() > 1)
120 : {
121 0 : InstanceData* data = _instanceDatas.back();
122 0 : if (data->commitCount <= _commitCount)
123 0 : break;
124 :
125 : #ifdef CO_INSTRUMENT
126 : _bytesBuffered -= data->os.getSaveBuffer().getSize();
127 : LBINFO << _bytesBuffered << " bytes used" << std::endl;
128 : #endif
129 0 : _releaseInstanceData(data);
130 0 : _instanceDatas.pop_back();
131 : }
132 :
133 0 : InstanceData* data = _instanceDatas.back();
134 0 : if (data->commitCount > _commitCount)
135 : {
136 : // tweak commitCount of minimum retained version for correct obsoletion
137 0 : data->commitCount = 0;
138 0 : _version = data->os.getVersion();
139 : }
140 : }
141 :
142 111 : void FullMasterCM::_obsolete()
143 : {
144 111 : LBASSERT(!_instanceDatas.empty());
145 327 : while (_instanceDatas.size() > 1 && _commitCount > _nVersions)
146 : {
147 207 : InstanceData* data = _instanceDatas.front();
148 207 : if (data->commitCount >= (_commitCount - _nVersions))
149 99 : break;
150 :
151 : #ifdef CO_INSTRUMENT
152 : _bytesBuffered -= data->os.getSaveBuffer().getSize();
153 : LBINFO << _bytesBuffered << " bytes used" << std::endl;
154 : #endif
155 : #if 0
156 : LBINFO
157 : << "Remove v" << data->os.getVersion() << " c" << data->commitCount
158 : << "@" << _commitCount << "/" << _nVersions << " from "
159 : << lunchbox::className( _object ) << " " << ObjectVersion( _object )
160 : << std::endl;
161 : #endif
162 108 : _releaseInstanceData(data);
163 108 : _instanceDatas.pop_front();
164 : }
165 111 : _checkConsistency();
166 111 : }
167 :
168 30 : bool FullMasterCM::_initSlave(const MasterCMCommand& command,
169 : const uint128_t& /*replyVersion*/,
170 : bool replyUseCache)
171 : {
172 30 : _checkConsistency();
173 :
174 30 : const uint128_t& version = command.getRequestedVersion();
175 30 : const uint128_t oldest = _instanceDatas.front()->os.getVersion();
176 : uint128_t start =
177 30 : (version == VERSION_OLDEST || version < oldest) ? oldest : version;
178 30 : uint128_t end = _version;
179 :
180 : #ifndef NDEBUG
181 30 : if (version != VERSION_OLDEST && version < start)
182 0 : LBINFO << "Mapping version " << start << " instead of requested "
183 0 : << version << " for " << lunchbox::className(_object) << " "
184 0 : << ObjectVersion(_object->getID(), _version) << " of "
185 0 : << _instanceDatas.size() << "/" << _nVersions << std::endl;
186 : #endif
187 :
188 30 : const uint128_t& minCachedVersion = command.getMinCachedVersion();
189 30 : const uint128_t& maxCachedVersion = command.getMaxCachedVersion();
190 30 : const uint128_t replyVersion = start;
191 30 : if (replyUseCache)
192 : {
193 0 : if (minCachedVersion <= start && maxCachedVersion >= start)
194 : {
195 : #ifdef CO_INSTRUMENT_MULTICAST
196 : _hit += maxCachedVersion + 1 - start;
197 : #endif
198 0 : start = maxCachedVersion + 1;
199 : }
200 0 : else if (maxCachedVersion == end)
201 : {
202 0 : end = LB_MAX(start, minCachedVersion - 1);
203 : #ifdef CO_INSTRUMENT_MULTICAST
204 : _hit += _version - end;
205 : #endif
206 : }
207 : // TODO else cached block in the middle, send head and tail elements
208 : }
209 :
210 : #if 0
211 : LBLOG( LOG_OBJECTS )
212 : << *_object << ", instantiate on " << node->getNodeID() << " with v"
213 : << ((requested == VERSION_OLDEST) ? oldest : requested) << " ("
214 : << requested << ") sending " << start << ".." << end << " have "
215 : << _version - _nVersions << ".." << _version << " "
216 : << _instanceDatas.size() << std::endl;
217 : #endif
218 30 : LBASSERT(start >= oldest);
219 :
220 30 : bool dataSent = false;
221 :
222 : // send all instance datas from start..end
223 30 : InstanceDataDeque::iterator i = _instanceDatas.begin();
224 30 : while (i != _instanceDatas.end() && (*i)->os.getVersion() < start)
225 0 : ++i;
226 :
227 90 : for (; i != _instanceDatas.end() && (*i)->os.getVersion() <= end; ++i)
228 : {
229 30 : if (!dataSent)
230 : {
231 30 : _sendMapSuccess(command, true);
232 30 : dataSent = true;
233 : }
234 :
235 30 : InstanceData* data = *i;
236 30 : LBASSERT(data);
237 30 : data->os.sendMapData(command.getRemoteNode(), command.getInstanceID());
238 :
239 : #ifdef CO_INSTRUMENT_MULTICAST
240 : ++_miss;
241 : #endif
242 : }
243 :
244 30 : if (!dataSent)
245 : {
246 0 : _sendMapSuccess(command, false);
247 0 : _sendMapReply(command, replyVersion, true, replyUseCache, false);
248 : }
249 : else
250 30 : _sendMapReply(command, replyVersion, true, replyUseCache, true);
251 :
252 : #ifdef CO_INSTRUMENT_MULTICAST
253 : if (_miss % 100 == 0)
254 : LBINFO << "Cached " << _hit << "/" << _hit + _miss
255 : << " instance data transmissions" << std::endl;
256 : #endif
257 30 : return true;
258 : }
259 :
260 141 : void FullMasterCM::_checkConsistency() const
261 : {
262 : #ifndef NDEBUG
263 141 : LBASSERT(!_instanceDatas.empty());
264 141 : LBASSERT(_object->isAttached());
265 :
266 141 : if (_version == VERSION_NONE)
267 0 : return;
268 :
269 141 : uint128_t version = _version;
270 1440 : for (InstanceDataDeque::const_reverse_iterator i = _instanceDatas.rbegin();
271 960 : i != _instanceDatas.rend(); ++i)
272 : {
273 339 : const InstanceData* data = *i;
274 339 : LBASSERT(data->os.getVersion() != VERSION_NONE);
275 339 : LBASSERTINFO(data->os.getVersion() == version,
276 : data->os.getVersion() << " != " << version);
277 339 : if (data != _instanceDatas.front())
278 : {
279 198 : LBASSERTINFO(data->commitCount + _nVersions >= _commitCount,
280 : data->commitCount << ", " << _commitCount << " ["
281 : << _nVersions << "]");
282 : }
283 339 : --version;
284 : }
285 : #endif
286 : }
287 :
288 : //---------------------------------------------------------------------------
289 : // cache handling
290 : //---------------------------------------------------------------------------
291 121 : FullMasterCM::InstanceData* FullMasterCM::_newInstanceData()
292 : {
293 : InstanceData* instanceData;
294 :
295 121 : if (_instanceDataCache.empty())
296 121 : instanceData = new InstanceData(this);
297 : else
298 : {
299 0 : instanceData = _instanceDataCache.back();
300 0 : _instanceDataCache.pop_back();
301 : }
302 :
303 121 : instanceData->commitCount = _commitCount;
304 121 : instanceData->os.reset();
305 121 : instanceData->os.enableSave();
306 121 : return instanceData;
307 : }
308 :
309 110 : void FullMasterCM::_addInstanceData(InstanceData* data)
310 : {
311 110 : LBASSERT(data->os.getVersion() != VERSION_NONE);
312 110 : LBASSERT(data->os.getVersion() != VERSION_INVALID);
313 :
314 110 : _instanceDatas.push_back(data);
315 : #ifdef CO_INSTRUMENT
316 : _bytesBuffered += data->os.getSaveBuffer().getSize();
317 : LBINFO << _bytesBuffered << " bytes used" << std::endl;
318 : #endif
319 110 : }
320 :
321 108 : void FullMasterCM::_releaseInstanceData(InstanceData* data)
322 : {
323 : #ifdef CO_AGGRESSIVE_CACHING
324 : _instanceDataCache.push_back(data);
325 : #else
326 108 : delete data;
327 : #endif
328 108 : }
329 :
330 110 : uint128_t FullMasterCM::commit(const uint32_t incarnation)
331 : {
332 110 : LBASSERT(_version != VERSION_NONE);
333 :
334 110 : if (!_object->isDirty())
335 : {
336 0 : Mutex mutex(_slaves);
337 0 : _updateCommitCount(incarnation);
338 0 : _obsolete();
339 0 : return _version;
340 : }
341 :
342 110 : _maxVersion.waitGE(_version.low() + 1);
343 220 : Mutex mutex(_slaves);
344 : #if 0
345 : LBLOG( LOG_OBJECTS ) << "commit v" << _version << " " << command
346 : << std::endl;
347 : #endif
348 110 : _updateCommitCount(incarnation);
349 110 : _commit();
350 110 : _obsolete();
351 110 : return _version;
352 : }
353 :
354 6 : void FullMasterCM::_commit()
355 : {
356 6 : InstanceData* instanceData = _newInstanceData();
357 6 : instanceData->os.enableCommit(_version + 1, *_slaves);
358 6 : _object->getInstanceData(instanceData->os);
359 6 : instanceData->os.disable();
360 :
361 6 : if (instanceData->os.hasSentData())
362 : {
363 6 : ++_version;
364 6 : LBASSERT(_version != VERSION_NONE);
365 : #if 0
366 : LBINFO << "Committed v" << _version << "@" << _commitCount << ", id "
367 : << _object->getID() << std::endl;
368 : #endif
369 6 : _addInstanceData(instanceData);
370 : }
371 : else
372 0 : _instanceDataCache.push_back(instanceData);
373 6 : }
374 :
375 3 : void FullMasterCM::push(const uint128_t& groupID, const uint128_t& typeID,
376 : const Nodes& nodes)
377 : {
378 6 : Mutex mutex(_slaves);
379 3 : InstanceData* instanceData = _instanceDatas.back();
380 3 : instanceData->os.push(nodes, _object->getID(), groupID, typeID);
381 3 : }
382 :
383 0 : bool FullMasterCM::sendSync(const MasterCMCommand& command)
384 : {
385 : // const uint128_t& version = command.getRequestedVersion();
386 0 : const uint128_t& maxCachedVersion = command.getMaxCachedVersion();
387 : const bool useCache =
388 0 : command.useCache() &&
389 0 : command.getMasterInstanceID() == _object->getInstanceID() &&
390 0 : maxCachedVersion == _version;
391 :
392 0 : if (!useCache)
393 : {
394 0 : Mutex mutex(_slaves);
395 0 : InstanceData* instanceData = _instanceDatas.back();
396 0 : instanceData->os.sync(command);
397 : }
398 :
399 0 : NodePtr node = command.getNode();
400 0 : node->send(CMD_NODE_SYNC_OBJECT_REPLY, useCache /*preferMulticast*/)
401 0 : << node->getNodeID() << command.getObjectID() << command.getRequestID()
402 0 : << true << command.useCache() << useCache;
403 0 : return true;
404 : }
405 63 : }
|