Line data Source code
1 :
2 : /* Copyright (c) 2007-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "fullMasterCM.h"
22 :
23 : #include "log.h"
24 : #include "node.h"
25 : #include "nodeCommand.h"
26 : #include "oCommand.h"
27 : #include "object.h"
28 : #include "objectDataIStream.h"
29 :
30 : //#define CO_INSTRUMENT
31 :
32 : namespace co
33 : {
34 : namespace
35 : {
36 : #ifdef CO_INSTRUMENT
37 : lunchbox::a_int32_t _bytesBuffered;
38 : #endif
39 : }
40 :
41 11 : FullMasterCM::FullMasterCM( Object* object )
42 : : VersionedMasterCM( object )
43 : , _commitCount( 0 )
44 11 : , _nVersions( 0 )
45 11 : {}
46 :
47 26 : FullMasterCM::~FullMasterCM()
48 : {
49 72 : for( InstanceDataDeque::const_iterator i = _instanceDatas.begin();
50 48 : i != _instanceDatas.end(); ++i )
51 : {
52 13 : delete *i;
53 : }
54 11 : _instanceDatas.clear();
55 :
56 33 : for( InstanceDatas::const_iterator i = _instanceDataCache.begin();
57 22 : i != _instanceDataCache.end(); ++i )
58 : {
59 0 : delete *i;
60 : }
61 11 : _instanceDataCache.clear();
62 15 : }
63 :
64 0 : void FullMasterCM::sendInstanceData( const Nodes& nodes )
65 : {
66 0 : LB_TS_THREAD( _cmdThread );
67 0 : Mutex mutex( _slaves );
68 0 : if( !_slaves->empty( ))
69 0 : return;
70 :
71 0 : InstanceData* data = _instanceDatas.back();
72 0 : data->os.sendInstanceData( nodes );
73 : }
74 :
75 11 : void FullMasterCM::init()
76 : {
77 11 : LBASSERT( _commitCount == 0 );
78 11 : VersionedMasterCM::init();
79 :
80 11 : InstanceData* data = _newInstanceData();
81 :
82 11 : data->os.enableCommit( VERSION_FIRST, *_slaves );
83 11 : _object->getInstanceData( data->os );
84 11 : data->os.disable();
85 :
86 11 : _instanceDatas.push_back( data );
87 11 : ++_version;
88 11 : ++_commitCount;
89 11 : }
90 :
91 1 : void FullMasterCM::setAutoObsolete( const uint32_t count )
92 : {
93 1 : Mutex mutex( _slaves );
94 1 : _nVersions = count;
95 1 : _obsolete();
96 1 : }
97 :
98 110 : void FullMasterCM::_updateCommitCount( const uint32_t incarnation )
99 : {
100 110 : LBASSERT( !_instanceDatas.empty( ));
101 110 : if( incarnation == CO_COMMIT_NEXT )
102 : {
103 110 : ++_commitCount;
104 110 : return;
105 : }
106 :
107 0 : if( incarnation >= _commitCount )
108 : {
109 0 : _commitCount = incarnation;
110 0 : return;
111 : }
112 :
113 0 : LBASSERTINFO( incarnation >= _commitCount,
114 : "Detected decreasing commit incarnation counter" );
115 0 : _commitCount = incarnation;
116 :
117 : // obsolete 'future' old packages
118 0 : while( _instanceDatas.size() > 1 )
119 : {
120 0 : InstanceData* data = _instanceDatas.back();
121 0 : if( data->commitCount <= _commitCount )
122 0 : break;
123 :
124 : #ifdef CO_INSTRUMENT
125 : _bytesBuffered -= data->os.getSaveBuffer().getSize();
126 : LBINFO << _bytesBuffered << " bytes used" << std::endl;
127 : #endif
128 0 : _releaseInstanceData( data );
129 0 : _instanceDatas.pop_back();
130 : }
131 :
132 0 : InstanceData* data = _instanceDatas.back();
133 0 : if( data->commitCount > _commitCount )
134 : {
135 : // tweak commitCount of minimum retained version for correct obsoletion
136 0 : data->commitCount = 0;
137 0 : _version = data->os.getVersion();
138 : }
139 : }
140 :
141 111 : void FullMasterCM::_obsolete()
142 : {
143 111 : LBASSERT( !_instanceDatas.empty( ));
144 330 : while( _instanceDatas.size() > 1 && _commitCount > _nVersions )
145 : {
146 207 : InstanceData* data = _instanceDatas.front();
147 207 : if( data->commitCount >= (_commitCount - _nVersions))
148 99 : break;
149 :
150 : #ifdef CO_INSTRUMENT
151 : _bytesBuffered -= data->os.getSaveBuffer().getSize();
152 : LBINFO << _bytesBuffered << " bytes used" << std::endl;
153 : #endif
154 : #if 0
155 : LBINFO
156 : << "Remove v" << data->os.getVersion() << " c" << data->commitCount
157 : << "@" << _commitCount << "/" << _nVersions << " from "
158 : << lunchbox::className( _object ) << " " << ObjectVersion( _object )
159 : << std::endl;
160 : #endif
161 108 : _releaseInstanceData( data );
162 108 : _instanceDatas.pop_front();
163 : }
164 111 : _checkConsistency();
165 111 : }
166 :
167 30 : bool FullMasterCM::_initSlave( const MasterCMCommand& command,
168 : const uint128_t& /*replyVersion*/,
169 : bool replyUseCache )
170 : {
171 30 : _checkConsistency();
172 :
173 30 : const uint128_t& version = command.getRequestedVersion();
174 30 : const uint128_t oldest = _instanceDatas.front()->os.getVersion();
175 59 : uint128_t start = (version == VERSION_OLDEST || version < oldest ) ?
176 31 : oldest : version;
177 30 : uint128_t end = _version;
178 :
179 : #ifndef NDEBUG
180 30 : if( version != VERSION_OLDEST && version < start )
181 0 : LBINFO << "Mapping version " << start << " instead of requested "
182 0 : << version << " for " << lunchbox::className( _object )
183 0 : << " " << ObjectVersion( _object->getID(), _version ) << " of "
184 0 : << _instanceDatas.size() << "/" << _nVersions << std::endl;
185 : #endif
186 :
187 30 : const uint128_t& minCachedVersion = command.getMinCachedVersion();
188 30 : const uint128_t& maxCachedVersion = command.getMaxCachedVersion();
189 30 : const uint128_t replyVersion = start;
190 30 : if( replyUseCache )
191 : {
192 0 : if( minCachedVersion <= start && maxCachedVersion >= start )
193 : {
194 : #ifdef CO_INSTRUMENT_MULTICAST
195 : _hit += maxCachedVersion + 1 - start;
196 : #endif
197 0 : start = maxCachedVersion + 1;
198 : }
199 0 : else if( maxCachedVersion == end )
200 : {
201 0 : end = LB_MAX( start, minCachedVersion - 1 );
202 : #ifdef CO_INSTRUMENT_MULTICAST
203 : _hit += _version - end;
204 : #endif
205 : }
206 : // TODO else cached block in the middle, send head and tail elements
207 : }
208 :
209 : #if 0
210 : LBLOG( LOG_OBJECTS )
211 : << *_object << ", instantiate on " << node->getNodeID() << " with v"
212 : << ((requested == VERSION_OLDEST) ? oldest : requested) << " ("
213 : << requested << ") sending " << start << ".." << end << " have "
214 : << _version - _nVersions << ".." << _version << " "
215 : << _instanceDatas.size() << std::endl;
216 : #endif
217 30 : LBASSERT( start >= oldest );
218 :
219 30 : bool dataSent = false;
220 :
221 : // send all instance datas from start..end
222 30 : InstanceDataDeque::iterator i = _instanceDatas.begin();
223 60 : while( i != _instanceDatas.end() && (*i)->os.getVersion() < start )
224 0 : ++i;
225 :
226 60 : for( ; i != _instanceDatas.end() && (*i)->os.getVersion() <= end; ++i )
227 : {
228 30 : if( !dataSent )
229 : {
230 30 : _sendMapSuccess( command, true );
231 30 : dataSent = true;
232 : }
233 :
234 30 : InstanceData* data = *i;
235 30 : LBASSERT( data );
236 30 : data->os.sendMapData( command.getRemoteNode(),
237 60 : command.getInstanceID( ));
238 :
239 : #ifdef CO_INSTRUMENT_MULTICAST
240 : ++_miss;
241 : #endif
242 : }
243 :
244 30 : if( !dataSent )
245 : {
246 0 : _sendMapSuccess( command, false );
247 0 : _sendMapReply( command, replyVersion, true, replyUseCache, false );
248 : }
249 : else
250 30 : _sendMapReply( command, replyVersion, true, replyUseCache, true );
251 :
252 : #ifdef CO_INSTRUMENT_MULTICAST
253 : if( _miss % 100 == 0 )
254 : LBINFO << "Cached " << _hit << "/" << _hit + _miss
255 : << " instance data transmissions" << std::endl;
256 : #endif
257 30 : return true;
258 : }
259 :
260 141 : void FullMasterCM::_checkConsistency() const
261 : {
262 : #ifndef NDEBUG
263 141 : LBASSERT( !_instanceDatas.empty( ));
264 141 : LBASSERT( _object->isAttached() );
265 :
266 141 : if( _version == VERSION_NONE )
267 141 : return;
268 :
269 141 : uint128_t version = _version;
270 1440 : for( InstanceDataDeque::const_reverse_iterator i = _instanceDatas.rbegin();
271 960 : i != _instanceDatas.rend(); ++i )
272 : {
273 339 : const InstanceData* data = *i;
274 339 : LBASSERT( data->os.getVersion() != VERSION_NONE );
275 339 : LBASSERTINFO( data->os.getVersion() == version,
276 : data->os.getVersion() << " != " << version );
277 339 : if( data != _instanceDatas.front( ))
278 : {
279 198 : LBASSERTINFO( data->commitCount + _nVersions >= _commitCount,
280 : data->commitCount << ", " << _commitCount << " [" <<
281 : _nVersions << "]" );
282 : }
283 339 : --version;
284 : }
285 : #endif
286 : }
287 :
288 : //---------------------------------------------------------------------------
289 : // cache handling
290 : //---------------------------------------------------------------------------
291 121 : FullMasterCM::InstanceData* FullMasterCM::_newInstanceData()
292 : {
293 : InstanceData* instanceData;
294 :
295 121 : if( _instanceDataCache.empty( ))
296 121 : instanceData = new InstanceData( this );
297 : else
298 : {
299 0 : instanceData = _instanceDataCache.back();
300 0 : _instanceDataCache.pop_back();
301 : }
302 :
303 121 : instanceData->commitCount = _commitCount;
304 121 : instanceData->os.reset();
305 121 : instanceData->os.enableSave();
306 121 : return instanceData;
307 : }
308 :
309 110 : void FullMasterCM::_addInstanceData( InstanceData* data )
310 : {
311 110 : LBASSERT( data->os.getVersion() != VERSION_NONE );
312 110 : LBASSERT( data->os.getVersion() != VERSION_INVALID );
313 :
314 110 : _instanceDatas.push_back( data );
315 : #ifdef CO_INSTRUMENT
316 : _bytesBuffered += data->os.getSaveBuffer().getSize();
317 : LBINFO << _bytesBuffered << " bytes used" << std::endl;
318 : #endif
319 110 : }
320 :
321 108 : void FullMasterCM::_releaseInstanceData( InstanceData* data )
322 : {
323 : #ifdef CO_AGGRESSIVE_CACHING
324 : _instanceDataCache.push_back( data );
325 : #else
326 108 : delete data;
327 : #endif
328 108 : }
329 :
330 110 : uint128_t FullMasterCM::commit( const uint32_t incarnation )
331 : {
332 110 : LBASSERT( _version != VERSION_NONE );
333 :
334 110 : if( !_object->isDirty( ))
335 : {
336 0 : Mutex mutex( _slaves );
337 0 : _updateCommitCount( incarnation );
338 0 : _obsolete();
339 0 : return _version;
340 : }
341 :
342 110 : _maxVersion.waitGE( _version.low() + 1 );
343 110 : Mutex mutex( _slaves );
344 : #if 0
345 : LBLOG( LOG_OBJECTS ) << "commit v" << _version << " " << command
346 : << std::endl;
347 : #endif
348 110 : _updateCommitCount( incarnation );
349 110 : _commit();
350 110 : _obsolete();
351 110 : return _version;
352 : }
353 :
354 6 : void FullMasterCM::_commit()
355 : {
356 6 : InstanceData* instanceData = _newInstanceData();
357 6 : instanceData->os.enableCommit( _version + 1, *_slaves );
358 6 : _object->getInstanceData( instanceData->os );
359 6 : instanceData->os.disable();
360 :
361 6 : if( instanceData->os.hasSentData( ))
362 : {
363 6 : ++_version;
364 6 : LBASSERT( _version != VERSION_NONE );
365 : #if 0
366 : LBINFO << "Committed v" << _version << "@" << _commitCount << ", id "
367 : << _object->getID() << std::endl;
368 : #endif
369 6 : _addInstanceData( instanceData );
370 : }
371 : else
372 0 : _instanceDataCache.push_back( instanceData );
373 6 : }
374 :
375 3 : void FullMasterCM::push( const uint128_t& groupID, const uint128_t& typeID,
376 : const Nodes& nodes )
377 : {
378 3 : Mutex mutex( _slaves );
379 3 : InstanceData* instanceData = _instanceDatas.back();
380 3 : instanceData->os.push( nodes, _object->getID(), groupID, typeID );
381 3 : }
382 :
383 0 : bool FullMasterCM::sendSync( const MasterCMCommand& command )
384 : {
385 : //const uint128_t& version = command.getRequestedVersion();
386 0 : const uint128_t& maxCachedVersion = command.getMaxCachedVersion();
387 : const bool useCache =
388 0 : command.useCache() &&
389 0 : command.getMasterInstanceID() == _object->getInstanceID() &&
390 0 : maxCachedVersion == _version;
391 :
392 0 : if( !useCache )
393 : {
394 0 : Mutex mutex( _slaves );
395 0 : InstanceData* instanceData = _instanceDatas.back();
396 0 : instanceData->os.sync( command );
397 : }
398 :
399 0 : NodePtr node = command.getNode();
400 : node->send( CMD_NODE_SYNC_OBJECT_REPLY, useCache /*preferMulticast*/ )
401 0 : << node->getNodeID() << command.getObjectID() << command.getRequestID()
402 0 : << true << command.useCache() << useCache;
403 0 : return true;
404 : }
405 :
406 66 : }
|