Aplikacje przetwarzające wiele wątków
Rozważmy sytuację, gdy aplikacja serwera jest dostępna do publicznego użytku i potrzebuje np. 30 sekund ciągłych obliczeń do obsługi jednego zapytania od klienta. Pojawia się problem, gdy w tym czasie inne aplikacje klienckie spróbują połączyć się do naszego serwera.Najprostsze dwa rozwiązania to:
- wiele procesów – po otrzymaniu połączenia stworzenie procesu potomonego (fork) i przekazanie deskryptora tego procesu połączenia do nowego procesu, podczas gdy proces macierzysty powróci do oczekiwania na połączenie,
- wiele wątków – po otrzymaniu połączenia stworzenie procesu potomonego (fork) i przekazanie deskryptora tego procesu połączenia do nowego procesu, podczas gdy proces macierzysty powróci do oczekiwania na połączenie
W tym artykule skupiam się na rozwiązaniu problemu poprzez stworzneie aplikacji wielowątkowej. Wątki wykorzystują zdefiniowaną przez nas funkcję/procedurę, która będzie uruchomiona równolegle niezależnie od całości programu, tj. pozwoli na przetworzenie dwóch lub więcej instancji tej funkcji przez nasz program w taki sposób jak by działały równolegle.
Wprowadzenie i definicje
Biblioteka pthreads pozwala na uruchamianie rónolegle wielu funkcji zwanch wątkami.
Wątki te współdzielą:
- PID, PPID, id grupy procesów, id sesji,
- otwarty terminal,
- identyfikator właściciela i grupy,
- otwarte pliki, bieżący katalog,
- rlimity, priorytet nice,
- otwarte pliki (z blokadami odczytu – by konkurente wątki nie konkurowały o zasób), maskę tworzenia nowych plików,
- liczniki czasu – znane z polecenia time.
Wątki mają jednak różne:
- identyfikatory
- funkcje błędów,
- obsługę sygnałów,
- algorytm szeregowania grupy wątków,
- linux capabilities (obok procesów uprzywilejowanych i nieuprzywilejowanych, jądro linux od wersj 2.2 zaimplementowało odrębny zestaw zdolności procesów, które mogą być włączone lub wyłączone),
- cpu affinity (zbiór procesorów, na których zadanie może być przetwarzane).
Typy zmiennych:
- pthreads_t – identyfikator wątku
- pthread_attr_t – atrybuty wątków
Funkcje podstawowe:
- int pthread_create(pthread_t *id, const pthread_attr_t *attr, void* (*fun)(void*), void* arg) – uruchomienie wątku, nadanie mu identfikatora do id (przypisujemy do adresu wskaźnika), przekazanie atrybutów pthread_attr_t(można podać NULL – zostaną ustawione domyślne atrybuty), wskazanie adresu funkcji, która ma zostać wywołana fun, przekazanie przez wskaźnik argumentu arg (może to być wskaźnik na dowolną zmienną lub strukturę),
- int pthread_join(pthread_t id, void **retval) – oczekiwanie na zakończenie działania wątku o identyfikatorze id oraz pobranie wartości zwróconej przez ten wątek retval (może to być wskaźnik na strukturę)
Funkcje dodatkowe:
- int pthread_detach(pthread_t id) – zmiana wątku na typ detached (tj. taki, który zwalania pamięć po zakończeniu bez konieczności wywołania pthread_join)
- pthread_t pthread_self() – zwraca wartość id wątku, w którym jest wywołana
- int pthread_equal(pthread_t id1, pthread_t id2) – porównuje identyfikatory dwóch wątków (w różnych systemach pthread_t możę mieć inn
- void pthread_exit(void *retval) – zakończenie wątku, który wywołał tą funkcję
Argumenty:
- id – identyfikator wątku (jeśli stworzymy tablicę identyfikatorów – wówczas podajemy np. &tablica[pozycja])
- pthread_attr_t – atrybuty wątku (jeśli podamy NULL brane są domyślne)
- fun – wskaźnik na funkcję, która ma zostać wykonana, powinna przyjmować argumenty typu *void i zwracać *void
- arg – wskaźnik na argument przekazany do funkcji (w przypadku zadeklarowanej tablicy jest to po prostu jej nazwa)
- retval – wskaźnik na wartość wynikową wątku, jeśli NULL wynik będzie ignorowany
Rodzaje wątków:
- joinable – wątek, który zakończył swoje działanie, do czasu wywoł
- detached
Przykład
Przykład 1
Poniżej znajduje się przykład aplikacji, która będzie stworzy dwa wątki.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
#include <stdio.h> #include <stdlib.h> #include <pthread.h> /* Definicja funkcji, której kod określony jest poniżej */ void *wypisz_wiadomosc( void *ptr ); int main() { pthread_t threads[2]; const char *wiadomosc1 = "Wątek 1"; const char *wiadomosc2 = "Wątek 2"; int tid[2]; /* Utworzenie dwóch niezależnych wątków, każdy uruchomi funkcję wypisz_wiadomosc */ tid[0] = pthread_create( &threads[0], NULL, wypisz_wiadomosc, (void*) wiadomosc1); tid[1] = pthread_create( &threads[1], NULL, wypisz_wiadomosc, (void*) wiadomosc2); /* Funkcja main musi poczekać na zakończenie obu wątków, po to by nie powstały procesy Zombie */ pthread_join( threads[0], NULL); pthread_join( threads[1], NULL); printf("Wątek 1 zwraca: %d\n",tid[0]); printf("Wątek 2 zwraca: %d\n",tid[1]); exit(0); } void *wypisz_wiadomosc( void *ptr ) { char *message; message = (char *) ptr; printf("%s \n", message); } |
Przykład 2
Aplikacja wielowątkowa z socketami:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
//.... // void *wypisz_wiadomosc(int socket); int main(){ //.... // int socket_client; //.... // while(1) { int addrlen=sizeof(struct sockaddr); socket_client = accept(socket_server,(struct sockaddr*)&client_addr,&addrlen); if(socket_client<0) continue; pthread_t pth; pthread_create(&pth,NULL,(void*)wyslij_wiadomosc,(void*) &socket_client); } } void *wyslij_wiadomosc(void* socket_client){ char bufor[1024]; int socket = *(int*)socket_client; sprintf(bufor,"Hello world.\n"); write(socket,bufor,1024); } |
Zwróćmy uwagę, że aplikacja w nieskońćzonej pętli oczekuje na połączenia, a w przypadku, gdy do połączenia dochodzi tworzy nowy wątek, który otrzymuje socket jako argument i może przeprowadzić bezpośrednią komunikację z klientem.
Zadania
- Zmień aplikację z przykładu 1 tak by powstało 5 równoległych wątków, z których każdy wypisze wiadomosc, odczeka 10 sekund i ponownie wypisze wiadomość.
- Napisz aplikację serwera, który po połączeniu klienta będzie wypisywał zawartość jakiegoś pliku, przesyłając 10bajtów na sekundę. Serwer po uruchomieniu będzie nasłuchiwał na porcie 10000. Napisz aplikację klienta, która będzie się łączyła z serwerem opisanym powyżej, a po połączeniu będzie odczytywała plik przesyłany przez serwer i zapisywała go w lokalnym katalogu pod losową nazwą.
Ciekawostki:
- zmienna PTHREAD_THREADS_MAX z limits.h określa maksymalną liczbę wątków, które można uruchomić,
- Przykładowe operacje na atrybutach wątków:
- int pthread_attr_init(pthread_attr_t *attr) – wypelnienie attr domyślnymi wartościami,
- int pthread_attr_destroy(pthread_attr_t *attr) – usunięcie struktury opisującej atrybuty wątków (bez wpływu na działanie wątków, które przy jej pomocy zostały utworzone),
- int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate) – ustawienie rodzaju wątku (PTHREAD_CREATE_JOINABLE, PTHREAD_CREATE_DETACHED),
- int pthread_attr_getdetachstate(const pthread_attr_t *attr, int *detachstate) – odczytanie rodzaju wątku,
- int pthread_attr_setstacksize(pthread_attr_t *attr, size_t stacksize) – ustawienei rozmiaru stos,
- int pthread_attr_getstacksize(const pthread_attr_t *attr, size_t *stacksize) – pobranie informacji o rozmiarze stosu,
- int pthread_attr_getinheritsched(const pthread_attr_t *attr, int *inheritsched) – odczytanie informacji o algorytmie szeregowania,
- int pthread_attr_setinheritsched(pthread_attr_t *attr, int inheritsched) – ustawienie algorytmu szeregowania np. SCHED_FIFO, SCHED_RR.