0

Równoległe algorytmy sortowania – Strefa juniora

Witaj! Dzisiejszym wpisem zakończę cykl algorytmów sortowania przedstawiając dwie asynchroniczne wersje algorytmów sortowania zaimplementowanych wcześniej. Będą to algorytmy sortowania szybkiego oraz sortowania kubełkowego, wykorzystałem właśnie te algorytmy, gdyż dają się one łatwo zrównoleglić. Zapraszam do lektury.

Artykuł ten należy do cyklu przedstawiającego algorytmy rozwiązujące problem sortowania. Jeżeli temat Cię zainteresował, to koniecznie przeczytaj inne artykuły z tej tematyki, znajdziesz je w dziale algorytmów. Dodatkowo wpis ten zawiera informacje pomocne dla osoby zaczynającej swoją przygodę z programowaniem. Jeżeli jesteś taką osobą, to koniecznie odwiedź dział juniora.

Implementacja zrównoleglenia

Do zrównoleglenia wykorzystam mechanizm Tasków z języka C#. Przyjąłem regułę, że będę dzielić pracę algorytmu maksymalnie na 4 wątki, gdzie każdy dostanie mnie więcej tyle samo pracy. Założenie to będzie spełnione dla danych równomiernie rozłożonych, w praktyce niestety dane nie będą tak rozmieszczone, więc korzyści płynące z zrównoleglenia będą mniejsze.

Sposób działanie algorytmów nie ulega zmianie, więc odsyłam do wcześniejszych artykułów. Wykorzystuję tutaj jedynie zdolność algorytmu do delegowania pracy. Pracę mogę delegować do drugiego wątku tylko jeżeli dane opracowywane przez ten wątek nie będą wykorzystywane w innym wątku. Innymi słowy, chcę uniknąć wyścigów do danych lub  zakleszczenia. Obie sytuacje są problemem operacji wielowątkowych, jeżeli jesteś początkujący w tym temacie to koniecznie zapoznaj się z tymi terminami.

W swoich implementacjach, dzięki prawidłowemu ułożeniu danych, nie muszę się za bardzo martwić synchronizowaniem zapisu i odczytu. Każdy wątek dostaje dane odseparowane od innego wątku, dzięki czemu może na nich bezpiecznie działać. W dwóch algorytmach wybrałem odpowiednie operacje do zrównoleglenia, opisane one zostały w odpowiednich sekcjach.

Przystąpmy więc do implementacji tych algorytmów, jednak zanim to nastąpi należy napisać testy.

Cały kod użyty w tym przykładzie znajduje się również na GitHubie.
Kod napisany jest tak, aby był jak najbardziej czytelny. Oznacza to, że pominąłem wszelkie walidacje danych wejściowych. W kodzie produkcyjnym pamiętaj, aby zawsze upewniać się, że to co dostarcza Ci użytkownik ma prawidłowy format.

Testy

Testy są tak na prawdę kopią swoich synchronicznych odpowiedników. Na uwagę zasługuje jedynie wykorzystanie CancellationTokena, który to z dobrymi praktykami pociągnąłem po operacjach asynchronicznych.

Kod testów prezentuje się następująco:

[TestMethod]
public void ConcurrentStringBucketSort()
{
    StringBucketSort sort = new StringBucketSort(new SortableAlgorithms.BubbleSort.BubbleSort());
    string[] test = SortData.Example1StringsInput;
    sort.Sort(test, CancellationToken.None);
    for (int index = 0; index < SortData.Example1StringsOutput.Length; index++)
    {
        Assert.AreEqual(SortData.Example1StringsOutput[index], test[index]);
    }
}
 
[TestMethod]
public void ConcurrentQuickSort()
{
    QuickSort sort = new QuickSort();
    ComparableInt[] test1 = SortData.Example1Input;
    sort.Sort(test1, CancellationToken.None);
    Assert.IsTrue(Helpers.Equals(SortData.Example1Output, test1));
}

Jeżeli czytałeś moje poprzednie artykuły z tego cyklu, to kod ten powinien być dla Ciebie zrozumiały, gdyż stanowi on praktycznie kopię 1:1 z wcześniejszych wpisów.

Asynchroniczna wersja sortowania kubełkowego

W sortowaniu kubełkowym, przyjąłem jako element zrównolegnienia sortowanie kubełków. Uznałem, że czas przygotowywania kubełków i wrzucania do nich danych jest znacznie mniejszy niż w przypadku sortowania tych kubełków. Jeżeli się z tym nie zgadzasz, nic nie stoi na przeszkodzie, aby zrównoleglić również tą operację.

Skoro chciałem zrównoleglić jedynie faktyczne sortowanie, to duża część kodu pozostała bez zmian, prezentuje się on następująco:

private readonly ISortAlgorithm _sortAlgorithm;
private const int _bucketsCount = 26;
private const int _threadCount = 4;
 
public StringBucketSort(ISortAlgorithm sortAlgorithm)
{
    _sortAlgorithm = sortAlgorithm;
}
 
public void Sort(string[] table, CancellationToken cancellationToken)
{
    List<string>[] buckets = PrepareBuckets();
    PutToBuckets(buckets, table);
    int countPerTasks = _bucketsCount / _threadCount + 1;
    IEnumerable<string>[] output = new IEnumerable<string>[_bucketsCount];
    List<Task> tasks = new List<Task>();
    for (int i = 0; i< _threadCount; ++i)
    {
        List<string>[] bucketsToSort = buckets.Skip(i * countPerTasks).Take(countPerTasks).Where(x => x.Any()).ToArray();
        Task newTask = Task.Factory.StartNew(() => SortBuckets(bucketsToSort, cancellationToken), cancellationToken);
        tasks.Add(newTask);
    }
    Task.WaitAll(tasks.ToArray(), cancellationToken);
    List<string> sorted = new List<string>();
    foreach(var values in buckets)
    {
        sorted.AddRange(values);
    }
    for (int index = 0; index < table.Length; ++index)
    {
        table[index] = sorted[index];
    }
}
 
private List<string>[] PrepareBuckets()
{
    List<string>[] buckets = new List<string>[_bucketsCount];
    for (int i = 0; i < _bucketsCount; ++i)
    {
        buckets[i] = new List<string>();
    }
    return buckets;
}
 
private void PutToBuckets(List<string>[] buckets, IEnumerable<string> records)
{
    foreach (string str in records)
    {
        int bucket = (int)str.ToUpper()[0] - 65;
        buckets[bucket].Add(str);
    }
}
 
private void SortBuckets(IEnumerable<List<string>> buckets, CancellationToken cancellationToken)
{
    foreach (List<string> bucket in buckets)
    {
        cancellationToken.ThrowIfCancellationRequested();
        if (bucket.Any() == false)
        {
            continue;
        }
        string[] toSort = bucket.ToArray();
        _sortAlgorithm.Sort(toSort);
        bucket.Clear();
        bucket.AddRange(toSort);
    }
}

Jak widzisz jest on bardzo podobny do tego opisanego w swojej synchronicznej wersji. Różnicą tutaj jest wykorzystanie tasków do podzielenia pracy. Tak jak pisałem w założeniach, chciałem utworzyć 4 taski (pominąłem wszelkie walidacje tablicy tj. czy ma przynajmniej 4 elementy, w celu lepszej czytelności) i tak też zrobiłem. Każdy task dostał odpowiednią ilość kubełków do posortowania, a więc task nr. 1 dostał kubełki od 0 do X, task nr.2 dostał kubełki od X+1 do 2*X itd. Ostatni task dostał najprawdopodobniej najmniej kubełków, ponieważ mu przypadła ostatnia część zbioru. Jako X wyznaczyłem ilość kubełków podzieloną przez ilość wątków, a następnie dodatkowo zwiększyłem tą wartość o jeden, co gwarantuje mi nie pominięcie jakichś danych, jeżeli liczba kubełków była niepodzielna przez 4.

Każdy task utworzyłem i uruchomiłem za pomocą Task.Factory.StartTask, następnie zebrałem je do jednej kolekcji i poczekałem aż wszystkie się zakończą wykorzystując do tego funkcję Task.WaitAll. Po zakończeniu się tasków zebrałem wszystkie wyniki i odpowiednio przypisałem.

Dlaczego nie występują tutaj wyścigi do danych? Spieszę z odpowiedzią. Wyścigi nie występują, ponieważ każdy wątek pracuje na innym przedziale zbioru kubełków. Task nr.1 sortuje tylko pierwszą 1/4 ilość kubełków, task nr. 2 kolejną itd. Żaden wątek nie wykorzystuje danych, które są przeznaczone dla innego wątku, dzięki temu nie muszę synchronizować odczytu (wykorzystywać locków).

Asynchroniczna wersja sortowania szybkiego

Po wersji kubełkowej, sortowanie szybkie jest łatwiej zrównoleglić. Nawet w niektórych bibliotekach standardowych są dostępne asynchroniczne wersje tego algorytmu.

Kod w stosunku do synchronicznej wersji nie uległ większym zmianom. Doszedł tak na prawdę jeden if, ale o tym za chwilę, teraz zobacz proszę kod:

public void Sort(IComparable[] table, CancellationToken cancellationToken)
{
    QuickSortIntern(table, 0, table.Length - 1,2, cancellationToken);
}
 
private void QuickSortIntern(IComparable[] elements, int left, int right, int splitLeft, CancellationToken cancellationToken)
{
    if (left >= right)
    {
        return;
    }
    int onLeft = left;
    int onRight = right;
    IComparable comparableElement = elements[(onRight + onLeft) >> 1]; // (right + left) / 2
    do
    {
        cancellationToken.ThrowIfCancellationRequested();
        while (elements[onLeft].CompareTo(comparableElement) < 0) ++onLeft;
        while (elements[onRight].CompareTo(comparableElement) > 0) --onRight;
        if (onLeft <= onRight)
        {
            IComparable temp = elements[onLeft];
            elements[onLeft] = elements[onRight];
            elements[onRight] = temp;
            ++onLeft;
            --onRight;
        }
    }
    while (onLeft <= onRight);
    if (splitLeft > 0)
    {
        Task taskSortLeft = Task.Factory.StartNew(() => QuickSortIntern(elements, left, onRight, splitLeft - 1, cancellationToken), cancellationToken);
        Task taskSortRight = Task.Factory.StartNew(() => QuickSortIntern(elements, onLeft, right, splitLeft - 1, cancellationToken), cancellationToken);
        Task.WaitAll(new Task[] { taskSortLeft, taskSortRight }, cancellationToken);
    }
    else
    {
        QuickSortIntern(elements, left, onRight,0, cancellationToken);
        QuickSortIntern(elements, onLeft, right,0, cancellationToken);
    }
}

Tak jak pisałem, jedyną różnicą jest if na samym dole funkcji. W warunku tym wykorzystuję wartość zmiennej splitLeft, która to określa, ile jeszcze razy mogę podzielić pracę. Założenia wymagały maksymalnie 4 wątków i tak też tutaj to zostało zrealizowane, jednak nie widać tego w sposób jawny. Powodem tego jest rekurencja, każde wywołanie funkcji odpala dwie kolejne funkcje, więc mogę zrównoleglić pracę jedynie 2 razy (2*2 daje 4 ;)).

Wywołanie tasków i oczekiwanie na ich zakończenie jest identyczne jak wcześniej, więc nie będę tego opisywać ponownie.

Podsumowanie

Na początku zaznaczę, że trochę Cię okłamałem (co pewnie wychwycili co uważniejsi czytelnicy). Tak na prawdę algorytmy te nie pracują na 4 wątkach, ale na większej ilości. Algorytm sortowania kubełkowego wykorzystuje 5 wątków, a sortowanie szybkie wykorzystuje aż 7. Czy wiesz, dlaczego tak jest? A no dlatego, że należy podliczyć również wątek, który odpala nowe wątki i czeka na ich zakończenie. Można to w bardzo prosty sposób naprawić, wystarczy ostatnią część pracy zlecić wątkowi głównemu (temu, który odpala nowe), tak aby nie tylko oczekiwał na zakończenie pozostałych, ale też coś samemu zrobił.

Nie wprowadzałem jednak takiej implementacji, bo nie chciałem komplikować przykładu, a i Tobie przyda się dodatkowa praktyka. Możesz to zrobić w celu utrwalenie materiału, to na prawdę dosłownie kilka linijek kodu ;).

Na sam koniec cyklu algorytmów sortujących, przedstawiam testy wydajnościowe. Sprawdzam, jak sprawdzają się 2 moje implementacje tj. sortowanie szybkie i asynchroniczne sortowanie szybkie na tle algorytmu wykorzystanego w Array.Sort. W tym celu powstał taki kod testowy:

[TestMethod]
public void PerformanceTest()
{
    int length = 5000000;
    Random random = new Random();
    ComparableInt[] test = new ComparableInt[length];
    for(int i = 0; i < length; ++i)
    {
        test[i] = new ComparableInt(random.Next(-100000, 100000));
    }
    ComparableInt[] test1 = new ComparableInt[length];
    ComparableInt[] test2 = new ComparableInt[length];
    ComparableInt[] test3 = new ComparableInt[length];
    Array.Copy(test, test1, length);
    Array.Copy(test, test2, length);
    Array.Copy(test, test3, length);
    
    TimeSpan quickSort;
    TimeSpan quickSortAsync;
    TimeSpan main;
 
    Stopwatch sw = new Stopwatch();
    sw.Start();
    new SortableAlgorithms.QuickSort.QuickSort().Sort(test1);
    sw.Stop();
    quickSort = sw.Elapsed;
    Trace.WriteLine($"Quick Sort: {sw.Elapsed}");
 
    sw.Reset();
    sw.Start();
    new SortableAlgorithms.ConcurrencySort.QuickSort().Sort(test2, CancellationToken.None);
    sw.Stop();
    quickSortAsync = sw.Elapsed;
    Trace.WriteLine($"Quick Sort Async: {sw.Elapsed}");
 
    sw.Reset();
    sw.Start();
    Array.Sort(test3);
    sw.Stop();
    main = sw.Elapsed;
    Trace.WriteLine($"Array.Sort: {sw.Elapsed}");
}

Kod ten za zadanie ma wygenerować 5 milionów rekordów z przedziału [-100000,100000] i sprawdzić czas sortowania takiej tablicy. Test wypisywał 3 wartości, które informowały o czasie przetwarzania przez każdy algorytm. Przeprowadziłem 2 testy, które zwróciły takie wyniki:

Quick Sort: 00:00:08.4578274
Quick Sort Async: 00:00:07.9619596
Array.Sort: 00:00:09.0774643

Quick Sort: 00:00:10.9366832
Quick Sort Async: 00:00:9.3253238
Array.Sort: 00:00:10.6314131

Jak widzisz, moja implementacja QuickSort jest zbliżona prędkością do implementacji z Array.Sort, jednak implementacja asynchroniczna jest szybsza od dwóch poprzednich. Jeżeli dodatkowo poprawić działanie wątków (zejść z 7 na faktyczne 4), wynik byłby jeszcze lepszy. Po prowadzeniu tej poprawki wyniki wyglądają tak:

Quick Sort: 00:00:09.1811512
Quick Sort Async: 00:00:06.9047783
Array.Sort: 00:00:09.2373039

A kod, który poprawiłem i który miał zająć kilka linijek (jak pisałem wyżej) wygląda tak:

if (splitLeft > 0)
{
    Task taskSortRight = Task.Factory.StartNew(() => QuickSortIntern(elements, onLeft, right, splitLeft - 1, cancellationToken), cancellationToken);
    QuickSortIntern(elements, left, onRight, splitLeft - 1, cancellationToken);
    Task.WaitAll(new Task[] { taskSortRight }, cancellationToken);
}

Jeżeli udało Ci się samemu to poprawić to brawo!

Tym samym, zakończyliśmy cykl artykułów o algorytmach sortowania. W następnych wpisach poruszymy ciekawsze implementacje.

Pozdro!

Patryk Bogdański

Dodaj komentarz

Twój adres email nie zostanie opublikowany. Pola, których wypełnienie jest wymagane, są oznaczone symbolem *

This site uses Akismet to reduce spam. Learn how your comment data is processed.