Libthreadar 1.4.0
ratelier_scatter.hpp
Go to the documentation of this file.
1/*********************************************************************/
2// libthreadar - is a library providing several C++ classes to work with threads
3// Copyright (C) 2014-2020 Denis Corbin
4//
5// This file is part of libthreadar
6//
7// libthreadar is free software: you can redistribute it and/or modify
8// it under the terms of the GNU Lesser General Public License as published by
9// the Free Software Foundation, either version 3 of the License, or
10// (at your option) any later version.
11//
12// libhtreadar is distributed in the hope that it will be useful,
13// but WITHOUT ANY WARRANTY; without even the implied warranty of
14// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15// GNU Lesser General Public License for more details.
16//
17// You should have received a copy of the GNU Lesser General Public License
18// along with libthreadar. If not, see <http://www.gnu.org/licenses/>
19//
20//----
21// to contact the author: dar.linux@free.fr
22/*********************************************************************/
23
24#ifndef LIBTHREADAR_RATELIER_SCATTER_HPP
25#define LIBTHREADAR_RATELIER_SCATTER_HPP
26
33
34#include "config.h"
35
36 // C system headers
37extern "C"
38{
39}
40 // C++ standard headers
41#include <vector>
42#include <map>
43#include <deque>
44#include <memory>
45
46 // libthreadar headers
47#include "mutex.hpp"
48
49
50namespace libthreadar
51{
53
62
63 template <class T> class ratelier_scatter
64 {
65 public:
66 ratelier_scatter(unsigned int size, signed int flag = 0);
67 ratelier_scatter(const ratelier_scatter & ref) = delete;
68 ratelier_scatter(ratelier_scatter && ref) = default;
69 ratelier_scatter & operator = (const ratelier_scatter & ref) = delete;
70 ratelier_scatter & operator = (ratelier_scatter && ref) noexcept = default;
71 virtual ~ratelier_scatter() = default;
72
81 void scatter(std::unique_ptr<T> & one, signed int flag = 0);
82
90 std::unique_ptr<T> worker_get_one(unsigned int & slot, signed int & flag);
91
93 void reset();
94
95 private:
96
97 static const unsigned int cond_empty = 0;
98 static const unsigned int cond_full = 1;
99
100 struct slot
101 {
102 std::unique_ptr<T> obj;
103 bool empty;
104 unsigned int index;
105 signed int flag;
106
107 slot(signed int val) { empty = true; flag = val; };
108 slot(const slot & ref) { obj.reset(); empty = ref.empty; index = ref.index; flag = ref.flag; };
109 };
110
111 unsigned int next_index;
112 unsigned int lowest_index;
113 std::vector<slot> table;
114 std::map<unsigned int, unsigned int> corres;
115 std::deque<unsigned int> empty_slot;
117 };
118
119 template <class T> ratelier_scatter<T>::ratelier_scatter(unsigned int size, signed int flag):
120 table(size, slot(flag)),
121 verrou(2)
122 {
123 next_index = 0;
124 lowest_index = 0;
125
126 for(unsigned int i = 0; i < size; ++i)
127 empty_slot.push_back(i);
128 }
129
130 template <class T> void ratelier_scatter<T>::scatter(std::unique_ptr<T> & one, signed int flag)
131 {
132 unsigned int tableindex;
133
134 verrou.lock();
135 try
136 {
137 while(empty_slot.empty()) // ratelier_scatter is full
138 verrou.wait(cond_full);
139
140 tableindex = empty_slot.back();
141
142 // sanity checks
143
144 if(tableindex >= table.size())
145 throw THREADAR_BUG;
146 if( ! table[tableindex].empty)
147 throw THREADAR_BUG;
148
149 // recording the change
150
151 table[tableindex].empty = false;
152 table[tableindex].obj = std::move(one);
153 table[tableindex].index = next_index;
154 table[tableindex].flag = flag;
155
156 corres[next_index] = tableindex;
157 ++next_index;
158
159 empty_slot.pop_back();
160 if(verrou.get_waiting_thread_count(cond_empty) > 0)
161 verrou.signal(cond_empty);
162 }
163 catch(...)
164 {
165 verrou.unlock();
166 verrou.broadcast(cond_empty);
167 verrou.broadcast(cond_full);
168 throw;
169 }
170 verrou.unlock();
171 }
172
173 template <class T> std::unique_ptr<T> ratelier_scatter<T>::worker_get_one(unsigned int & slot, signed int & flag)
174 {
175 std::unique_ptr<T> ret;
176
177 verrou.lock();
178 try
179 {
180 std::map<unsigned int, unsigned int>::iterator it = corres.begin();
181 // using sequential reading provides sorted scanning
182 // of the map, looking first for the lowest index available (oldest entries)
183
184 do
185 {
186 if(it != corres.end())
187 {
188 if(it->first < lowest_index) // overflooding occured
189 ++it; // ignoring this slot
190 else
191 {
192
193 // sanity checks
194
195 if(it->second >= table.size())
196 throw THREADAR_BUG;
197 if(table[it->second].empty)
198 throw THREADAR_BUG;
199 if( ! table[it->second].obj)
200 throw THREADAR_BUG;
201
202 // recording the change
203
204 ret = std::move(table[it->second].obj);
205 slot = table[it->second].index;
206 flag = table[it->second].flag;
207 table[it->second].empty = true;
208
209 if(lowest_index != slot)
210 throw THREADAR_BUG;
211 ++lowest_index;
212
213 // reusing quicker the last block used
214 // as the back() be used first
215 empty_slot.push_back(it->second);
216 corres.erase(it); // removing the correspondance
217
218 if(verrou.get_waiting_thread_count(cond_full) > 0)
219 verrou.signal(cond_full);
220 }
221 }
222 else
223 {
224 // ratelier_scatter is empty
225
226 verrou.wait(cond_empty);
227 it = corres.begin();
228 }
229 }
230 while( ! ret);
231 }
232 catch(...)
233 {
234 verrou.unlock();
235 verrou.broadcast(cond_empty);
236 verrou.broadcast(cond_full);
237 throw;
238 }
239 verrou.unlock();
240
241 return ret;
242 }
243
244 template <class T> void ratelier_scatter<T>::reset()
245 {
246 unsigned int size = table.size();
247 next_index = 0;
248 lowest_index = 0;
249 corres.clear();
250 empty_slot.clear();
251
252 for(unsigned int i = 0; i < size; ++i)
253 {
254 table[i].obj.reset();
255 table[i].empty = true;
256 empty_slot.push_back(i);
257 }
258
259 verrou.lock();
260 verrou.broadcast(cond_empty);
261 verrou.broadcast(cond_full);
262 verrou.unlock();
263 }
264
265} // end of namespace
266
267#endif
Wrapper around the Posix pthread_cond_t object and its associated mutex.
Definition: condition.hpp:46
the class ratelier_scatter has a fixed length range of slots of arbitrary defined object type
std::unique_ptr< T > worker_get_one(unsigned int &slot, signed int &flag)
void scatter(std::unique_ptr< T > &one, signed int flag=0)
void reset()
reset the object in its prestine state
#define THREADAR_BUG
Macro used to throw an exception_bug when execution reach that statement.
Definition: exceptions.hpp:164
defines the mutex C++ class
This is the only namespace used in libthreadar and all symbols provided by libthreadar are member of ...
Definition: barrier.hpp:46