Line data Source code
1 :
2 : /* Copyright (c) 2012-2017, Daniel Nachbaur <danielnachbaur@gmail.com>
3 : * Stefan.Eilemann@epfl.ch
4 : *
5 : * This file is part of Collage <https://github.com/Eyescale/Collage>
6 : *
7 : * This library is free software; you can redistribute it and/or modify it under
8 : * the terms of the GNU Lesser General Public License version 2.1 as published
9 : * by the Free Software Foundation.
10 : *
11 : * This library is distributed in the hope that it will be useful, but WITHOUT
12 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
13 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
14 : * details.
15 : *
16 : * You should have received a copy of the GNU Lesser General Public License
17 : * along with this library; if not, write to the Free Software Foundation, Inc.,
18 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
19 : */
20 :
21 : #include "oCommand.h"
22 :
23 : #include "buffer.h"
24 : #include "iCommand.h"
25 :
26 : namespace co
27 : {
28 : namespace detail
29 : {
30 72107 : class OCommand
31 : {
32 : public:
33 72104 : OCommand(co::Dispatcher* const dispatcher_, LocalNodePtr localNode_)
34 72104 : : isLocked(false)
35 : , size(0)
36 : , dispatcher(dispatcher_)
37 72104 : , localNode(localNode_)
38 : {
39 72105 : }
40 :
41 : bool isLocked;
42 : uint64_t size;
43 : co::Dispatcher* const dispatcher;
44 : LocalNodePtr localNode;
45 : };
46 : }
47 :
48 72074 : OCommand::OCommand(const Connections& receivers, const uint32_t cmd,
49 72074 : const uint32_t type)
50 : : DataOStream(std::numeric_limits<size_t>::max())
51 72074 : , _impl(new detail::OCommand(0, 0))
52 : {
53 72072 : _setupConnections(receivers);
54 72071 : _init(cmd, type);
55 72072 : }
56 :
57 33 : OCommand::OCommand(Dispatcher* const dispatcher, LocalNodePtr localNode,
58 33 : const uint32_t cmd, const uint32_t type)
59 : : DataOStream(std::numeric_limits<size_t>::max())
60 33 : , _impl(new detail::OCommand(dispatcher, localNode))
61 : {
62 33 : _init(cmd, type);
63 33 : }
64 :
65 0 : OCommand::OCommand(const OCommand& rhs)
66 : : DataOStream(const_cast<OCommand&>(rhs))
67 0 : , _impl(new detail::OCommand(*rhs._impl))
68 : {
69 0 : }
70 :
71 144212 : OCommand::~OCommand()
72 : {
73 72105 : if (_impl->isLocked)
74 : {
75 159 : LBASSERT(_impl->size > 0);
76 159 : const uint64_t size = _impl->size + getBuffer().getSize();
77 159 : const Connections& connections = getConnections();
78 159 : if (size < COMMAND_MINSIZE) // Fill send to minimal size
79 : {
80 146 : const size_t delta = COMMAND_MINSIZE - size;
81 146 : void* padding = alloca(delta);
82 876 : for (ConnectionsCIter i = connections.begin();
83 584 : i != connections.end(); ++i)
84 : {
85 292 : ConnectionPtr connection = *i;
86 146 : connection->send(padding, delta, true);
87 : }
88 : }
89 318 : for (ConnectionsCIter i = connections.begin(); i != connections.end();
90 : ++i)
91 : {
92 318 : ConnectionPtr connection = *i;
93 159 : connection->unlockSend();
94 : }
95 159 : _impl->isLocked = false;
96 159 : _impl->size = 0;
97 159 : reset();
98 : }
99 : else
100 71946 : disable();
101 :
102 72107 : if (_impl->dispatcher)
103 : {
104 33 : LBASSERT(_impl->localNode);
105 :
106 : // #145 proper local command dispatch?
107 33 : LBASSERT(_impl->size == 0);
108 33 : const uint64_t size = getBuffer().getSize();
109 66 : BufferPtr buffer = _impl->localNode->allocBuffer(size);
110 33 : buffer->swap(getBuffer());
111 33 : reinterpret_cast<uint64_t*>(buffer->getData())[0] = size;
112 :
113 66 : ICommand cmd(_impl->localNode, _impl->localNode, buffer);
114 33 : _impl->dispatcher->dispatchCommand(cmd);
115 : }
116 :
117 72107 : delete _impl;
118 72107 : }
119 :
120 72104 : void OCommand::_init(const uint32_t cmd, const uint32_t type)
121 : {
122 72104 : enableSave();
123 72104 : _enable();
124 72107 : *this << uint64_t(0) /* size */ << type << cmd;
125 72105 : }
126 :
127 159 : void OCommand::sendHeader(const uint64_t additionalSize)
128 : {
129 159 : LBASSERT(!_impl->dispatcher);
130 159 : LBASSERT(!_impl->isLocked);
131 159 : LBASSERT(additionalSize > 0);
132 :
133 159 : const Connections& connections = getConnections();
134 318 : for (ConnectionsCIter i = connections.begin(); i != connections.end(); ++i)
135 : {
136 318 : ConnectionPtr connection = *i;
137 159 : connection->lockSend();
138 : }
139 159 : _impl->isLocked = true;
140 159 : _impl->size = additionalSize;
141 159 : flush(true);
142 159 : }
143 :
144 3 : size_t OCommand::getSize()
145 : {
146 3 : return sizeof(uint64_t) + sizeof(uint32_t) + sizeof(uint32_t);
147 : }
148 :
149 72067 : void OCommand::sendData(const void* buffer LB_UNUSED, const uint64_t size,
150 : const bool last LB_UNUSED)
151 : {
152 72067 : LBASSERT(!_impl->dispatcher);
153 72069 : LBASSERT(last);
154 72069 : LBASSERTINFO(size >= 16, size);
155 72068 : LBASSERT(getBuffer().getData() == buffer);
156 72063 : LBASSERT(getBuffer().getSize() == size);
157 72056 : LBASSERT(getBuffer().getMaxSize() >= COMMAND_MINSIZE);
158 :
159 : // Update size field
160 : // cppcheck-suppress unreadVariable
161 72056 : uint8_t* bytes = getBuffer().getData();
162 72054 : reinterpret_cast<uint64_t*>(bytes)[0] = _impl->size + size;
163 : const uint64_t sendSize =
164 72054 : _impl->isLocked ? size : LB_MAX(size, COMMAND_MINSIZE);
165 72054 : const Connections& connections = getConnections();
166 144128 : for (auto connection : connections)
167 : {
168 72071 : if (connection)
169 72071 : connection->send(bytes, sendSize, _impl->isLocked);
170 : else
171 0 : LBERROR << "Can't send data, node is closed" << std::endl;
172 : }
173 72073 : }
174 63 : }
|