TPL Teil 6 – Aggregationen

Mit Schleifen werden häufig Daten zusammengeführt, bzw. verdichtet. Solche Aggregationen werden innerhalb der Task Parallel Library (TPL) durch die Methoden Parallel.For() und Parallel.ForEach() direkt unterstützt.

Im Post TPL Teil 4 – Die Klasse Parallel habe ich unter Unabhängigkeit sicherstellen dargestellt, dass alle Abhängigkeiten zwischen den einzelnen Ausführungseinheiten beseitigt werden müssen. Ansonsten kann es zu fehlerhaften Berechnungen kommen. Hier ein entsprechendes (fehlerhaftes) Beispiel:

public void Run()
{
    Console.WriteLine("Start Run");

    double result01 = 0;
    for (int i = 1; i < 10000; i++)
    {
        result01 = result01 + DoSomeWork(i);
    }
    Console.WriteLine("Result: {0}", result01);

    double result02 = 0;
    Parallel.For(1, 10000, (i) =>
    {
        result02 = result02 + DoSomeWork(i);
    });
    Console.WriteLine("Result: {0}", result02);

    Console.WriteLine("End Run");
    Console.ReadLine();
}
private double DoSomeWork(int index)
{
    return Math.Sin(index) + Math.Sqrt(index) * Math.Pow(index, 0.14);
}

Beispiel 1 (Visual Studio 2012) auf GitHub

Die Ergebnisse der sequenziellen und der parallelen Ausführung unterscheiden sich deutlich. In der Methode Parallel.For() wurden die Schreibzugriffe auf die Variable result02 nicht synchronisiert. Dadurch wird die Summe falsch berechnet.

Picture01

Bei parallelen Berechnungen sollte das Ergebnis immer sorgfältig geprüft werden. Am besten wird das Ergebnis mit einer sequenziellen Berechnung verglichen. Auch sollte der Endwert der Schleife bei Tests möglichst groß gewählt werden. Ist die Spreizung der Laufvariable zu klein, so kann es vorkommen, dass die TPL-Bibliothek die Arbeit nicht auf mehrere Tasks aufteilt. Die Schleife wird dadurch sequenziell ausgeführt und fehlende Sperren wirken sich nicht negativ auf das Ergebnis aus.

Zugriffe auf gemeinsame Ressourcen müssen mit geeigneten Mitteln synchronisiert werden.

double result02 = 0;
object locker = new Object();
Parallel.For(1, 10000, (i) =>
{
    lock (locker)
    {
        result02 = result02 + DoSomeWork(i);
    }
});

Beispiel 2 (Visual Studio 2012) auf GitHub

Die Berechnung der Methode Parallel.For() ist jetzt Dank der synchronisierten Schreibzugriffe korrekt.

Picture02

Leider wirkt sich die notwendige Sperre sehr negativ auf das Laufzeitverhalten aus, da in jedem Schleifendurchlauf eine Sperre aktiv ist. In jedem Schleifendurchlauf ein Lock zu nutzen ist hier aber nicht notwendig. Es brauchen nur so viele Locks vorhanden sein, wie Tasks ausgeführt werden.

Optimal wäre für jeden Task eine lokale Hilfsvariable, in der jede Ausführungseinheit sein Zwischenergebnis ablegen kann, ohne Synchronisierungen durchführen zu müssen. Erst am Ende, wenn alle Ausführungseinheiten fertig sind, müssen die Teilergebnisse zusammengeführt werden.

Für diese Anforderungen bieten die Methoden Parallel.For() und Parallel.ForEach() entsprechende Überladungen an.

Aggregationen mit Parallel.For()

ParallelLoopResult For<TLocal>(int fromInclusive,
                               int toExclusive,
                               Func<TLocal> localInit,
                               Func<int, ParallelLoopState, TLocal, TLocal> body,
                               Action<TLocal> localFinally);

Es handelt sich hierbei um eine generische Methode, wobei TLocal den Datentyp der lokalen Variable des jeweiligen Task festlegt. Dadurch lassen sich auch komplexere Objekte als lokale Variable verwenden. Der Methode müssen folgende Parameter übergeben werden:

  • Startwert
  • Endwert
  • wird am Anfang jeder Ausführungseinheit aufgerufen
  • Funktion, die in der Task je Schleifendurchlauf ausgeführt wird
  • wird am Ende jeder Ausführungseinheit aufgerufen

Hierzu ein entsprechendes Beispiel:

double result02 = 0;
object locker = new Object();
Parallel.For<double>(1, 10000,
     () => 0,
     (i, pls, tls) =>
     {
         tls += DoSomeWork(i);
         return tls;
     },
     (tls) =>
     {
         lock (locker)
         {
             result02 += tls;
         }
     });

Die ersten beiden Parameter verhalten sich wie bekannt. Neu ist die Initialisierung der lokalen Variable. Laut Signatur handelt es sich um ein Delegate, der eine Variable vom Typ TLocal zurück gibt. Die Initialisierungsfunktion wird für jede Ausführungseinheit einmal aufgerufen. Wird die Schleife in 10 Tasks aufgeteilt, so wird die Initialisierungsfunktion 10 mal aufgerufen und somit 10 lokale Variablen (oder Objekte) angelegt.

Als Lambda-Ausdruck ist die Schreibweise besonders kompakt, man könnte aber auch schreiben:

Parallel.For<double>(1, 10000,
     () =>
     {
         double x = 0.0;
         return x;
     },
     (i, pls, tls) =>
     {
         tls += DoSomeWork(i);
         return tls;
     },
     (tls) =>
     {
         lock (locker)
         {
             result02 += tls;
         }
     });

Jetzt ist gut zu erkennen, dass in der Init-Funktion auch komplexere Initialisierungen möglich sind. Wichtig ist nur, dass der Rückgabewert dem Typ TLocal entspricht.

War bisher der Delegate für den Schleifen-Body immer vom Typ Action<>, so ist er jetzt vom Typ Func<>. An dieser Funktion werden drei Parameter übergeben:

  • die Laufvariable
  • ein Objekt vom Typ ParallelLoopState (wird hier nicht benötigt)
  • eine Variable/Objekt vom Typ TLocal, die als ThreadLocalState bezeichnet wird

Der Delegate gibt das aktuelle Zwischenergebnis der jeweiligen Schleifeniteration zurück. Dieser Wert wird bei der folgenden Iteration über den ThreadLocalState erneut übergeben. Der ThreadLocalState stellt also die lokale Variable da, auf die zugegriffen werden kann, ohne dass Synchronisierungen notwendig sind.

Wer eine kompakte Schreibweise bevorzug, könnte das Beispiel auch wie folgt schreiben:

Parallel.For<double>(1, 10000,
     () => 0.0,
     (i, pls, tls) => tls += DoSomeWork(i),
     (tls) =>
     {
         lock (locker)
         {
             result02 += tls;
         }
     });

Der letzte Parameter erwartet einen Action-Delegate. Dieser wird aufgerufen sobald eine Ausführungseinheit die letzte Iteration ausgeführt hat. Der Parameter enthält den aktuellen ThreadLocalState, der an dieser Stelle für die eigentliche Berechnung des Schleifenergebnisses genutzt werden kann.

Da nicht vorhersehbar ist, wann welche Ausführungseinheit seine Arbeit beendet, müssen u.U. Schreibzugriffe synchronisiert werden. Diese Sperren werden aber nur so häufig ausgeführt, wie es Ausführungseinheiten gibt.

Sollten in dem Delegate für die Initialisierung ebenfalls Schreibzugriffe auf gemeinsame Ressourcen stattfinden (warum auch immer), so müssen diese ebenfalls synchronisiert werden.

Der Delegate für die Initialisierung, ebenso wie der für die Aggregation, wird genauso häufig aufgerufen wie es Ausführungseinheiten gibt. Dieses kann durch einen Zähler einfach überprüft werden:

double result02 = 0;
object locker = new Object();
int inits = 0;
int aggregates = 0;
Parallel.For<double>(1, 10000,
() =>
{
    Interlocked.Increment(ref inits);
    double tls = 0.0;
    return tls;
},
(i, pls, tls) =>
{
    tls += DoSomeWork(i);
    return tls;
},
(tls) =>
{
    Interlocked.Increment(ref aggregates);
    lock (locker)
    {
        result02 += tls;
    }
});
Console.WriteLine("Result: {0}", result02);
Console.WriteLine("inits: {0}   aggregates: {1}", inits, aggregates);

Beispiel 3 (Visual Studio 2012) auf GitHub

Picture03

Aggregationen mit Parallel.ForEach()

ParallelLoopResult ForEach<TSource, TLocal>(IEnumerable<TSource> source,
                                            Func<TLocal> localInit,
                                            Func<TSource, ParallelLoopState, long, TLocal, TLocal> body,
                                            Action<TLocal> localFinally);

Die Methode Parallel.ForEach() hat einen ähnlichen Aufbau wie die entsprechende Methode Parallel.For(). Auch diese Methode ist generisch und definiert durch TLocal den Datentyp der lokalen Variable des jeweiligen Task. TSource legt den Datentyp der einzelnen Elemente der Datenquelle fest.

Der Methode müssen folgende Parameter übergeben werden:

  • Eine Aufzählung, die als Datenquelle dient
  • wird am Anfang jeder Ausführungseinheit aufgerufen
  • Funktion, die in der Task je Element ausgeführt wird
  • wird am Ende jeder Ausführungseinheit aufgerufen

An den Delegate, der die einzelnen Elemente der Datenquelle bearbeitet, werden vier Parameter übergeben:

  • das jeweilige Element aus der Datenquelle
  • ein Objekt vom Typ ParallelLoopState (wird hier nicht benötigt)
  • der Index des Datenelements
  • eine Variable/Objekt vom Typ TLocal, die als ThreadLocalState bezeichnet wird

Der Delegate gibt das aktuelle Zwischenergebnis der jeweiligen Bearbeitung zurück. Dieser Wert wird dann bei der folgenden Iteration über den ThreadLocalState erneut übergeben. Der ThreadLocalState stellt also auch hier die lokale Variable da, auf die ohne Synchronisierung zugegriffen werden kann.

ublic void Run()
{
    Console.WriteLine("Start Run");
    // create data source
    int[] data = new int[100000];
    for (int i = 0; i < 100000; i++)
        data[i] = i;

    double result01 = 0;
    foreach (int i in data)
        result01 += DoSomeWork(i);
    Console.WriteLine("Result: {0}", result01);

    double result02 = 0;
    object locker = new Object();
    Parallel.ForEach<int, double>(data,
    () =>
    {
        double tls = 0.0;
        return tls;
    },
    (item, pls, i, tls) =>
    {
        tls += DoSomeWork(item);
        return tls;
    },
    (tls) =>
    {
        result02 += tls;
    });
    Console.WriteLine("Result: {0}", result02);

    Console.WriteLine("End Run");
    Console.ReadLine();
}
private double DoSomeWork(int index)
{
    return Math.Sin(index) + Math.Sqrt(index) * Math.Pow(index, 0.14);
}

Beispiel 4 (Visual Studio 2012) auf GitHub

Picture04

Da die Anzahl der Additionen bei der sequenziellen und der parallelen Variante unterschiedlich ist, treten irgendwann Rundungsfehler auf. Dieses erklärt die Ungenauigkeit ab der 6. Nachkommastelle.

Aggregationen mit PLINQ

Wer mehr mit Aggregationen zu tun hat, sollte sich PLINQ (parallel LINQ) etwas genauer anschauen.

Hier das oben gezeigte Beispiel, umgesetzt mit PLINQ (in 2 verschiedenen Varianten):

public void Run()
{
    Console.WriteLine("Start Run");

    double result01 = 0;
    for (int i = 1; i < 10000; i++)
        result01 += DoSomeWork(i);
    Console.WriteLine("Result: {0}", result01);

    double result02 = 0;
    result02 = ParallelEnumerable.Range(0, 10000).Sum(i => DoSomeWork(i));
    Console.WriteLine("Result: {0}", result02);

    double result03 = 0;
    result03 = (from x in ParallelEnumerable.Range(0, 10000).AsParallel()
                select x).Sum(i => DoSomeWork(i));
    Console.WriteLine("Result: {0}", result03);

    Console.WriteLine("End Run");
    Console.ReadLine();
}
private double DoSomeWork(int index)
{
    return Math.Sin(index) + Math.Sqrt(index) * Math.Pow(index, 0.14);
}

Beispiel 5 (Visual Studio 2012) auf GitHub

Fazit

Die Task Parallel Library besitzt mit den Methoden Parallel.For() und Parallel.ForEach() eingebaute Mechanismen zur parallelen Aggregation.

PLINQ stellt eine weitere Möglichkeit dar, um Daten einfach zu aggregieren. Durch den deklarativen Ansatz bleibt der Quellcode strukturiert und gut lesbar. Positiv ist hierbei, dass auf zusätzliche Sperrobjekte ganz verzichtet werden kann.

Author: Stefan Henneken

I’m Stefan Henneken, a software developer based in Germany. This blog is just a collection of various articles I want to share, mostly related to Software Development.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: