Line data Source code
1 :
2 : /* Copyright (c) 2007-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2011-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "fullMasterCM.h"
22 :
23 : #include "log.h"
24 : #include "node.h"
25 : #include "nodeCommand.h"
26 : #include "oCommand.h"
27 : #include "object.h"
28 : #include "objectDataIStream.h"
29 :
30 : //#define CO_INSTRUMENT
31 :
32 : namespace co
33 : {
34 : namespace
35 : {
36 : #ifdef CO_INSTRUMENT
37 : lunchbox::a_int32_t _bytesBuffered;
38 : #endif
39 : }
40 :
41 10 : FullMasterCM::FullMasterCM( Object* object )
42 : : VersionedMasterCM( object )
43 : , _commitCount( 0 )
44 10 : , _nVersions( 0 )
45 10 : {}
46 :
47 23 : FullMasterCM::~FullMasterCM()
48 : {
49 66 : for( InstanceDataDeque::const_iterator i = _instanceDatas.begin();
50 44 : i != _instanceDatas.end(); ++i )
51 : {
52 12 : delete *i;
53 : }
54 10 : _instanceDatas.clear();
55 :
56 30 : for( InstanceDatas::const_iterator i = _instanceDataCache.begin();
57 20 : i != _instanceDataCache.end(); ++i )
58 : {
59 0 : delete *i;
60 : }
61 10 : _instanceDataCache.clear();
62 13 : }
63 :
64 0 : void FullMasterCM::sendInstanceData( Nodes& nodes )
65 : {
66 0 : LB_TS_THREAD( _cmdThread );
67 0 : Mutex mutex( _slaves );
68 0 : if( !_slaves->empty( ))
69 0 : return;
70 :
71 0 : InstanceData* data = _instanceDatas.back();
72 0 : data->os.sendInstanceData( nodes );
73 : }
74 :
75 10 : void FullMasterCM::init()
76 : {
77 10 : LBASSERT( _commitCount == 0 );
78 10 : VersionedMasterCM::init();
79 :
80 10 : InstanceData* data = _newInstanceData();
81 :
82 10 : data->os.enableCommit( VERSION_FIRST, *_slaves );
83 10 : _object->getInstanceData( data->os );
84 10 : data->os.disable();
85 :
86 10 : _instanceDatas.push_back( data );
87 10 : ++_version;
88 10 : ++_commitCount;
89 10 : }
90 :
91 1 : void FullMasterCM::setAutoObsolete( const uint32_t count )
92 : {
93 1 : Mutex mutex( _slaves );
94 1 : _nVersions = count;
95 1 : _obsolete();
96 1 : }
97 :
98 112 : void FullMasterCM::_updateCommitCount( const uint32_t incarnation )
99 : {
100 112 : LBASSERT( !_instanceDatas.empty( ));
101 112 : if( incarnation == CO_COMMIT_NEXT )
102 : {
103 112 : ++_commitCount;
104 112 : return;
105 : }
106 :
107 0 : if( incarnation >= _commitCount )
108 : {
109 0 : _commitCount = incarnation;
110 0 : return;
111 : }
112 :
113 0 : LBASSERTINFO( incarnation >= _commitCount,
114 : "Detected decreasing commit incarnation counter" );
115 0 : _commitCount = incarnation;
116 :
117 : // obsolete 'future' old packages
118 0 : while( _instanceDatas.size() > 1 )
119 : {
120 0 : InstanceData* data = _instanceDatas.back();
121 0 : if( data->commitCount <= _commitCount )
122 0 : break;
123 :
124 : #ifdef CO_INSTRUMENT
125 : _bytesBuffered -= data->os.getSaveBuffer().getSize();
126 : LBINFO << _bytesBuffered << " bytes used" << std::endl;
127 : #endif
128 0 : _releaseInstanceData( data );
129 0 : _instanceDatas.pop_back();
130 : }
131 :
132 0 : InstanceData* data = _instanceDatas.back();
133 0 : if( data->commitCount > _commitCount )
134 : {
135 : // tweak commitCount of minimum retained version for correct obsoletion
136 0 : data->commitCount = 0;
137 0 : _version = data->os.getVersion();
138 : }
139 : }
140 :
141 113 : void FullMasterCM::_obsolete()
142 : {
143 113 : LBASSERT( !_instanceDatas.empty( ));
144 336 : while( _instanceDatas.size() > 1 && _commitCount > _nVersions )
145 : {
146 212 : InstanceData* data = _instanceDatas.front();
147 212 : if( data->commitCount >= (_commitCount - _nVersions))
148 102 : break;
149 :
150 : #ifdef CO_INSTRUMENT
151 : _bytesBuffered -= data->os.getSaveBuffer().getSize();
152 : LBINFO << _bytesBuffered << " bytes used" << std::endl;
153 : #endif
154 : #if 0
155 : LBINFO
156 : << "Remove v" << data->os.getVersion() << " c" << data->commitCount
157 : << "@" << _commitCount << "/" << _nVersions << " from "
158 : << lunchbox::className( _object ) << " " << ObjectVersion( _object )
159 : << std::endl;
160 : #endif
161 110 : _releaseInstanceData( data );
162 110 : _instanceDatas.pop_front();
163 : }
164 113 : _checkConsistency();
165 113 : }
166 :
167 29 : bool FullMasterCM::_initSlave( const MasterCMCommand& command,
168 : const uint128_t& /*replyVersion*/,
169 : bool replyUseCache )
170 : {
171 29 : _checkConsistency();
172 :
173 29 : const uint128_t& version = command.getRequestedVersion();
174 29 : const uint128_t oldest = _instanceDatas.front()->os.getVersion();
175 58 : uint128_t start = (version == VERSION_OLDEST || version < oldest ) ?
176 29 : oldest : version;
177 29 : uint128_t end = _version;
178 :
179 : #ifndef NDEBUG
180 29 : if( version != VERSION_OLDEST && version < start )
181 0 : LBINFO << "Mapping version " << start << " instead of requested "
182 0 : << version << " for " << lunchbox::className( _object )
183 0 : << " " << ObjectVersion( _object->getID(), _version ) << " of "
184 0 : << _instanceDatas.size() << "/" << _nVersions << std::endl;
185 : #endif
186 :
187 29 : const uint128_t& minCachedVersion = command.getMinCachedVersion();
188 29 : const uint128_t& maxCachedVersion = command.getMaxCachedVersion();
189 29 : const uint128_t replyVersion = start;
190 29 : if( replyUseCache )
191 : {
192 0 : if( minCachedVersion <= start && maxCachedVersion >= start )
193 : {
194 : #ifdef CO_INSTRUMENT_MULTICAST
195 : _hit += maxCachedVersion + 1 - start;
196 : #endif
197 0 : start = maxCachedVersion + 1;
198 : }
199 0 : else if( maxCachedVersion == end )
200 : {
201 0 : end = LB_MAX( start, minCachedVersion - 1 );
202 : #ifdef CO_INSTRUMENT_MULTICAST
203 : _hit += _version - end;
204 : #endif
205 : }
206 : // TODO else cached block in the middle, send head and tail elements
207 : }
208 :
209 : #if 0
210 : LBLOG( LOG_OBJECTS )
211 : << *_object << ", instantiate on " << node->getNodeID() << " with v"
212 : << ((requested == VERSION_OLDEST) ? oldest : requested) << " ("
213 : << requested << ") sending " << start << ".." << end << " have "
214 : << _version - _nVersions << ".." << _version << " "
215 : << _instanceDatas.size() << std::endl;
216 : #endif
217 29 : LBASSERT( start >= oldest );
218 :
219 29 : bool dataSent = false;
220 :
221 : // send all instance datas from start..end
222 29 : InstanceDataDeque::iterator i = _instanceDatas.begin();
223 58 : while( i != _instanceDatas.end() && (*i)->os.getVersion() < start )
224 0 : ++i;
225 :
226 58 : for( ; i != _instanceDatas.end() && (*i)->os.getVersion() <= end; ++i )
227 : {
228 29 : if( !dataSent )
229 : {
230 29 : _sendMapSuccess( command, true );
231 29 : dataSent = true;
232 : }
233 :
234 29 : InstanceData* data = *i;
235 29 : LBASSERT( data );
236 29 : data->os.sendMapData( command.getRemoteNode(),
237 58 : command.getInstanceID( ));
238 :
239 : #ifdef CO_INSTRUMENT_MULTICAST
240 : ++_miss;
241 : #endif
242 : }
243 :
244 29 : if( !dataSent )
245 : {
246 0 : _sendMapSuccess( command, false );
247 0 : _sendMapReply( command, replyVersion, true, replyUseCache, false );
248 : }
249 : else
250 29 : _sendMapReply( command, replyVersion, true, replyUseCache, true );
251 :
252 : #ifdef CO_INSTRUMENT_MULTICAST
253 : if( _miss % 100 == 0 )
254 : LBINFO << "Cached " << _hit << "/" << _hit + _miss
255 : << " instance data transmissions" << std::endl;
256 : #endif
257 29 : return true;
258 : }
259 :
260 142 : void FullMasterCM::_checkConsistency() const
261 : {
262 : #ifndef NDEBUG
263 142 : LBASSERT( !_instanceDatas.empty( ));
264 142 : LBASSERT( _object->isAttached() );
265 :
266 142 : if( _version == VERSION_NONE )
267 142 : return;
268 :
269 142 : uint128_t version = _version;
270 1464 : for( InstanceDataDeque::const_reverse_iterator i = _instanceDatas.rbegin();
271 976 : i != _instanceDatas.rend(); ++i )
272 : {
273 346 : const InstanceData* data = *i;
274 346 : LBASSERT( data->os.getVersion() != VERSION_NONE );
275 346 : LBASSERTINFO( data->os.getVersion() == version,
276 : data->os.getVersion() << " != " << version );
277 346 : if( data != _instanceDatas.front( ))
278 : {
279 204 : LBASSERTINFO( data->commitCount + _nVersions >= _commitCount,
280 : data->commitCount << ", " << _commitCount << " [" <<
281 : _nVersions << "]" );
282 : }
283 346 : --version;
284 : }
285 : #endif
286 : }
287 :
288 : //---------------------------------------------------------------------------
289 : // cache handling
290 : //---------------------------------------------------------------------------
291 122 : FullMasterCM::InstanceData* FullMasterCM::_newInstanceData()
292 : {
293 : InstanceData* instanceData;
294 :
295 122 : if( _instanceDataCache.empty( ))
296 122 : instanceData = new InstanceData( this );
297 : else
298 : {
299 0 : instanceData = _instanceDataCache.back();
300 0 : _instanceDataCache.pop_back();
301 : }
302 :
303 122 : instanceData->commitCount = _commitCount;
304 122 : instanceData->os.reset();
305 122 : instanceData->os.enableSave();
306 122 : return instanceData;
307 : }
308 :
309 112 : void FullMasterCM::_addInstanceData( InstanceData* data )
310 : {
311 112 : LBASSERT( data->os.getVersion() != VERSION_NONE );
312 112 : LBASSERT( data->os.getVersion() != VERSION_INVALID );
313 :
314 112 : _instanceDatas.push_back( data );
315 : #ifdef CO_INSTRUMENT
316 : _bytesBuffered += data->os.getSaveBuffer().getSize();
317 : LBINFO << _bytesBuffered << " bytes used" << std::endl;
318 : #endif
319 112 : }
320 :
321 110 : void FullMasterCM::_releaseInstanceData( InstanceData* data )
322 : {
323 : #ifdef CO_AGGRESSIVE_CACHING
324 : _instanceDataCache.push_back( data );
325 : #else
326 110 : delete data;
327 : #endif
328 110 : }
329 :
330 112 : uint128_t FullMasterCM::commit( const uint32_t incarnation )
331 : {
332 112 : LBASSERT( _version != VERSION_NONE );
333 :
334 112 : if( !_object->isDirty( ))
335 : {
336 0 : Mutex mutex( _slaves );
337 0 : _updateCommitCount( incarnation );
338 0 : _obsolete();
339 0 : return _version;
340 : }
341 :
342 112 : _maxVersion.waitGE( _version.low() + 1 );
343 112 : Mutex mutex( _slaves );
344 : #if 0
345 : LBLOG( LOG_OBJECTS ) << "commit v" << _version << " " << command
346 : << std::endl;
347 : #endif
348 112 : _updateCommitCount( incarnation );
349 112 : _commit();
350 112 : _obsolete();
351 112 : return _version;
352 : }
353 :
354 5 : void FullMasterCM::_commit()
355 : {
356 5 : InstanceData* instanceData = _newInstanceData();
357 5 : instanceData->os.enableCommit( _version + 1, *_slaves );
358 5 : _object->getInstanceData( instanceData->os );
359 5 : instanceData->os.disable();
360 :
361 5 : if( instanceData->os.hasSentData( ))
362 : {
363 5 : ++_version;
364 5 : LBASSERT( _version != VERSION_NONE );
365 : #if 0
366 : LBINFO << "Committed v" << _version << "@" << _commitCount << ", id "
367 : << _object->getID() << std::endl;
368 : #endif
369 5 : _addInstanceData( instanceData );
370 : }
371 : else
372 0 : _instanceDataCache.push_back( instanceData );
373 5 : }
374 :
375 2 : void FullMasterCM::push( const uint128_t& groupID, const uint128_t& typeID,
376 : const Nodes& nodes )
377 : {
378 2 : Mutex mutex( _slaves );
379 2 : InstanceData* instanceData = _instanceDatas.back();
380 2 : instanceData->os.push( nodes, _object->getID(), groupID, typeID );
381 2 : }
382 :
383 0 : bool FullMasterCM::sendSync( const MasterCMCommand& command )
384 : {
385 : //const uint128_t& version = command.getRequestedVersion();
386 0 : const uint128_t& maxCachedVersion = command.getMaxCachedVersion();
387 : const bool useCache =
388 0 : command.useCache() &&
389 0 : command.getMasterInstanceID() == _object->getInstanceID() &&
390 0 : maxCachedVersion == _version;
391 :
392 0 : if( !useCache )
393 : {
394 0 : Mutex mutex( _slaves );
395 0 : InstanceData* instanceData = _instanceDatas.back();
396 0 : instanceData->os.sync( command );
397 : }
398 :
399 0 : NodePtr node = command.getNode();
400 : node->send( CMD_NODE_SYNC_OBJECT_REPLY, useCache /*preferMulticast*/ )
401 0 : << node->getNodeID() << command.getObjectID() << command.getRequestID()
402 0 : << true << command.useCache() << useCache;
403 0 : return true;
404 : }
405 :
406 60 : }
|