Line data Source code
1 :
2 : /* Copyright (c) 2011-2016, Stefan Eilemann <eile@eyescale.ch>
3 : * Carsten Rohn <carsten.rohn@rtt.ag>
4 : * Daniel Nachbaur <danielnachbaur@gmail.com>
5 : *
6 : * This file is part of Collage <https://github.com/Eyescale/Collage>
7 : *
8 : * This library is free software; you can redistribute it and/or modify it under
9 : * the terms of the GNU Lesser General Public License version 2.1 as published
10 : * by the Free Software Foundation.
11 : *
12 : * This library is distributed in the hope that it will be useful, but WITHOUT
13 : * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
14 : * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
15 : * details.
16 : *
17 : * You should have received a copy of the GNU Lesser General Public License
18 : * along with this library; if not, write to the Free Software Foundation, Inc.,
19 : * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
20 : */
21 :
22 : #include "queueMaster.h"
23 :
24 : #include "dataOStream.h"
25 : #include "objectICommand.h"
26 : #include "objectOCommand.h"
27 : #include "queueCommand.h"
28 : #include "queueItem.h"
29 :
30 : #include <lunchbox/buffer.h>
31 : #include <lunchbox/mtQueue.h>
32 :
33 : namespace co
34 : {
35 :
36 : namespace detail
37 : {
38 :
39 : class ItemBuffer : public lunchbox::Bufferb, public lunchbox::Referenced
40 : {
41 : public:
42 4 : explicit ItemBuffer( lunchbox::Bufferb&& from )
43 4 : : lunchbox::Bufferb( std::move( from ))
44 4 : , lunchbox::Referenced()
45 4 : {}
46 :
47 8 : ~ItemBuffer() {}
48 : };
49 :
50 : typedef lunchbox::RefPtr< ItemBuffer > ItemBufferPtr;
51 :
52 2 : class QueueMaster : public co::Dispatcher
53 : {
54 : public:
55 1 : explicit QueueMaster( const co::QueueMaster& parent )
56 1 : : co::Dispatcher()
57 1 : , _parent( parent )
58 1 : {}
59 :
60 : /** The command handler functions. */
61 5 : bool cmdGetItem( co::ICommand& comd )
62 : {
63 10 : co::ObjectICommand command( comd );
64 :
65 5 : const uint32_t itemsRequested = command.get< uint32_t >();
66 5 : const uint32_t slaveInstanceID = command.get< uint32_t >();
67 5 : const int32_t requestID = command.get< int32_t >();
68 :
69 : typedef std::vector< ItemBufferPtr > Items;
70 10 : Items items;
71 5 : queue.tryPop( itemsRequested, items );
72 :
73 10 : Connections connections( 1, command.getNode()->getConnection( ));
74 9 : for( Items::const_iterator i = items.begin(); i != items.end(); ++i )
75 : {
76 : co::ObjectOCommand cmd( connections, CMD_QUEUE_ITEM,
77 4 : COMMANDTYPE_OBJECT, _parent.getID(),
78 8 : slaveInstanceID );
79 :
80 8 : const ItemBufferPtr item = *i;
81 4 : if( !item->isEmpty( ))
82 3 : cmd << Array< const void >( item->getData(), item->getSize( ));
83 : }
84 :
85 5 : if( itemsRequested > items.size( ))
86 2 : co::ObjectOCommand( connections, CMD_QUEUE_EMPTY,
87 : COMMANDTYPE_OBJECT, command.getObjectID(),
88 1 : slaveInstanceID ) << requestID;
89 10 : return true;
90 : }
91 :
92 : typedef lunchbox::MTQueue< ItemBufferPtr > ItemQueue;
93 :
94 : ItemQueue queue;
95 :
96 : private:
97 : const co::QueueMaster& _parent;
98 : };
99 : }
100 :
101 1 : QueueMaster::QueueMaster()
102 : #pragma warning(push)
103 : #pragma warning(disable: 4355)
104 1 : : _impl( new detail::QueueMaster( *this ))
105 : #pragma warning(pop)
106 : {
107 1 : }
108 :
109 3 : QueueMaster::~QueueMaster()
110 : {
111 1 : clear();
112 1 : delete _impl;
113 2 : }
114 :
115 1 : void QueueMaster::attach( const uint128_t& id, const uint32_t instanceID )
116 : {
117 1 : Object::attach( id, instanceID );
118 :
119 1 : CommandQueue* queue = getLocalNode()->getCommandThreadQueue();
120 1 : registerCommand( CMD_QUEUE_GET_ITEM,
121 3 : CommandFunc< detail::QueueMaster >(
122 2 : _impl, &detail::QueueMaster::cmdGetItem ), queue );
123 1 : }
124 :
125 1 : void QueueMaster::clear()
126 : {
127 1 : _impl->queue.clear();
128 1 : }
129 :
130 1 : void QueueMaster::getInstanceData( co::DataOStream& os )
131 : {
132 1 : os << getInstanceID() << getLocalNode()->getNodeID();
133 1 : }
134 :
135 4 : QueueItem QueueMaster::push()
136 : {
137 4 : return QueueItem( *this );
138 : }
139 :
140 4 : void QueueMaster::_addItem( QueueItem& item )
141 : {
142 : detail::ItemBufferPtr newBuffer =
143 8 : new detail::ItemBuffer( std::move( item.getBuffer( )));
144 4 : _impl->queue.push( newBuffer );
145 4 : }
146 :
147 66 : }
|