-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Open
Labels
bugSomething isn't workingSomething isn't working
Description
Background
Application is using Structured Streaming to do the following every batch:
- Read from Kafka Topic and decode json data using a provided Schema
- repartition to reduce number of files needed later on during compaction
- Write in
append
mode to a Delta Table in S3 - In a separate (python) thread, every 15 minutes compact the data by reading in the desired amount of data from Delta Table, invoke
repartition()
and writing out withdataChange=False
flag set
Input Rate: ~2-6k records/second depending on time of day over 25 Kafka Partitions
When Moving over to Delta-1.0.0, a "degraded state" was noticed after about 48 hours of run-time, (and since shortened to shortened to ~36 hours.). In this degraded state, batch times significantly increase, but the job continues to run. This has an almost harmonic effect, as when the batch time increases, more data has to be read, which leads to longer batch times, which leads to more backlog of data, ad infinitum.
Expected Behavior
- Batch times will be relatively consistent
- Behavior will be approximately the same between Delta 0.8.0 and Delta 1.0.0
Observed Behavior
- Batch times spike up from ~15-20 Seconds up to ~5 minutes when using Delta 1.0.0 (and even higher on batches the require Delta Log Compaction/Checkpointing)
- On job restart, batch time instantly recovers and is good for ~36 more hours
- A Spark job with 10k tasks at
$anonfun$gc$1 at DatabricksLogging.scala:77
shows up only when using Delta 1.0.0 that takes upwards of 4 minutes.
More complete stack trace:
org.apache.spark.sql.delta.commands.VacuumCommand$.$anonfun$gc$1(VacuumCommand.scala:239)
com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:77)
com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:67)
org.apache.spark.sql.delta.commands.VacuumCommand$.recordOperation(VacuumCommand.scala:49)
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:106)
org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:91)
org.apache.spark.sql.delta.commands.VacuumCommand$.recordDeltaOperation(VacuumCommand.scala:49)
org.apache.spark.sql.delta.commands.VacuumCommand$.gc(VacuumCommand.scala:101)
io.delta.tables.execution.VacuumTableCommand.run(VacuumTableCommand.scala:69)
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:229)
org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3724)
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:110)
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135)
org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232)
Current Debugging Done
- Running Concurrent jobs in Functioning and Non Functioning Environment for comparison
- Adjusting Dynamic Allocation on/off
- Adjusting YARN resource calculator in case it was resource-starvation
Debugging yet-to-be-done
- Run example writing to parquet instead of delta to rule out EMR/Spark versioning Issues
- Move Compaction and Vacuuming to another Driver to rule out memory/GC issues due to competing jobs
Environment and Configs
Node Count/Size
- Master: 1x i3.4xLarge
- Core: 2-3x i3.4xLarge
- Task: 0
Storage
- Aws S3, no VPC Endpoint
Functioning Environment
- EMR 6.1 (Spark 3.0.0)
- Delta 0.8.0, scala 12
- Additional Packages:
log4j:apache-log4j-extras:1.2.17,
org.apache.kafka:kafka-clients:2.6.0,
org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0
Non-Functioning Environment
- EMR 6.3 (Spark 3.1.0)
- Delta 1.0.0, Scala 12
- Additional Packages:
log4j:apache-log4j-extras:1.2.17
io.delta:delta-core_2.12:1.0.0
com.qubole.spark/spark-sql-kinesis_2.12/1.2.0_spark-3.0
org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2
Spark Configs
Application Configs
- Run using:
spark-submit --deploy-mode cluster --master yarn --py-files MY_HELPERS --packages MY_PACKAGES --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog --conf spark.sql.parquet.outputTimestampType=INT96 s3://PATH_TO_MY_SCRIPT
.set('spark.scheduler.mode', 'FAIR') \
.set("spark.executor.cores", CORE_VALUE) \
.set("spark.executor.memory", MEMORY_VALUE)\
.set('spark.dynamicAllocation.enabled', 'true')\
.set('spark.sql.files.maxPartitionBytes', '1073741824') \
.set('spark.driver.maxResultSize', 0) \
.set('spark.dynamicAllocation.minExecutors','3')\
.set('spark.executor.heartbeatInterval', '25000') \
.set('spark.databricks.delta.vacuum.parallelDelete.enabled', 'true') \
.set('spark.databricks.delta.retentionDurationCheck.enabled', 'false') \
.set('spark.databricks.delta.checkpoint.partSize', '1000000') \
.set('spark.databricks.delta.snapshotPartitions', '150')
Output Configs
.option('maxRecordsPerFile', 3000000) \
.option('mergeSchema', 'true') \
.option('checkpointLocation', output_location + table_name + f'/_checkpoints/{config["source_name"]}') \
.partitionBy('HOURLY_TIMESTAMP_FIELD') \
.start(output_location + table_name) \
.awaitTermination()
Delta Table Configs
Non-Functioning Environment
.property('delta.deletedFileRetentionDuration', '6 HOURS') \
.property('delta.logRetentionDuration', '96 HOURS')
Functioning Environment
- Default Settings
cc: @dennyglee following slack conversation
justinTMcosmincatalin
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working