Line data Source code
1 :
2 : /* Copyright (c) 2012, Daniel Nachbaur <danielnachbaur@gmail.com>
3 : * 2012-2014, Stefan.Eilemann@epfl.ch
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "objectDataOCommand.h"
22 :
23 : #include "buffer.h"
24 : #include "objectDataICommand.h"
25 : #include <pression/plugins/compressorTypes.h>
26 :
27 : namespace co
28 : {
29 :
30 : namespace detail
31 : {
32 :
33 : class ObjectDataOCommand
34 : {
35 : public:
36 170 : ObjectDataOCommand( co::DataOStream* stream_, const void* data_,
37 : const uint64_t size_ )
38 : : data( data_ )
39 : , size( 0 )
40 170 : , stream( stream_ )
41 : {
42 170 : if( stream )
43 : {
44 165 : size = stream->getCompressedDataSize();
45 165 : if( size == 0 )
46 165 : size = size_;
47 : }
48 170 : }
49 :
50 0 : ObjectDataOCommand( const ObjectDataOCommand& rhs )
51 : : data( rhs.data )
52 : , size( rhs.size )
53 0 : , stream( rhs.stream )
54 0 : {}
55 :
56 : const void* const data;
57 : uint64_t size;
58 : co::DataOStream* stream;
59 : };
60 :
61 : }
62 :
63 170 : ObjectDataOCommand::ObjectDataOCommand( const Connections& receivers,
64 : const uint32_t cmd, const uint32_t type,
65 : const uint128_t& id,
66 : const uint32_t instanceID,
67 : const uint128_t& version,
68 : const uint32_t sequence,
69 : const void* data,
70 : const uint64_t size,
71 : const bool isLast,
72 : DataOStream* stream )
73 : : ObjectOCommand( receivers, cmd, type, id, instanceID )
74 170 : , _impl( new detail::ObjectDataOCommand( stream, data, size ))
75 : {
76 170 : _init( version, sequence, size, isLast );
77 170 : }
78 :
79 0 : ObjectDataOCommand::ObjectDataOCommand( const ObjectDataOCommand& rhs )
80 : : ObjectOCommand( rhs )
81 0 : , _impl( new detail::ObjectDataOCommand( *rhs._impl ))
82 : {
83 0 : }
84 :
85 170 : void ObjectDataOCommand::_init( const uint128_t& version,
86 : const uint32_t sequence,
87 : const uint64_t size, const bool isLast )
88 : {
89 170 : *this << version << size << sequence << isLast;
90 :
91 170 : if( _impl->stream )
92 165 : _impl->stream->streamDataHeader( *this );
93 : else
94 5 : *this << EQ_COMPRESSOR_NONE << 0u; // compressor, nChunks
95 170 : }
96 :
97 340 : ObjectDataOCommand::~ObjectDataOCommand()
98 : {
99 170 : if( _impl->stream && _impl->size > 0 )
100 : {
101 165 : sendHeader( _impl->size );
102 165 : const Connections& connections = getConnections();
103 330 : for( ConnectionsCIter i = connections.begin(); i != connections.end();
104 : ++i )
105 : {
106 165 : ConnectionPtr conn = *i;
107 165 : _impl->stream->sendBody( conn, _impl->data, _impl->size );
108 165 : }
109 : }
110 :
111 170 : delete _impl;
112 170 : }
113 :
114 1 : ObjectDataICommand ObjectDataOCommand::_getCommand( LocalNodePtr node )
115 : {
116 1 : lunchbox::Bufferb& outBuffer = getBuffer();
117 : // cppcheck-suppress unreadVariable
118 1 : uint8_t* bytes = outBuffer.getData();
119 1 : reinterpret_cast< uint64_t* >( bytes )[ 0 ] = outBuffer.getSize();
120 :
121 1 : BufferPtr inBuffer = node->allocBuffer( outBuffer.getSize( ));
122 1 : inBuffer->swap( outBuffer );
123 1 : return ObjectDataICommand( node, node, inBuffer, false );
124 : }
125 :
126 66 : }
|