Skip to content

Do I have to restart the Spark streaming job for new schema to take effect ? #309

@akshayar

Description

@akshayar

I was trying Abris library and consuming CDC record generated by Debezium.

val abrisConfig: FromAvroConfig = (AbrisConfig
  .fromConfluentAvro
  .downloadReaderSchemaByLatestVersion
  .andTopicNameStrategy(topicName)
  .usingSchemaRegistry(schemaRegistryURL))

val df=(spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokerServers)
  .option("subscribe", topicName)
  .load())

val deserializedAvro = (df
  .select(from_avro(col("value"), abrisConfig)
          .as("data"))
  .select(col("data.after.*")))
deserializedAvro.printSchema()

val query = (deserializedAvro
  .writeStream
  .format("console")
  .outputMode("append")
  .option("checkpointLocation", s"s3://$bucketName/checkpoints/$tableName")
  .trigger(Trigger.ProcessingTime("60 seconds"))
  .start())

I added column while the streaming job is running. I was expecting it to print the new col that I added. It did not. Does it not dynamically refresh the schema from the version information in the payload ?
Do I have to restart the spark streaming job to process/view new columns ?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions