LCOV - code coverage report
Current view: top level - co - dataOStream.h (source / functions) Hit Total Coverage
Test: Collage Lines: 22 22 100.0 %
Date: 2018-01-09 16:37:03 Functions: 34 37 91.9 %

          Line data    Source code
       1             : 
       2             : /* Copyright (c) 2007-2017, Stefan Eilemann <eile@equalizergraphics.com>
       3             :  *                          Cedric Stalder <cedric.stalder@gmail.com>
       4             :  *                          Daniel Nachbaur <danielnachbaur@gmail.com>
       5             :  *
       6             :  * This file is part of Collage <https://github.com/Eyescale/Collage>
       7             :  *
       8             :  * This library is free software; you can redistribute it and/or modify it under
       9             :  * the terms of the GNU Lesser General Public License version 2.1 as published
      10             :  * by the Free Software Foundation.
      11             :  *
      12             :  * This library is distributed in the hope that it will be useful, but WITHOUT
      13             :  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
      14             :  * FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
      15             :  * details.
      16             :  *
      17             :  * You should have received a copy of the GNU Lesser General Public License
      18             :  * along with this library; if not, write to the Free Software Foundation, Inc.,
      19             :  * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
      20             :  */
      21             : 
      22             : #ifndef CO_DATAOSTREAM_H
      23             : #define CO_DATAOSTREAM_H
      24             : 
      25             : #include <co/api.h>
      26             : #include <co/global.h>
      27             : #include <co/types.h>
      28             : 
      29             : #include <lunchbox/array.h> // used inline
      30             : 
      31             : #include <boost/noncopyable.hpp>
      32             : #include <boost/type_traits.hpp>
      33             : #include <map>
      34             : #include <set>
      35             : #include <unordered_map>
      36             : #include <unordered_set>
      37             : #include <vector>
      38             : 
      39             : namespace co
      40             : {
      41             : namespace detail
      42             : {
      43             : class DataOStream;
      44             : }
      45             : namespace DataStreamTest
      46             : {
      47             : class Sender;
      48             : }
      49             : 
      50             : /**
      51             :  * A std::ostream-like interface for object serialization.
      52             :  *
      53             :  * Implements buffering, retaining and compressing data in a binary format.
      54             :  * Derived classes send the data using the appropriate commands.
      55             :  */
      56             : class DataOStream : public boost::noncopyable
      57             : {
      58             : public:
      59             :     /** @name Internal */
      60             :     //@{
      61             :     /** @internal Disable and flush the output to the current receivers. */
      62             :     CO_API void disable();
      63             : 
      64             :     /** @internal Enable copying of all data into a saved buffer. */
      65             :     void enableSave();
      66             : 
      67             :     /** @internal Disable copying of all data into a saved buffer. */
      68             :     void disableSave();
      69             : 
      70             :     /** @internal @return if data was sent since the last enable() */
      71             :     CO_API bool hasSentData() const;
      72             : 
      73             :     /** @internal */
      74             :     CO_API const Connections& getConnections() const;
      75             : 
      76             :     /** @internal Stream the data header (compressor, nChunks). */
      77             :     DataOStream& streamDataHeader(DataOStream& os);
      78             : 
      79             :     /** @internal Send the (compressed) data using the given connection. */
      80             :     void sendBody(ConnectionPtr connection, const void* data,
      81             :                   const uint64_t dataSize);
      82             : 
      83             :     /** @internal @return the compressed data size, 0 if uncompressed.*/
      84             :     uint64_t getCompressedDataSize() const;
      85             :     //@}
      86             : 
      87             :     /** @name Data output */
      88             :     //@{
      89             :     /** Write a plain data item by copying it to the stream. @version 1.0 */
      90             :     template <class T>
      91      291457 :     DataOStream& operator<<(const T& value)
      92             :     {
      93      291457 :         _write(value, boost::has_trivial_copy<T>());
      94      291479 :         return *this;
      95             :     }
      96             : 
      97             :     /** Write a C array. @version 1.0 */
      98             :     template <class T>
      99           7 :     DataOStream& operator<<(const Array<T> array)
     100             :     {
     101           7 :         _writeArray(array, boost::has_trivial_copy<T>());
     102           7 :         return *this;
     103             :     }
     104             : 
     105             :     /**
     106             :      * Write a lunchbox::RefPtr. Refcount has to managed by caller.
     107             :      * @version 1.1
     108             :      */
     109             :     template <class T>
     110             :     DataOStream& operator<<(const lunchbox::RefPtr<T>& ptr);
     111             : 
     112             :     /** Write a lunchbox::Buffer. @version 1.0 */
     113             :     template <class T>
     114             :     DataOStream& operator<<(const lunchbox::Buffer<T>& buffer);
     115             : 
     116             :     /** Transmit a request identifier. @version 1.1.1 */
     117             :     template <class T>
     118       20161 :     DataOStream& operator<<(const lunchbox::Request<T>& request)
     119             :     {
     120       20161 :         return (*this) << request.getID();
     121             :     }
     122             : 
     123             :     /** Write a std::vector of serializable items. @version 1.0 */
     124             :     template <class T>
     125             :     DataOStream& operator<<(const std::vector<T>& value);
     126             : 
     127             :     /** Write a std::map of serializable items. @version 1.0 */
     128             :     template <class K, class V>
     129             :     DataOStream& operator<<(const std::map<K, V>& value);
     130             : 
     131             :     /** Write a std::set of serializable items. @version 1.0 */
     132             :     template <class T>
     133             :     DataOStream& operator<<(const std::set<T>& value);
     134             : 
     135             :     /** Write an unordered_map of serializable items. @version 1.0 */
     136             :     template <class K, class V>
     137             :     DataOStream& operator<<(const std::unordered_map<K, V>& value);
     138             : 
     139             :     /** Write an unordered_set of serializable items. @version 1.0 */
     140             :     template <class T>
     141             :     DataOStream& operator<<(const std::unordered_set<T>& value);
     142             : 
     143             :     /** @internal
     144             :      * Serialize child objects.
     145             :      *
     146             :      * The DataIStream has a deserialize counterpart to this method. All
     147             :      * child objects have to be registered or mapped beforehand.
     148             :      */
     149             :     template <typename C>
     150             :     void serializeChildren(const std::vector<C*>& children);
     151             :     //@}
     152             : 
     153             :     CO_API static std::ostream& printStatistics(std::ostream&); //!< @internal
     154             :     CO_API static void clearStatistics();                       //!< @internal
     155             : 
     156             : protected:
     157             :     CO_API explicit DataOStream(
     158         177 :         size_t chunkSize = Global::getObjectBufferSize()); //!< @internal
     159             :     explicit DataOStream(DataOStream& rhs);                //!< @internal
     160             :     virtual CO_API ~DataOStream();                         //!< @internal
     161             : 
     162             :     /** @internal */
     163             :     CO_API lunchbox::Bufferb& getBuffer();
     164             : 
     165             :     /** @internal Initialize the given compressor. */
     166             :     void _setCompressor(const pression::data::CompressorInfo& info);
     167             : 
     168             :     /** @internal Enable output. */
     169             :     CO_API void _enable();
     170             : 
     171             :     /** @internal Flush remaining data in the buffer. */
     172             :     void flush(const bool last);
     173             : 
     174             :     /** @internal Set up the connection list for a group of nodes, using
     175             :      * multicast where possible.
     176             :      */
     177             :     void _setupConnections(const Nodes& receivers);
     178             : 
     179             :     void _setupConnections(const Connections& connections);
     180             : 
     181             :     /** @internal Set up the connection (list) for one node. */
     182             :     void _setupConnection(NodePtr node, const bool useMulticast);
     183             : 
     184             :     /** @internal Needed by unit test. */
     185             :     CO_API void _setupConnection(ConnectionPtr connection);
     186             :     friend class DataStreamTest::Sender;
     187             : 
     188             :     /** @internal Resend the saved buffer to all enabled connections. */
     189             :     void _resend();
     190             : 
     191             :     void _clearConnections(); //!< @internal
     192             : 
     193             :     /** @internal @name Data sending, used by the subclasses */
     194             :     //@{
     195             :     /** @internal Send a data buffer (command) to the receivers. */
     196             :     virtual void sendData(const void* buffer, const uint64_t size,
     197             :                           const bool last) = 0;
     198             :     //@}
     199             : 
     200             :     /** @internal Reset the whole stream. */
     201             :     virtual CO_API void reset();
     202             : 
     203             : private:
     204             :     detail::DataOStream* const _impl;
     205             : 
     206             :     /** Collect compressed data. */
     207             :     CO_API uint64_t _getCompressedData(const uint8_t** chunks,
     208             :                                        uint64_t* chunkSizes) const;
     209             : 
     210             :     /** Write a number of bytes from data into the stream. */
     211             :     CO_API void _write(const void* data, uint64_t size);
     212             : 
     213             :     /** Helper function preparing data for sendData() as needed. */
     214             :     void _sendData(const void* data, const uint64_t size);
     215             : 
     216             :     /** Reset after sending a buffer. */
     217             :     void _resetBuffer();
     218             : 
     219             :     /** Write a vector of trivial data. */
     220             :     template <class T>
     221           4 :     DataOStream& _writeFlatVector(const std::vector<T>& value)
     222             :     {
     223           4 :         const uint64_t nElems = value.size();
     224           4 :         _write(&nElems, sizeof(nElems));
     225           4 :         if (nElems > 0)
     226           4 :             _write(&value.front(), nElems * sizeof(T));
     227           4 :         return *this;
     228             :     }
     229             : 
     230             :     /** Write a plain data item. */
     231             :     template <class T>
     232      291461 :     void _write(const T& value, const boost::true_type&)
     233             :     {
     234      291461 :         _write(&value, sizeof(value));
     235      291479 :     }
     236             : 
     237             :     /** Write a non-plain data item. */
     238             :     template <class T>
     239             :     void _write(const T& value, const boost::false_type&)
     240             :     {
     241             :         _writeSerializable(value, boost::is_base_of<servus::Serializable, T>());
     242             :     }
     243             : 
     244             :     /** Write a serializable object. */
     245             :     template <class T>
     246             :     void _writeSerializable(const T& value, const boost::true_type&);
     247             : 
     248             :     /** Write an Array of POD data */
     249             :     template <class T>
     250             :     void _writeArray(const Array<T> array, const boost::true_type&)
     251             :     {
     252             :         _write(array.data, array.getNumBytes());
     253             :     }
     254             : 
     255             :     /** Write an Array of non-POD data */
     256             :     template <class T>
     257           1 :     void _writeArray(const Array<T> array, const boost::false_type&)
     258             :     {
     259           3 :         for (size_t i = 0; i < array.num; ++i)
     260           2 :             *this << array.data[i];
     261           1 :     }
     262             : 
     263             :     /** Send the trailing data (command) to the receivers */
     264             :     void _sendFooter(const void* buffer, const uint64_t size);
     265             : };
     266             : }
     267             : 
     268             : #include "dataOStream.ipp" // template implementation
     269             : 
     270             : #endif // CO_DATAOSTREAM_H

Generated by: LCOV version 1.11