Pool di thread con boost asio

Sto cercando di creare una class di pool di thread limitato utilizzando boost :: asio. Ma sono bloccato a un certo punto qualcuno può aiutarmi.

L’unico problema è il posto dove dovrei diminuire il contatore?

il codice non funziona come previsto.

il problema è che non so quando il mio thread finirà l’esecuzione e come potrò sapere che è tornato al pool

#include  #include  #include  #include  #include  #include  using namespace std; using namespace boost; class ThreadPool { static int count; int NoOfThread; thread_group grp; mutex mutex_; asio::io_service io_service; int counter; stack thStk ; public: ThreadPool(int num) { NoOfThread = num; counter = 0; mutex::scoped_lock lock(mutex_); if(count == 0) count++; else return; for(int i=0 ; i NoOfThread) { cout<<"run out of threads \n"; return NULL; } counter++; thread* ptr = thStk.top(); thStk.pop(); return ptr; } }; int ThreadPool::count = 0; struct callable { void operator()() { cout<<"some task for thread \n"; } }; int main( int argc, char * argv[] ) { callable x; ThreadPool pool(10); thread* p = pool.getThread(); cout<get_id(); //how i can assign some function to thread pointer ? //how i can return thread pointer after work done so i can add //it back to stack? return 0; } 

In breve, è necessario avvolgere l’attività fornita dall’utente con un’altra funzione che:

  • Richiama la funzione utente o object richiamabile.
  • Blocca il mutex e decrementa il contatore.

Potrei non capire tutti i requisiti per questo pool di thread. Quindi, per chiarezza, ecco un elenco esplicito di ciò che ritengo siano i requisiti:

  • Il pool gestisce la durata dei thread. L’utente non dovrebbe essere in grado di eliminare i thread che risiedono nel pool.
  • L’utente può assegnare un’attività al pool in modo non intrusivo.
  • Quando viene assegnata un’attività, se tutti i thread nel pool stanno attualmente eseguendo altre attività, l’attività viene scartata.

Prima di fornire un’implementazione, ci sono alcuni punti chiave che vorrei sottolineare:

  • Una volta che un thread è stato avviato, verrà eseguito fino al completamento, alla cancellazione o alla chiusura. La funzione che il thread sta eseguendo non può essere riassegnata. Per consentire a un singolo thread di eseguire più funzioni nel corso della sua vita, il thread verrà avviato con una funzione che leggerà da una coda, come io_service::run() , e i tipi callable vengono registrati nell’evento coda, come da io_service::post() .
  • io_service::run() restituisce se non ci sono lavori in sospeso nel io_service , il servizio io_service viene fermato, oppure viene generata un’eccezione da un gestore che il thread era in esecuzione. Per impedire a io_serivce::run() di tornare quando non c’è lavoro non finito, è ansible utilizzare la class di io_service::work .
  • Definire i requisiti del tipo di attività (ovvero il tipo di attività deve essere chiamabile mediante la syntax object() ) invece di richiedere un tipo (ad esempio, l’attività deve ereditare dal process ), offre maggiore flessibilità all’utente. Consente all’utente di fornire un’attività come puntatore a funzione o un tipo che fornisce un operator() null operator() .

Implementazione usando boost::asio :

 #include  #include  class thread_pool { private: boost::asio::io_service io_service_; boost::asio::io_service::work work_; boost::thread_group threads_; std::size_t available_; boost::mutex mutex_; public: /// @brief Constructor. thread_pool( std::size_t pool_size ) : work_( io_service_ ), available_( pool_size ) { for ( std::size_t i = 0; i < pool_size; ++i ) { threads_.create_thread( boost::bind( &boost::asio::io_service::run, &io_service_ ) ); } } /// @brief Destructor. ~thread_pool() { // Force all threads to return from io_service::run(). io_service_.stop(); // Suppress all exceptions. try { threads_.join_all(); } catch ( const std::exception& ) {} } /// @brief Adds a task to the thread pool if a thread is currently available. template < typename Task > void run_task( Task task ) { boost::unique_lock< boost::mutex > lock( mutex_ ); // If no threads are available, then return. if ( 0 == available_ ) return; // Decrement count, indicating thread is no longer available. --available_; // Post a wrapped task into the queue. io_service_.post( boost::bind( &thread_pool::wrap_task, this, boost::function< void() >( task ) ) ); } private: /// @brief Wrap a task so that the available count can be increased once /// the user provided task has completed. void wrap_task( boost::function< void() > task ) { // Run the user supplied task. try { task(); } // Suppress all exceptions. catch ( const std::exception& ) {} // Task has finished, so increment count of available threads. boost::unique_lock< boost::mutex > lock( mutex_ ); ++available_; } }; 

Alcuni commenti sull’implementazione:

  • La gestione delle eccezioni deve essere eseguita intorno all’attività dell’utente. Se la funzione dell’utente o l’object callable genera un’eccezione che non è di tipo boost::thread_interrupted , viene chiamato std::terminate() . Questo è il risultato delle eccezioni di Boost.Thread nel comportamento delle funzioni dei thread . Vale anche la pena di leggere l’ effetto Boost.Asio delle eccezioni generate dagli handler .
  • Se l’utente fornisce l’ task tramite boost::bind , il boost::bind nested boost::bind non verrà compilato. È richiesta una delle seguenti opzioni:
    • Attività non supportata creata da boost::bind .
    • Meta-programmazione per eseguire il branching in fase di compilazione in base al tipo dell’utente se il risultato di boost::bind tale che boost::protect può essere utilizzato, poiché boost::protect funziona correttamente solo su determinati oggetti funzione.
    • Utilizzare un altro tipo per passare indirettamente l’object dell’attività. Ho optato per utilizzare boost::function per la leggibilità al costo di perdere il tipo esatto. boost::tuple , sebbene leggermente meno leggibile, potrebbe anche essere usato per preservare il tipo esatto, come mostrato nell’esempio di serializzazione di Boost.Asio.

Il codice dell’applicazione ora può utilizzare il tipo thread_pool non intrusivo:

 void work() {}; struct worker { void operator()() {}; }; void more_work( int ) {}; int main() { thread_pool pool( 2 ); pool.run_task( work ); // Function pointer. pool.run_task( worker() ); // Callable object. pool.run_task( boost::bind( more_work, 5 ) ); // Callable object. } 

Il thread_pool potrebbe essere creato senza Boost.Asio e potrebbe essere leggermente più semplice per i manutentori, poiché non devono più conoscere i comportamenti Boost.Asio , ad esempio quando viene io_service::run() e cos’è il servizio io_service::work object di io_service::work :

 #include  #include  #include  class thread_pool { private: std::queue< boost::function< void() > > tasks_; boost::thread_group threads_; std::size_t available_; boost::mutex mutex_; boost::condition_variable condition_; bool running_; public: /// @brief Constructor. thread_pool( std::size_t pool_size ) : available_( pool_size ), running_( true ) { for ( std::size_t i = 0; i < pool_size; ++i ) { threads_.create_thread( boost::bind( &thread_pool::pool_main, this ) ) ; } } /// @brief Destructor. ~thread_pool() { // Set running flag to false then notify all threads. { boost::unique_lock< boost::mutex > lock( mutex_ ); running_ = false; condition_.notify_all(); } try { threads_.join_all(); } // Suppress all exceptions. catch ( const std::exception& ) {} } /// @brief Add task to the thread pool if a thread is currently available. template < typename Task > void run_task( Task task ) { boost::unique_lock< boost::mutex > lock( mutex_ ); // If no threads are available, then return. if ( 0 == available_ ) return; // Decrement count, indicating thread is no longer available. --available_; // Set task and signal condition variable so that a worker thread will // wake up andl use the task. tasks_.push( boost::function< void() >( task ) ); condition_.notify_one(); } private: /// @brief Entry point for pool threads. void pool_main() { while( running_ ) { // Wait on condition variable while the task is empty and the pool is // still running. boost::unique_lock< boost::mutex > lock( mutex_ ); while ( tasks_.empty() && running_ ) { condition_.wait( lock ); } // If pool is no longer running, break out. if ( !running_ ) break; // Copy task locally and remove from the queue. This is done within // its own scope so that the task object is destructed immediately // after running the task. This is useful in the event that the // function contains shared_ptr arguments bound via bind. { boost::function< void() > task = tasks_.front(); tasks_.pop(); lock.unlock(); // Run the task. try { task(); } // Suppress all exceptions. catch ( const std::exception& ) {} } // Task has finished, so increment count of available threads. lock.lock(); ++available_; } // while running_ } };