Line data Source code
1 :
2 : /* Copyright (c) 2009-2017, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "instanceCache.h"
22 :
23 : #include "objectDataICommand.h"
24 : #include "objectDataIStream.h"
25 : #include "objectVersion.h"
26 :
27 : #include <lunchbox/debug.h>
28 : #include <lunchbox/scopedMutex.h>
29 :
30 : namespace co
31 : {
32 : //#define CO_INSTRUMENT_CACHE
33 : #ifdef CO_INSTRUMENT_CACHE
34 : namespace
35 : {
36 : lunchbox::a_int32_t nRead;
37 : lunchbox::a_int32_t nReadHit;
38 : lunchbox::a_int32_t nWrite;
39 : lunchbox::a_int32_t nWriteHit;
40 : lunchbox::a_int32_t nWriteMiss;
41 : lunchbox::a_int32_t nWriteReady;
42 : lunchbox::a_int32_t nWriteOld;
43 : lunchbox::a_int32_t nUsedRelease;
44 : lunchbox::a_int32_t nUnusedRelease;
45 : }
46 : #endif
47 :
48 21 : const InstanceCache::Data InstanceCache::Data::NONE;
49 :
50 54 : InstanceCache::InstanceCache(const uint64_t maxSize)
51 : : _maxSize(maxSize)
52 54 : , _size(0)
53 : {
54 54 : }
55 :
56 104 : InstanceCache::~InstanceCache()
57 : {
58 52 : for (ItemHash::iterator i = _items->begin(); i != _items->end(); ++i)
59 : {
60 0 : Item& item = i->second;
61 0 : _releaseStreams(item);
62 : }
63 :
64 52 : _items->clear();
65 52 : _size = 0;
66 52 : }
67 :
68 185641 : InstanceCache::Data::Data()
69 185641 : : masterInstanceID(CO_INSTANCE_INVALID)
70 : {
71 185641 : }
72 :
73 331521 : bool InstanceCache::Data::operator!=(const InstanceCache::Data& rhs) const
74 : {
75 477459 : return (masterInstanceID != rhs.masterInstanceID ||
76 477459 : versions != rhs.versions);
77 : }
78 :
79 65576 : bool InstanceCache::Data::operator==(const InstanceCache::Data& rhs) const
80 : {
81 131152 : return (masterInstanceID == rhs.masterInstanceID &&
82 131152 : versions == rhs.versions);
83 : }
84 :
85 185620 : InstanceCache::Item::Item()
86 : : used(0)
87 185620 : , access(0)
88 : {
89 185620 : }
90 :
91 185628 : bool InstanceCache::add(const ObjectVersion& rev, const uint32_t instanceID,
92 : ICommand& command, const uint32_t usage)
93 : {
94 185628 : LBASSERTINFO(command.isValid(), command);
95 :
96 : #ifdef CO_INSTRUMENT_CACHE
97 : ++nWrite;
98 : #endif
99 :
100 185628 : const NodeID nodeID = command.getNode()->getNodeID();
101 :
102 371256 : lunchbox::ScopedWrite mutex(_items);
103 185628 : ItemHash::const_iterator i = _items->find(rev.identifier);
104 185628 : if (i == _items->end())
105 : {
106 185620 : Item& item = _items.data[rev.identifier];
107 185620 : item.data.masterInstanceID = instanceID;
108 185620 : item.from = nodeID;
109 : }
110 :
111 185628 : Item& item = _items.data[rev.identifier];
112 185628 : if (item.data.masterInstanceID != instanceID || item.from != nodeID)
113 : {
114 0 : LBASSERT(!item.access); // same master with different instance ID?!
115 0 : if (item.access != 0) // are accessed - don't add
116 0 : return false;
117 : // trash data from different master mapping
118 0 : _releaseStreams(item);
119 0 : item.data.masterInstanceID = instanceID;
120 0 : item.from = nodeID;
121 0 : item.used = usage;
122 : }
123 : else
124 185628 : item.used = LB_MAX(item.used, usage);
125 :
126 185628 : if (item.data.versions.empty())
127 : {
128 185620 : item.data.versions.push_back(new ObjectDataIStream);
129 185620 : item.times.push_back(_clock.getTime64());
130 : }
131 8 : else if (item.data.versions.back()->getPendingVersion() == rev.version)
132 : {
133 4 : if (item.data.versions.back()->isReady())
134 : {
135 : #ifdef CO_INSTRUMENT_CACHE
136 : ++nWriteReady;
137 : #endif
138 0 : return false; // Already have stream
139 : }
140 : // else append data to stream
141 : }
142 : else
143 : {
144 4 : const ObjectDataIStream* previous = item.data.versions.back();
145 4 : LBASSERT(previous->isReady());
146 :
147 4 : const uint128_t previousVersion = previous->getPendingVersion();
148 4 : if (previousVersion > rev.version)
149 : {
150 : #ifdef CO_INSTRUMENT_CACHE
151 : ++nWriteOld;
152 : #endif
153 0 : return false;
154 : }
155 4 : if ((previousVersion + 1) != rev.version) // hole
156 : {
157 0 : LBASSERT(previousVersion < rev.version);
158 :
159 0 : if (item.access != 0) // are accessed - don't add
160 0 : return false;
161 :
162 0 : _releaseStreams(item);
163 : }
164 : else
165 : {
166 4 : LBASSERT(previous->isReady());
167 : }
168 4 : item.data.versions.push_back(new ObjectDataIStream);
169 4 : item.times.push_back(_clock.getTime64());
170 : }
171 :
172 185628 : LBASSERT(!item.data.versions.empty());
173 185628 : ObjectDataIStream* stream = item.data.versions.back();
174 :
175 185628 : stream->addDataCommand(command);
176 :
177 185628 : if (stream->isReady())
178 185624 : _size += stream->getDataSize();
179 :
180 185628 : _releaseItems(1);
181 185628 : _releaseItems(0);
182 :
183 : #ifdef CO_INSTRUMENT_CACHE
184 : if (_items->find(rev.identifier) != _items->end())
185 : ++nWriteHit;
186 : else
187 : ++nWriteMiss;
188 : #endif
189 185628 : return true;
190 : }
191 :
192 111 : void InstanceCache::remove(const NodeID& nodeID)
193 : {
194 222 : std::vector<uint128_t> keys;
195 :
196 222 : lunchbox::ScopedWrite mutex(_items);
197 141 : for (ItemHash::iterator i = _items->begin(); i != _items->end(); ++i)
198 : {
199 30 : Item& item = i->second;
200 30 : if (item.from != nodeID)
201 0 : continue;
202 :
203 30 : LBASSERT(!item.access);
204 30 : if (item.access != 0)
205 0 : continue;
206 :
207 30 : _releaseStreams(item);
208 30 : keys.push_back(i->first);
209 : }
210 :
211 423 : for (std::vector<uint128_t>::const_iterator i = keys.begin();
212 282 : i != keys.end(); ++i)
213 : {
214 30 : _items->erase(*i);
215 : }
216 111 : }
217 :
218 397078 : const InstanceCache::Data& InstanceCache::operator[](const uint128_t& id)
219 : {
220 : #ifdef CO_INSTRUMENT_CACHE
221 : ++nRead;
222 : #endif
223 :
224 794177 : lunchbox::ScopedWrite mutex(_items);
225 397099 : ItemHash::iterator i = _items->find(id);
226 397099 : if (i == _items->end())
227 211514 : return Data::NONE;
228 :
229 185585 : Item& item = i->second;
230 185585 : LBASSERT(!item.data.versions.empty());
231 185585 : ++item.access;
232 185585 : ++item.used;
233 :
234 : #ifdef CO_INSTRUMENT_CACHE
235 : ++nReadHit;
236 : #endif
237 185585 : return item.data;
238 : }
239 :
240 185585 : bool InstanceCache::release(const uint128_t& id, const uint32_t count)
241 : {
242 371170 : lunchbox::ScopedWrite mutex(_items);
243 185585 : ItemHash::iterator i = _items->find(id);
244 185585 : if (i == _items->end())
245 0 : return false;
246 :
247 185585 : Item& item = i->second;
248 185585 : LBASSERT(!item.data.versions.empty());
249 185585 : LBASSERT(item.access >= count);
250 :
251 185585 : item.access -= count;
252 185585 : _releaseItems(1);
253 185585 : return true;
254 : }
255 :
256 185600 : bool InstanceCache::erase(const uint128_t& id)
257 : {
258 371200 : lunchbox::ScopedWrite mutex(_items);
259 185600 : ItemHash::iterator i = _items->find(id);
260 185600 : if (i == _items->end())
261 10 : return false;
262 :
263 185590 : Item& item = i->second;
264 185590 : if (item.access != 0)
265 0 : return false;
266 :
267 185590 : _releaseStreams(item);
268 185590 : _items->erase(i);
269 185590 : return true;
270 : }
271 :
272 98 : void InstanceCache::expire(const int64_t timeout)
273 : {
274 98 : const int64_t time = _clock.getTime64() - timeout;
275 98 : if (time <= 0)
276 1 : return;
277 :
278 194 : std::vector<uint128_t> keys;
279 :
280 194 : lunchbox::ScopedWrite mutex(_items);
281 97 : for (ItemHash::iterator i = _items->begin(); i != _items->end(); ++i)
282 : {
283 0 : Item& item = i->second;
284 0 : if (item.access != 0)
285 0 : continue;
286 :
287 0 : _releaseStreams(item, time);
288 0 : if (item.data.versions.empty())
289 0 : keys.push_back(i->first);
290 : }
291 :
292 291 : for (std::vector<uint128_t>::const_iterator i = keys.begin();
293 194 : i != keys.end(); ++i)
294 : {
295 0 : _items->erase(*i);
296 : }
297 : }
298 :
299 0 : void InstanceCache::_releaseStreams(InstanceCache::Item& item,
300 : const int64_t minTime)
301 : {
302 0 : LBASSERT(item.access == 0);
303 0 : while (!item.data.versions.empty() && item.times.front() <= minTime &&
304 0 : item.data.versions.front()->isReady())
305 : {
306 0 : _releaseFirstStream(item);
307 : }
308 0 : }
309 :
310 185620 : void InstanceCache::_releaseStreams(InstanceCache::Item& item)
311 : {
312 185620 : LBASSERT(item.access == 0);
313 185620 : LBASSERT(!item.data.versions.empty());
314 :
315 556868 : while (!item.data.versions.empty())
316 : {
317 185624 : ObjectDataIStream* stream = item.data.versions.back();
318 185624 : item.data.versions.pop_back();
319 185624 : _deleteStream(stream);
320 : }
321 185620 : item.times.clear();
322 185620 : }
323 :
324 0 : void InstanceCache::_releaseFirstStream(InstanceCache::Item& item)
325 : {
326 0 : LBASSERT(item.access == 0);
327 0 : LBASSERT(!item.data.versions.empty());
328 0 : if (item.data.versions.empty())
329 0 : return;
330 :
331 0 : ObjectDataIStream* stream = item.data.versions.front();
332 0 : item.data.versions.pop_front();
333 0 : item.times.pop_front();
334 0 : _deleteStream(stream);
335 : }
336 :
337 185624 : void InstanceCache::_deleteStream(ObjectDataIStream* stream)
338 : {
339 185624 : LBASSERT(stream->isReady());
340 185624 : LBASSERT(_size >= stream->getDataSize());
341 :
342 185624 : _size -= stream->getDataSize();
343 185624 : delete stream;
344 185624 : }
345 :
346 556841 : void InstanceCache::_releaseItems(const uint32_t minUsage)
347 : {
348 556841 : if (_size <= _maxSize)
349 556841 : return;
350 :
351 0 : LB_TS_SCOPED(_thread);
352 :
353 0 : std::vector<uint128_t> keys;
354 0 : const uint64_t target = uint64_t(float(_maxSize) * 0.8f);
355 :
356 : // Release used items (first stream)
357 0 : bool streamsLeft = false;
358 0 : for (ItemHashIter i = _items->begin(); i != _items->end() && _size > target;
359 : ++i)
360 : {
361 0 : Item& item = i->second;
362 0 : LBASSERT(!item.data.versions.empty());
363 :
364 0 : if (item.access == 0 && item.used >= minUsage)
365 : {
366 0 : _releaseFirstStream(item);
367 0 : if (!item.data.versions.empty())
368 0 : streamsLeft = true;
369 :
370 0 : keys.push_back(i->first);
371 : #ifdef CO_INSTRUMENT_CACHE
372 : ++nUsedRelease;
373 : #endif
374 : }
375 : }
376 :
377 : // release used items (second..n streams)
378 0 : while (streamsLeft && _size > target)
379 : {
380 0 : streamsLeft = false;
381 :
382 0 : for (std::vector<uint128_t>::const_iterator i = keys.begin();
383 0 : i != keys.end() && _size > target; ++i)
384 : {
385 0 : Item& item = _items.data[*i];
386 :
387 0 : if (!item.data.versions.empty() && item.access == 0 &&
388 0 : item.used >= minUsage)
389 : {
390 0 : _releaseFirstStream(item);
391 0 : if (!item.data.versions.empty())
392 0 : streamsLeft = true;
393 : #ifdef CO_INSTRUMENT_CACHE
394 : ++nUsedRelease;
395 : #endif
396 : }
397 : }
398 : }
399 :
400 0 : for (std::vector<uint128_t>::const_iterator i = keys.begin();
401 0 : i != keys.end(); ++i)
402 : {
403 0 : Item& item = _items.data[*i];
404 0 : if (item.data.versions.empty())
405 0 : _items->erase(*i);
406 : }
407 :
408 0 : if (_size > target && minUsage == 0)
409 0 : LBWARN << "Overfull instance cache, too many pinned items, size "
410 0 : << _size << " target " << target << " max " << _maxSize << " "
411 0 : << _items->size() << " entries"
412 : #ifdef CO_INSTRUMENT_CACHE
413 : << ": " << *this
414 : #endif
415 0 : << std::endl;
416 : }
417 :
418 2 : std::ostream& operator<<(std::ostream& os, const InstanceCache& instanceCache)
419 : {
420 2 : os << "InstanceCache " << instanceCache.getSize() / 1048576 << "/"
421 4 : << instanceCache.getMaxSize() / 1048576 << " MB"
422 : #ifdef CO_INSTRUMENT_CACHE
423 : << ", " << nReadHit << "/" << nRead << " reads, " << nWriteHit << "/"
424 : << nWrite << " writes (" << nWriteMiss << " misses, " << nWriteOld
425 : << " old, " << nWriteReady << " dups) " << nUsedRelease << " used, "
426 : << nUnusedRelease << " unused releases"
427 : #endif
428 2 : ;
429 2 : return os;
430 : }
431 63 : }
|