-
Notifications
You must be signed in to change notification settings - Fork 74
Description
Is there an existing issue for this?
- I have searched the existing issues
Current Behavior
Thank you for DQX development and a comprehensive ambition on Data Quality in Databricks.
I implemented DQX framework successfully in Lakehouse pipelines (e.g., Data quality controls, quarantine work flawlessly), but I don't seem to succeed in materializing DQ results via "observer" and listeners as described in documentation (see https://databrickslabs.github.io/dqx/docs/guide/summary_metrics/#writing-metrics-to-a-table-with-streaming). Screenshot of working pipeline implementation:
Tested both with serverless and defined job cluster.
Relevant pipeline code is attached below for bronze->silver transformation, I hope to have implemented correctly observer/listener metrics pattern:
# Import modules
from datetime import datetime
from pyspark import pipelines as dp
from pyspark.sql.types import StructType, TimestampType
from databricks.labs.dqx.config import InputConfig, OutputConfig
from utilities import conf, utils
# parameters
catalog_name = spark.conf.get(
"catalog_name", default="XXX"
)
metrics_table = "monitoring.dqx_summary_metrics"
# Define silver tables
def define_dq_view(table_name: str, schema: StructType, data_quality):
source_table = f"bronze.{table_name}"
dq_view = f"bronze.{table_name}_dq"
select_cols = [field.name for field in schema.fields]
@dp.table(
name=dq_view,
temporary=True,
comment=f"Data Quality (DQX) view for table `{source_table}`"
)
def _dq_view():
df = spark.readStream.table(source_table).select(select_cols)
# Extract timestamp field names
timestamp_fields = (field.name
for field in schema.fields
if isinstance(field.dataType, TimestampType))
for field_name in timestamp_fields:
if field_name in df.columns:
df = df.withColumn(field_name, utils.safe_to_timestamp(field_name))
df, observer = conf.dq_engine.apply_checks_by_metadata(df, data_quality)
return df
return _dq_view
def define_silver(table_name: str, table_comment: str, schema: StructType):
source_table = f"bronze.{table_name}"
dq_view = f"bronze.{table_name}_dq"
@dp.table(
name=f"silver.{table_name}",
comment=table_comment,
schema=schema
)
def _silver():
df = (
spark.readStream.table(dq_view)
)
# get rows without errors or warnings, and drop auxiliary columns
return conf.dq_engine.get_valid(df)
@dp.table(
name=f"silver.{table_name}_quarantine",
comment=f"Data Quality (DQX) quarantine for table `{source_table}`"
)
def _silver_quarantine():
df = (
spark.readStream.table(dq_view)
)
# get only rows with errors or warnings
return conf.dq_engine.get_invalid(df)
return _silver, _silver_quarantine
# Create pipeline
for data_entity in conf.get_all_data_entities():
data_entity_conf = conf.get_data_entity_conf(data_entity)
name = data_entity_conf["metadata"]["table"]["name"]
globals()[f"silver_{name}_dq"] = define_dq_view(
table_name=name,
schema=data_entity_conf["schemas"]["silver"],
data_quality = data_entity_conf["data_quality"]
)
globals()[f"silver_{name}"], globals()[f"silver_{name}_quarantine"] = define_silver(
table_name=name,
table_comment=data_entity_conf["metadata"]["table"]["comment"],
schema=data_entity_conf["schemas"]["silver"],
)
# Write per-microbatch metrics to a Delta table
listener = conf.dq_engine.get_streaming_metrics_listener(
input_config=InputConfig(location=f"bronze.{name}", is_streaming=True),
output_config=OutputConfig(location=f"silver.{name}"),
quarantine_config=OutputConfig(location=f"silver.{name}_quarantine"),
metrics_config=OutputConfig(location=metrics_table, mode="append"),
)
# add once per session; safe to call if not already present
spark.streams.addListener(listener)DQ Engine is initialized as such in a separate config file:
observer = DQMetricsObserver(name="dq_metrics_ingestion")
dq_engine = DQEngine(WorkspaceClient(), observer=observer)Expected Behavior
Metrics table materialized in configured destination monitoring.dqx_summary_metrics at the end of pipeline run.
Steps To Reproduce
- Define data streaming pipeline via lakeflow APIs
- Configure metrics materialization
- Run pipeline and wait till end of successful execution
- Check for metrics persistence in table (none found)
Cloud
Azure
Operating System
Windows