Line data Source code
1 :
2 : /* Copyright (c) 2007-2017, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Daniel Nachbaur <danielnachbaur@gmail.com>
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "objectDataIStream.h"
22 :
23 : #include "commands.h"
24 : #include "objectCommand.h"
25 : #include "objectDataICommand.h"
26 : #include <pression/data/CompressorInfo.h>
27 :
28 : namespace co
29 : {
30 185779 : ObjectDataIStream::ObjectDataIStream()
31 185779 : : DataIStream()
32 : {
33 185779 : _reset();
34 185779 : }
35 :
36 0 : ObjectDataIStream::ObjectDataIStream(const ObjectDataIStream& rhs)
37 0 : : DataIStream()
38 : {
39 0 : *this = rhs;
40 0 : }
41 :
42 0 : ObjectDataIStream& ObjectDataIStream::operator=(const ObjectDataIStream& rhs)
43 : {
44 0 : if (this != &rhs)
45 : {
46 0 : _commands = rhs._commands;
47 0 : _version = rhs._version;
48 : }
49 0 : return *this;
50 : }
51 :
52 557337 : ObjectDataIStream::~ObjectDataIStream()
53 : {
54 185779 : _reset();
55 371558 : }
56 :
57 0 : void ObjectDataIStream::reset()
58 : {
59 0 : DataIStream::reset();
60 0 : _reset();
61 0 : }
62 :
63 371558 : void ObjectDataIStream::_reset()
64 : {
65 371558 : _usedCommand.clear();
66 371558 : _commands.clear();
67 371558 : _version = VERSION_INVALID;
68 371558 : }
69 :
70 185789 : void ObjectDataIStream::addDataCommand(ObjectDataICommand command)
71 : {
72 185789 : LB_TS_THREAD(_thread);
73 185789 : LBASSERT(!isReady());
74 :
75 : #ifndef NDEBUG
76 185789 : const uint128_t& version = command.getVersion();
77 185789 : const uint32_t sequence = command.getSequence();
78 :
79 185789 : if (_commands.empty())
80 : {
81 185779 : LBASSERTINFO(sequence == 0, sequence << " in " << command);
82 : }
83 : else
84 : {
85 20 : ObjectDataICommand previous(_commands.back());
86 10 : const uint128_t& previousVersion = previous.getVersion();
87 10 : const uint32_t previousSequence = previous.getSequence();
88 10 : LBASSERTINFO(sequence == previousSequence + 1,
89 : sequence << ", " << previousSequence);
90 10 : LBASSERT(version == previousVersion);
91 : }
92 : #endif
93 :
94 185789 : _commands.push_back(command);
95 185789 : if (command.isLast())
96 185779 : _setReady();
97 185789 : }
98 :
99 146 : bool ObjectDataIStream::hasInstanceData() const
100 : {
101 146 : if (!_usedCommand.isValid() && _commands.empty())
102 : {
103 0 : LBUNREACHABLE;
104 0 : return false;
105 : }
106 :
107 : const ICommand& command =
108 146 : _usedCommand.isValid() ? _usedCommand : _commands.front();
109 146 : return (command.getCommand() == CMD_OBJECT_INSTANCE);
110 : }
111 :
112 0 : NodePtr ObjectDataIStream::getRemoteNode() const
113 : {
114 0 : if (!_usedCommand.isValid() && _commands.empty())
115 0 : return 0;
116 :
117 : const ICommand& command =
118 0 : _usedCommand.isValid() ? _usedCommand : _commands.front();
119 0 : return command.getRemoteNode();
120 : }
121 :
122 0 : LocalNodePtr ObjectDataIStream::getLocalNode() const
123 : {
124 0 : if (!_usedCommand.isValid() && _commands.empty())
125 0 : return 0;
126 :
127 : const ICommand& command =
128 0 : _usedCommand.isValid() ? _usedCommand : _commands.front();
129 0 : return command.getLocalNode();
130 : }
131 :
132 556872 : size_t ObjectDataIStream::getDataSize() const
133 : {
134 556872 : size_t size = 0;
135 : typedef CommandDeque::const_iterator CommandDequeCIter;
136 1113756 : for (CommandDequeCIter i = _commands.begin(); i != _commands.end(); ++i)
137 : {
138 556884 : const ICommand& command = *i;
139 556884 : size += command.getSize();
140 : }
141 556872 : return size;
142 : }
143 :
144 185791 : uint128_t ObjectDataIStream::getPendingVersion() const
145 : {
146 185791 : if (_commands.empty())
147 0 : return VERSION_INVALID;
148 :
149 371582 : const ObjectDataICommand& cmd(_commands.back());
150 185791 : return cmd.getVersion();
151 : }
152 :
153 316 : bool ObjectDataIStream::getNextBuffer(CompressorInfo& info, uint32_t& nChunks,
154 : const void*& chunkData, uint64_t& size)
155 : {
156 316 : if (_commands.empty())
157 : {
158 155 : _usedCommand.clear();
159 155 : return false;
160 : }
161 :
162 161 : _usedCommand = _commands.front();
163 161 : _commands.pop_front();
164 161 : if (!_usedCommand.isValid())
165 0 : return false;
166 :
167 161 : LBASSERT(_usedCommand.getCommand() == CMD_OBJECT_INSTANCE ||
168 : _usedCommand.getCommand() == CMD_OBJECT_DELTA ||
169 : _usedCommand.getCommand() == CMD_OBJECT_SLAVE_DELTA);
170 :
171 322 : ObjectDataICommand command(_usedCommand);
172 161 : const uint64_t dataSize = command.getDataSize();
173 :
174 161 : if (dataSize == 0) // empty command
175 4 : return getNextBuffer(info, nChunks, chunkData, size);
176 :
177 157 : size = dataSize;
178 157 : info = command.getCompressorInfo();
179 157 : nChunks = command.getChunks();
180 157 : switch (command.getCommand())
181 : {
182 : case CMD_OBJECT_INSTANCE:
183 51 : command.get<NodeID>(); // nodeID
184 51 : command.get<uint32_t>(); // instanceID
185 51 : break;
186 : case CMD_OBJECT_SLAVE_DELTA:
187 0 : command.get<uint128_t>(); // commit UUID
188 0 : break;
189 : }
190 157 : chunkData = command.getRemainingBuffer(command.getRemainingBufferSize());
191 157 : return true;
192 : }
193 63 : }
|