Skip to content

[BUG]: Materialize metrics in data streaming lakeflow pipelines #947

@flaprimo

Description

@flaprimo

Is there an existing issue for this?

  • I have searched the existing issues

Current Behavior

Thank you for DQX development and a comprehensive ambition on Data Quality in Databricks.
I implemented DQX framework successfully in Lakehouse pipelines (e.g., Data quality controls, quarantine work flawlessly), but I don't seem to succeed in materializing DQ results via "observer" and listeners as described in documentation (see https://databrickslabs.github.io/dqx/docs/guide/summary_metrics/#writing-metrics-to-a-table-with-streaming). Screenshot of working pipeline implementation:

Image

Tested both with serverless and defined job cluster.

Relevant pipeline code is attached below for bronze->silver transformation, I hope to have implemented correctly observer/listener metrics pattern:

# Import modules
from datetime import datetime
from pyspark import pipelines as dp
from pyspark.sql.types import StructType, TimestampType
from databricks.labs.dqx.config import InputConfig, OutputConfig
from utilities import conf, utils

# parameters
catalog_name = spark.conf.get(
    "catalog_name", default="XXX"
)
metrics_table = "monitoring.dqx_summary_metrics"

# Define silver tables
def define_dq_view(table_name: str, schema: StructType, data_quality):
    source_table = f"bronze.{table_name}"
    dq_view = f"bronze.{table_name}_dq"
    select_cols = [field.name for field in schema.fields]

    @dp.table(
        name=dq_view,
        temporary=True,
        comment=f"Data Quality (DQX) view for table `{source_table}`"
    )
    def _dq_view():
        df = spark.readStream.table(source_table).select(select_cols)

        # Extract timestamp field names
        timestamp_fields = (field.name
                            for field in schema.fields
                            if isinstance(field.dataType, TimestampType))
        for field_name in timestamp_fields:
            if field_name in df.columns:
                df = df.withColumn(field_name, utils.safe_to_timestamp(field_name))

        df, observer = conf.dq_engine.apply_checks_by_metadata(df, data_quality)

        return df
    
    return _dq_view

def define_silver(table_name: str, table_comment: str, schema: StructType):
    source_table = f"bronze.{table_name}"
    dq_view = f"bronze.{table_name}_dq"

    @dp.table(
        name=f"silver.{table_name}",
        comment=table_comment,
        schema=schema
    )
    def _silver():
        df = (
            spark.readStream.table(dq_view)
        )
        # get rows without errors or warnings, and drop auxiliary columns
        return conf.dq_engine.get_valid(df)

    @dp.table(
        name=f"silver.{table_name}_quarantine",
        comment=f"Data Quality (DQX) quarantine for table `{source_table}`"
    )
    def _silver_quarantine():
        df = (
            spark.readStream.table(dq_view)
        )
        # get only rows with errors or warnings
        return conf.dq_engine.get_invalid(df)

    return _silver, _silver_quarantine

# Create pipeline
for data_entity in conf.get_all_data_entities():
  data_entity_conf = conf.get_data_entity_conf(data_entity)
  name = data_entity_conf["metadata"]["table"]["name"]

  globals()[f"silver_{name}_dq"] = define_dq_view(
    table_name=name,
    schema=data_entity_conf["schemas"]["silver"],
    data_quality = data_entity_conf["data_quality"]
  )
    
  globals()[f"silver_{name}"], globals()[f"silver_{name}_quarantine"] = define_silver(
    table_name=name,
    table_comment=data_entity_conf["metadata"]["table"]["comment"],
    schema=data_entity_conf["schemas"]["silver"],
  )
  
  # Write per-microbatch metrics to a Delta table  
  listener = conf.dq_engine.get_streaming_metrics_listener(
    input_config=InputConfig(location=f"bronze.{name}", is_streaming=True),
    output_config=OutputConfig(location=f"silver.{name}"),
    quarantine_config=OutputConfig(location=f"silver.{name}_quarantine"),
    metrics_config=OutputConfig(location=metrics_table, mode="append"),
  )
  
  # add once per session; safe to call if not already present
  spark.streams.addListener(listener)

DQ Engine is initialized as such in a separate config file:

observer = DQMetricsObserver(name="dq_metrics_ingestion")
dq_engine = DQEngine(WorkspaceClient(), observer=observer)

Expected Behavior

Metrics table materialized in configured destination monitoring.dqx_summary_metrics at the end of pipeline run.

Steps To Reproduce

  1. Define data streaming pipeline via lakeflow APIs
  2. Configure metrics materialization
  3. Run pipeline and wait till end of successful execution
  4. Check for metrics persistence in table (none found)

Cloud

Azure

Operating System

Windows

Relevant log output

Metadata

Metadata

Assignees

Labels

bugSomething isn't working

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions