c ++ code di lavoro con blocco

Questa domanda dovrebbe essere un po ‘più semplice dei miei ultimi. Ho implementato la seguente coda di lavoro nel mio programma:

Pool.h:

// tpool class // It's always closed. :glasses: #ifndef __POOL_H #define __POOL_H class tpool { public: tpool( std::size_t tpool_size ); ~tpool(); template void run_task( Task task ){ boost::unique_lock lock( mutex_ ); if( 0 < available_ ) { --available_; io_service_.post( boost::bind( &tpool::wrap_task, this, boost::function ( task ) ) ); } } private: boost::asio::io_service io_service_; boost::asio::io_service::work work_; boost::thread_group threads_; std::size_t available_; boost::mutex mutex_; void wrap_task( boost::function task ); }; extern tpool dbpool; #endif 

pool.cpp:

 #include  #include  #include  #include  #include "pool.h" tpool::tpool( std::size_t tpool_size ) : work_( io_service_ ), available_( tpool_size ) { for ( std::size_t i = 0; i < tpool_size; ++i ){ threads_.create_thread( boost::bind( &boost::asio::io_service::run, &io_service_ ) ); } } tpool::~tpool() { io_service_.stop(); try { threads_.join_all(); } catch( ... ) {} } void tpool::wrap_task( boost::function task ) { // run the supplied task try { task(); } // suppress exceptions catch( ... ) { } boost::unique_lock lock( mutex_ ); ++available_; } tpool dbpool( 50 ); 

Il problema è, tuttavia, che non tutte le mie chiamate a run_task() vengono completate dai thread worker. Non sono sicuro se è perché non sta entrando in coda o perché l’attività si annulla quando il thread che lo ha creato è terminato.

Quindi la mia domanda è, c’è qualcosa di speciale che devo dare a boost::thread per farlo aspettare fino a quando la coda non viene sbloccata? e qual è la durata prevista di un’attività inserita in una coda? Le attività escono dall’ambito quando viene chiuso il thread che le ha create? Se è così, come posso evitare che ciò accada?

Modifica: ho apportato le seguenti modifiche al mio codice:

 template void run_task( Task task ){ // add item to the queue io_service_.post( boost::bind( &tpool::wrap_task, this, boost::function ( task ) ) ); } 

e ora sto vedendo tutte le voci inserite correttamente. Tuttavia, mi rimane una domanda persistente: qual è la durata delle attività aggiunte alla coda? Cessano di esistere una volta che esce il thread che li ha creati?

Bene. Questo è davvero abbastanza semplice; Stai rifiutando le attività pubblicate!

 template< typename Task > void run_task(task task){ boost::unique_lock lock( mutex_ ); if(0 < available_) { --available_; io_service_.post(boost::bind(&tpool::wrap_task, this, boost::function< void() > ( task ))); } } 

Si noti che il lock “attende” fino a quando il mutex non è di proprietà di un thread. Questo potrebbe essere già il caso, e probabilmente quando available_ è già 0. Ora la linea

 if(0 < available_) { 

Questa linea è semplicemente la condizione. Non è "magico" perché stai tenendo bloccato il mutex_ . (Il programma non sa nemmeno che esiste una relazione tra mutex_ e available_ ). Quindi, se available_ <= 0 , salterai semplicemente pubblicando il lavoro.


Soluzione n. 1

Dovresti usare io_service per fare la coda per te. Questo è probabilmente ciò che volevi ottenere in primo luogo. Invece di tenere traccia dei thread "disponibili", io_service fa il lavoro per voi. È ansible controllare quanti thread può utilizzare eseguendo il servizio io_service su più thread. Semplice.

Poiché io_service è già thread-safe, puoi fare a meno del lock.

 #include  #include  #include  // tpool class // It's always closed. :glasses: #ifndef __POOL_H #define __POOL_H class tpool { public: tpool( std::size_t tpool_size ); ~tpool(); template void run_task(Task task){ io_service_.post(task); } private: // note the order of destruction of members boost::asio::io_service io_service_; boost::asio::io_service::work work_; boost::thread_group threads_; }; extern tpool dbpool; #endif #include  #include  #include  #include  //#include "pool.h" tpool::tpool(std::size_t tpool_size) : work_(io_service_) { for (std::size_t i = 0; i < tpool_size; ++i) { threads_.create_thread( boost::bind(&boost::asio::io_service::run, &io_service_) ); } } tpool::~tpool() { io_service_.stop(); try { threads_.join_all(); } catch(...) {} } void foo() { std::cout << __PRETTY_FUNCTION__ << "\n"; } void bar() { std::cout << __PRETTY_FUNCTION__ << "\n"; } int main() { tpool dbpool(50); dbpool.run_task(foo); dbpool.run_task(bar); boost::this_thread::sleep_for(boost::chrono::seconds(1)); } 

Ai fini dell'arresto, si desidera abilitare la "cancellazione" dell'object io_service::work , altrimenti il ​​pool non uscirà mai.


Soluzione n. 2

Non utilizzare io_service , ma implementare la propria implementazione di coda con una variabile di condizione per notificare a un thread di lavoro il nuovo lavoro inviato. Di nuovo, il numero di lavoratori è determinato dal numero di thread nel gruppo.

 #include  #include  #include  using namespace boost; using namespace boost::phoenix::arg_names; class thread_pool { private: mutex mx; condition_variable cv; typedef function job_t; std::deque _queue; thread_group pool; boost::atomic_bool shutdown; static void worker_thread(thread_pool& q) { while (auto job = q.dequeue()) (*job)(); } public: thread_pool() : shutdown(false) { for (unsigned i = 0; i < boost::thread::hardware_concurrency(); ++i) pool.create_thread(bind(worker_thread, ref(*this))); } void enqueue(job_t job) { lock_guard lk(mx); _queue.push_back(std::move(job)); cv.notify_one(); } optional dequeue() { unique_lock lk(mx); namespace phx = boost::phoenix; cv.wait(lk, phx::ref(shutdown) || !phx::empty(phx::ref(_queue))); if (_queue.empty()) return none; auto job = std::move(_queue.front()); _queue.pop_front(); return std::move(job); } ~thread_pool() { shutdown = true; { lock_guard lk(mx); cv.notify_all(); } pool.join_all(); } }; void the_work(int id) { std::cout << "worker " << id << " entered\n"; // no more synchronization; the pool size determines max concurrency std::cout << "worker " << id << " start work\n"; this_thread::sleep_for(chrono::seconds(2)); std::cout << "worker " << id << " done\n"; } int main() { thread_pool pool; // uses 1 thread per core for (int i = 0; i < 10; ++i) pool.enqueue(bind(the_work, i)); }