Collage  1.7.0
High-performance C++ library for developing object-oriented distributed applications.
dataIStream.ipp
1 
2 /* Copyright (c) 2012-2017, Daniel Nachbaur <danielnachbaur@gmail.com>
3  * Stefan.Eilemann@epfl.ch
4  *
5  * This library is free software; you can redistribute it and/or modify it under
6  * the terms of the GNU Lesser General Public License version 2.1 as published
7  * by the Free Software Foundation.
8  *
9  * This library is distributed in the hope that it will be useful, but WITHOUT
10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
11  * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
12  * details.
13  *
14  * You should have received a copy of the GNU Lesser General Public License
15  * along with this library; if not, write to the Free Software Foundation, Inc.,
16  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17  */
18 
19 #include <co/object.h>
20 #include <co/objectVersion.h>
21 #include <lunchbox/algorithm.h> // find_if
22 #include <servus/serializable.h> // used inline
23 
24 namespace co
25 {
29 template <>
30 inline DataIStream& DataIStream::operator>>(std::string& str)
31 {
32  uint64_t nElems = 0;
33  *this >> nElems;
34  const uint64_t maxElems = getRemainingBufferSize();
35  LBASSERTINFO(nElems <= maxElems, nElems << " > " << maxElems);
36  if (nElems == 0)
37  str.clear();
38  else if (nElems <= maxElems)
39  str.assign(static_cast<const char*>(getRemainingBuffer(nElems)),
40  size_t(nElems));
41  else
42  str.assign(static_cast<const char*>(getRemainingBuffer(maxElems)),
43  size_t(maxElems));
44  return *this;
45 }
46 
48 template <>
50 {
51  ObjectVersion data;
52  *this >> data;
53  LBASSERT(object->getID() == data.identifier);
54  object->sync(data.version);
55  return *this;
56 }
57 
59 template <class T>
60 void DataIStream::_readSerializable(T& object, const boost::true_type&)
61 {
62  const size_t size = read<size_t>();
63  object.fromBinary(getRemainingBuffer(size), size);
64 }
65 
67 template <class T>
68 void DataIStream::_readArray(Array<T> array, const boost::true_type&)
69 {
70  _read(array.data, array.getNumBytes());
71 }
72 
74 template <class T>
75 void DataIStream::_readArray(Array<T> array, const boost::false_type&)
76 {
77  for (size_t i = 0; i < array.num; ++i)
78  *this >> array.data[i];
79 }
80 
81 template <>
82 inline void DataIStream::_readArray(Array<void> array, const boost::false_type&)
83 {
84  _read(array.data, array.getNumBytes());
85 }
86 
87 template <class T>
88 inline DataIStream& DataIStream::operator>>(lunchbox::RefPtr<T>& ptr)
89 {
90  T* object = 0;
91  *this >> object;
92  ptr = object;
93  return *this;
94 }
95 
96 template <class T>
97 inline DataIStream& DataIStream::operator>>(lunchbox::Buffer<T>& buffer)
98 {
99  uint64_t nElems = 0;
100  *this >> nElems;
101  LBASSERTINFO(nElems < LB_BIT48,
102  "Out-of-sync co::DataIStream: " << nElems << " elements?");
103  buffer.resize(nElems);
104  return *this >> Array<T>(buffer.getData(), nElems);
105 }
106 
107 template <class T>
108 inline DataIStream& DataIStream::operator>>(std::vector<T>& value)
109 {
110  uint64_t nElems = 0;
111  *this >> nElems;
112  value.resize(nElems);
113  for (uint64_t i = 0; i < nElems; ++i)
114  *this >> value[i];
115  return *this;
116 }
117 
118 template <class K, class V>
119 inline DataIStream& DataIStream::operator>>(std::map<K, V>& map)
120 {
121  map.clear();
122  uint64_t nElems = 0;
123  *this >> nElems;
124  for (uint64_t i = 0; i < nElems; ++i)
125  {
126  typename std::map<K, V>::key_type key;
127  typename std::map<K, V>::mapped_type value;
128  *this >> key >> value;
129  map.insert(std::make_pair(key, value));
130  }
131  return *this;
132 }
133 
134 template <class T>
135 inline DataIStream& DataIStream::operator>>(std::set<T>& value)
136 {
137  value.clear();
138  uint64_t nElems = 0;
139  *this >> nElems;
140  for (uint64_t i = 0; i < nElems; ++i)
141  {
142  T item;
143  *this >> item;
144  value.insert(item);
145  }
146  return *this;
147 }
148 
149 template <class K, class V>
150 inline DataIStream& DataIStream::operator>>(std::unordered_map<K, V>& map)
151 {
152  map.clear();
153  uint64_t nElems = 0;
154  *this >> nElems;
155  for (uint64_t i = 0; i < nElems; ++i)
156  map.insert({read<K>(), read<V>()});
157  return *this;
158 }
159 
160 template <class T>
161 inline DataIStream& DataIStream::operator>>(std::unordered_set<T>& value)
162 {
163  value.clear();
164  uint64_t nElems = 0;
165  *this >> nElems;
166  for (uint64_t i = 0; i < nElems; ++i)
167  value.insert(read<T>());
168  return *this;
169 }
170 
171 namespace
172 {
173 class ObjectFinder
174 {
175 public:
176  explicit ObjectFinder(const uint128_t& id)
177  : _id(id)
178  {
179  }
180  bool operator()(co::Object* candidate) { return candidate->getID() == _id; }
181 private:
182  const uint128_t _id;
183 };
184 }
185 
186 template <typename O, typename C>
187 inline void DataIStream::deserializeChildren(O* object,
188  const std::vector<C*>& old_,
189  std::vector<C*>& result)
190 {
191  ObjectVersions versions;
192  *this >> versions;
193  std::vector<C*> old = old_;
194 
195  // rebuild vector from serialized list
196  result.clear();
197  for (ObjectVersions::const_iterator i = versions.begin();
198  i != versions.end(); ++i)
199  {
200  const ObjectVersion& version = *i;
201 
202  if (version.identifier == uint128_t())
203  {
204  result.push_back(0);
205  continue;
206  }
207 
208  typename std::vector<C*>::iterator j =
209  lunchbox::find_if(old, ObjectFinder(version.identifier));
210 
211  if (j == old.end()) // previously unknown child
212  {
213  C* child = 0;
214  object->create(&child);
215  LocalNodePtr localNode = object->getLocalNode();
216  LBASSERT(child);
217  LBASSERT(!object->isMaster());
218 
219  LBCHECK(localNode->mapObject(child, version));
220  result.push_back(child);
221  }
222  else
223  {
224  C* child = *j;
225  old.erase(j);
226  if (object->isMaster())
227  child->sync(VERSION_HEAD);
228  else
229  child->sync(version.version);
230 
231  result.push_back(child);
232  }
233  }
234 
235  while (!old.empty()) // removed children
236  {
237  C* child = old.back();
238  old.pop_back();
239  if (!child)
240  continue;
241 
242  if (child->isAttached() && !child->isMaster())
243  {
244  LocalNodePtr localNode = object->getLocalNode();
245  localNode->unmapObject(child);
246  }
247  object->release(child);
248  }
249 }
253 template <>
254 inline DataIStream& DataIStream::operator>>(std::vector<uint8_t>& value)
255 {
256  return _readFlatVector(value);
257 }
258 
260 template <>
261 inline DataIStream& DataIStream::operator>>(std::vector<uint16_t>& value)
262 {
263  return _readFlatVector(value);
264 }
265 
267 template <>
268 inline DataIStream& DataIStream::operator>>(std::vector<int16_t>& value)
269 {
270  return _readFlatVector(value);
271 }
272 
274 template <>
275 inline DataIStream& DataIStream::operator>>(std::vector<uint32_t>& value)
276 {
277  return _readFlatVector(value);
278 }
279 
281 template <>
282 inline DataIStream& DataIStream::operator>>(std::vector<int32_t>& value)
283 {
284  return _readFlatVector(value);
285 }
286 
288 template <>
289 inline DataIStream& DataIStream::operator>>(std::vector<uint64_t>& value)
290 {
291  return _readFlatVector(value);
292 }
293 
295 template <>
296 inline DataIStream& DataIStream::operator>>(std::vector<int64_t>& value)
297 {
298  return _readFlatVector(value);
299 }
300 
302 template <>
303 inline DataIStream& DataIStream::operator>>(std::vector<float>& value)
304 {
305  return _readFlatVector(value);
306 }
307 
309 template <>
310 inline DataIStream& DataIStream::operator>>(std::vector<double>& value)
311 {
312  return _readFlatVector(value);
313 }
314 
316 template <>
317 inline DataIStream& DataIStream::operator>>(std::vector<ObjectVersion>& value)
318 {
319  return _readFlatVector(value);
320 }
322 }
CO_API const uint128_t & getID() const
A distributed object.
Definition: object.h:47
lunchbox::RefPtr< LocalNode > LocalNodePtr
A reference pointer for LocalNode pointers.
Definition: types.h:89
A helper struct bundling an object identifier and version.
Definition: objectVersion.h:46
uint128_t identifier
the object identifier
Object-oriented network library.
Definition: barrier.h:27
CO_API const void * getRemainingBuffer(const uint64_t size)
DataIStream & operator>>(T &value)
Read a plain data item.
Definition: dataIStream.h:70
uint128_t version
the object version
CO_API uint64_t getRemainingBufferSize()
A std::istream-like input data stream for binary data.
Definition: dataIStream.h:45