ABONAMENTE VIDEO REDACȚIA
RO
EN
×
▼ LISTĂ EDIȚII ▼
Numărul 20
Abonament PDF

Multithreading în standardul C++11 (II)

Dumitrița Munteanu
Software engineer
@Arobs
PROGRAMARE

În exemplele anterioare am prezentat și analizat metode de protejare a datelor comune între mai multe thread-uri. Uneori însă nu este suficientă doar protejarea datelor comune, fiind necesară și sincronizarea operațiilor executate de diferite thread-uri. În general se dorește ca un thread să aștepte până când are loc un anumit eveniment sau până când o anumită condiție devine adevărată. În acest scop, librăria standard C++ oferă primitive precum variabilele condiționale și futures.

Variabilele condiționale au în standardul C++11 nu o singură implementare, ci două: std::condition_variable și std::condition_variable_any. Ambele implementări pot fi folosite prin includerea header-ului <condition_variable>. Pentru a facilita comunicarea între thread-uri, variabilele condiționale sunt, de obicei, asociate cu un mutex, pentru std::condition_variable sau cu orice alt mecanism care oferă excludere mutuală, pentru std::condition_variable_any.
Thread-ul care așteaptă ca o variabilă condițională să devină adevărată trebuie, mai întâi, să blocheze un mutex, folosind primitiva std::unique_lock, a cărei necesitate o vom vedea ulterior. Mutex-ul este deblocat atomic atunci când thread-ul începe să aștepte ca variabila condițională să devină adevărată. În momentul în care se primește o notificare relativă la variabila condițională, thread-ul este repornit, blocând din nou mutex-ul.
Un exemplu practic poate fi un buffer care este folosit pentru a transmite date între două thread-uri:

std::mutex mutex;
std::queue buffer;              
std::condition_variable buffer_cond;

void data_preparation_thread()
{
    while(has_data_to_prepare())                //--  (1)    
    {
      buffer_data data = prepare_data();
      std::lock_quard lock(mutex);  //--  (2) 
      buffer.push(data);                        
      buffer_cond.notify_one();                 //-- (3)  
    }
}

void data_processing_thread()
{
    while(true)
    {
      std::unique_lock lock(mutex);              //-- (4)  
      buffer_cond.wait(lock, []{return ! buffer.empty()})    //-- (5)   
      buffer_data data = buffer.front();
      buffer.pop();
      lock.unlock();                                         //-- (6)   
      process(data);
      if(is_last_data_entry(data)) 
          break;         
    }  
}

Atunci când datele sunt pregătite de procesare (1), thread-ul care pregătește datele blochează mutex-ul (2), pentru a proteja buffer-ul când adaugă noile valori. Apoi apelează medota notify_one() asupra variabilei condiționale buffer_cond (3) pentru a notifica thread-ul care aștepta date (dacă este vreunul) că buffer-ul conține date ce pot fi procesate.
Thread-ul care procesează datele din buffer mai întâi blochează mutex-ul dar, de data aceasta, folosește un std::unique_lock (4). Thread-ul apelează apoi metoda wait() asupra variabilei condiționale buff_cond, trimițându-i ca parametri obiectul lock și o funcție lambda care reprezintă condiția pentru care se așteaptă (5). Funcțiile lambda sunt o altă caracteristică specifică standardului C++11 care permit ca funcții anonime să fie parte din alte expresii. În acest caz funcția lambda []{return ! buffer.empty()} este scrisă inline în codul sursă și verifică dacă în buffer sunt date ce pot fi procesate. Metoda wait() verifică apoi dacă condiția este adevărată (apelând funcția lambda care i-a fost transmisă) și returnează rezultatul. Dacă condiția nu este îndeplinită (funcția lambda returnează false), atunci funcția wait deblochează mutex-ul și pune thread-ul într-o stare de blocare sau așteptare. Când variabila condițională este notificată prin apelul funcției notify_one() din data_preparetion_thread(), thread-ul care procesează datele este deblocat, reblochează mutex-ul și verifică condiția din nou, ieșind din metoda wait() cu mutex-ul încă blocat dacă condiția este îndeplinită. Dacă condiția nu este îndeplinită, thread-ul deblochează mutex-ul și așteaptă din nou. Acesta este motivul pentru care se folosește std::unique_lock, deoarece thread-ul care procesează date trebuie să deblocheze mutex-ul în timp ce așteaptă, pentru ca apoi să îl blocheze din nou. În acest caz std::lock_guard nu furnizează această flexibilitate. Dacă mutex-ul ar rămâne blocat în timp ce thread-ul care așteaptă date de procesare este blocat, atunci thread-ul care pregătește datele nu ar putea bloca mutex-ul pentru a insera în buffer noile valori iar thread-ul ce procesează datele nu ar avea niciodată condiția îndeplinită.

Flexibilitatea de a debloca un obiect std::unique_lock nu este folosită doar în apelarea metodei wait(), ci este, de asemenea, folosită atunci când datele sunt pregătite de procesare dar înainte de a fi procesate (6). Aceasta deoarece buffer-ul este folosit doar pentru a transfera datele de la un thread către celălalt. Dar în acest caz nu este indicat să blocăm mutex-ul pe durata procesării datelor, deoarece ar putea fi o operație costisitoare în timp.

Futures

Un alt mecanism de sincronizare este un future, adică un obiect returnat asincron (un obiect care citește un rezultat al unei stări comune mai multor thread-uri) implementat în librăria standard C++11 prin intermediul a doua clase template, declarate în header-ul : unique futures (std::future<>) și shared futures (std::shared_future<>), ambele fiind modelate după mecanismele std::unique_ptr si std::shared_ptr.
Spre exemplu, să presupunem că avem o operație care efectuează un calcul foarte costisitor în timp iar rezultatul operației nu este imediat necesar. În acest caz putem porni un nou thread care să efectueze operația în background , dar aceasta presupune că este nevoie ca rezultatul să fie transferat înapoi metodei din care thread-ul a fost lansat, deoarece obiectul std::thread nu include un mecanism pentru această situație. Aici intervine funcția template std::async, inclusă de asemenea în header-ul .

Un obiect std::async este folosit pentru a lansa o operație asincronă alrei rezultat nu este imediat necesar. În loc să așteptăm ca un obiect std::thread să își încheie execuția furnizând rezultatul operației, funcția std::async returnează un std::future, care poate incapsula rezultatul operației. Când rezultatul este necesar, se poate apela metoda get() pe obiectul std::future(), iar thread-ul se blochează până când obiectul future este ready, adică poate furniza rezultatul operației. Spre exemplu:

#include 
#include 

int  long_time_computation();
void do_other_stuff();

int main()
{
   std::future the_result = std::async(long_time_computation);

   do_other_stuff();
 
   std::cout << "The result is " << the_result.get() << std::endl;
} 

Un obiect std::async este o utilitate de nivel înalt care furnizează un rezultat asincron și care se ocupă intern de crearea unui provider asincron și de pregătirea datelor comune când operația se finalizează. Acest obiect poate fi emulat prin intermediul unui obiect std::package_task (sau std::bind si std::promise) și un std::thread, însă folosirea unui obiect std::async este mai sigură și mai ușoară.

Packages

Un obiect std::package face legătura dintre o funcție și un obiect apelabil. Atunci când obiectul std::package<> este apelat, acesta apelează la rândul său funcția asociată sau obiectul apelabil și pregătește obiectul future în starea ready, cu valoarea returnată de către operația efectuată ca valoare asociată. Acest mecanism poate fi utilizat spre exemplu atunci când se dorește ca fiecare operație să fie executată de un thread separat sau să ruleze secvențial pe un thread în background. Dacă o operație de mari dimensiuni se poate divide în mai multe suboperații, fiecare dintre acestea poate fi mapată într-o instanță std::package_task<>, care va fi returnată managerului de operații. Astfel, se abstractizează detaliile operațiilor iar managerul operează doar cu instanțe std::package_task<>, în loc de funcții individuale. Spre exemplu:

#include 
#include 
int execute(int x, int y) { return std::pow(x,y); }

void main()
{
    std::packaged_task task(std::bind(execute, 2, 10));
    std::future result = task.get_future();     //-- (1)   
 
    task();  //-- (2)
 
    std::cout << "task_bind:	" << result.get() << "
"; //-- (4)
}

Când obiectul std::packaged_task este apelat (2), implicit este apelată și funcția asociată cu acesta, execute, căreia i se transmit parametrii 2 și 10 iar rezultatul operației va fi salvat asincron în obiectul std::future (1). Astfel, este posibil să incapsulăm o operație într-un std::package_task și să obținem obiectul std::future, care conține rezultul operației înainte ca obiectul std::package_task să fie apelat. Când rezultatul operației este necesar, acesta se poate obține atunci când obiectul std::future este în starea ready (3).

Promises

Așa cum am văzut la secțiunea Futures, transmiterea datelor între thread-uri se poate efectua prin transmiterea acestora ca parametri către funcția thread-ului iar obținerea rezultatului se poate obține prin returnarea argumentelor prin referință, utilizând metoda async().
Un alt mecanism pentru transmiterea datelor rezultate în urma operatiilor efectuate de diferite thread-uri este folosirea unei perechi std::promise/std::future. Un obiect std::promise oferă un mecanism pentru a seta o valoare de tip T, care poate fi ulterior citită prin intermediul unui obiect std::future. În timp ce un obiect std::future permite accesarea datelor rezultat (folosind metoda get()), obiectul promise este responsabil pentru furnizarea datelor (folosind una dintre medotele set_...()). Spre exemplu:

#include 
#include 

void execute(std::promise& promise) 
{
   std::string str("processed data"); 
   promise.set_value(std::move(str));	//-- (3)   
}

void main()
{
    std::promise promise; //-- (1)   
    std::thread thread(execute, std::ref(promise)); //-- (2)
    std::future result(promise.get_future()); //-- (4)
    std::cout << "result: " << result.get() << std::endl; //-- (5)
}

După includerea header-ului , unde sunt declarate obiectele std::promise, se declară un obiect promise specializat pentru valoarea pe care trebuie să o păstreze, std::string (1). Intern, obiectul std::promise creează o stare comună (shared state) care este utilizată pentru a salva valoarea corespunzătoare tipului std::string și care este utilizată de către obiectul std::future pentru a obține această valoare, ca rezultat al operației thread-ului.
Această promisiune este apoi transferată cu rol de parametru către funcția unui thread separat (2). În interiorul thread-ului se setează valoarea obiectului promise (3), moment în care starea comună devine automat ready. Pentru a obține valoarea setată în funcția execute, este necesară utilizarea unui obiect std::future care să aibă aceeași stare comună cu obiectul std::promise (4). Odată creat obiectul future, valoarea acestuia se poate obține prin apelarea metodei get() (5). Este important de știut faptul ca thread-ul curent (main) rămâne blocat până când starea comună este ready (atunci când este executată metoda set_value (3)), adică datele sunt disponibile.

Utilizarea obiectelor std::promise nu se adresează în exclusivitate programării multithreading. Acestea se pot folosi și în aplicațiile cu un singur fir de execuție, pentru a păstra o valoare sau o excepție care urmează să fie procesată mai târziu prin intermediul unui std::future.

Atomics

Pe lângă mecanismele de excludere mutuală prezentate anterior, standardul C++11 introduce și tipurile atomice.

Un tip atomic std::atomic se poate folosi cu orice tip T și garantează că orice operație asupra obiectului std::amotic va fi atomică, adică se va executa în întregime sau deloc.

Un avantaj al folosirii tipurilor atomice pentru excludere mutuală este performanța, deoarece în acest caz se folosește o tehnică lock-free, mult mai economică decât utilizarea unui mutex care poate fi relativ costisitor în termeni de resurse și latență datorată excluderii mutuale.
Operațiile principale oferite de clasa std::atomic sunt funcțiile de store și load, care setează și returnează atomic valorile stocate în obiectul std::atomic. O altă metodă specifică acestor obiecte este funcția exchange, care setează o nouă valoare pentru obiectul atomic returnând în același timp valoarea setată anterior. De asemenea, mai sunt două metode compare_exchange_weak și compare_exchange_strong care efectuează schimbări atomice, numai dacă valoarea curentă este egală cu valoarea actuală așteptată. Aceste ultime două funcții pot fi folosite pentru implementarea algoritmilor lock-free. Spre exemplu:

#include 
#include 

void execute(std::promise& promise) 
{
   std::string str("processed data"); 
   promise.set_value(std::move(str));	//-- (3)   
}

void main()
{
    std::promise promise; //-- (1)   
    std::thread thread(execute, std::ref(promise)); //-- (2)
    std::future result(promise.get_future()); //-- (4)
    std::cout << "result: " << result.get() << std::endl; //-- (5)
}

În acest exemplu se include mai întâi header-ul unde este declarată clasa template std::atomic<>. Apoi se declară un obiect atomic counter (1). În principiu, se poate folosi orice tip trivial, integral sau un tip pointer cu rol de parametru pentru template. Atenție la inițializarea obiectului std::atomic ! Acesta trebuie inițializat întotdeauna, deoarece constructorul default nu îl inițializează complet. Spre deosebire de exemplul de la secțiunea Mutex, în acest caz variabila counter poate fi incrementată direct, fără necesitatea utilizării mutex (2), deoarece atât funcțiile membre obiectului std::atomict și operațiile triviale precum asignările, conversiile automate, incrementările, decrementările sunt garantate să se execute atomic.

Este indicat să se folosească tipurile atomice atunci când se dorește utilizarea operațiilor atomice, în special asupra tipurilor integrale.

Concluzii

Am prezentat în linii generale modalitățile de folosire a thread-urilor în standardul C++11, acoperind atât aspecte despre managementul thread-urilor cât și mecanisme de sincronizare a datelor și operațiilor folosind mutex-uri, variabile condiționale, futures, promises, packed tasks și tipuri atomice. După cum se poate observa, utilizarea thread-urilor din librăria standard C++ nu este dificilă, urmând practic aceleași mecanisme de utilizare ca și thread-urile din libraria Boost. În schimb, complexitatea crește odată cu complexitatea și design-ul codului care trebuie să se comporte conform așteptărilor. Pentru o aprofundare a celor discutate dar și o extindere a cunoștințelor referitoare la noile concepte disponibile în standardul C++11, recomand cu încredere cartea lui Anthony Williams, C++ Concurency in Action, precum și ultima ediție a clasicei The C++ Standard Library, de Nicolai Josuttis. Veți găsi acolo nu doar o detaliere a subiectelor prezentate anterior ci și descrierea altor caracteristici specifice standardului C++11, incluzând tehnici de utilizare ale acestora, pentru programarea multithreding la un nivel avansat.

LANSAREA NUMĂRULUI 89

Prezentări articole și
Panel: Programming

Joi, 21 Noiembrie, ora 18:00
Liberty Technology Park

Înregistrează-te

Facebook Meetup

Sponsori

  • comply advantage
  • ntt data
  • 3PillarGlobal
  • Betfair
  • Telenav
  • Accenture
  • Siemens
  • Bosch
  • FlowTraders
  • MHP
  • Connatix
  • UIPatj
  • MetroSystems
  • Globant
  • Colors in projects