Tutoriel pour apprendre à intégrer Spark et Cassandra

Image non disponible

Ce tutoriel s'intéresse à présenter Spark et l'intégration avec la base de données NoSQL Cassandra.

Pour réagir à ce tutoriel, un espace de dialogue vous est proposé sur le forum : Commentez Donner une note à l'article (5)

Article lu   fois.

Les deux auteurs

Site personnel

Liens sociaux

Viadeo Twitter Facebook Share on Google+   

I. Introduction à Spark

Spark est un framework pour traiter de gros volumes de données de manière distribuée. Initialement imaginé dans le laboratoire AMPLab de l'université de Berkeley pour répondre à certains manques des frameworks Map/Reduce, c'est aujourd'hui un projet open source hébergé par la fondation Apache, supporté par Databricks et les principales distributions Hadoop (Cloudera, Hortonworks, MapR).

I-A. Des Streams aux RDD

Si vous maîtrisez l'API Stream introduite dans Java 8, ou les FluentIterables de Guava, vous ne serez pas déboussolés, car Spark reprend les mêmes concepts :

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
// Java 8
  metrics.stream()
    .filter(metric -> metric.getName().equals("cpu.user"))
    .mapToLong(metric -> metric.getValue())
    .summaryStatistics().getAverage();
 
  // Spark
  metricRdd
    .filter(metric -> metric.getName().equals("cpu.user"))
    .mapToDouble(metric -> metric.getValue())
    .mean();

On part d'un jeu de données sur lequel on applique en chaîne des opérations. Spark propose dans son DSL plus d'une vingtaine d'opérateurs dont filter, map, flatMap, reduce , count, distinct, collect, forEach comme dans les Streams Java 8 ou join, leftOuterJoin, union ou groupBy comme dans les bases de données relationnelles.

Un RDD, ou Resilient Distributed Dataset, est le concept central du framework Spark. C'est un jeu de données qui se parcourt comme une collection. Il est distribué, car il sera vraisemblablement partitionné (découpé en partitions), et chacune des partitions traitée sur un nœud du cluster. Il est résilient, car il sera peut-être partiellement relu en cas de problème (perte d'un nœud par exemple).

Comme une collection, un RDD peut être constitué de types simples (Int, String…) ou structurés. Les types structurés (comme l'objet Metric dans l'exemple ci-dessus) seront immuables pour permettre la parallélisation et sérialisables, car ils seront amenés à voyager d'un nœud à l'autre.

Pour créer un RDD, on peut partir de :

  • une collection (List, Set), transformée en RDD avec l'opérateur parallelize ;
  • un fichier local ou distribué (HDFS) dont le format est configurable: texte brut, SequenceFile Hadoop, JSON, ProtoBuf (via Elephant Bird)… ;
  • une base de données : JDBC, Cassandra, HBase… ;
  • un autre RDD auquel on aura appliqué une transformation comme filter, map

Le chemin inverse, exporter un RDD dans un fichier, dans une base de données ou une collection est aussi possible.

I-B. Du localisé au distribué

Lancer Spark en local est très utile pour développer gentiment sur son poste et tester unitairement ses chaînes de traitement. Il suffit de quelques lignes de code pour démarrer Spark :

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
11.
12.
13.
14.
15.
16.
17.
18.
19.
public class MetricSparkTest {
        private JavaSparkContext sparkContext;
        @Before
        public void setUp() {
            SparkConf conf = new SparkConf(true)
                    .setMaster("local")
                    .setAppName("Zenika");
            this.sparkContext = new JavaSparkContext(conf);
        }
        @Test
        public void testMeanCpuUser() throws Exception {
            Double mean = sparkContext.textFile("file://metric.txt")
                    .map(Metric::parseLine)
                    .filter(metric -> metric.getName().equals("cpu.user"))
                    .mapToDouble(metric -> metric.getValue())
                    .mean();
            assertEquals(74.0D, mean, 0.1D);
        }
    }

Néanmoins, ne perdons pas de vue que l'objectif de Spark est de pouvoir répartir les traitements sur plusieurs machines. Dans ce but, une machine maître écoute les demandes de traitement clientes, découpe chaque traitement et, comme tout bon chef, délègue sa réalisation à des machines esclaves. Ces dernières vont accomplir le travail demandé en parallèle.

Image non disponible

Pour passer d'une exécution locale à un cluster, seule la configuration Spark (voir l'objet SparkConf) change, l'emplacement du nœud maître n'est plus local mais spark://…

 
Sélectionnez
1.
2.
SparkConf conf = new SparkConf(true)
            .setMaster("spark://maitre:7077")

Pour paralléliser un traitement, il faut être capable de le découper. Ce partitionnement des données sera déterminé par la source de données ou bien pourra être imposé par le développeur. Premier exemple, pour HDFS on obtient par défaut une partition par bloc (64 Mo):

 
Sélectionnez
1.
sparkContext.textFile("hdfs://metric.txt");

Second exemple, sur un RDD de type couple clé/valeur, on peut se servir de la clé (et d'une fonction de hachage) pour forcer le partitionnement :

 
Sélectionnez
1.
2.
3.
metricRdd
    .mapToPair(metric -> new Tuple2<>(metric.getName(), metric))
    .partitionBy(new HashPartitioner(100))

Sur des opérations comme les join, groupBy par exemple, la maîtrise du partitionnement est vitale pour limiter le volume de données brassé entre les nœuds et ainsi garantir des performances acceptables.

I-C. Caching et lazy-evaluation

Revenons à l'exemple complet:

 
Sélectionnez
1.
2.
3.
4.
5.
Double mean = sparkContext.textFile("file://metric.txt") // 0
    .map(Metric::parseLine) // 1
    .filter(metric ->  metric.getName().equals("cpu.user") // 2
    .mapToDouble(metric -> metric.getValue()) // 3
    .mean() // 4;

Lorsque les opérations 1 à 3 se déclenchent, rien ne se passe immédiatement, ce sont des transformations : elles convertissent un RDD en un autre RDD. En pratique, elles ne font qu'assembler des composants entre eux, chacun d'eux venant envelopper et décorer son ou ses prédécesseurs pour obtenir un nouveau RDD :

Image non disponible

Seule la quatrième et dernière étape va réellement déclencher l'exécution de toutes les précédentes, c'est une action. Dans une architecture distribuée, cette action va se traduire par la soumission d'un job à l'ensemble du cluster.

En procédant ainsi, pour effectuer un autre traitement sur le même jeu de données issu de l'étape 1, je serai amené à relire et reparser le fichier source (étapes 0 et 1). Avec les opérations cache et persist, Spark permet de conserver temporairement un résultat de transformation, cela permettra de réutiliser des résultats intermédiaires et d'éviter des recalculs superflus :

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
10.
JavaRDD<Metric> metricRdd = sparkContext.textFile("file://metric.txt")
        .map(Metric::parse)
        .cache();
    Double mean = metricRdd
        .filter(metric ->  metric.getName().equals("cpu.user") // 2
        .mapToDouble(metric -> metric.getValue())
        .mean();
    Map<String, Long> metricByHost = metricRdd
        .map(metric -> metric.getHost())
        .countByValue();
Image non disponible

Spark permet ainsi de conserver temporairement un RDD en mémoire (on heap ou off heap avec Tachyon) et/ou sur le disque local. Si le résultat n'a pas pu être conservé (cache miss), il sera recalculé.

I-D. Déploiement

Image non disponible

Exécuter des traitements lourds sur un cluster, piloter les nœuds esclaves, leur distribuer les tâches équitablement, et arbitrer la quantité de CPU et de mémoire qui sera allouée à chacun des traitements, tel est le rôle d'un gestionnaire de cluster. Spark offre pour l'instant trois solutions pour cela : Spark standalone, YARN et Mesos. Livré avec Spark, Spark Standalone est le moyen le plus simple à mettre en place. Ce gestionnaire de cluster s'appuie sur Akka pour les échanges et sur Zookeeper pour garantir la haute disponibilité du nœud maître. Ce n'est pas un jouet, c'est un réel outil paré pour la production: il dispose d'une console pour superviser les traitements, d'un mécanisme pour collecter les logs des esclaves…

Pour lancer les process :

 
Sélectionnez
1.
2.
3.
4.
5.
// Spark Master
    spark-class org.apache.spark.deploy.master.Master
    
    // Spark Slave (Worker)
    spark-class org.apache.spark.deploy.worker.Worker spark://maitre:7077

Autre possibilité, YARN le gestionnaire de cluster Hadoop, Spark peut s'exécuter dessus, et aux côtés de jobs Hadoop. Enfin, plus sophistiqué et plus généraliste, Mesos permet de configurer plus finement l'allocation des ressources (mémoire, CPU) aux différentes applications.

Une fois notre traitement mis au point et notre cluster prêt, il ne reste plus qu'à lancer le traitement. On commence par empaqueter tout le code et les bibliothèques utilisées dans un gros Jar (un fatjar/uberjar). Puis on utilise la commande spark-submit pour soumettre le traitement au cluster :

 
Sélectionnez
1.
2.
3.
4.
5.
    spark-submit --master spark://maitre:7077  \
        --class com.zenika.metric.spark.Main   \
        --executor-memory 1G                   \
        --total-executor-cores 8               \
        metric-spark.jar

I-E. Multilangage

Les exemples présentés jusqu'ici étaient écrits en Java pour être accessibles au plus grand nombre. Cependant Spark est écrit en Scala et peut être indifféremment utilisé en Java, en Scala et en Python :

  • Scala : certaines fonctionnalités du langage comme les tuples, l'inférence de type, les case classes, les conversions implicites, rendent l'utilisation de Spark fluide. De plus, le Spark Shell, qui s'appuie sur le REPL Scala, permet l'écriture et l'exécution de traitements en direct ;
  • Python : en attendant SparkR, c'est un des langages préférés des data scientists, on dispose, comme en Scala, d'un REPL (PySpark), de tuples, d'un typage flexible… Mais certaines fonctionnalités purement Java ou Hadoop ne sont pas accessibles ;
  • Java : la version 8 est presque obligatoire pour tirer parti des expressions lambda. La couche d'adaptation Java/Scala et le typage explicite nécessaire rend l'utilisation un peu plus lourde, mais reste acceptable.

La plupart des exemples de la documentation sont décrits dans les trois langages.

I-F. Modules additionnels

Au-dessus de Spark, des bibliothèques additionnelles apportent des fonctionnalités supplémentaires :

  • Spark SQL : permet d'exprimer les traitements (map, filter, reduce) sous la forme d'un langage inspiré de SQL ;
  • MLLib : est une bibliothèque d'algorithmes de Machine Learning pour classifier, regrouper les données (k-Means), faire des recommandations… ;
  • Spark Stream : là où Spark excelle dans les traitements en masse de gros volumes de données, Spark Stream applique des recettes semblables pour des traitements au fil de l'eau ;
  • GraphX : apporte les outils pour explorer les graphes.

Dans cette courte introduction à Spark, nous avons parcouru les principaux concepts de ce framework de traitement distribué : RDD, transformations, actions, partitionnement… ainsi que quelques-unes de ses qualités : développement simple et maintenable, possibilités d'optimisation… Je détaillerai un peu plus Spark SQL et Spark Stream dans un prochain épisode.Spark et Cassandra

II. Spark et Cassandra

Cassandra est une base de données distribuée capable de stocker de gros volumes de données. Si son modèle de données constitué de tables et de colonnes, et son langage de requêtage CQL imite fortement les bases de données relationnelles, la ressemblance s'arrête là.

Les possibilités de requêtage dépendent intrinsèquement de la manière dont sont stockées/modélisées les données.

Datastax fournit connecteur Spark qui permet de manipuler des tables Cassandra sous forme de RDD, concept que nous avons présenté lors de la précédente section d'introduction à Spark. Par le biais de ce connecteur, Spark apporte à Cassandra des capacités de requêtage analytique.

D'une manière plus générale, nous étudierons l'intégration de Cassandra et Spark. Ce sera aussi l'occasion de découvrir les possibilités de Spark SQL et Spark Streaming.

II-A. Spark classique

Pour commencer avec le connecteur Cassandra Spark, les Jobs Spark doivent pouvoir se connecter à Cassandra. Pour cela on place dans la configuration les identifiants de connexion à Cassandra :

 
Sélectionnez
1.
2.
3.
4.
5.
6.
SparkConf conf = new SparkConf(true)
        .setMaster("local")
        .setAppName("Zenika")
        .set("spark.cassandra.connection.host", "node1")
        .set("spark.cassandra.connection.username", "spark")
        .set("spark.cassandra.connection.password", "spark");

Pour lire les données dans Cassandra, on décrit au moyen de la classe CassandraJavaUtil une requête de type select comme en CQL. On obtient ainsi un RDD de lignes Cassandra :

 
Sélectionnez
1.
2.
3.
CassandraJavaRDD<CassandraRow> metricRdd = javaFunctions(sparkContext)
        .cassandraTable("metrics", "metric")
        .select("host", "name", "date", "value");

Au passage, notez qu'en Scala, les conversions implicites auraient permis d'éviter l'appel à javaFunctions.

Le connecteur sait automatiser le mapping ligne/objet à la manière d'un ORM (voir mapRowTo), la columnMap contient la correspondance colonne/propriété :

 
Sélectionnez
1.
2.
3.
CassandraJavaRDD<Metric> metricRdd = javaFunctions(sparkContext)
        .cassandraTable("metrics", "metric", mapRowTo(Metric.class, columnMap))
        .select("host", "name", "date", "value");

Le chemin inverse, l'écriture d'un RDD dans Cassandra, est très similaire à la lecture. La méthode saveToCassandra est une action Spark tout comme saveAsTextFile :

 
Sélectionnez
1.
2.
3.
javaFunctions(metricStatRdd)
        .writerBuilder("metrics", "metric_stat", mapToRow(MetricStat.class, columnMap))
        .saveToCassandra();

De manière à optimiser les accès base, il est de bon ton de placer les esclaves Spark sur les mêmes machines que les nœuds Cassandra.

Image non disponible

Le connecteur Cassandra-Spark amène chaque exécutant Spark à préférer lire les partitions Cassandra du nœud sur lequel il est, évitant ainsi de coûteux allers-retours. Autrement dit, le connecteur sait optimiser les lectures, dès lors que Spark et Cassandra cohabitent sur un même nœud.

II-B. Spark SQL

Pour mémoire, Spark SQL est une extension de Spark qui permet d'exprimer les traitements sur un RDD dans un langage inspiré de SQL. Sous le capot, Spark SQL utilise un type de RDD spécialisé, nommé SchemaRDD, dans lequel les données seront structurées sous forme d'un tableau. Le SchemaRDD est constitué de lignes (Row), toutes ayant les mêmes colonnes. Pour passer d'un RDD tout simple à un SchemaRDD, on décrit les colonnes, puis on lui affecte un nom de table :

 
Sélectionnez
1.
2.
3.
4.
5.
6.
7.
8.
9.
// Un RDD tout simple
    List<Metric> metrics = ...
    JavaRDD<Metric> simpleRdd = sparkContext.parallelize(metrics);
 
    // Un SchemaRDD
    JavaSQLContext sqlContext = new JavaSQLContext(sparkContext);
    JavaSchemaRDD schemaRDD = sqlContext.applySchema(simpleRdd, Metric.class);
    schemaRDD.registerTempTable("metric");
    JavaSchemaRDD resultRDD = sqlContext.sql("select host, avg(value) from metric where name='cpu.total' group by host");

Une fois tables et colonnes déclarées dans le contexte, Spark SQL permet d'exécuter du pseudo-SQL sur n'importe quel RDD. En enregistrant plusieurs tables dans le contexte Spark SQL, on aurait pu faire faire des jointures entre elles.

Revenons à Cassandra, avec le connecteur Cassandra Spark, on peut utiliser Spark SQL pour requêter directement la base de données et produire des SchemaRDD.

 
Sélectionnez
1.
2.
3.
4.
5.
JavaCassandraSQLContext cassandraSQLContext = new JavaCassandraSQLContext(sc);
    JavaSchemaRDD metricRDD = cassandraSQLContext.sql("select name from metrics.metric where name like 'cpu%'");
    long count = metricRDD 
    .map(row -> row.getString(0))
    distinct().count();

On peut aussi appliquer des transformations habituelles (map, filter), du cache, etc. sur le résultat de la requête comme sur tout RDD qui se respecte.

Pourquoi utiliser Spark SQL alors qu'on dispose déjà de CQL ? Spark SQL permet :

  • de requêter les données même si le modèle de données ne s'y prête pas. En effet, en CQL, on ne pourra pas exprimer une requête si le modèle ne s'y prête pas ;
  • de faire des jointures et d'utiliser de nombreux opérateurs comme like ou group by dans les exemples ci-dessous.

Bien que plus riche et permissive, l'approche Spark SQL est aussi beaucoup plus coûteuse que du CQL. En effet, Spark va généralement balayer un gros volume de données à la façon d'un full scan. Pour limiter l'impact de Spark sur les performances de Cassandra, il est vivement conseillé de créer un datacenter logique au niveau de Cassandra de manière à isoler cette charge.

Finalement, Spark SQL ne remplace pas CQL, mais le complète pour tout ce qui est requêtes analytiques.

II-C. Spark Streaming

Si le cœur de Spark est focalisé sur les traitements en masse de gros volumes de données, Spark Streaming propose une API similaire pour des traitements de données au fil de l'eau. On troque les forts volumes de données pour de forts débits, et les RDD pour des DStreams (Discrete Streams). Mais l'API reste très proche, map, filter sont toujours de la partie :

 
Sélectionnez
1.
2.
3.
4.
5.
6.
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkContext, 
    new Duration(10000));
    JavaDStream<Metric> metricDStream = streamingContext
        .socketTextStream("metric-source", 7075)
        .map(Metric::parseStream)
        .filter(metric -> metric.getValue() != null);

Ces DStreams seront alimentés par des systèmes orientés message/événement comme Kafka, ZeroMQ, Akka, etc. Cependant, les traitements seront effectués de manière périodique (toutes les dix secondes dans l'exemple ci-dessus). Spark Streaming est un framework de micro-batch, il traite les données sous forme de petits RDD toutes les N secondes (N entre une seconde et une minute) :

Image non disponible

Le fait d'avoir des RDD ouvre pas mal de possibilités comme factoriser du code avec les traitements batch classiques, faire des jointures…

Et Cassandra dans tout ça ? Et bien le connecteur permet de persister un DStream dans une table de la même manière qu'un RDD :

 
Sélectionnez
1.
2.
3.
javaFunctions(metricDStream)
                .writerBuilder("metrics", "metric", mapRowTo(Metric.class, columnMap))
                .saveToCassandra();

Cassandra pour le stockage et le requêtage temps réel, Spark pour le requêtage analytique, les traitements de masse et l'intégration au fil de l'eau. Ce redoutable tandem est capable d'absorber des données en grande quantité et procéder à des analyses de données à grande échelle. Spark apporte à Cassandra des capacités de requêtage et de fouille des données, Cassandra apporte à Spark une solution pour persister les RDD de manière distribuée et structurée.

III. Remerciements

Cet article a été publié avec l'aimable autorisation de Zenika, experts en technologies open source et méthodes Agiles.

Nous tenons à remercier Claude Leloup pour sa relecture orthographique et Malick SECK pour la mise au gabarit.

Vous avez aimé ce tutoriel ? Alors partagez-le en cliquant sur les boutons suivants : Viadeo Twitter Facebook Share on Google+   

  

Copyright © 2016 Gérald Quintana (Zenika). Aucune reproduction, même partielle, ne peut être faite de ce site et de l'ensemble de son contenu : textes, documents, images, etc. sans l'autorisation expresse de l'auteur. Sinon vous encourez selon la loi jusqu'à trois ans de prison et jusqu'à 300 000 € de dommages et intérêts.