Apache Storm auf Microsoft Azure

Nachdem die letzten Male Azure Data Factory und Azure HDInsight im Fokus standen geht es heute um eine Technologie, die sich in diesen Bereich nahtlos einreiht: Apache Storm.

Dieser Artikel erklärt die Grundidee und die wichtigsten Begriffe bezogen auf Storm und welche Möglichkeiten es auf Azure gibt.

Was ist Apache Storm?

Storm wurde ursprünglich von Twitter und BackType entwickelt und ist ein echtzeitfähiges, verteiltes System für die Verarbeitung von Streams. Ursprünglich entwickelt um den Umgang mit Queues und Workern zu vereinfachen hat sich Storm über die Jahre weiterentwickelt. Es kann unter anderem genutzt werden um Daten aus Streams auszufiltern bevor sie überhaupt persistiert werden. Weiterhin ist es gut geeignet für den Aufbau einer Datensenke indem verschiedene Quellen angesprochen werden und nach beliebigen Verarbeitungsschritten in die entsprechenden Ziele geladen werden kann. Hierzu gibt es Alternativen, aber der große Unterschied ist die Echtzeitfähigkeit von Storm im Gegensatz zur Batch-Verarbeitung anderer Technologien.

Typische Einsatzgebiete sind unter anderem die Missbrauchserkennung bspw. im Bereich der Versicherungen. Weiterhin ermöglicht Storm die Erkennung von gerade beliebten Themen in sozialen Medien (siehe Twitter).

Storm umfasst vier grundlegende Begriffe: Streams, Topologien, Spouts and Bolts.
Streams sind dabei die Hauptabstraktion von Storm und beschreiben eine Sequenz von Tupeln.

Diese Tupel werden in einem Graphen, der sogenannten Topologie, verarbeitet.

In einer Topologie ist jeder Knoten entweder ein Spout oder ein Bolt. Ein Spout wird genutzt um Streams in die Topologie zu laden indem Tupel von externen Quellen gelesen werden. Bolts verarbeiten im Anschluss die eingehenden Stream. Dies reicht von einfachen Filtern, Aggregationen und einfache Transformationen bis hin zum Aufbau von Datenbankverbindungen für das Speichern der Informationen. Um komplexere Transformationen durchführen zu können werden mehrere Bolts hintereinander geschaltet. Ein Bolt kann dabei mehr als einen Stream von Spouts oder anderen Bolts verarbeiten.

Das berühmte Word Count-Beispiel könnte in Storm beispielsweise folgendermaßen implementiert werden:

Hierbei werden Tupel aus einer (sehr großen) Datei von einem Spout gelesen. Dieser schickt jede einzelne Zeile der Datei an einen Bolt, der für die Aufteilung der Zeilen in einzelne Wörter zuständig ist. Diese Wörter werden dann an einen weiteren Bolt geschickt, der die Anzahl der eingehenden Wörter ermittelt.

Der entsprechende Code in Java setzt sich folgendermaßen zusammen (basierend auf dem Buch Storm Blueprints, welches eine absolute Empfehlung bezüglich Storm ist). Bevor es allerdings los geht definieren wir eine Hilfsklasse, die bei der Verarbeitung der Streams unterstützt.

public class Utils {

    public static void waitForSeconds(int seconds) {
        try {
            Thread.sleep(seconds * 1000);
        } catch (InterruptedException e) {

        }
    }

    public static void waitForMillis(long milliseconds) {
        try {
            Thread.sleep(milliseconds);
        } catch (InterruptedException e) {

        }
    }
}

Die Klasse SentenceSpout dient als Datenquelle ähnlich zu dem File-Spout in der oberen Abbildung.

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import storm.blueprints.utils.Utils;

import java.util.Map;

public class SentenceSpout extends BaseRichSpout {

    private SpoutOutputCollector collector;
    private String[] sentences = {
            "my dog has fleas",
            "i like cold beverages",
            "the dog ate my homework",
            "don't have a cow man",
            "i don't think i like fleas"
    };
    private int index = 0;

    /**
     * Tells Storm what streams a component will emit and the fields each
     * Stream's tuples will contain.
     *
     * @param outputFieldsDeclarer
     */
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("sentence"));
    }

    /**
     * Called whenever a spout component is initialized.
     *
     * @param map
     * @param topologyContext
     * @param spoutOutputCollector
     */
    public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        collector = spoutOutputCollector;
    }

    /**
     * Core of any spout implementation. Storm calls this method to request that
     * the spout emit tuples to the output collector.
     */
    public void nextTuple() {
        this.collector.emit(new Values(sentences[index]));
        index++;

        if (index >= sentences.length) {
            index = 0;
        }

        Utils.waitForMillis(1);
    }
}

Die Klasse SplitSentenceBolt spaltet im nächsten Schritt alle Satze auf.

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.Map;

public class SplitSentenceBolt extends BaseRichBolt {

    private OutputCollector collector;

    /**
     * Analogous to the open() method of a spout. Here are resources, such as database
     * connections, are prepared during bolt initialization.
     *
     * @param map
     * @param topologyContext
     * @param outputCollector
     */
    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        collector = outputCollector;
    }

    /**
     * Is called every time the bolt receives a tuple from a stream to which it subscribes.
     *
     * @param tuple
     */
    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");

        for (String word : words) {
            collector.emit(new Values(word));
        }
    }

    /**
     * Declares a single stream of tuples.
     *
     * @param outputFieldsDeclarer
     */
    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word"));
    }
}

Aus den vorher aufgeteilten Sätzen zählt die Klasse WordCountBolt alle Vorkommen eines Wortes.

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

public class WordCountBolt extends BaseRichBolt{

    private OutputCollector collector;
    private HashMap counts = null;

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        collector = outputCollector;
        counts = new HashMap();
    }

    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Long count = counts.get(word);

        if (count == null) {
            count = 0L;
        }

        count++;
        counts.put(word, count);
        collector.emit(new Values(word, count));
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(new Fields("word", "count"));
    }
}

Als zusätzlicher Schritt wird die Klasse ReportBolt an das Ende der Topology gesetzt um sinnvolle Ausgaben auf der Konsole zu ermöglichen.

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;

import java.util.*;

public class ReportBolt extends BaseRichBolt {

    private HashMap counts = null;

    public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
        counts = new HashMap();
    }

    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Long count = tuple.getLongByField("count");
        counts.put(word, count);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

    }

    public void cleanup() {
        System.out.println("--- FINAL COUNTS ---");

        List keys = new ArrayList();
        keys.addAll(counts.keySet());
        Collections.sort(keys);

        for (String key : keys) {
            System.out.println(key + " : " + counts.get(key));
        }

        System.out.println("--------------");
    }
}

Nachdem alle Spouts and Bolts definiert wurden geht es nun an die Implementierung der Topology. Hierbei werden alle Spouts und Bolts in eine sinnvolle Reihenfolge gebracht und miteinander verknüpft.

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import storm.blueprints.utils.Utils;

public class WordCountTopology {

    private static final String  SENTENCE_SPOUT_ID = "sentence-spout";
    private static final String SPLIT_BOLT_ID = "split-bolt";
    private static final String COUNT_BOLT_ID = "count-bolt";
    private static final String REPORT_BOLT_ID = "report-bolt";
    private static final String TOPOLOGY_NAME = "word-count-topology";

    public static void main(String[] args) throws Exception {

        SentenceSpout spout = new SentenceSpout();
        SplitSentenceBolt splitBolt = new SplitSentenceBolt();
        WordCountBolt countBolt = new WordCountBolt();
        ReportBolt reportBolt = new ReportBolt();

        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
        // SentenceSpout --> SplitSentenceBolt
        builder.setBolt(SPLIT_BOLT_ID, splitBolt, 2)
                .setNumTasks(4)
                .shuffleGrouping(SENTENCE_SPOUT_ID);
        // SplitSentenceBolt --> WordCountBolt
        builder.setBolt(COUNT_BOLT_ID, countBolt, 4)
                .fieldsGrouping(SPLIT_BOLT_ID, new Fields("word"));
        // WordCountBolt --> ReportBolt
        builder.setBolt(REPORT_BOLT_ID, reportBolt).globalGrouping(COUNT_BOLT_ID);

        Config config = new Config();
        config.setNumWorkers(2);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
        Utils.waitForSeconds(10);
        cluster.killTopology(TOPOLOGY_NAME);
        cluster.shutdown();
    }
}

Dieses Setup nutzt die KLasse LocalCluster, welche es ermöglicht die Topology auch lokal zu testen ohne dass sie auf ein Cluster deployt werden muss. Um ein Deployment auf ein Remote-Cluster durchzuführen wird die Klasse StormSubmitter genutzt. Dabei wird folgender Aufruf…

LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());

…durch die Nutzung der StormSubmitter-Klasse ersetzt

StormSubmitter.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());

Nachdem die Topology deployt wurde (entweder lokal oder remote) sollte der Output ähnlich zu dem folgenden sein (getestet mit den Storm-Versionen 0.9.1-incubating und 0.9.3):

--- FINAL COUNTS ---
a : 2193
ate : 2193
beverages : 2194
cold : 2194
cow : 2193
dog : 4387
don't : 4386
fleas : 4388
has : 2194
have : 2193
homework : 2193
i : 6580
like : 4387
man : 2193
my : 4387
the : 2193
think : 2193
--------------

Nach einer umfangreicheren Einführung in Apache Storm, zusammen mit einem Beispiel, geht es im folgenden darum die Einsatzmöglichkeiten von Storm in Azure zu betrachten.

Welche Möglichkeiten bestehen auf Azure?

Azure bietet generell drei Möglichkeiten um Apache Storm einzusetzen:

  • Ein eigenes Setup
  • HDInsight
  • Nutzung eines Distributors über den Azure Marketplace

Die aufwändigste Lösung ist das Aufsetzen eines eigenen Storm-Clusters anhand der Azure Virtual Machines. Etwas angenehmer ist die Nutzung von HDInsight, wobei es hier einen eigenen Clustertypen für Storm gibt. Für einen Zwischenweg zwischen Aufwand und Kontrolle bieten seit kurzem die Hadoop-Distributoren Cloudera und Hortonworks eigene Umgebungen über den Azure Marketplace an.

Option #1 – Eigenes Setup

Das eigene Setup ist die umfangreichste Möglichkeit Storm auf Azure zu betreiben. Sie bietet zwar feingranulare Kontrolle, hat aber den entscheidenden Nachteil, dass das Cluster von Hand deployt und konfiguriert werden muss. Hierzu zählen in der Regel mindestens drei Zookeeper-Instanzen plus die gewünschte Anzahl an Storm-Knoten. In der Regel umfasst ein Storm-Cluster in der Minimalkonfiguration einen Nimbus (Master-Knoten) und zwei Supervisors (Worker-Knoten).

Auf Azure würde dieses Setup typischerweise auf einer Linux-Distrubtion in IaaS basieren.

Option #2 – HDInsight

Die zweite Möglichkeit ist derzeit die gängigste auf Azure: Die Nutzung von HDInsight. Seit diesem Jahr bietet HDInsight auch spezialisierte Cluster-Version an, die HBase auf der einen und Storm auf der anderen Seite umfassen.

Wie bereits in einem anderen Artikel diskutiert basiert HDInsight unter der Haube auf Hortonworks Data Platform (HDP).

Option #3 – Nutzung eines Distributors

Möchte man sich die Mühe sparen ein eigenens Cluster aufzusetzen (Option #1), aber mehr Kontrolle über sein Cluster zu haben als in HDInsight (Option #2) bieten die Distributoren Cloudera und Hortonworks über den Azure Marketplace ihre eigenen Distributionen im Schnell-Setup an.

Hortonworks

Cloudera

Wie man in den obenstehenden Abbildungen bereits sieht, kommt man bei der Wahl dieser Option nicht drum herum die Kerne-Limitierung der eigenen Subscription zu erhöhen (siehe rote Kästchen). Dies ist in einem gewissen Rahmen ohne Probleme möglich. Beachten muss man hierbei allerdings, dass die erhöhte Kernanzahl in der Regel auch eine erhöhte Gesamtabrechnung mit sich zieht.

Zusammenfassung

Storm ist eines der interessanteren Projekte aus dem Hadoop-Ökosystem, welches aber Alternativen bereithält (bspw. Spark, wenn Batch-Verarbeitung anstatt von Stream-Verarbeitung gewünscht ist oder weitere Komponenten, wie Machine Learning, eingesetzt werden sollen).

In Azure haben wir die Möglichkeit Storm auf drei verschiedenen Wegen zu nutzen. Entweder setzt man sein eigenes Cluster basierend auf Azure IaaS auf, nutzt HDInsight in der normalen Variante oder in der Storm-Version oder geht mit einem der etablierten Distributoren über den Azure Marketplace.

Lesenswertes

Jan (@Horizon_Net)

Überblick Azure Data Factory

Microsoft hat innerhalb des letztens Jahres viele interessante Dienste auf Azure veröffentlicht. Einen dieser Dienste habe ich mit HDInsight das letzte Mal kurz vorgestellt. Ein weiterer Dienst, der sich sehr gut mit HDInsight ergänzt, ist die Azure Data Factory.

Dieser Artikel erklärt, was die Azure Data Factory ist und wofür sie genutzt werden kann. Zusätzlich werden die erste Schritte mit dem neuen Dienst erklärt.

Continue reading

Überblick Azure HDInsight

Die Schlagworte Big Data und Hadoop sind seit einiger Zeit in aller Munde. Viele reden darüber, manche praktizieren es … und Microsoft bietet in Zusammenarbeit mit Hortonworks Hadoop auf seiner Cloud-Plattform Azure an, genannt HDInsight.
Mit HDInsight bringt Microsoft keine eigene Hadoop-Distribution raus, so wie es Cloudera und MapR machen, sondern baut auf Hortonworks Data Platform (HDP) auf.

Dieser Artikel bietet einen Kurzüberblick über Hadoop, wie es anhand von Azure HDInsight in der Cloud betrieben werden kann und welche Alternativen es zu HDInsight auf Azure oder anderen Cloud-Anbietern gibt um Hadoop zu betreiben.

Continue reading

Links of the month (November Edition)

This month was an(other) exciting one for all .NET developers, especially the ones working in the open source space. Earlier this year the .NET Foundation showed a way Microsoft could possible go in the future. The connect() event two weeks ago has exceeded the expectations made with the .NET Foundation. This Links of the month will share some posts related to the announcements.

Continue reading

Links of the month (October Edition)

An exciting month with a lot of announcements, especially related to Microsoft Azure, has passed. This Links of the month will focus mainly on the new features added to Azure.

Microsoft Azure

ASP.NET

Power BI

TypeScript

Conferences

Miscellaneous

Have fun!

Jan (@Horizon_Net)

Running RethinkDB on Azure

Over a year ago a friend of mine aroused my interest for RethinkDB. At that time I played a little bit with it and thought that it appears to be a nice database. After that I focused on other topics, but recently I came back to RethinkDB. In this post I would like to explain how you can setup RethinkDB on Azure and play around with it.

Continue reading

Big Data Latest and Greatest

Recently I had the opportunity to give a talk with the topic Big Data Latest and Greatest. Sounds like a marketing session? No, it is not … and I hope you will see it the same way. This blog post tries to extend this session.

What was the big idea behind this session? The Hadoop ecosystem is increasing every month and it is hard to keep in step with the latest versions and products.

That is just an excerpt of a whole bunch of technologies.

In this post we will discuss the following pieces:

Instead of digging deep into the technologies I will try to compare some of them and try to picture some use cases.

Continue reading