Skip to content

Streaming to Delta Sink, Sharp Increase in Batch Time after ~36h Using Delta-1.0.0 #859

@mlecuyer-nd

Description

@mlecuyer-nd

Background

Application is using Structured Streaming to do the following every batch:

  • Read from Kafka Topic and decode json data using a provided Schema
  • repartition to reduce number of files needed later on during compaction
  • Write in append mode to a Delta Table in S3
  • In a separate (python) thread, every 15 minutes compact the data by reading in the desired amount of data from Delta Table, invoke repartition() and writing out with dataChange=False flag set

Input Rate: ~2-6k records/second depending on time of day over 25 Kafka Partitions

When Moving over to Delta-1.0.0, a "degraded state" was noticed after about 48 hours of run-time, (and since shortened to shortened to ~36 hours.). In this degraded state, batch times significantly increase, but the job continues to run. This has an almost harmonic effect, as when the batch time increases, more data has to be read, which leads to longer batch times, which leads to more backlog of data, ad infinitum.

Expected Behavior

  • Batch times will be relatively consistent
  • Behavior will be approximately the same between Delta 0.8.0 and Delta 1.0.0

Observed Behavior

  • Batch times spike up from ~15-20 Seconds up to ~5 minutes when using Delta 1.0.0 (and even higher on batches the require Delta Log Compaction/Checkpointing)
  • On job restart, batch time instantly recovers and is good for ~36 more hours
  • A Spark job with 10k tasks at $anonfun$gc$1 at DatabricksLogging.scala:77 shows up only when using Delta 1.0.0 that takes upwards of 4 minutes.
    More complete stack trace:
org.apache.spark.sql.delta.commands.VacuumCommand$.$anonfun$gc$1(VacuumCommand.scala:239)
com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
org.apache.spark.sql.delta.commands.VacuumCommand$.recordOperation(VacuumCommand.scala:49)
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:106)
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:91)
org.apache.spark.sql.delta.commands.VacuumCommand$.recordDeltaOperation(VacuumCommand.scala:49)
org.apache.spark.sql.delta.commands.VacuumCommand$.gc(VacuumCommand.scala:101)
io.delta.tables.execution.VacuumTableCommand.run(VacuumTableCommand.scala:69)
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3724)
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)

Current Debugging Done

  • Running Concurrent jobs in Functioning and Non Functioning Environment for comparison
  • Adjusting Dynamic Allocation on/off
  • Adjusting YARN resource calculator in case it was resource-starvation

Debugging yet-to-be-done

  • Run example writing to parquet instead of delta to rule out EMR/Spark versioning Issues
  • Move Compaction and Vacuuming to another Driver to rule out memory/GC issues due to competing jobs

Environment and Configs

Node Count/Size

  • Master: 1x i3.4xLarge
  • Core: 2-3x i3.4xLarge
  • Task: 0

Storage

  • Aws S3, no VPC Endpoint

Functioning Environment

  • EMR 6.1 (Spark 3.0.0)
  • Delta 0.8.0, scala 12
  • Additional Packages:
     log4j:apache-log4j-extras:1.2.17,
     org.apache.kafka:kafka-clients:2.6.0,
     org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0

Non-Functioning Environment

  • EMR 6.3 (Spark 3.1.0)
  • Delta 1.0.0, Scala 12
  • Additional Packages:
    log4j:apache-log4j-extras:1.2.17
    io.delta:delta-core_2.12:1.0.0
    com.qubole.spark/spark-sql-kinesis_2.12/1.2.0_spark-3.0 
    org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2

Spark Configs

Application Configs

  • Run using:
spark-submit --deploy-mode cluster --master yarn --py-files MY_HELPERS --packages MY_PACKAGES --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.sql.parquet.outputTimestampType=INT96 s3://PATH_TO_MY_SCRIPT

        .set('spark.scheduler.mode', 'FAIR') \
        .set("spark.executor.cores", CORE_VALUE) \
        .set("spark.executor.memory", MEMORY_VALUE)\
        .set('spark.dynamicAllocation.enabled', 'true')\
        .set('spark.sql.files.maxPartitionBytes', '1073741824') \
        .set('spark.driver.maxResultSize', 0) \
        .set('spark.dynamicAllocation.minExecutors','3')\
        .set('spark.executor.heartbeatInterval', '25000') \
        .set('spark.databricks.delta.vacuum.parallelDelete.enabled', 'true') \
        .set('spark.databricks.delta.retentionDurationCheck.enabled', 'false') \
        .set('spark.databricks.delta.checkpoint.partSize', '1000000') \
        .set('spark.databricks.delta.snapshotPartitions', '150')

Output Configs

                .option('maxRecordsPerFile', 3000000) \
                .option('mergeSchema', 'true') \
                .option('checkpointLocation', output_location + table_name + f'/_checkpoints/{config["source_name"]}') \
                .partitionBy('HOURLY_TIMESTAMP_FIELD') \
                .start(output_location + table_name) \
                .awaitTermination()

Delta Table Configs

Non-Functioning Environment

            .property('delta.deletedFileRetentionDuration', '6 HOURS') \
            .property('delta.logRetentionDuration', '96 HOURS')

Functioning Environment

  • Default Settings

cc: @dennyglee following slack conversation

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions