So schreiben Sie ein MapReduce-Framework in Python

Eine leicht zu befolgende Anleitung

"Ich habe versucht, so viel wie möglich zu erklären", sagt Poppet. "Ich glaube, ich habe eine Analogie über Kuchen gemacht." "Nun, das muss geklappt haben", sagt Widget. "Wer mag keine gute Kuchen-Analogie?" - Erin Morgenstern, The Night Circus

Ich weiß nicht, warum ich mich dazu entschlossen habe, diese Framework-Mapcakes zu benennen, aber ich liebe diesen Namen und jeder liebt Kuchen.

MapReduce ist ein elegantes Modell, das die Verarbeitung von Datensätzen mit vielen Dingen vereinfacht (a.k.a. große Datensätze). Als Ergebnis eines Wochenendprojekts folgt hier eine übermäßig vereinfachte Implementierung des Python MapReduce-Frameworks. In diesem Beitrag werden Sie durch die von mir ausgeführten Schritte und eine Beispielimplementierung zum Zählen von Wörtern für „Enthüllung einer Parallele: Eine Romanze“ aus dem Projekt gutenberg geführt. Die fertige Version des Codes liegt als Mapcakes auf Github vor. Hier sind einige Entscheidungen, die für die Implementierung getroffen wurden:

  • Wir werden CPython Version 2.7.6 verwenden.
  • Das Multiprocessing-Modul wird zum Erzeugen von Prozessen verwendet, indem die start () -Methode für ein erstelltes Process-Objekt aufgerufen wird.
  • Zu jedem Reduzierthread gibt es eine Ausgabedatei.
  • Die Ausgaben können am Ende zu einer einzigen Datei zusammengeführt werden.
  • Die Ergebnisse des Map-Schritts (sowie die Ausgabedateien für jeden reduzierten Thread) werden mithilfe von JavaScript Object Notation (JSON) im Speicher gespeichert.
  • Man kann wählen, diese Dateien zu löschen oder am Ende zu belassen.

Wenn Sie mit dem mapreduce-Framework noch nicht vertraut sind, finden Sie auf diesem Link bei Quora eine sanfte Einführung. Genießen :)

Implementieren der MapReduce-Klasse

Zunächst schreiben wir eine MapReduce-Klasse, die die Rolle einer vom Benutzer zu implementierenden Schnittstelle spielt. Diese Klasse verfügt über zwei Methoden: Mapper und Reducer, die später implementiert werden müssen (Eine Beispielimplementierung für eine Wortanzahl mit MapReduce wird unten im Abschnitt Beispiel für die Wortanzahl vorgestellt). Daher schreiben wir zunächst folgende Klasse:

Importeinstellungen
Klasse MapReduce (Objekt):
    "" MapReduce - Klasse, die das MapReduce - Modell darstellt
    hinweis: die methoden 'mapper' und 'reducer' müssen sein
    implementiert, um das mapreduce-Modell zu verwenden.
    "" "
    def __init __ (self, input_dir = settings.default_input_dir, output_dir = settings.default_output_dir,
                 n_mappers = settings.default_n_mappers, n_reducers = settings.default_n_reducers,
                 sauber = wahr):
        "" "
        : param input_dir: Verzeichnis der Eingabedateien,
        aus den Standardeinstellungen übernommen, falls nicht angegeben
        : param output_dir: Verzeichnis der Ausgabedateien,
        aus den Standardeinstellungen übernommen, falls nicht angegeben
        : param n_mappers: Anzahl der zu verwendenden Mapper-Threads,
        aus den Standardeinstellungen übernommen, falls nicht angegeben
        : param n_reducers: Anzahl der zu verwendenden Reducer-Threads,
        aus den Standardeinstellungen übernommen, falls nicht angegeben
        : param clean: optional, wenn True temporäre Dateien sind
        gelöscht, standardmäßig True.
        "" "
        self.input_dir = input_dir
        self.output_dir = output_dir
        self.n_mappers = n_mappers
        self.n_reducers = n_reducers
        self.clean = sauber
    def mapper (self, key, value):
        "" "gibt eine Liste von Schlüssel-Wert-Paaren aus, in denen sich der Schlüssel befindet
        potenziell neu und die Werte sind von einem potenziell anderen Typ.
        Hinweis: Diese Funktion ist zu implementieren.
        : param key:
        : param value:
        "" "
        bestehen
    Def Reducer (self, key, values_list):
        "" "Gibt einen einzelnen Wert zusammen mit dem bereitgestellten Schlüssel aus.
        Hinweis: Diese Funktion ist zu implementieren.
        : param key:
        : param value_list:
        "" "
        bestehen

Erläuterungen zu den verschiedenen Einstellungen finden Sie im folgenden Abschnitt zum Einstellungsmodul. Als nächstes müssen wir eine run () -Methode für die MapReduce-Klasse hinzufügen, die die Map ausführt und Operationen reduziert. Dazu müssen wir eine run_mapper (index) -Methode definieren (wobei index sich auf den aktuellen Thread bezieht), die den Mapper verwendet und die Ergebnisse auf der Festplatte speichert, sowie einen run_reducer (index), der den Reduktor auf die Ergebnisse des anwendet Karte und speichern Sie die Ergebnisse auf der Festplatte. Die run () -Methode erzeugt die gewünschte Anzahl von Zuordnern und dann die gewünschte Anzahl von Reduzierern. Das Process-Objekt aus dem Multiprocessing-Modul wird wie folgt verwendet:

def run_mapper (self, index):
    "" Führt den implementierten Mapper aus
    : param index: Der Index des Threads, auf dem ausgeführt werden soll
    "" "
    # einen Schlüssel lesen
    # einen Wert lesen
    # erhalte das Ergebnis des Mapper
    # Speichern Sie das vom Reduzierer zu verwendende Ergebnis
    bestehen
def run_reducer (self, index):
    "" "Führt den implementierten Reduzierer aus
    : param index: Der Index des Threads, auf dem ausgeführt werden soll
    "" "
    # Lade die Ergebnisse der Karte
    # für jede Taste verringern Sie die Werte
    # Speichern Sie die Ergebnisse für diesen Reduzierer
    bestehen
def run (self):
    "" "Führt die Map aus und reduziert Operationen
    "" "
    # Mapper-Liste initialisieren
    map_workers = []
    # Reduziererliste initialisieren
    rdc_workers = []
    # Führen Sie den Kartenschritt aus
    für thread_id in range (self.n_mappers):
        p = Prozess (target = self.run_mapper, args = (thread_id,))
        p.start ()
        map_workers.append (p)
    [t.join () für t in map_workers]
    # Führen Sie den Reduktionsschritt aus
    für thread_id in range (self.n_reducers):
        p = Prozess (target = self.run_reducer, args = (thread_id,))
        p.start ()
        map_workers.append (p)
    [t.join () für t in rdc_workers]

Jetzt müssen wir unsere Methoden run_mapper und run_reducer vervollständigen. Da diese Methoden jedoch das Lesen und Speichern von Daten aus einer Eingabedatei erfordern, erstellen wir zunächst eine FileHandler-Klasse. Diese Klasse teilt die Eingabedatei mithilfe einer split_file-Methode (number_of_splits) (wobei number of splits die Gesamtzahl der Chunks ist, die als Ergebnis der Aufteilung gewünscht werden). Die FileHandler-Klasse verknüpft die Ausgaben auch mit der ajoin_files-Methode (Anzahl_der_Dateien, Bereinigen, Sortieren, Verringern) (wobei Anzahl_der_Dateien die Gesamtzahl der zu verbindenden Dateien ist, Bereinigen, Sortieren und Verringern sind in unserem Fall optionale boolesche Argumente, die standardmäßig auf True gesetzt sind. clean gibt an, ob die temporären Dateien nach dem Join gelöscht werden sollen, sort gibt an, ob die Ergebnisse sortiert werden sollen, und descending gibt an, ob in umgekehrter Reihenfolge sortiert werden soll. In diesem Sinne schreiben wir zunächst das FileHandler-Objekt wie folgt:

Klasse FileHandler (Objekt):
    "" FileHandler-Klasse
    Verwaltet das Aufteilen von Eingabedateien und das Zusammenfügen von Ausgaben.
    "" "
    def __init __ (self, input_file_path, output_dir):
        "" "
        Hinweis: Der Pfad der Eingabedatei sollte zum Teilen angegeben werden.
        Das Ausgabeverzeichnis wird zum Zusammenführen der Ausgaben benötigt.
        : param Eingabedateipfad: Eingabedateipfad
        : param output_dir: Ausgabeverzeichnispfad
        "" "
        self.input_file_path = Eingabedateipfad
        self.output_dir = output_dir
    def split_file (self, number_of_splits):
        "" eine Datei in mehrere Dateien aufteilen.
        : param number_of_splits: Die Anzahl der Teilungen.
        "" "
        bestehen
    def join_files (self, number_of_files, clean = None, sort = True, descending = True):
        "" "fügen Sie alle Dateien im Ausgabeverzeichnis zu einem
        einzelne Ausgabedatei.
        : param number_of_files: Gesamtzahl der Dateien.
        : param clean: Wenn True, werden die Reduce-Ausgaben gelöscht.
        Standardmäßig wird der Wert von self.clean verwendet.
        : param sort: sortiert die Ausgaben.
        : Parameter absteigend: Sortieren nach absteigender Reihenfolge, hoher Wert
        zu niedriger Wert.
        : return output_join_list: Eine Liste der Ausgaben
        "" "
        bestehen

Wir schließen dann das Schreiben der Split- und Join-Methoden ab:

import os
Importieren Sie JSON
Klasse FileHandler (Objekt):
    "" FileHandler-Klasse
    Verwaltet das Aufteilen von Eingabedateien und das Zusammenfügen von Ausgaben.
    "" "
    def __init __ (self, input_file_path, output_dir):
        "" "
        Hinweis: Der Pfad der Eingabedatei sollte zum Teilen angegeben werden.
        Das Ausgabeverzeichnis wird zum Zusammenführen der Ausgaben benötigt.
        : param Eingabedateipfad: Eingabedateipfad
        : param output_dir: Ausgabeverzeichnispfad
        "" "
        self.input_file_path = Eingabedateipfad
        self.output_dir = output_dir
    def begin_file_split (self, split_index, index):
        "" initialisieren Sie eine geteilte Datei, indem Sie einen Index öffnen und hinzufügen.
        : param split_index: der geteilte Index, auf dem wir uns gerade befinden, um die Datei zu benennen.
        : param index: Der der Datei zugewiesene Index.
        "" "
        file_split = open (settings.get_input_split_file (split_index-1), "w +")
        file_split.write (str (index) + "\ n")
        return file_split
    def is_on_split_position (self, character, index, split_size, current_split):
        "" "Prüfen Sie, ob es der richtige Zeitpunkt ist, sich zu trennen.
        Das heißt: Zeichen ist ein Leerzeichen und das Limit wurde erreicht.
        : param Charakter: Der Charakter, an dem wir gerade arbeiten.
        : param index: Der Index, auf dem wir gerade sind.
        : param split_size: Die Größe jedes einzelnen Split.
        : param current_split: Der Split, an dem wir gerade arbeiten.
        "" "
        Rückgabeindex> split_size * current_split + 1 und character.isspace ()
    def split_file (self, number_of_splits):
        "" eine Datei in mehrere Dateien aufteilen.
        Hinweis: Dies wurde nicht optimiert, um Overhead zu vermeiden.
        : param number_of_splits: die Anzahl der Chunks bis
        Teilen Sie die Datei in.
        "" "
        file_size = os.path.getsize (self.input_file_path)
        unit_size = file_size / number_of_splits + 1
        original_file = open (self.input_file_path, "r")
        file_content = original_file.read ()
        original_file.close ()
        (index, current_split_index) = (1, 1)
        current_split_unit = self.begin_file_split (current_split_index, index)
        für Zeichen in file_content:
            current_split_unit.write (Zeichen)
            wenn self.is_on_split_position (zeichen, index, unit_size, current_split_index):
                current_split_unit.close ()
                current_split_index + = 1
                current_split_unit = self.begin_file_split (current_split_index, index)
            Index + = 1
        current_split_unit.close ()

Jetzt können wir unsere Methoden run_mapper und run_reducer wie folgt vervollständigen:

def run_mapper (self, index):
    "" Führt den implementierten Mapper aus
    : param index: Der Index des Threads, auf dem ausgeführt werden soll
    "" "
    input_split_file = open (settings.get_input_split_file (index), "r")
    key = input_split_file.readline ()
    value = input_split_file.read ()
    input_split_file.close ()
    if (self.clean):
        os.unlink (settings.get_input_split_file (index))
    mapper_result = self.mapper (Schlüssel, Wert)
    für reducer_index in range (self.n_reducers):
        temp_map_file = open (settings.get_temp_map_file (index, reducer_index), "w +")
        json.dump ([(Schlüssel, Wert) für (Schlüssel, Wert) in mapper_result
                                    if self.check_position (key, reducer_index)]
                    , temp_map_file)
        temp_map_file.close ()
    
def run_reducer (self, index):
    "" "Führt den implementierten Reduzierer aus
    : param index: Der Index des Threads, auf dem ausgeführt werden soll
    "" "
    key_values_map = {}
    für mapper_index in range (self.n_mappers):
        temp_map_file = open (settings.get_temp_map_file (mapper_index, index), "r")
        mapper_results = json.load (temp_map_file)
        für (Schlüssel, Wert) in mapper_results:
            Wenn nicht (key_values_map eingeben):
                key_values_map [key] = []
            Versuchen:
                key_values_map [key] .append (value)
            außer Ausnahme, e:
                print "Ausnahme beim Einfügen des Schlüssels:" + str (e)
        temp_map_file.close ()
        wenn self.clean:
            os.unlink (settings.get_temp_map_file (mapper_index, index))
    key_value_list = []
    für key in key_values_map:
        key_value_list.append (self.reducer (key, key_values_map [key])
    output_file = open (settings.get_output_file (index), "w +")
    json.dump (key_value_list, output_file)
    output_file.close ()

Zuletzt haben wir die Methode run leicht modifiziert, damit der Benutzer angeben kann, ob die Ausgaben verknüpft werden sollen oder nicht. Die run Methode wird:

def run (self, join = False):
    "" "Führt die Map aus und reduziert Operationen
    : param join: True, wenn die Ausgaben verknüpft werden sollen, standardmäßig False.
    "" "
    # Mapper-Liste initialisieren
    map_workers = []
    # Reduziererliste initialisieren
    rdc_workers = []
    # Führen Sie den Kartenschritt aus
    für thread_id in range (self.n_mappers):
        p = Prozess (target = self.run_mapper, args = (thread_id,))
        p.start ()
        map_workers.append (p)
    [t.join () für t in map_workers]
    # Führen Sie den Reduktionsschritt aus
    für thread_id in range (self.n_reducers):
        p = Prozess (target = self.run_reducer, args = (thread_id,))
        p.start ()
        map_workers.append (p)
    [t.join () für t in rdc_workers]
    wenn mitmachen:
        self.join_outputs ()

Der endgültige Code befindet sich im github MapCakes-Repository unter folgendem Link: https://github.com/nidhog/mapcakes

Das Modul "Einstellungen"

Dieses Modul enthält die Standardeinstellungen und Hilfsprogrammfunktionen zum Generieren der Pfadnamen für die Eingabe-, Ausgabe- und temporären Dateien. Diese Dienstprogrammmethoden werden in den Kommentaren des folgenden Codeausschnitts beschrieben:

# Standardverzeichnis für die Eingabedateien festlegen
default_input_dir = "Eingabedateien"
# Standardverzeichnis für die temporären Kartendateien festlegen
default_map_dir = "temp_map_files"
# Standardverzeichnis für die Ausgabedateien festlegen
default_output_dir = "Ausgabedateien"
# Standardnummer für die Map setzen und Threads reduzieren
default_n_mappers = 4
default_n_reducers = 4
# gibt den Namen der Eingabedatei zurück, die in Blöcke aufgeteilt werden soll
def get_input_file (input_dir = None, extension = ".ext"):
    wenn nicht (input_dir ist None):
        return input_dir + "/ file" + extension
    return default_input_dir + "/ file" + extension
    
    
# gibt den Namen der aktuellen geteilten Datei zurück, die dem angegebenen Index entspricht
def get_input_split_file (index, input_dir = None, extension = ".ext"):
    wenn nicht (input_dir ist None):
        return input_dir + "/ file _" + str (index) + extension
    return default_input_dir + "/ file_" + str (index) + extension
        
        
# gibt den Namen der temporären Map-Datei zurück, die dem angegebenen Index entspricht
def get_temp_map_file (index, reducer, output_dir = None, extension = ".ext"):
    wenn nicht (output_dir ist None):
        return output_dir + "/ map_file_" + str (index) + "-" + str (reduzierer) + extension
    return default_output_dir + "/ map_file_" + str (index) + "-" + str (reduzierer) + extension
# gibt den Namen der Ausgabedatei mit dem entsprechenden Index zurück
def get_output_file (index, output_dir = None, extension = ".out"):
    wenn nicht (output_dir ist None):
        return output_dir + "/ reduction_file _" + str (index) + extension
    return default_output_dir + "/ reduction_file_" + str (index) + extension
        
# gibt den Namen der Ausgabedatei zurück
def get_output_join_file (output_dir = None, extension = ".out"):
    wenn nicht (output_dir ist None):
        return output_dir + "/ output" + extension
    return default_output_dir + "/ output" + extension

Beispiel für die Wortzahl

In diesem Beispiel wird davon ausgegangen, dass wir ein Dokument haben und die Anzahl der Vorkommen jedes Wortes im Dokument zählen möchten. Dazu müssen wir unsere Map definieren und Operationen reduzieren, damit wir die Mapper- und Reducer-Methoden der MapReduce-Klasse implementieren können. Die Lösung für die Wortzahl ist ziemlich einfach:

  • map: Wir teilen den Text auf, nehmen die Wörter, die nur ASCII-Zeichen enthalten, und setzen die Wörter in Kleinbuchstaben. Dann senden wir jedes Wort als Schlüssel mit der Zählung 1.
  • Verkleinern: Wir addieren einfach alle vorherigen Werte für jedes Wort.

Daher implementieren wir die MapReduce-Klasse wie folgt:

aus MapReduce importieren MapReduce
Import sys
Klasse WordCount (MapReduce):
    def __init __ (self, input_dir, output_dir, n_mappers, n_reducers):
        MapReduce .__ init __ (self, input_dir, output_dir, n_mappers, n_reducers)
    def mapper (self, key, value):
        "" "Kartenfunktion für das Beispiel der Wortzählung
        Hinweis: Jede Zeile muss in Wörter und jedes Wort getrennt werden
        muss in Kleinbuchstaben umgewandelt werden.
        "" "
        Ergebnisse = []
        default_count = 1
        # Zeile in Worte trennen
        für Wort in value.split ():
            if self.is_valid_word (word):
                # Kleinbuchstaben

Das war's Leute! : D

Danke fürs Lesen! :) Wenn du es genossen hast, drücke diesen Herzknopf unten. Würde mir viel bedeuten und es hilft anderen Leuten, die Geschichte zu sehen.

Lieben Sie Python? Hier ist ein Tutorial, wie Sie in weniger als einer Stunde einen Chatbot erstellen können.