Line data Source code
1 :
2 : /* Copyright (c) 2012-2017, Daniel Nachbaur <danielnachbaur@gmail.com>
3 : * Stefan.Eilemann@epfl.ch
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "objectDataOCommand.h"
22 :
23 : #include "buffer.h"
24 : #include "objectDataICommand.h"
25 :
26 : namespace co
27 : {
28 : namespace detail
29 : {
30 : class ObjectDataOCommand
31 : {
32 : public:
33 164 : ObjectDataOCommand(co::DataOStream* stream_, const void* data_,
34 : const uint64_t size_)
35 164 : : data(data_)
36 : , size(0)
37 164 : , stream(stream_)
38 : {
39 164 : if (stream)
40 : {
41 159 : size = stream->getCompressedDataSize();
42 159 : if (size == 0)
43 151 : size = size_;
44 : }
45 164 : }
46 :
47 0 : ObjectDataOCommand(const ObjectDataOCommand& rhs)
48 0 : : data(rhs.data)
49 0 : , size(rhs.size)
50 0 : , stream(rhs.stream)
51 : {
52 0 : }
53 :
54 : const void* const data;
55 : uint64_t size;
56 : co::DataOStream* stream;
57 : };
58 : }
59 :
60 164 : ObjectDataOCommand::ObjectDataOCommand(
61 : const Connections& receivers, const uint32_t cmd, const uint32_t type,
62 : const uint128_t& id, const uint32_t instanceID, const uint128_t& version,
63 : const uint32_t sequence, const void* data, const uint64_t size,
64 164 : const bool isLast, DataOStream* stream)
65 : : ObjectOCommand(receivers, cmd, type, id, instanceID)
66 164 : , _impl(new detail::ObjectDataOCommand(stream, data, size))
67 : {
68 164 : _init(version, sequence, size, isLast);
69 164 : }
70 :
71 0 : ObjectDataOCommand::ObjectDataOCommand(const ObjectDataOCommand& rhs)
72 : : ObjectOCommand(rhs)
73 0 : , _impl(new detail::ObjectDataOCommand(*rhs._impl))
74 : {
75 0 : }
76 :
77 164 : void ObjectDataOCommand::_init(const uint128_t& version,
78 : const uint32_t sequence, const uint64_t size,
79 : const bool isLast)
80 : {
81 164 : *this << version << size << sequence << isLast;
82 :
83 164 : if (_impl->stream)
84 159 : _impl->stream->streamDataHeader(*this);
85 : else
86 5 : *this << std::string() << uint32_t(0); // compressor, nChunks
87 164 : }
88 :
89 328 : ObjectDataOCommand::~ObjectDataOCommand()
90 : {
91 164 : if (_impl->stream && _impl->size > 0)
92 : {
93 159 : sendHeader(_impl->size);
94 159 : const Connections& connections = getConnections();
95 318 : for (ConnectionsCIter i = connections.begin(); i != connections.end();
96 : ++i)
97 : {
98 318 : ConnectionPtr conn = *i;
99 159 : _impl->stream->sendBody(conn, _impl->data, _impl->size);
100 : }
101 : }
102 :
103 164 : delete _impl;
104 164 : }
105 :
106 1 : ObjectDataICommand ObjectDataOCommand::_getCommand(LocalNodePtr node)
107 : {
108 1 : lunchbox::Bufferb& outBuffer = getBuffer();
109 : // cppcheck-suppress unreadVariable
110 1 : uint8_t* bytes = outBuffer.getData();
111 1 : reinterpret_cast<uint64_t*>(bytes)[0] = outBuffer.getSize();
112 :
113 2 : BufferPtr inBuffer = node->allocBuffer(outBuffer.getSize());
114 1 : inBuffer->swap(outBuffer);
115 2 : return ObjectDataICommand(node, node, inBuffer);
116 : }
117 63 : }
|