Line data Source code
1 :
2 : /* Copyright (c) 2007-2017, Stefan Eilemann <eile@equalizergraphics.com>
3 : * Cedric Stalder <cedric.stalder@gmail.com>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #ifndef CO_DATAOSTREAM_H
23 : #define CO_DATAOSTREAM_H
24 :
25 : #include <co/api.h>
26 : #include <co/global.h>
27 : #include <co/types.h>
28 :
29 : #include <lunchbox/array.h> // used inline
30 :
31 : #include <boost/noncopyable.hpp>
32 : #include <boost/type_traits.hpp>
33 : #include <map>
34 : #include <set>
35 : #include <unordered_map>
36 : #include <unordered_set>
37 : #include <vector>
38 :
39 : namespace co
40 : {
41 : namespace detail
42 : {
43 : class DataOStream;
44 : }
45 : namespace DataStreamTest
46 : {
47 : class Sender;
48 : }
49 :
50 : /**
51 : * A std::ostream-like interface for object serialization.
52 : *
53 : * Implements buffering, retaining and compressing data in a binary format.
54 : * Derived classes send the data using the appropriate commands.
55 : */
56 : class DataOStream : public boost::noncopyable
57 : {
58 : public:
59 : /** @name Internal */
60 : //@{
61 : /** @internal Disable and flush the output to the current receivers. */
62 : CO_API void disable();
63 :
64 : /** @internal Enable copying of all data into a saved buffer. */
65 : void enableSave();
66 :
67 : /** @internal Disable copying of all data into a saved buffer. */
68 : void disableSave();
69 :
70 : /** @internal @return if data was sent since the last enable() */
71 : CO_API bool hasSentData() const;
72 :
73 : /** @internal */
74 : CO_API const Connections& getConnections() const;
75 :
76 : /** @internal Stream the data header (compressor, nChunks). */
77 : DataOStream& streamDataHeader(DataOStream& os);
78 :
79 : /** @internal Send the (compressed) data using the given connection. */
80 : void sendBody(ConnectionPtr connection, const void* data,
81 : const uint64_t dataSize);
82 :
83 : /** @internal @return the compressed data size, 0 if uncompressed.*/
84 : uint64_t getCompressedDataSize() const;
85 : //@}
86 :
87 : /** @name Data output */
88 : //@{
89 : /** Write a plain data item by copying it to the stream. @version 1.0 */
90 : template <class T>
91 291457 : DataOStream& operator<<(const T& value)
92 : {
93 291457 : _write(value, boost::has_trivial_copy<T>());
94 291479 : return *this;
95 : }
96 :
97 : /** Write a C array. @version 1.0 */
98 : template <class T>
99 7 : DataOStream& operator<<(const Array<T> array)
100 : {
101 7 : _writeArray(array, boost::has_trivial_copy<T>());
102 7 : return *this;
103 : }
104 :
105 : /**
106 : * Write a lunchbox::RefPtr. Refcount has to managed by caller.
107 : * @version 1.1
108 : */
109 : template <class T>
110 : DataOStream& operator<<(const lunchbox::RefPtr<T>& ptr);
111 :
112 : /** Write a lunchbox::Buffer. @version 1.0 */
113 : template <class T>
114 : DataOStream& operator<<(const lunchbox::Buffer<T>& buffer);
115 :
116 : /** Transmit a request identifier. @version 1.1.1 */
117 : template <class T>
118 20161 : DataOStream& operator<<(const lunchbox::Request<T>& request)
119 : {
120 20161 : return (*this) << request.getID();
121 : }
122 :
123 : /** Write a std::vector of serializable items. @version 1.0 */
124 : template <class T>
125 : DataOStream& operator<<(const std::vector<T>& value);
126 :
127 : /** Write a std::map of serializable items. @version 1.0 */
128 : template <class K, class V>
129 : DataOStream& operator<<(const std::map<K, V>& value);
130 :
131 : /** Write a std::set of serializable items. @version 1.0 */
132 : template <class T>
133 : DataOStream& operator<<(const std::set<T>& value);
134 :
135 : /** Write an unordered_map of serializable items. @version 1.0 */
136 : template <class K, class V>
137 : DataOStream& operator<<(const std::unordered_map<K, V>& value);
138 :
139 : /** Write an unordered_set of serializable items. @version 1.0 */
140 : template <class T>
141 : DataOStream& operator<<(const std::unordered_set<T>& value);
142 :
143 : /** @internal
144 : * Serialize child objects.
145 : *
146 : * The DataIStream has a deserialize counterpart to this method. All
147 : * child objects have to be registered or mapped beforehand.
148 : */
149 : template <typename C>
150 : void serializeChildren(const std::vector<C*>& children);
151 : //@}
152 :
153 : CO_API static std::ostream& printStatistics(std::ostream&); //!< @internal
154 : CO_API static void clearStatistics(); //!< @internal
155 :
156 : protected:
157 : CO_API explicit DataOStream(
158 177 : size_t chunkSize = Global::getObjectBufferSize()); //!< @internal
159 : explicit DataOStream(DataOStream& rhs); //!< @internal
160 : virtual CO_API ~DataOStream(); //!< @internal
161 :
162 : /** @internal */
163 : CO_API lunchbox::Bufferb& getBuffer();
164 :
165 : /** @internal Initialize the given compressor. */
166 : void _setCompressor(const pression::data::CompressorInfo& info);
167 :
168 : /** @internal Enable output. */
169 : CO_API void _enable();
170 :
171 : /** @internal Flush remaining data in the buffer. */
172 : void flush(const bool last);
173 :
174 : /** @internal Set up the connection list for a group of nodes, using
175 : * multicast where possible.
176 : */
177 : void _setupConnections(const Nodes& receivers);
178 :
179 : void _setupConnections(const Connections& connections);
180 :
181 : /** @internal Set up the connection (list) for one node. */
182 : void _setupConnection(NodePtr node, const bool useMulticast);
183 :
184 : /** @internal Needed by unit test. */
185 : CO_API void _setupConnection(ConnectionPtr connection);
186 : friend class DataStreamTest::Sender;
187 :
188 : /** @internal Resend the saved buffer to all enabled connections. */
189 : void _resend();
190 :
191 : void _clearConnections(); //!< @internal
192 :
193 : /** @internal @name Data sending, used by the subclasses */
194 : //@{
195 : /** @internal Send a data buffer (command) to the receivers. */
196 : virtual void sendData(const void* buffer, const uint64_t size,
197 : const bool last) = 0;
198 : //@}
199 :
200 : /** @internal Reset the whole stream. */
201 : virtual CO_API void reset();
202 :
203 : private:
204 : detail::DataOStream* const _impl;
205 :
206 : /** Collect compressed data. */
207 : CO_API uint64_t _getCompressedData(const uint8_t** chunks,
208 : uint64_t* chunkSizes) const;
209 :
210 : /** Write a number of bytes from data into the stream. */
211 : CO_API void _write(const void* data, uint64_t size);
212 :
213 : /** Helper function preparing data for sendData() as needed. */
214 : void _sendData(const void* data, const uint64_t size);
215 :
216 : /** Reset after sending a buffer. */
217 : void _resetBuffer();
218 :
219 : /** Write a vector of trivial data. */
220 : template <class T>
221 4 : DataOStream& _writeFlatVector(const std::vector<T>& value)
222 : {
223 4 : const uint64_t nElems = value.size();
224 4 : _write(&nElems, sizeof(nElems));
225 4 : if (nElems > 0)
226 4 : _write(&value.front(), nElems * sizeof(T));
227 4 : return *this;
228 : }
229 :
230 : /** Write a plain data item. */
231 : template <class T>
232 291461 : void _write(const T& value, const boost::true_type&)
233 : {
234 291461 : _write(&value, sizeof(value));
235 291479 : }
236 :
237 : /** Write a non-plain data item. */
238 : template <class T>
239 : void _write(const T& value, const boost::false_type&)
240 : {
241 : _writeSerializable(value, boost::is_base_of<servus::Serializable, T>());
242 : }
243 :
244 : /** Write a serializable object. */
245 : template <class T>
246 : void _writeSerializable(const T& value, const boost::true_type&);
247 :
248 : /** Write an Array of POD data */
249 : template <class T>
250 : void _writeArray(const Array<T> array, const boost::true_type&)
251 : {
252 : _write(array.data, array.getNumBytes());
253 : }
254 :
255 : /** Write an Array of non-POD data */
256 : template <class T>
257 1 : void _writeArray(const Array<T> array, const boost::false_type&)
258 : {
259 3 : for (size_t i = 0; i < array.num; ++i)
260 2 : *this << array.data[i];
261 1 : }
262 :
263 : /** Send the trailing data (command) to the receivers */
264 : void _sendFooter(const void* buffer, const uint64_t size);
265 : };
266 : }
267 :
268 : #include "dataOStream.ipp" // template implementation
269 :
270 : #endif // CO_DATAOSTREAM_H
|