Skip to content

Conversation

@iambriccardo
Copy link
Contributor

@iambriccardo iambriccardo commented Aug 4, 2025

This PR implements a revised schema handling mechanism with the following changes:

  • The TableSchema is not anymore stored in the destination, which makes the implementation of a destination much easier and gives us more control over schema management.
  • The Destination interface is much cleaner without the need for the ugly inject method.
  • A new SchemaStore trait has been added which exposes schema storage capabilities. This trait is not used in conjunction with StateStore. The rationale for splitting them up is to follow the single responsibility principle and also allow components to explicitly highlight their interest in the SchemaStore only.
  • The BigQueryDestination has been cleaned, simplified and slightly improved (additional improvements will follow).
  • A new SchemaStore for Postgres has been added which stores the table schema data into the same instance as the one used by the Postgres StateStore.
  • Adds new integration tests for the PostgresStore to make sure there are no regressions.

Comment on lines 28 to 36
fn generate_sequence_number(start_lsn: PgLsn, commit_lsn: PgLsn) -> String {
let start_lsn = u64::from(start_lsn);
let commit_lsn = u64::from(commit_lsn);

format!("{commit_lsn:016x}/{start_lsn:016x}")
}

/// Returns the BigQuery table id as [`String`] for a supplied [`TableName`].
pub fn table_name_to_bigquery_table_id(table_name: &TableName) -> String {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lifted both functions out of the BigQueryDestination for more ergonomics.

Comment on lines -76 to -80
// We load the table schemas and check that they are correctly fetched.
let mut table_schemas = destination.load_table_schemas().await.unwrap();
table_schemas.sort();
assert_eq!(table_schemas[0], database_schema.orders_schema());
assert_eq!(table_schemas[1], database_schema.users_schema());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to check schemas since now they are stored in the state store, which is tested already in pipeline_test.

inner
.table_schemas
.iter()
.map(|(id, schema)| (*id, Arc::as_ref(schema).clone()))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decided to just expose TableSchema instead of Arc<TableSchema> since this is used in tests and it will make them more ergonomic.

fn get_table_schema(
&self,
table_id: &TableId,
) -> impl Future<Output = EtlResult<Option<Arc<TableSchema>>>> + Send;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since TableSchema is immutable, we can just Arc it to avoid unnecessary clones.

@iambriccardo iambriccardo marked this pull request as ready for review August 5, 2025 12:44
@iambriccardo iambriccardo requested a review from a team as a code owner August 5, 2025 12:44
@iambriccardo iambriccardo merged commit 6713472 into main Aug 5, 2025
3 checks passed
@iambriccardo iambriccardo deleted the riccardobusetti/etl-225-move-table-schemas-into-the-state-store branch August 5, 2025 13:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants