Line data Source code
1 :
2 : /* Copyright (c) 2011-2016, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "dataIStreamQueue.h"
22 :
23 : #include "objectDataICommand.h"
24 : #include "objectDataIStream.h"
25 :
26 : namespace co
27 : {
28 66 : DataIStreamQueue::DataIStreamQueue()
29 : {
30 66 : }
31 :
32 128 : DataIStreamQueue::~DataIStreamQueue()
33 : {
34 64 : LBASSERTINFO(_pending.empty(), "Incomplete commits pending");
35 64 : LBASSERTINFO(_queued.isEmpty(), _queued.getSize() << " unapplied commits")
36 :
37 64 : for (PendingStreamsCIter i = _pending.begin(); i != _pending.end(); ++i)
38 0 : delete i->second;
39 64 : _pending.clear();
40 :
41 64 : QueuedStream stream;
42 64 : while (_queued.tryPop(stream))
43 0 : delete stream.second;
44 64 : }
45 :
46 0 : ObjectDataIStream* DataIStreamQueue::tryPop()
47 : {
48 0 : QueuedStream stream(uint128_t(), (ObjectDataIStream*)0);
49 0 : _queued.tryPop(stream);
50 0 : return stream.second;
51 : }
52 :
53 5 : ObjectDataIStream* DataIStreamQueue::pull(const uint128_t& key)
54 : {
55 5 : ObjectDataIStream* is = 0;
56 10 : QueuedStreams unusedStreams;
57 15 : while (!is)
58 : {
59 5 : QueuedStream candidate = _queued.pop();
60 5 : if (candidate.first == key)
61 5 : is = candidate.second;
62 : else
63 0 : unusedStreams.push_back(candidate);
64 : }
65 :
66 5 : _queued.pushFront(unusedStreams);
67 10 : return is;
68 : }
69 :
70 5 : void DataIStreamQueue::recycle(ObjectDataIStream* stream)
71 : {
72 : #ifdef CO_AGGRESSIVE_CACHING
73 : stream->reset();
74 : _iStreamCache.release(stream);
75 : #else
76 5 : delete stream;
77 : #endif
78 5 : }
79 :
80 7 : bool DataIStreamQueue::addDataCommand(const uint128_t& key, ICommand& command)
81 : {
82 7 : LB_TS_THREAD(_thread);
83 7 : LBASSERTINFO(_pending.size() < 100, "More than 100 pending commits");
84 :
85 7 : ObjectDataIStream* is = 0;
86 7 : PendingStreams::iterator i = _pending.find(key);
87 7 : if (i == _pending.end())
88 5 : is = _iStreamCache.alloc();
89 : else
90 2 : is = i->second;
91 :
92 7 : is->addDataCommand(command);
93 7 : if (is->isReady())
94 : {
95 5 : if (i != _pending.end())
96 2 : _pending.erase(i);
97 :
98 5 : _queued.push(QueuedStream(key, is));
99 5 : return true;
100 : }
101 :
102 2 : if (i == _pending.end())
103 : {
104 2 : _pending[key] = is;
105 2 : return false;
106 : }
107 :
108 0 : return false;
109 : }
110 63 : }
|