Zum Hauptinhalt springen
Neoinsights

Spark Structured Streaming in Databricks: Performance-Probleme lösen

Kleine Dateien, wachsender State, langsamer werdende Batches: warum Spark Structured Streaming Pipelines mit der Zeit degradieren und wie man es behebt.

Mory KabaMory Kaba16 Min. Lesezeit

Daten in Echtzeit oder nahezu Echtzeit fließen zu sehen hat seinen eigenen Reiz und Charme. Der Großteil der Anwendungsfälle lassen sich noch immer mit traditioneller Batch-Verarbeitung oder Micro-Batch-Verarbeitung (häufiger ausgeführte Pipelines) lösen, aber es gibt einige legitime Fälle für Echtzeit-Datenanalyse, darunter:

  • Predictive Maintenance mit IoT-Sensordaten
  • Betrugserkennung mit Transaktionsdaten
  • Clickstream-Daten für Produkt-Analytics

Glücklicherweise erleichtern Plattformen wie Databricks die Implementierung von Streaming-Pipelines mit Spark Structured Streaming immer mehr. Databricks übernimmt dabei viele aufwändige Aufgaben im Hintergrund und erlaubt es, sich hauptsächlich auf das Design und die Implementierung der Streaming-Pipeline zu konzentrieren.

Mit dem Aufkommen von Lakeflow Spark Declarative Pipelines (ein deklarativer Ansatz zur Implementierung von Pipelines für Echtzeit oder Batch) wird dies noch einfacher. Darüber hinaus besitzt Spark Structured Streaming fast die gleiche API wie die DataFrame API, sodass kein komplett neues API erlernt werden muss, um Streaming-Daten zu transformieren.

Wenn die Implementierung immer einfacher wird, welche Herausforderungen bleiben dann bei Streaming-Pipelines in Databricks? Performance. Ich habe noch keine Streaming-Pipeline gesehen, deren Lese- oder Schreib-Performance mit der Zeit nicht abnimmt. In diesem Artikel untersuchen wir, warum das so ist und wie man es behebt.

Wichtigste Erkenntnisse

  • Spark Structured Streaming verarbeitet Daten in Micro-Batches, nicht in echter Echtzeit; die Latenz ist eine Funktion des Trigger-Intervalls
  • Stateful Operationen (Windowed Aggregations, Deduplizierung, Stream-Stream-Joins) benötigen über Batches hinweg gespeicherten State; immer ein Watermark setzen
  • Jeder Micro-Batch-Schreibvorgang in einen Delta Sink erstellt kleine Parquet-Dateien; ohne OPTIMIZE degradiert die Lese-Performance mit der Zeit
  • Die Anzahl der Shuffle-Partitionen ist beim Query-Start festgelegt, da der State Store nach Partitions-ID indiziert ist; eine Änderung erfordert einen Neustart
  • Das RocksDB State Store Backend ist für stateful Workloads deutlich stabiler als das standardmäßige In-Memory-Backend

Eine Structured Streaming Pipeline in Databricks implementieren

Jede Spark Structured Streaming Pipeline besteht aus drei Kernkomponenten: einer Quelle, einer Transformationsschicht und einem Sink. In Databricks sieht der kanonische Aufbau folgendermaßen aus: Daten werden von einem Message Broker wie Apache Kafka oder Azure Event Hubs gelesen, mit der vertrauten DataFrame API transformiert, und die Ergebnisse werden in eine Delta-Tabelle geschrieben, in der Regel als Teil einer Medallion Architecture.

Um das konkreter zu machen, nehmen wir einen typischen IoT-Anwendungsfall: Tausende von Sensoren, die jede Sekunde Temperatur- und Druckmessungen über Azure Event Hubs senden.

Lesen aus der Quelle

raw_stream = (
    spark.readStream
    .format("eventhubs")
    .options(**event_hubs_conf)
    .load()
)

Der readStream-Aufruf gibt einen Streaming DataFrame zurück. Ab diesem Punkt ist die DataFrame API nahezu identisch mit der, die in einem Batch-Kontext verwendet wird. Es können select, filter, withColumn, Joins gegen statische Lookup-Tabellen und so weiter angewendet werden.

Transformationen anwenden

from pyspark.sql.functions import col, from_json, to_timestamp, avg, window
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
 
schema = StructType([
    StructField("sensor_id", StringType()),
    StructField("temperature", DoubleType()),
    StructField("pressure", DoubleType()),
    StructField("event_time", StringType()),
])
 
parsed = (
    raw_stream
    .select(from_json(col("body").cast("string"), schema).alias("data"))
    .select("data.*")
    .withColumn("event_time", to_timestamp("event_time"))
    .withWatermark("event_time", "2 minutes")
    .groupBy(window("event_time", "5 minutes"), "sensor_id")
    .agg(avg("temperature").alias("avg_temp"), avg("pressure").alias("avg_pressure"))
)

Beachte das Watermark hier. Wir haben es mit einer windowed Aggregation zu tun, die eine stateful Operation ist; daher müssen wir Spark mitteilen, wann es sicher ist, Zustände zu verwerfen.

Schreiben in den Sink

query = (
    parsed.writeStream
    .format("delta")
    .outputMode("append")
    .option("checkpointLocation", "abfss://checkpoints@datalake.dfs.core.windows.net/iot-agg/")
    .trigger(processingTime="10 seconds")
    .toTable("silver.sensor_aggregates")
)

Databricks übernimmt den Rest. Die Streaming-Query läuft kontinuierlich, löst alle 10 Sekunden einen Micro-Batch aus, checkpointet den Fortschritt in ADLS und hängt die aggregierten Ergebnisse an die Delta-Tabelle an. Einfach aufzusetzen. Die Probleme kommen später.


Wichtige Konzepte in Structured Streaming

Bevor wir in die Details der Performance-Optimierung einer Spark Structured Streaming Pipeline eintauchen, schauen wir uns zunächst einige grundlegende Konzepte an.

Was sind Micro-Batches in Spark Structured Streaming?

Im Kontext von Spark Structured Streaming ist „Echtzeit-Verarbeitung” etwas irreführend. Spark Structured Streaming ist nicht im wörtlichen Sinne Echtzeit. Die Engine verarbeitet Daten vielmehr in einer Abfolge von Micro-Batches. Jeder Micro-Batch liest eine begrenzte Menge neuer Datensätze, verarbeitet sie, schreibt die Ausgabe und committet einen Checkpoint. Die Engine plant dann den nächsten Batch. Alles andere im System ergibt sich aus diesem Modell.

Ein Continuous Processing Mode wurde im Open-Source Spark eingeführt, wird jedoch in Databricks nicht unterstützt.

Trigger-Modi

Der Trigger-Modus steuert, wann ein Micro-Batch ausgelöst wird:

TriggerVerhalten
processingTime("10 seconds")Festes Intervall. Wartet auch, wenn keine neuen Daten vorhanden sind
onceFührt einen Batch aus, dann Stop (veraltet seit DBR 11.3 LTS / Spark 3.4)
availableNowVerarbeitet alle verfügbaren Daten in mehreren Batches, dann Stop. Der moderne Ersatz für once
realTime("5 minutes")Databricks-nativer Sub-Sekunden-Latenz-Modus; verarbeitet Daten kontinuierlich in langläufigen Batches. Das Argument ist das Checkpoint-Intervall. Erfordert DBR 16.4 LTS+, nur update-Modus
continuous("1 second")Echtes Continuous Processing (in Databricks nicht unterstützt)

Was ist der Unterschied zwischen stateless und stateful Processing in Spark?

Bei der stateless Verarbeitung kann die Query-Engine die Daten jedes Micro-Batches verarbeiten, ohne sich über zuvor verarbeitete Daten im Klaren sein zu müssen. Operationen wie select, filter oder Joins auf eine statische Tabelle sind einfach zu skalieren und benötigen kein „Gedächtnis” zuvor verarbeiteter Daten.

# All stateless - each record stands alone
df.filter(col("country") == "DE")
df.select("user_id", "event_type", upper(col("name")))
df.withColumn("amount_eur", col("amount_usd") * 0.92)
df.join(static_lookup_table, "product_id")

Bei der stateful Verarbeitung muss die Engine Informationen aus vorherigen Micro-Batches verfolgen, da das Ergebnis von diesen Informationen abhängt.

# All stateful - require state across batches
df.groupBy("user_id").count()                           # aggregation
df.groupBy(window("event_time", "5 min")).sum("amount") # windowed aggregation
df.dropDuplicates(["event_id"])                         # deduplication
stream_a.join(stream_b, "session_id")                   # stream-stream join
df.flatMapGroupsWithState(...)                          # arbitrary stateful logic

Alle Operationen im obigen Beispiel erfordern, dass die Engine Informationen über Micro-Batches hinweg in einem Zwischenzustand speichert. Mehr dazu in Kürze.

State Store

Der Mechanismus zur Verfolgung von Informationen über Micro-Batches hinweg ist ein In-Memory-Arbeitsspeicher, der entweder im Executor-Speicher lebt oder einen Off-Heap-Mechanismus namens RocksDB verwendet. Der State Store enthält die Zwischenergebnisse, die stateful Operatoren benötigen, um korrekte Ausgaben zu erzeugen.

Checkpointing

Der Checkpointing-Mechanismus ermöglicht es der Streaming-Query, fehlertolerant zu sein. Er verfolgt, welche Daten bisher verarbeitet wurden. Falls der Job aus irgendeinem Grund abstürzt, kann die Query den letzten Checkpoint nutzen, um nur Daten zu verarbeiten, die noch nicht verarbeitet wurden.

Der Checkpoint-Mechanismus wird durch periodische Schreibvorgänge in ein Checkpoint-Verzeichnis auf Objektspeicher (Azure Data Lake Storage oder AWS S3) implementiert:

df.writeStream \
  .option("checkpointLocation", "s3://bucket/checkpoints/my-query/") \
  .start()

Watermarking

Wie bereits erwähnt, erfordern stateful Operationen, dass Zwischeninformationen regelmäßig gespeichert werden, damit die Streaming-Query korrekte Ergebnisse liefert. Dieser Zustand kann mit der Zeit sehr groß werden und die Streaming-Query verlangsamen. Watermarking setzt einen Schwellenwert, den Spark nutzt, um zu bestimmen, wann es erlaubt ist, periodisch Zwischenzustände zu verwerfen.

windowed = (
    clicks
    .withWatermark("event_time", "2 minutes")   # evict state where event_time < max(event_time) - delay
    .groupBy(window("event_time", "10 minutes"), "user_id")
    .count()
)

Output-Modi

Der Output-Modus steuert, wie die Daten jedes Micro-Batches in den Sink geschrieben werden:

ModusSchreibtErfordert
appendNur neue Zeilen, die sich nie ändern werdenStateless ops oder Windowed Aggregations mit Watermark
updateZeilen, die sich seit dem letzten Batch geändert habenBeliebige Op (stateful oder stateless)
completeDie gesamte Ergebnistabelle bei jedem BatchAggregationen (aufwändig, bei großen Datenmengen vermeiden)

Das folgende Diagramm zeigt, wie sich jeder Output-Modus auf das auswirkt, was über drei aufeinanderfolgende Micro-Batches in den Sink geschrieben wird. Die Query berechnet eine Windowed-Zählung pro Nutzer mit einem 10-minütigen Tumbling Window und einem 5-minütigen Watermark. Im append-Modus wird ein Window erst emittiert, wenn der Watermark über sein Ende hinaus vorgerückt ist; daher schreibt Batch 1 nichts: W1 ist noch offen.

events \
    .withWatermark("event_time", "5 minutes") \
    .groupBy(window("event_time", "10 minutes"), "user") \
    .count()
WINDOWED AGGREGATION: SINK STATE ACROSS MICRO-BATCHES10-min tumbling window · 5-min watermark; append only emits finalized (closed) windowsBATCHINCOMING DATAappendonly new rows, never updatedupdatechanged rows since last batchcompleteentire result table every batchBatch 1t = 0sW1 user_A 3 eventsW1 user_B 2 eventswm=3min, W1 end=10min, still openW1 not yet closednothing writtenW1 user_A cnt=3W1 user_B cnt=2W1 user_A cnt=3W1 user_B cnt=2Batch 2t = 10sW1 user_A 5 eventsW2 user_B 1 eventwm=11min, W1 end=10min, W1 closedW1 closed, emit final countsW1 user_A cnt=8W1 user_B cnt=2only delta writtenW1 user_A cnt=8 (3+5)W2 user_B cnt=1full table rewrittenW1 user_A cnt=8 (3+5)W1 user_B cnt=2W2 user_B cnt=1Batch 3t = 20sW2 user_A 2 eventsW2 user_B 3 eventswm=21min, W2 end=20min, W2 closedW2 closed, emit final countsW2 user_A cnt=2W2 user_B cnt=4only delta writtenW2 user_A cnt=2W2 user_B cnt=4 (1+3)full table rewrittenW1 user_A cnt=8W1 user_B cnt=2W2 user_A cnt=2W2 user_B cnt=4 (1+3)New row written this batchRow updated (aggregate changed)Unchanged, re-written (complete only)Not written (append: window still open)

append ist fast immer bevorzugt, da es die geringste Write-Amplification erzeugt und die Ausgabe unveränderlich ist. Für stateful Windowed Aggregations erfordert es ein Watermark, um zu bestimmen, wann Windows final sind. Für unbegrenzte Aggregationen kann es überhaupt nicht verwendet werden; diese erfordern update oder complete.


Warum dein Streaming-Job langsamer wird

Ein frisch bereitgestellter Streaming-Job läuft tendenziell hervorragend. Die Batch-Latenz ist niedrig, der Throughput ist gesund, und der Cluster läuft reibungslos. Dann, irgendwo zwischen einigen Tagen und ein paar Wochen nach dem Deployment, beginnt die Verlangsamung. Batches, die früher in 2 Sekunden fertig waren, benötigen jetzt 15. Was ist passiert?

Es gibt vier häufige Ursachen: Delta Lake File-Proliferation, State Store Wachstum, Source Backlog Feedback Loops und Shuffle-Partition-Fehlkonfiguration.

Delta Lake File-Proliferation

Jeder Micro-Batch, der in einen Delta Sink schreibt, erstellt eine oder mehrere kleine Parquet-Dateien. Bei einem Trigger-Intervall von 10 Sekunden werden bis zu 8.640 Schreiboperationen pro Tag generiert, von denen jede kleine Dateien im Storage-Pfad der Tabelle ablegt. Dies führt mit der Zeit zu zwei Problemen.

Erstens Read Amplification: Jede nachgelagerte Query, die die Tabelle scannt, muss jetzt Tausende von winzigen Dateien öffnen und verarbeiten, anstatt einer Handvoll gut dimensionierter. Der Overhead pro Datei (Metadaten-Reads, S3/ADLS API-Aufrufe, Parquet Footer Parsing) dominiert, und der Throughput bricht ein.

Zweitens Delta Transaction Log Wachstum: Delta Lake pflegt ein _delta_log-Verzeichnis mit einem JSON-Eintrag für jede committete Transaktion. Wenn dieses Log wächst, muss der Streaming-Job selbst zu Beginn jedes Batches mehr Log-Einträge lesen, um festzustellen, welche Daten neu sind. Das fügt Latenz hinzu, bevor auch nur ein einziger Datensatz verarbeitet wurde.

State Store Wachstum

Für stateful Operationen akkumuliert Spark über Micro-Batches hinweg Zwischenzustände im State Store. Ohne Watermark, oder mit einem zu großzügigen Watermark, wächst dieser Zustand unbegrenzt. Je größer der State Store wird, desto länger dauert es, ihn zu lesen und zurückzuschreiben. Beim Standard-In-Memory-Backend führt dies schließlich zu GC-Druck und Executor-Instabilität. Beim RocksDB-Backend degradiert die Performance gradueller, aber sie degradiert dennoch.

Ein häufiger Fehler ist die Verwendung von dropDuplicates auf einem High-Cardinality-Schlüssel ohne Watermark. Spark muss sich jeden Schlüssel merken, den es je gesehen hat, um korrekt zu deduplizieren. Ohne Watermark wächst diese Menge endlos.

Source Backlog Growth

Wenn die Pipeline Ereignisse nicht so schnell verarbeiten kann wie sie eintreffen, nimmt jeder neue Micro-Batch einen größeren Datenschnitt auf. Größere Batches benötigen länger zur Verarbeitung, wodurch der Consumer noch weiter zurückfällt und im nächsten Zyklus noch größere Batches entstehen. Unkontrolliert führt diese Spirale dazu, dass Batches statt Sekunden Minuten laufen, während der Consumer Group Lag auf dem Message Broker unbegrenzt steigt.

Shuffle-Partition-Fehlkonfiguration

Stateful Operationen wie Windowed Aggregations und Stream-Stream-Joins shufflen Records zu den richtigen State-Partitionen. Spark steuert die Anzahl der Partitionen über spark.sql.shuffle.partitions. Der Standardwert von 200 ist für Streaming fast nie geeignet.

Zu wenige Partitionen: Jede Partition enthält einen großen Teil des States, Tasks laufen lange, und ein einzelner langsamer Task blockiert den gesamten Batch. Zu viele: Der Scheduling-Overhead pro Task überwiegt den Rechenaufwand auf einem kleinen Cluster, und es entstehen Tausende winziger Shuffle-Dateien. Im Gegensatz zu Batch-Jobs ist die Partitionsanzahl beim Query-Start festgelegt, da der State Store nach Partitions-ID indiziert ist und eine Änderung den State korrumpieren würde.


Wie man es behebt

Die gute Nachricht ist, dass für alle vier Ursachen bewährte Lösungen existieren.

Wie behebt man Delta Lake File-Proliferation mit OPTIMIZE?

Der OPTIMIZE-Befehl von Delta Lake kompaktiert kleine Dateien zu größeren, gut dimensionierten Parquet-Dateien. Für einen Streaming Sink empfiehlt es sich, dies regelmäßig auszuführen, typischerweise als nächtlicher Databricks Job, der die betroffenen Tabellen ansteuert.

OPTIMIZE silver.sensor_aggregates;

Falls die Tabelle groß ist und die meisten Queries nur aktuelle Daten betreffen, kann OPTIMIZE mit ZORDER auf den selektivsten Query-Spalten kombiniert werden:

OPTIMIZE silver.sensor_aggregates ZORDER BY (sensor_id, event_time);

OPTIMIZE sollte nicht synchron innerhalb des Streaming-Jobs selbst ausgeführt werden. Es blockiert den Schreib-Pfad und führt zu Latenz-Spitzen.

Liquid Clustering als langfristige Lösung

Wer Databricks Runtime 15.2 oder höher verwendet, sollte Liquid Clustering für Tabellen mit hohem Schreibvolumen in Betracht ziehen. Anstatt statischer Partitionierung oder ZORDER (die zum OPTIMIZE-Zeitpunkt angewendet werden und ohne vollständiges Rewrite nicht geändert werden können), verwendet Liquid Clustering einen flexiblen Clustering-Schlüssel, der Daten inkrementell als Teil des OPTIMIZE-Prozesses reorganisiert.

CREATE TABLE silver.sensor_aggregates
CLUSTER BY (sensor_id, event_time);

Der praktische Vorteil für Streaming Sinks besteht darin, dass man aufhört, gegen nicht passende Partition-Schemata anzukämpfen, wenn sich die Query-Muster weiterentwickeln. Das Clustering passt sich an, ohne dass eine Tabellenmigration erforderlich ist.

Partitionierung wohlüberlegt einsetzen

Es ist verlockend, einen Streaming Sink mit hohem Volumen nach Datum oder Stunde zu partitionieren. In der Praxis verschlimmert die Partitionierung einer Tabelle mit kleinen, häufigen Schreibvorgängen das Small-File-Problem oft, da jeder Micro-Batch seine Schreibvorgänge über mehrere Partitionsverzeichnisse verteilt. Sofern die Partitionen nicht groß genug sind, um bedeutsam zu sein (typischerweise mehrere hundert MB pro Partitionsverzeichnis pro Batch), sollte die Partitionierung des Sink-Tables vermieden und stattdessen OPTIMIZE die Dateilayout-Verwaltung überlassen werden.

State Store Wachstum eindämmen

Die Lösung hier ist einfach, wird aber während der initialen Implementierung leicht vergessen: Für stateful Operationen immer ein Watermark setzen. Das Watermark teilt Spark mit, dass jedes Ereignis, das älter als der Schwellenwert ist, aus dem Zustand entfernt werden kann, was den State Store begrenzt hält.

df.withWatermark("event_time", "10 minutes") \
  .groupBy(window("event_time", "5 minutes"), "sensor_id") \
  .count()

Speziell für Deduplizierung sollte das Watermark auf den Ereigniszeitstempel gesetzt und innerhalb dieses Fensters dedupliziert werden:

df.withWatermark("event_time", "1 hour") \
  .dropDuplicatesWithinWatermark(["event_id"])  # Spark 3.5 / DBR 13.3+

Falls der Anwendungsfall großen State erfordert und noch nicht geschehen, sollte das State Store Backend auf RocksDB umgestellt werden. Es lagert State auf Disk aus, anstatt ihn im Executor-Heap zu halten, was das GC-Verhalten weitaus vorhersehbarer macht:

spark.conf.set(
    "spark.sql.streaming.stateStore.providerClass",
    "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)

Source Backlog eindämmen

Rate-Limiting für die Quelle begrenzt die maximal pro Batch verarbeitete Datenmenge:

# Kafka
.option("maxOffsetsPerTrigger", 100_000)
 
# Azure Event Hubs
.option("eventhubs.maxEventsPerTrigger", 10_000)

Consumer Lag und Batch-Dauer sollten gemeinsam überwacht werden. Wenn der Lag wächst, während die Batch-Dauer konstant bleibt, sind mehr Parallelität oder ein größerer Cluster nötig, kein Rate-Limiting. Ein Rate-Cap auf einem zu kleinen Cluster verdeckt das Problem nur.

Die richtige Shuffle-Partitionsanzahl setzen

Ein praktikabler Ausgangspunkt ist das Zwei- bis Dreifache der Gesamtzahl der Cores im Cluster:

spark.conf.set("spark.sql.shuffle.partitions", 32)  # z. B. 4 Workers × 4 Cores

Den Wert beim Cluster-Start setzen, bevor die Streaming-Query beginnt. Da die Partitionsanzahl beim Query-Start festgelegt wird, ist ein Query-Neustart erforderlich, um eine Änderung wirksam zu machen. Den Wert neu bewerten, wenn sich Clustergröße oder Datenvolumen signifikant verändern.


Fazit

Spark Structured Streaming in Databricks ist wirklich gut designed. Die API ist sauber, die Fehlertoleranz ist solide, und eine erste Pipeline zum Laufen zu bringen dauert Stunden, nicht Tage. Aber die Performance über die Zeit ist eine andere Geschichte. Kleine Dateien akkumulieren still, das Transaction Log wächst, und der State Store Overhead schleicht sich ein, bis Batches, die früher in Sekunden liefen, Minuten benötigen.

Die Lösungen sind nicht kompliziert. OPTIMIZE regelmäßig ausführen, Liquid Clustering auf Tabellen mit hohem Schreibvolumen einsetzen, der Versuchung zur Über-Partitionierung widerstehen, Watermarks zu einem nicht verhandelbaren Teil jeder stateful Query machen, die Quelle rate-limitieren und Shuffle-Partitionen explizit setzen. Wer diese Gewohnheiten von Anfang an einbaut, wird feststellen, dass der Streaming-Job noch Monate nach dem Deployment sauber läuft.


Häufig gestellte Fragen

Was verursacht, dass Spark Structured Streaming Pipelines mit der Zeit langsamer werden?

Die vier häufigsten Ursachen sind: Delta Lake File-Proliferation (jeder Micro-Batch schreibt kleine Parquet-Dateien, die sich ansammeln), State Store Wachstum (stateful Operationen ohne Watermark akkumulieren unbegrenzt State), Source Backlog Feedback Loops (wenn Verarbeitungszeit größer ist als die Ankunftsrate, wächst jeder Batch) und Shuffle-Partition-Fehlkonfiguration (der Standardwert von 200 ist für die meisten Streaming-Workloads nie geeignet).

Wie oft sollte OPTIMIZE auf einem Delta Streaming Sink ausgeführt werden?

Für die meisten Produktions-Streaming-Sinks reicht ein nächtlicher OPTIMIZE-Job aus. Bei sehr hohem Schreibvolumen (viele tausend Micro-Batches pro Tag) kann es notwendig sein, ihn häufiger auszuführen. OPTIMIZE sollte niemals synchron innerhalb des Streaming-Jobs selbst ausgeführt werden, da es den Schreib-Pfad blockiert und Latenz-Spitzen erzeugt.

Warum sollte ich bei jeder stateful Operation ein Watermark setzen?

Ohne Watermark muss Spark alle Zwischenzustände unbegrenzt aufbewahren, da es nicht wissen kann, ob verspätete Ereignisse noch eintreffen könnten. Der State Store wächst endlos, was zu GC-Druck, Executor-Instabilität und graduell degradierender Batch-Performance führt. Das Watermark teilt Spark mit, wann es sicher ist, alten State zu verwerfen.

Was ist die richtige Anzahl von Shuffle-Partitionen für einen Streaming-Job?

Ein guter Ausgangspunkt ist das Zwei- bis Dreifache der Gesamtzahl der Cores im Cluster. Beim Standard-Spark-Wert von 200 für kleine Cluster entsteht ein hoher Scheduling-Overhead; zu wenige Partitionen führen zu Skew und langen Tasks. Der Wert muss beim Query-Start gesetzt werden, da der State Store nach Partitions-ID indiziert ist und eine nachträgliche Änderung den State korrumpieren würde.

Wann sollte Liquid Clustering statt ZORDER verwendet werden?

Liquid Clustering ist für alle neuen Tabellen auf Databricks Runtime 15.2+ zu bevorzugen. Es reorganisiert Daten inkrementell als Teil des OPTIMIZE-Prozesses und ermöglicht es, Clustering-Schlüssel zu ändern, ohne eine vollständige Tabellenmigration durchführen zu müssen. ZORDER wird zum OPTIMIZE-Zeitpunkt statisch angewendet und kann ohne vollständiges Rewrite nicht geändert werden.

Woran erkenne ich, dass meine Streaming-Pipeline ein Source-Backlog-Problem hat?

Überwache Consumer Group Lag auf dem Message Broker gemeinsam mit der Batch-Dauer. Wenn der Lag stetig wächst, während die Batch-Dauer konstant bleibt, fehlt die Verarbeitungskapazität. Wenn sowohl der Lag als auch die Batch-Dauer wachsen, liegt ein Feedback-Loop vor. Rate-Limiting verdeckt das Problem in beiden Fällen nur; die eigentliche Lösung ist mehr Parallelität oder ein größerer Cluster.

Mory Kaba

Mory Kaba

Senior Data Platform Engineer und Berater für Data Engineering, KI und Cloud-Architektur im DACH-Raum.