Traitement de données volumineuses (big data)
Introduction
Supposons que je doive compter le nombre de requêtes Web effectuées par adresse IP sur mon serveur Apache.
Le plus simple est d'extraire les adresses IP du fichier de log (awk), de trier ces adresses (sort), puis de compter
le nombre d'occurences (uniq -c).
Voici comment je procéderais sur une seule machine avec un OS standard (POSIX):
(cd /var/log/apache2 && zcat access.log.*.gz && cat access.log.1 access.log) \
| awk '{print $2;}' | sort | uniq -c > /scratch/bigf
Ici, les données ne sont pas gigantesques, mais elles pourraient l'être. Le traitement ci-dessus prend en fait
relativement peu de temps pour environ 1 an de données sur un petit serveur Web (30 secondes, dont 124% de CPU
et environ 105 MBytes de mémoire), pour en tout 11.3M connexions (120k adresses uniques). Le traitement est
complètement séquentiel (une seule machine, un seul CPU, un tout petit peu de parallélisme à certains
moments) et c'est le sort qui prend le plus de temps.
NB: sort travaille notamment en utilisant des fichiers temporaires, d'où l'usage modéré de mémoire (tri par baquet).
Passons-nous du tri
Une autre idée est de passer les données séquentiellement, sans tri, et de remplir un tableau de hachage.
Potentiellement ce tableau pourrait contenir 2^32 entrées (4 GBytes x taille du compteur), ce qui pourrait être
un peu abusif. Mais on voit ci-dessus qu'il n'y a que 120'000 adresses uniques, donc cela pourrait être
gérable.
(cd /var/log/apache2 && zcat access.log.*.gz && cat access.log.1 access.log) | awk '{print $2;}' | perl -e 'my %h; while (my $line = <STDIN>) { chomp $line; $h{$line}++; } while (my ($k, $v) = each(%h)) { print $v, " ", $k, "\n"; }' > /scratch/bigf2
Cela consomme un peu plus de mémoire (41 MB), mais cela prend largement moins de temps (10 secondes, dont 4 de CPU).
Si on part d'un fichier temporaire, Perl seul prend 3 secondes, dont 99% CPU.
Requête supplémentaire
Et maintenant, si l'on veut les 3 clients ayant fait le plus de connexions ?
sort -nr < /scratch/bigf2 | head -3
Cela ne prend quasi pas de temps, car il n'y a qu'environ 120k lignes.
Et s'il y avait beaucoup plus de données ?
S'il y avait beaucoup plus de données
- la méthode sort serait probablement très lente
- la méthode séquentielle perl serait probablement toujours efficace, tant que les résultats (adresses IP) n'étaient qu'une partie de l'address-space Internet; idem pour le tri par fréquence.
On pourrait répartir l'effort!
- découper les données en N parties
- donner ces N parties à plusieurs ordinateurs (ou au moins plusieurs threads/processus) à traiter
- faire la somme des résultats partiels (toujours envisageable si les adresses ne forment qu'une partie de l'address-space Internet)
Exemple:
(cd /var/log/apache2 && zcat access.log.*.gz && cat access.log.1 access.log) | awk '{print $2;}' > /scratch/bigf-all # 9 secondes
split -l $(wc -l /scratch/bigf-all | awk '{printf "%d", ($1 / 4) + 1000;}') /scratch/bigf-all /scratch/bigf- # 1 seconde
for i in /scratch/bigf-??
do
(perl -e 'my %h; while (my $line = <STDIN>) { chomp $line; $h{$line}++; } while (my ($k, $v) = each(%h)) { print $v, " ", $k, "\n"; }' \
< $i > $i.RESULTS &)
done
wait # environ < 1 seconde (exécuté en parallèle), contre 3 pour la variante séquentielle!
# il faut encore résumer les résultats, qui dans ce cas montrent une localité non négligeable.
wc -l *.RESULTS
41103 bigf-aa.RESULTS
19460 bigf-ab.RESULTS
37315 bigf-ac.RESULTS
46357 bigf-ad.RESULTS
On voit que l'amélioration est surtout dans le traitement, les tâches préparatoires prenant à peu près le même temps, et le split coûtant.
Map/Reduce
- map: splitter les données de manière intelligente
- reduce: synthétiser les résultats partiels
Performance comparable à la version sort initiale.
map.pl:
#! /usr/bin/perl
use strict;
use warnings;
while (my $line = <STDIN>) {
if ($line =~ /^([^\s]+)\s+([^\s]+)\s+/) {
print $2, "\t", "1", "\n";
}
else {
print STDERR $0, ": tossed: ", $line, "\n";
}
}
reduce.pl:
#! /usr/bin/perl
# hadoop provides sorted data!
use strict;
use warnings;
my $last_ip = 'impossible';
my $count = 0;
while (my $line = <STDIN>) {
if ($line =~ /^([^\s]+)\s+([^\s]+)\s+/) {
if ($1 eq $last_ip) {
$count += $2;
}
else {
if ($count) {
print $last_ip, "\t", $count, "\n";
}
$last_ip = $1;
$count = $2;
}
}
else {
print STDERR $0, ": tossed: ", $line, "\n";
}
}
if ($count) {
print $last_ip, "\t", $count, "\n";
}
test-no-hadoop.sh:
#! /bin/bash
./map.pl < big-log \
| sort \
| ./reduce.pl > big-log.RESULTS
wc -l big-log.RESULTS
Avec Hadoop
voir démo sur vz203
Très lent; mais c'est normal (1 machine, Java, et granularité trop faible)
script hadoop:
#! /bin/bash
JAR=share/hadoop/tools/lib/hadoop-streaming-2.2.0.jar
HSTREAMING="$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/$JAR"
$HSTREAMING \
-mapper 'perl map.pl' \
-reducer 'perl reduce.pl' \
-file map.pl \
-file reduce.pl \
-input perl/big-log \
-output perl/big-log.RESULTS
hadoop fs -get hdfs://localhost:54310/user/hadoop/perl/big-log.RESULTS
Références
--
MarcSCHAEFER - 29 Apr 2014