-
-
Notifications
You must be signed in to change notification settings - Fork 133
ref(schema): Move schema into the schema store and remove schema cache #231
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ref(schema): Move schema into the schema store and remove schema cache #231
Conversation
| fn generate_sequence_number(start_lsn: PgLsn, commit_lsn: PgLsn) -> String { | ||
| let start_lsn = u64::from(start_lsn); | ||
| let commit_lsn = u64::from(commit_lsn); | ||
|
|
||
| format!("{commit_lsn:016x}/{start_lsn:016x}") | ||
| } | ||
|
|
||
| /// Returns the BigQuery table id as [`String`] for a supplied [`TableName`]. | ||
| pub fn table_name_to_bigquery_table_id(table_name: &TableName) -> String { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lifted both functions out of the BigQueryDestination for more ergonomics.
| // We load the table schemas and check that they are correctly fetched. | ||
| let mut table_schemas = destination.load_table_schemas().await.unwrap(); | ||
| table_schemas.sort(); | ||
| assert_eq!(table_schemas[0], database_schema.orders_schema()); | ||
| assert_eq!(table_schemas[1], database_schema.users_schema()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to check schemas since now they are stored in the state store, which is tested already in pipeline_test.
| inner | ||
| .table_schemas | ||
| .iter() | ||
| .map(|(id, schema)| (*id, Arc::as_ref(schema).clone())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decided to just expose TableSchema instead of Arc<TableSchema> since this is used in tests and it will make them more ergonomic.
| fn get_table_schema( | ||
| &self, | ||
| table_id: &TableId, | ||
| ) -> impl Future<Output = EtlResult<Option<Arc<TableSchema>>>> + Send; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since TableSchema is immutable, we can just Arc it to avoid unnecessary clones.
This PR implements a revised schema handling mechanism with the following changes:
TableSchemais not anymore stored in the destination, which makes the implementation of a destination much easier and gives us more control over schema management.Destinationinterface is much cleaner without the need for the uglyinjectmethod.SchemaStoretrait has been added which exposes schema storage capabilities. This trait is not used in conjunction withStateStore. The rationale for splitting them up is to follow the single responsibility principle and also allow components to explicitly highlight their interest in theSchemaStoreonly.BigQueryDestinationhas been cleaned, simplified and slightly improved (additional improvements will follow).SchemaStorefor Postgres has been added which stores the table schema data into the same instance as the one used by the PostgresStateStore.PostgresStoreto make sure there are no regressions.