Line data Source code
1 :
2 : /* Copyright (c) 2009-2014, Stefan Eilemann <eile@equalizergraphics.com>
3 : * 2010-2012, Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "instanceCache.h"
22 :
23 : #include "objectDataICommand.h"
24 : #include "objectDataIStream.h"
25 : #include "objectVersion.h"
26 :
27 : #include <lunchbox/debug.h>
28 : #include <lunchbox/scopedMutex.h>
29 :
30 : namespace co
31 : {
32 : //#define CO_INSTRUMENT_CACHE
33 : #ifdef CO_INSTRUMENT_CACHE
34 : namespace
35 : {
36 : lunchbox::a_int32_t nRead;
37 : lunchbox::a_int32_t nReadHit;
38 : lunchbox::a_int32_t nWrite;
39 : lunchbox::a_int32_t nWriteHit;
40 : lunchbox::a_int32_t nWriteMiss;
41 : lunchbox::a_int32_t nWriteReady;
42 : lunchbox::a_int32_t nWriteOld;
43 : lunchbox::a_int32_t nUsedRelease;
44 : lunchbox::a_int32_t nUnusedRelease;
45 : }
46 : #endif
47 :
48 22 : const InstanceCache::Data InstanceCache::Data::NONE;
49 :
50 56 : InstanceCache::InstanceCache( const uint64_t maxSize )
51 : : _maxSize( maxSize )
52 56 : , _size( 0 )
53 56 : {}
54 :
55 108 : InstanceCache::~InstanceCache()
56 : {
57 54 : for( ItemHash::iterator i = _items->begin(); i != _items->end(); ++i )
58 : {
59 0 : Item& item = i->second;
60 0 : _releaseStreams( item );
61 : }
62 :
63 54 : _items->clear();
64 54 : _size = 0;
65 54 : }
66 :
67 197299 : InstanceCache::Data::Data()
68 197299 : : masterInstanceID( CO_INSTANCE_INVALID )
69 197299 : {}
70 :
71 338330 : bool InstanceCache::Data::operator != ( const InstanceCache::Data& rhs ) const
72 : {
73 479425 : return ( masterInstanceID != rhs.masterInstanceID ||
74 479425 : versions != rhs.versions );
75 : }
76 :
77 65582 : bool InstanceCache::Data::operator == ( const InstanceCache::Data& rhs ) const
78 : {
79 131164 : return ( masterInstanceID == rhs.masterInstanceID &&
80 131164 : versions == rhs.versions );
81 : }
82 :
83 197277 : InstanceCache::Item::Item()
84 : : used( 0 )
85 197277 : , access( 0 )
86 197277 : {}
87 :
88 197285 : bool InstanceCache::add( const ObjectVersion& rev, const uint32_t instanceID,
89 : ICommand& command, const uint32_t usage )
90 : {
91 197285 : LBASSERTINFO( command.isValid(), command );
92 :
93 : #ifdef CO_INSTRUMENT_CACHE
94 : ++nWrite;
95 : #endif
96 :
97 197285 : const NodeID nodeID = command.getNode()->getNodeID();
98 :
99 394570 : lunchbox::ScopedMutex<> mutex( _items );
100 197285 : ItemHash::const_iterator i = _items->find( rev.identifier );
101 197285 : if( i == _items->end( ))
102 : {
103 197277 : Item& item = _items.data[ rev.identifier ];
104 197277 : item.data.masterInstanceID = instanceID;
105 197277 : item.from = nodeID;
106 : }
107 :
108 197285 : Item& item = _items.data[ rev.identifier ] ;
109 197285 : if( item.data.masterInstanceID != instanceID || item.from != nodeID )
110 : {
111 0 : LBASSERT( !item.access ); // same master with different instance ID?!
112 0 : if( item.access != 0 ) // are accessed - don't add
113 0 : return false;
114 : // trash data from different master mapping
115 0 : _releaseStreams( item );
116 0 : item.data.masterInstanceID = instanceID;
117 0 : item.from = nodeID;
118 0 : item.used = usage;
119 : }
120 : else
121 197285 : item.used = LB_MAX( item.used, usage );
122 :
123 197285 : if( item.data.versions.empty( ))
124 : {
125 197277 : item.data.versions.push_back( new ObjectDataIStream );
126 197277 : item.times.push_back( _clock.getTime64( ));
127 : }
128 8 : else if( item.data.versions.back()->getPendingVersion() == rev.version )
129 : {
130 4 : if( item.data.versions.back()->isReady( ))
131 : {
132 : #ifdef CO_INSTRUMENT_CACHE
133 : ++nWriteReady;
134 : #endif
135 0 : return false; // Already have stream
136 : }
137 : // else append data to stream
138 : }
139 : else
140 : {
141 4 : const ObjectDataIStream* previous = item.data.versions.back();
142 4 : LBASSERT( previous->isReady( ));
143 :
144 4 : const uint128_t previousVersion = previous->getPendingVersion();
145 4 : if( previousVersion > rev.version )
146 : {
147 : #ifdef CO_INSTRUMENT_CACHE
148 : ++nWriteOld;
149 : #endif
150 0 : return false;
151 : }
152 4 : if( ( previousVersion + 1 ) != rev.version ) // hole
153 : {
154 0 : LBASSERT( previousVersion < rev.version );
155 :
156 0 : if( item.access != 0 ) // are accessed - don't add
157 0 : return false;
158 :
159 0 : _releaseStreams( item );
160 : }
161 : else
162 : {
163 4 : LBASSERT( previous->isReady( ));
164 : }
165 4 : item.data.versions.push_back( new ObjectDataIStream );
166 4 : item.times.push_back( _clock.getTime64( ));
167 : }
168 :
169 197285 : LBASSERT( !item.data.versions.empty( ));
170 197285 : ObjectDataIStream* stream = item.data.versions.back();
171 :
172 197285 : stream->addDataCommand( command );
173 :
174 197285 : if( stream->isReady( ))
175 197281 : _size += stream->getDataSize();
176 :
177 197285 : _releaseItems( 1 );
178 197285 : _releaseItems( 0 );
179 :
180 : #ifdef CO_INSTRUMENT_CACHE
181 : if( _items->find( rev.identifier ) != _items->end( ))
182 : ++nWriteHit;
183 : else
184 : ++nWriteMiss;
185 : #endif
186 197285 : return true;
187 : }
188 :
189 115 : void InstanceCache::remove( const NodeID& nodeID )
190 : {
191 230 : std::vector< uint128_t > keys;
192 :
193 230 : lunchbox::ScopedWrite mutex( _items );
194 151 : for( ItemHash::iterator i = _items->begin(); i != _items->end(); ++i )
195 : {
196 36 : Item& item = i->second;
197 36 : if( item.from != nodeID )
198 0 : continue;
199 :
200 36 : LBASSERT( !item.access );
201 36 : if( item.access != 0 )
202 0 : continue;
203 :
204 36 : _releaseStreams( item );
205 36 : keys.push_back( i->first );
206 : }
207 :
208 453 : for( std::vector< uint128_t >::const_iterator i = keys.begin();
209 302 : i != keys.end(); ++i )
210 : {
211 36 : _items->erase( *i );
212 : }
213 115 : }
214 :
215 403903 : const InstanceCache::Data& InstanceCache::operator[]( const uint128_t& id )
216 : {
217 : #ifdef CO_INSTRUMENT_CACHE
218 : ++nRead;
219 : #endif
220 :
221 807818 : lunchbox::ScopedWrite mutex( _items );
222 403915 : ItemHash::iterator i = _items->find( id );
223 403915 : if( i == _items->end( ))
224 206677 : return Data::NONE;
225 :
226 197238 : Item& item = i->second;
227 197238 : LBASSERT( !item.data.versions.empty( ));
228 197238 : ++item.access;
229 197238 : ++item.used;
230 :
231 : #ifdef CO_INSTRUMENT_CACHE
232 : ++nReadHit;
233 : #endif
234 197238 : return item.data;
235 : }
236 :
237 197238 : bool InstanceCache::release( const uint128_t& id, const uint32_t count )
238 : {
239 394476 : lunchbox::ScopedWrite mutex( _items );
240 197238 : ItemHash::iterator i = _items->find( id );
241 197238 : if( i == _items->end( ))
242 0 : return false;
243 :
244 197238 : Item& item = i->second;
245 197238 : LBASSERT( !item.data.versions.empty( ));
246 197238 : LBASSERT( item.access >= count );
247 :
248 197238 : item.access -= count;
249 197238 : _releaseItems( 1 );
250 197238 : return true;
251 : }
252 :
253 197257 : bool InstanceCache::erase( const uint128_t& id )
254 : {
255 394514 : lunchbox::ScopedWrite mutex( _items );
256 197257 : ItemHash::iterator i = _items->find( id );
257 197257 : if( i == _items->end( ))
258 16 : return false;
259 :
260 197241 : Item& item = i->second;
261 197241 : if( item.access != 0 )
262 0 : return false;
263 :
264 197241 : _releaseStreams( item );
265 197241 : _items->erase( i );
266 197241 : return true;
267 : }
268 :
269 102 : void InstanceCache::expire( const int64_t timeout )
270 : {
271 102 : const int64_t time = _clock.getTime64() - timeout;
272 102 : if( time <= 0 )
273 1 : return;
274 :
275 202 : std::vector< uint128_t > keys;
276 :
277 202 : lunchbox::ScopedWrite mutex( _items );
278 101 : for( ItemHash::iterator i = _items->begin(); i != _items->end(); ++i )
279 : {
280 0 : Item& item = i->second;
281 0 : if( item.access != 0 )
282 0 : continue;
283 :
284 0 : _releaseStreams( item, time );
285 0 : if( item.data.versions.empty( ))
286 0 : keys.push_back( i->first );
287 : }
288 :
289 303 : for( std::vector< uint128_t >::const_iterator i = keys.begin();
290 202 : i != keys.end(); ++i )
291 : {
292 0 : _items->erase( *i );
293 : }
294 : }
295 :
296 0 : void InstanceCache::_releaseStreams( InstanceCache::Item& item,
297 : const int64_t minTime )
298 : {
299 0 : LBASSERT( item.access == 0 );
300 0 : while( !item.data.versions.empty() && item.times.front() <= minTime &&
301 0 : item.data.versions.front()->isReady( ))
302 : {
303 0 : _releaseFirstStream( item );
304 : }
305 0 : }
306 :
307 197277 : void InstanceCache::_releaseStreams( InstanceCache::Item& item )
308 : {
309 197277 : LBASSERT( item.access == 0 );
310 197277 : LBASSERT( !item.data.versions.empty( ));
311 :
312 591839 : while( !item.data.versions.empty( ))
313 : {
314 197281 : ObjectDataIStream* stream = item.data.versions.back();
315 197281 : item.data.versions.pop_back();
316 197281 : _deleteStream( stream );
317 : }
318 197277 : item.times.clear();
319 197277 : }
320 :
321 0 : void InstanceCache::_releaseFirstStream( InstanceCache::Item& item )
322 : {
323 0 : LBASSERT( item.access == 0 );
324 0 : LBASSERT( !item.data.versions.empty( ));
325 0 : if( item.data.versions.empty( ))
326 0 : return;
327 :
328 0 : ObjectDataIStream* stream = item.data.versions.front();
329 0 : item.data.versions.pop_front();
330 0 : item.times.pop_front();
331 0 : _deleteStream( stream );
332 : }
333 :
334 197281 : void InstanceCache::_deleteStream( ObjectDataIStream* stream )
335 : {
336 197281 : LBASSERT( stream->isReady( ));
337 197281 : LBASSERT( _size >= stream->getDataSize( ));
338 :
339 197281 : _size -= stream->getDataSize();
340 197281 : delete stream;
341 197281 : }
342 :
343 591808 : void InstanceCache::_releaseItems( const uint32_t minUsage )
344 : {
345 591808 : if( _size <= _maxSize )
346 591808 : return;
347 :
348 0 : LB_TS_SCOPED( _thread );
349 :
350 0 : std::vector< uint128_t > keys;
351 0 : const uint64_t target = uint64_t( float( _maxSize ) * 0.8f );
352 :
353 : // Release used items (first stream)
354 0 : bool streamsLeft = false;
355 0 : for( ItemHashIter i = _items->begin();
356 0 : i != _items->end() && _size > target; ++i )
357 : {
358 0 : Item& item = i->second;
359 0 : LBASSERT( !item.data.versions.empty( ));
360 :
361 0 : if( item.access == 0 && item.used >= minUsage )
362 : {
363 0 : _releaseFirstStream( item );
364 0 : if( !item.data.versions.empty( ))
365 0 : streamsLeft = true;
366 :
367 0 : keys.push_back( i->first );
368 : #ifdef CO_INSTRUMENT_CACHE
369 : ++nUsedRelease;
370 : #endif
371 : }
372 : }
373 :
374 : // release used items (second..n streams)
375 0 : while( streamsLeft && _size > target )
376 : {
377 0 : streamsLeft = false;
378 :
379 0 : for( std::vector< uint128_t >::const_iterator i = keys.begin();
380 0 : i != keys.end() && _size > target; ++i )
381 : {
382 0 : Item& item = _items.data[ *i ];
383 :
384 0 : if( !item.data.versions.empty() && item.access == 0 &&
385 0 : item.used >= minUsage )
386 : {
387 0 : _releaseFirstStream( item );
388 0 : if( !item.data.versions.empty( ))
389 0 : streamsLeft = true;
390 : #ifdef CO_INSTRUMENT_CACHE
391 : ++nUsedRelease;
392 : #endif
393 : }
394 : }
395 : }
396 :
397 0 : for( std::vector< uint128_t >::const_iterator i = keys.begin();
398 0 : i != keys.end(); ++i )
399 : {
400 0 : Item& item = _items.data[ *i ];
401 0 : if( item.data.versions.empty( ))
402 0 : _items->erase( *i );
403 : }
404 :
405 0 : if( _size > target && minUsage == 0 )
406 0 : LBWARN << "Overfull instance cache, too many pinned items, size "
407 0 : << _size << " target " << target << " max " << _maxSize
408 0 : << " " << _items->size() << " entries"
409 : #ifdef CO_INSTRUMENT_CACHE
410 : << ": " << *this
411 : #endif
412 0 : << std::endl;
413 : }
414 :
415 2 : std::ostream& operator << ( std::ostream& os,
416 : const InstanceCache& instanceCache )
417 : {
418 2 : os << "InstanceCache " << instanceCache.getSize() / 1048576 << "/"
419 4 : << instanceCache.getMaxSize() / 1048576 << " MB"
420 : #ifdef CO_INSTRUMENT_CACHE
421 : << ", " << nReadHit << "/" << nRead << " reads, " << nWriteHit
422 : << "/" << nWrite << " writes (" << nWriteMiss << " misses, " << nWriteOld
423 : << " old, " << nWriteReady << " dups) " << nUsedRelease << " used, "
424 : << nUnusedRelease << " unused releases"
425 : #endif
426 2 : ;
427 2 : return os;
428 : }
429 :
430 66 : }
|