Questa domanda dovrebbe essere un po ‘più semplice dei miei ultimi. Ho implementato la seguente coda di lavoro nel mio programma:
Pool.h:
// tpool class // It's always closed. :glasses: #ifndef __POOL_H #define __POOL_H class tpool { public: tpool( std::size_t tpool_size ); ~tpool(); template void run_task( Task task ){ boost::unique_lock lock( mutex_ ); if( 0 < available_ ) { --available_; io_service_.post( boost::bind( &tpool::wrap_task, this, boost::function ( task ) ) ); } } private: boost::asio::io_service io_service_; boost::asio::io_service::work work_; boost::thread_group threads_; std::size_t available_; boost::mutex mutex_; void wrap_task( boost::function task ); }; extern tpool dbpool; #endif
pool.cpp:
#include #include #include #include #include "pool.h" tpool::tpool( std::size_t tpool_size ) : work_( io_service_ ), available_( tpool_size ) { for ( std::size_t i = 0; i < tpool_size; ++i ){ threads_.create_thread( boost::bind( &boost::asio::io_service::run, &io_service_ ) ); } } tpool::~tpool() { io_service_.stop(); try { threads_.join_all(); } catch( ... ) {} } void tpool::wrap_task( boost::function task ) { // run the supplied task try { task(); } // suppress exceptions catch( ... ) { } boost::unique_lock lock( mutex_ ); ++available_; } tpool dbpool( 50 );
Il problema è, tuttavia, che non tutte le mie chiamate a run_task()
vengono completate dai thread worker. Non sono sicuro se è perché non sta entrando in coda o perché l’attività si annulla quando il thread che lo ha creato è terminato.
Quindi la mia domanda è, c’è qualcosa di speciale che devo dare a boost::thread
per farlo aspettare fino a quando la coda non viene sbloccata? e qual è la durata prevista di un’attività inserita in una coda? Le attività escono dall’ambito quando viene chiuso il thread che le ha create? Se è così, come posso evitare che ciò accada?
Modifica: ho apportato le seguenti modifiche al mio codice:
template void run_task( Task task ){ // add item to the queue io_service_.post( boost::bind( &tpool::wrap_task, this, boost::function ( task ) ) ); }
e ora sto vedendo tutte le voci inserite correttamente. Tuttavia, mi rimane una domanda persistente: qual è la durata delle attività aggiunte alla coda? Cessano di esistere una volta che esce il thread che li ha creati?
Bene. Questo è davvero abbastanza semplice; Stai rifiutando le attività pubblicate!
template< typename Task > void run_task(task task){ boost::unique_lock lock( mutex_ ); if(0 < available_) { --available_; io_service_.post(boost::bind(&tpool::wrap_task, this, boost::function< void() > ( task ))); } }
Si noti che il lock
“attende” fino a quando il mutex non è di proprietà di un thread. Questo potrebbe essere già il caso, e probabilmente quando available_
è già 0. Ora la linea
if(0 < available_) {
Questa linea è semplicemente la condizione. Non è "magico" perché stai tenendo bloccato il mutex_
. (Il programma non sa nemmeno che esiste una relazione tra mutex_
e available_
). Quindi, se available_ <= 0
, salterai semplicemente pubblicando il lavoro.
Dovresti usare io_service
per fare la coda per te. Questo è probabilmente ciò che volevi ottenere in primo luogo. Invece di tenere traccia dei thread "disponibili", io_service
fa il lavoro per voi. È ansible controllare quanti thread può utilizzare eseguendo il servizio io_service
su più thread. Semplice.
Poiché io_service
è già thread-safe, puoi fare a meno del lock.
#include #include #include // tpool class // It's always closed. :glasses: #ifndef __POOL_H #define __POOL_H class tpool { public: tpool( std::size_t tpool_size ); ~tpool(); template void run_task(Task task){ io_service_.post(task); } private: // note the order of destruction of members boost::asio::io_service io_service_; boost::asio::io_service::work work_; boost::thread_group threads_; }; extern tpool dbpool; #endif #include #include #include #include //#include "pool.h" tpool::tpool(std::size_t tpool_size) : work_(io_service_) { for (std::size_t i = 0; i < tpool_size; ++i) { threads_.create_thread( boost::bind(&boost::asio::io_service::run, &io_service_) ); } } tpool::~tpool() { io_service_.stop(); try { threads_.join_all(); } catch(...) {} } void foo() { std::cout << __PRETTY_FUNCTION__ << "\n"; } void bar() { std::cout << __PRETTY_FUNCTION__ << "\n"; } int main() { tpool dbpool(50); dbpool.run_task(foo); dbpool.run_task(bar); boost::this_thread::sleep_for(boost::chrono::seconds(1)); }
Ai fini dell'arresto, si desidera abilitare la "cancellazione" dell'object io_service::work
, altrimenti il pool non uscirà mai.
Non utilizzare io_service
, ma implementare la propria implementazione di coda con una variabile di condizione per notificare a un thread di lavoro il nuovo lavoro inviato. Di nuovo, il numero di lavoratori è determinato dal numero di thread nel gruppo.
#include #include #include using namespace boost; using namespace boost::phoenix::arg_names; class thread_pool { private: mutex mx; condition_variable cv; typedef function job_t; std::deque _queue; thread_group pool; boost::atomic_bool shutdown; static void worker_thread(thread_pool& q) { while (auto job = q.dequeue()) (*job)(); } public: thread_pool() : shutdown(false) { for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i) pool.create_thread(bind(worker_thread, ref(*this))); } void enqueue(job_t job) { lock_guard lk(mx); _queue.push_back(std::move(job)); cv.notify_one(); } optional dequeue() { unique_lock lk(mx); namespace phx = boost::phoenix; cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue))); if (_queue.empty()) return none; auto job = std::move(_queue.front()); _queue.pop_front(); return std::move(job); } ~thread_pool() { shutdown = true; { lock_guard lk(mx); cv.notify_all(); } pool.join_all(); } }; void the_work(int id) { std::cout << "worker " << id << " entered\n"; // no more synchronization; the pool size determines max concurrency std::cout << "worker " << id << " start work\n"; this_thread::sleep_for(chrono::seconds(2)); std::cout << "worker " << id << " done\n"; } int main() { thread_pool pool; // uses 1 thread per core for (int i = 0; i < 10; ++i) pool.enqueue(bind(the_work, i)); }