ForkJoin : Bienvenue dans les mondes parallèles

Par :
dmartin
mar, 23/12/2008 - 14:18
Niveau :
Facile

L'API ForkJoin issue de la JSR166y, menée par Doug Lea, sera intégrée à la prochaine version majeure de Java (Java 7) dont la date de sortie devrait être en 2009.

Elle va apporter aux développeurs un ensemble de classes permettant à moindre coût de tirer partie des architectures multi cores et/ou multi processeurs sur lesquelles les développements sont exécutés.

Ce tutoriel va présenter quelques cas d’utilisation mettant en avant forces et faiblesses de la chose.

Cette API est d’ores et déjà disponible dans une version de test, compatible avec Java 6 ; ce qui permet de pouvoir la tester facilement. Vous trouverez le jar à cette adresse : http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166y.jar

Les classes héritant de ForkJoinTask

Commençons par nous intéresser aux classes héritant de ForkJoinTask. Cette dernière propose une base aux tâches amenées à être parallélisées. Une tâche ainsi définie recoupe un peu la notion de Callable apportée par Java 5.

Prenons un exemple simple, souvent utilisé, concernant la résolution de la suite de Fibonacci (vous n'aimez pas les Maths ? Ne fuyez pas déjà ! L'exemple est simple :) ).

Il s'agit de résoudre, de façon récursive (*), la suite suivante :

f(n) = f(n-1) + (f(n-2) pour n > 1 avec f(0) = 0; f(1) = 1;

Ce qui donne pour les premiers entiers : 0, 1, 1, 2, 3, 5, 8, 13, 21, 34, ...

(*) : Vous trouverez partout un algo beaucoup plus puissant pour cette résolution, mais non récursif et donc pas intéressant dans le cadre de notre démonstration.

L'approche naturelle ici est de faire calculer chaque partie de la suite en parallèle de l'autre (f(n-1) serait calculé en parallèle de f(n-2).

Nous allons donc créer une classe étendant ForkJoinTask implémentant cette logique. Nous considérerons la classe intermédiaire RecursiveAction (étendant ForkJoinTask) et se prêtant justement aux opérations récursives.

Voici un exemple d'implémentation :

public class Fibonacci extends RecursiveAction {

 

      private final int number;

      private int result;

 

      public Fibonacci(int number) {

            this.number = number;

      }

 

      public int getResult() {

            return result;

      }

 

      public void compute() {

            int n = number;

            if (n < 2) {

                  result = n;

                  return;

            }

 

            final Fibonacci f1 = new Fibonacci(n - 1); // subdivision

            final Fibonacci f2 = new Fibonacci(n - 2); // en 2 sous taches

            forkJoin(f1, f2); // invocation en parallèle des sous-taches (fork & join)

            result = f1.getResult() + f2.getResult(); // agregation des resultats

      }

 

}

 Pour exécuter ce code, nous y ajoutons une méthode main() :

 

      public static void main(String[] args) {

            final ForkJoinPool pool = new ForkJoinPool(2);

// dual core, mono processeur : pas la peine de tailler le pool au dela

            Fibonacci f = new Fibonacci(44);

// 44 est une valeur interessante pour son temps de calcul exploitable

            pool.invoke(f);

            System.out.println(f.getResult());

      }  

Cette méthode comporte la déclaration d’un ForkJoinPool, pierre angulaire ici car responsable de la gestion des Threads liés à l’exécution. Le paramètre passé au constructeur n’est autre que la taille du pool. On veillera à fournir une taille inférieure ou égale au nombre d’unité de traitement disponibles sur le serveur. Une valeur supérieure n’apporterait rien : les threads devant attendre la disponibilité d’une unité de calcul pour y être exécutés.

 

En lançant cet exemple, on observe une occupation CPU totale pour peu que le paramètre passé au ForkJoinPool soit égal au nombre de cores total sur le serveur.

 

Maintenant comparons avec une exécution récursive mais non parallèle en ajoutant une méthode à notre classe :

[...]

      private int seqFib(int n) {

            if (n <= 1) {

                  return n;

            } else {

                  return seqFib(n - 1) + seqFib(n - 2);

            }

      }

 

      public void sequentialCompute() {

            result = seqFib(this.number);

      }

 

[...]

L'heure de la comparaison sonne ! Et là, c'est plutôt une grosse déception car l'approche classique est particulièrement plus rapide que l'approche parallélisée. 

Essayons de comprendre pourquoi :

En essayant de paralléliser le traitement, nous découpons le problème en tâches très petites car tant que n n'est pas inférieur (strict) à 2, deux nouvelles tâches sont créées.

Cette création a déjà un coût qu’on ne retrouve pas dans l'autre approche. Ensuite, les tâches vont être dépilées tour à tour (en parallèle) pour agréger les résultats. Tout ceci ajoute encore du traitement... accès à la queue des tâches, suppression des objets qu’elle contient… Au final, cela s’avère très pénalisant.

Notion de seuil

 

Il faut donc trouver un compromis : c'est là que la notion de seuil apparaît. Il faut qu’au dessous d'un certain niveau, le découpage en sous tâches n’ait plus lieu, évitant ainsi la multiplication des tâches à traiter.

Ajoutons à notre code les lignes ci-dessous et conservons notre méthode sequentialCompute() pour comparer le gain :

 

 

public class Fibonacci extends RecursiveAction {

 

      private final int number;

      private int result;

      private static final int THRESHOLD = 10;

 

      public Fibonacci(int number) {

            this.number = number;

      }

 

      public void compute() {

            int n = number;

            // seuil de granularité sous lequel on ne parallèlise plus

            if (n <= THRESHOLD) {

                  result = seqFib(n);

            } else {

                  final Fibonacci f1 = new Fibonacci(n - 1); // subdivision

                  final Fibonacci f2 = new Fibonacci(n - 2);

                  // invocation en parallèle des sous-taches (fork & join)

                  forkJoin(f1, f2);

                  // aggregation des resultats

                  result = f1.getResult() + f2.getResult();

            }

      }

 

      private int seqFib(int n) {

            if (n <= 1) {

                  return n;

            } else {

                  return seqFib(n - 1) + seqFib(n - 2);

            }

      }

 

      public void sequentialCompute() {

            result = seqFib(this.number);

      }

 

      public int getResult() {

            return result;

      }

 

      public static void main(String[] args) {

            final ForkJoinPool pool = new ForkJoinPool(2);

            Fibonacci f = new Fibonacci(44);

            pool.invoke(f);

            System.out.println(f.getResult());

      }

 

}

Cette fois, un test comparatif des 2 méthodes va montrer un gain significatif (proche de 2 fois plus rapide dans mon cas) en faveur de l'approche "forkjoin".

On peut donc imaginer quel aurait pu être le gain sur un serveur plus lourdement équipé.

 

Prenons un autre exemple de calcul récursif, toujours dans le cadre des mathématiques : la décomposition d'un entier en au plus N sommants. Un entier peut s'écrire comme étant la somme de plusieurs autres. Le but du calcul sera de déterminer toutes les combinaisons possibles, en limitant le nombre d'entiers contenus dans la somme.

Exemple simple : l’entier « 3 » peut s'écrire :

« 3 », « 1+2 », « 1+1+1 »

Si je souhaite limiter à 2 sommants, il ne me reste plus que « 3 » et « 1+2 », soit 2 possibilités. On écrira d(3, 2) = 2. Simple donc.

 

La fonction récursive qui permet d'aboutir à ce résultat est :

d(p, q) =

si p = 0 alors 1

         sinon si q = 0 alors 0

         sinon si q > p alors d(p, p)

         sinon  d(p-q, q) + d(p, q-1)

  

Lançons nous dans une implémentation parallélisée de la chose. Utilisons pour cela la classe RecursiveTask<T>, proche voisine de RecursiveAction qui s'en différencie en cela que sa méthode compute() retourne un objet de type T.

 

 

Voici un exemple d'implémentation de cette fonction :

public class DecompositionEntierTask extends RecursiveTask<Integer> {

 

      private int threshold;

 

      private final int p;

 

      private final int q;

 

      public DecompositionEntierTask(final Integer p, final Integer q) {

            this(p, q, p/2);

      }

 

      public DecompositionEntierTask(final Integer p, final Integer q, final Integer threshold) {

            super();

            this.p = p;

            this.q = q;

            this.threshold = threshold;

      }

 

      /**

       * d(p, q) = si p = 0 alors 1

     *   sinon si q = 0 alors 0

     *   sinon si q > p alors d(p, p)

     *   sinon  d(p-q, q) + d(p, q-1)

       */

      protected Integer compute() {

 

            Integer result = null;

            if (p == 0) {

                  result = Integer.valueOf(1);

            } else if (q == 0) {

                  result = Integer.valueOf(0);

            } else if (q > p) {

                  if (p < threshold) {

                        result = exec(p, p);

                  } else {

                        result = new DecompositionEntierTask(p, p, threshold).forkJoin();

                  }

            } else {

                  if (p < threshold) {

                        result = exec(p - q, q) + exec(pq - 1);

                  } else {

                        DecompositionEntierTask d1 = new DecompositionEntierTask(p - q, q, threshold);

                        d1.fork();

                        result = new DecompositionEntierTask(pq - 1, threshold).forkJoin() + d1.join();

                  }

            }

            return result;

      }

 

      public Integer sequentialCompute() {

            return exec(this.p, this.q);

      }

 

      private Integer exec(final Integer p, final Integer q) {

            if (p == 0) {

                  return Integer.valueOf(1);

            } else if (q == 0) {

                  return Integer.valueOf(0);

            } else if (q > p) {

                  return exec(p, p);

            } else {

                  return exec(p - q, q) + exec(p,  q - 1);

            }

      }

 

}

Ici, nous laissons le soin de choisir le seuil (en utilisant le constructeur comportant le paramètre « threshold ») ou bien de le laisser être évalué par l'autre constructeur (via une règle simple : seuil = p/2).

Il apparaît assez rapidement que cette valeur de seuil peut devenir critique car avoir une forte incidence sur les performances. Il appartient donc au développeur de trouver la bonne stratégie pour l'évaluer. Celle retenue ici n’est sans doute pas la meilleure mais pas la pire ;-)

Ces deux exemples permettent donc d’ouvrir l’esprit sur les développements possibles qu’il est permis de faire avec cet ensemble de classes de la JSR.

A coté de ces classes héritant de ForkJoinTask, la JSR166y propose quelques autres facilités, qui ne seront peut être pas incorporées dans Java 7 aux dernières nouvelles, mais qui seront accessibles via une librairie tierce si tel est le cas.

Les ParallelArrays

Le principe du ParallelArray est simple : proposer une structure pour manipuler un tableau de valeurs en s'appuyant de façon transparente sur les fonctionnalités de parallélisation pour le traitement de certaines tâches.

Exemple : déterminer la valeur maximale parmi les valeurs du tableau ou calculer la moyenne des valeurs du tableau...

 

L'exemple traité ci-après consiste en un service d'analyse de population d'individus. Les caractéristiques suivies vont être des indicateurs physiques : l'indice de masse corporelle et l'indice de masse grasse.

L'idée est de pouvoir identifier parmi une population de personnes, celles présentant un indice élevé ou faible (à risque dans les 2 cas), de subdiviser cette population selon des critères et pouvoir toujours travailler sur ces sous ensembles...

 

Un code valant mieux qu'un long discours, posons les bases de l'exemple :

 

public class Person {

 

    public enum Gender {

        MALE,

        FEMALE;

    }

 

    private double weight;

    private double height;

    private int age;

    private String name;

    private Person.Gender gender;

 

    public Person() {

    }

 

    public Person(final String name, final Gender gender, final int age, final double weight, final double height) {

        this.name = name;

        this.gender = gender;

        this.age = age;

        this.weight = weight;

        this.height = height;

    }

 

    [...]

    // Get/Setters...

    [...]

}

 

public class HealthAnalyzerService {

 

    private final ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());

 

    private Person[] people;

 

    private ParallelArray<Person> peopleArray;

 

    public void setPeople(final Person[] people) {

        this.people = people;

        this.peopleArray = ParallelArray.createFromCopy(people, pool);

    }

 

}

Nous avons donc une classe Person décrivant un individu, ainsi qu'une classe HealthAnalyzerService qui proposera le service d'analyse.

Cette classe contient plusieurs choses :

- un tableau d'objets Person : on pouvait s'y attendre

- un ParallelArray typé Person : on pressent qu'il wrappera le tableau "simple"

- le pool d'exécution dimensionné de façon "souple" : en récupérant le nombre d'unité de calcul visibles par la JVM. Le code sera ainsi adaptable sur des environnements totalement différents.

 

La méthode setPeople() va non seulement initialiser l'attribut people mais aussi le ParallelArray qui lui est associé.

 

Ce code ne peut pour le moment pas produire beaucoup de choses utiles. Ajoutons lui donc quelques méthodes :

 

    // Creation de l'opération de calcul de l'Indice de Masse Corporelle

    private final Ops.ObjectToDouble<Person> bmiOp = new Ops.ObjectToDouble<Person>() {

        public double op(final Person p) {

            final BigDecimal bd = new BigDecimal(p.getWeight() / (p.getHeight() * p.getHeight()));

            return bd.setScale(2, RoundingMode.HALF_UP).doubleValue();

        }

    };

 

    // Creation de l'opération de calcul de l'Indice de Masse Grasse

    private final Ops.ObjectToDouble<Person> fmiOp = new Ops.ObjectToDouble<Person>() {

 

        private static final double CST_1 = 1.2;

        private static final double CST_2 = 0.23;

        private static final double CST_3 = 10.83;

        private static final double CST_4 = 5.4;

 

        public double op(final Person p) {

            final BigDecimal bd1 = new BigDecimal(p.getWeight() / (p.getHeight() * p.getHeight()));

            final double bmi =  bd1.setScale(2, RoundingMode.HALF_UP).doubleValue();

            final BigDecimal bd2 = new BigDecimal((CST_1 * bmi) + (CST_2 * p.getAge()) - (CST_3 * (p.getGender() == Person.Gender.MALE ? 1 : 0)) - CST_4);

            return bd2.setScale(2, RoundingMode.HALF_UP).doubleValue();

        }

    };

 

 

    private ParallelArrayWithFilter<Person> getFilteredMappedArray(final Ops.Predicate<Person>... predicates) {

        if (predicates != null && predicates.length > 0) {

            ParallelArrayWithFilter<Person> p = peopleArray;

            for (Ops.Predicate<Person> predicate : predicates) {

                p = p.withFilter(predicate);

            }

            return p;

        } else {

            return peopleArray;

        }

    }

 

    private ParallelDoubleArray.SummaryStatistics getBMISummary(final Ops.Predicate<Person>... predicates) {

        return getFilteredMappedArray(predicates).withMapping(bmiOp).summary();

    }

 

    private ParallelDoubleArray.SummaryStatistics getFMISummary(final Ops.Predicate<Person>... predicates) {

        return getFilteredMappedArray(predicates).withMapping(fmiOp).summary();

    }

 

    public double getHighestBMI(final Ops.Predicate<Person>... predicates) {

        return getFilteredMappedArray(predicates).withMapping(bmiOp).max();

    }

 

    public double getLowestBMI(final Ops.Predicate<Person>... predicates) {

        return getFilteredMappedArray(predicates).withMapping(bmiOp).min();

    }

 

    public double getHighestFMI(final Ops.Predicate<Person>... predicates) {

        return getFilteredMappedArray(predicates).withMapping(fmiOp).max();

    }

 

    public double getLowestFMI(final Ops.Predicate<Person>... predicates) {

        return getFilteredMappedArray(predicates).withMapping(fmiOp).min();

    }

 

    public Person getPersonWithHighestBMI(final Ops.Predicate<Person>... predicates) {

        return peopleArray.get(getBMISummary(predicates).indexOfMax());

    }

 

    public Person getPersonWithLowestBMI(final Ops.Predicate<Person>... predicates) {

        return peopleArray.get(getBMISummary(predicates).indexOfMin());

    }

 

    public Person getPersonWithHighestFMI(final Ops.Predicate<Person>... predicates) {

        return peopleArray.get(getFMISummary(predicates).indexOfMax());

    }

 

    public Person getPersonWithLowestFMI(final Ops.Predicate<Person>... predicates) {

        return peopleArray.get(getFMISummary(predicates).indexOfMin());

    }

 

Cette fois-ci on entre dans le vif du sujet. Commençons les explications dans l'ordre de lecture :

 

Les attributs bmiOp et fmiOp sont les opérations (Ops.*) de base sur lesquelles le code du ParallelArray s'appuiera pour évaluer les 2 indices.

Pour une personne donnée, chaque opération évalue un indice selon divers paramètres.

NOTE : il existe un grand nombre d'opérations permettant d'effectuer un calcul sur un élément d'un ParallelArray.

 

La méthode getFilteredMappedArray(..) admet n paramètres de type Ops.Predicate. Ces paramètres vont servir à filtrer le tableau pour dégager une vue restreinte des données sur lesquelles travailler.

 

Les méthodes getLowest* / getHighest* qui suivent vont faire usage conjoint des éléments précités. Par exemple getHighestBMI(..) va d'abord extraire l'ensemble des données correspondant aux critères en entrée (les "Predicates") pour ensuite déterminer le max() résultant de l'opération bmiOp.

Si vous avez suivi ce cheminement, tout va être très clair maintenant pour la suite.

 

Les dernières méthodes de type getPersonWith* utilisent une autre fonctionnalité offerte par le ParallelArray : déterminer l'index d'un élément sur la base de critères.

Prenons getPersonWithHighestBMI(..), elle comporte un appel à la méthode getBMISummary, laquelle retourne un objet contenant divers indicateurs issus de calculs effectués conjointement avec un ensemble de "Predicates" et une opération.

Cet objet retourné permet d'exposer l'indice de l'élément du tableau initial ayant la valeur retournée par l'opération la plus élevée dans notre cas.

Cet indice va permettre de retrouver simplement l'objet en question dans le tableau initial (people).

 

Maintenant où se cache la parallélisation ? Et bien dans chaque méthode manipulant les données du tableau wrappé : la détermination d'un maximum, d'un minimum, d'un sous ensemble d'éléments répondant à des critères...

Ainsi de façon transparente, en ne coûtant que quelques lignes de code additionnelles, il est possible de traiter de façon optimisée (du point de vue temps d'exécution perçu) des ensembles volumineux de données.

 

Bien sûr l'ensemble des classes utilitaires fournies ici vont se prêter à la majorité des usages sans pouvoir couvrir la totalité. Dans ces cas particuliers, il faudra retourner à l'approche plus bas niveau proposée par l'ensemble des classes FortJoinTask.

Conclusion

Nous pouvons enfin conclure ce tutoriel sur la JSR166y. Les 2 aspects présentés devraient vous permettre de mieux cerner le potentiel offert. Le JDK 7 approchant, le périmètre des fonctionnalités intégrées à la future version de Java sera vite connu et une librairie plus stable comportant un backport pour Java 6 et les parties non intégrées devrait aussi être disponible. Il sera ainsi possible de profiter de ceci sans pour cela attendre la généralisation de Java 7 dans les environnements de production.

Ajouter un commentaire

Filtered HTML

Plain text

CAPTCHA
Cette question permet de vérifier que vous n'êtes pas un robot spammeur :-)
 ZZZZZ  M   M   SSS   DDD    AA  
Z MM MM S D D A A
Z M M M SSS D D AAAA
Z M M S D D A A
ZZZZZ M M SSSS DDD A A