-
Notifications
You must be signed in to change notification settings - Fork 78
Transactional sitrep insert #10133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Transactional sitrep insert #10133
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -478,32 +478,10 @@ impl DataStore { | |
|
|
||
| let sitrep_id = sitrep.id(); | ||
|
|
||
| // Create the sitrep metadata record. | ||
| // | ||
| // NOTE: we must insert this record before anything else, because it's | ||
| // how orphaned sitreps are found when performing garbage collection. | ||
| // Were we to first insert some other records and insert the metadata | ||
| // record *last*, we could die when we have inserted some sitrep data | ||
| // but have yet to create the metadata record. If this occurs, those | ||
| // records could not be easily found by the garbage collection task. | ||
| // Those (unused) records would then be permanently leaked without | ||
| // manual human intervention to delete them. | ||
| diesel::insert_into(sitrep_dsl::fm_sitrep) | ||
| .values(model::SitrepMetadata::from(sitrep.metadata)) | ||
| .execute_async(&*conn) | ||
| .await | ||
| .map_err(|e| { | ||
| public_error_from_diesel(e, ErrorHandler::Server) | ||
| .internal_context("failed to insert sitrep metadata record") | ||
| })?; | ||
|
|
||
| // Create case records. | ||
| // | ||
| // We do this by collecting all the records for cases into big `Vec`s | ||
| // and inserting each category of case records in one big INSERT query, | ||
| // rather than doing smaller ones for each case in the sitrep. This uses | ||
| // more memory in Nexus but reduces the number of small db queries we | ||
| // perform. | ||
| // Collect case records into big `Vec`s so we can insert each category | ||
| // in one big INSERT query, rather than doing smaller ones for each case | ||
| // in the sitrep. This uses more memory in Nexus but reduces the number | ||
| // of small db queries we perform. | ||
| let mut cases = Vec::with_capacity(sitrep.cases.len()); | ||
| let mut alerts_requested = Vec::new(); | ||
| let mut case_ereports = Vec::new(); | ||
|
|
@@ -522,65 +500,136 @@ impl DataStore { | |
| )); | ||
| } | ||
|
|
||
| if !case_ereports.is_empty() { | ||
| diesel::insert_into(case_ereport_dsl::fm_ereport_in_case) | ||
| .values(case_ereports) | ||
| .execute_async(&*conn) | ||
| .await | ||
| .map_err(|e| { | ||
| public_error_from_diesel(e, ErrorHandler::Server) | ||
| .internal_context( | ||
| "failed to insert case ereport assignments", | ||
| // Sitrep insertion is transactional. This ensures that if Nexus | ||
| // crashes mid-insert, the transaction rolls back and no partial | ||
| // sitrep data is left behind. It also prevents the sitrep GC task | ||
| // from racing with insertion: without a transaction, GC could | ||
| // delete the metadata row while child rows are still being | ||
| // inserted, permanently leaking those children. | ||
| // | ||
| // See https://github.com/oxidecomputer/omicron/issues/10131 for | ||
| // details. | ||
| let metadata = model::SitrepMetadata::from(sitrep.metadata); | ||
| let err = OptionalError::new(); | ||
| self.transaction_retry_wrapper("fm_sitrep_insert") | ||
| .transaction(&conn, |conn| { | ||
| let err = err.clone(); | ||
| let metadata = metadata.clone(); | ||
| let case_ereports = case_ereports.clone(); | ||
| let alerts_requested = alerts_requested.clone(); | ||
| let cases = cases.clone(); | ||
| async move { | ||
| // Insert the sitrep metadata record. | ||
| diesel::insert_into(sitrep_dsl::fm_sitrep) | ||
| .values(metadata) | ||
| .execute_async(&conn) | ||
| .await | ||
| .map_err(|e| { | ||
| err.bail(InsertSitrepError::Other( | ||
| public_error_from_diesel( | ||
| e, | ||
| ErrorHandler::Server, | ||
| ) | ||
| .internal_context( | ||
| "failed to insert sitrep metadata record", | ||
| ), | ||
| )) | ||
| })?; | ||
|
|
||
| // Insert case ereport assignments. | ||
| if !case_ereports.is_empty() { | ||
| diesel::insert_into( | ||
| case_ereport_dsl::fm_ereport_in_case, | ||
| ) | ||
| })?; | ||
| } | ||
| .values(case_ereports) | ||
| .execute_async(&conn) | ||
| .await | ||
| .map_err(|e| { | ||
| err.bail(InsertSitrepError::Other( | ||
| public_error_from_diesel( | ||
| e, | ||
| ErrorHandler::Server, | ||
| ) | ||
| .internal_context( | ||
| "failed to insert case ereport assignments", | ||
| ), | ||
| )) | ||
| })?; | ||
| } | ||
|
|
||
| if !alerts_requested.is_empty() { | ||
| diesel::insert_into(alert_req_dsl::fm_alert_request) | ||
| .values(alerts_requested) | ||
| .execute_async(&*conn) | ||
| .await | ||
| .map_err(|e| { | ||
| public_error_from_diesel(e, ErrorHandler::Server) | ||
| .internal_context("failed to insert alert requests") | ||
| })?; | ||
|
Comment on lines
-543
to
-546
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was going to leave a comment saying that it's a bit of a shame to lose the context around which of these operations failed, but...I suppose that now, a lot of these failures aregetting retried, so logging them is a lot less important now... |
||
| } | ||
| // Insert alert requests. | ||
| if !alerts_requested.is_empty() { | ||
| diesel::insert_into(alert_req_dsl::fm_alert_request) | ||
| .values(alerts_requested) | ||
| .execute_async(&conn) | ||
| .await | ||
| .map_err(|e| { | ||
| err.bail(InsertSitrepError::Other( | ||
| public_error_from_diesel( | ||
| e, | ||
| ErrorHandler::Server, | ||
| ) | ||
| .internal_context( | ||
| "failed to insert alert requests", | ||
| ), | ||
| )) | ||
| })?; | ||
| } | ||
|
|
||
| if !cases.is_empty() { | ||
| diesel::insert_into(case_dsl::fm_case) | ||
| .values(cases) | ||
| .execute_async(&*conn) | ||
| .await | ||
| .map_err(|e| { | ||
| public_error_from_diesel(e, ErrorHandler::Server) | ||
| .internal_context("failed to insert case records") | ||
| })?; | ||
| } | ||
| // Insert case metadata records. | ||
| if !cases.is_empty() { | ||
| diesel::insert_into(case_dsl::fm_case) | ||
| .values(cases) | ||
| .execute_async(&conn) | ||
| .await | ||
| .map_err(|e| { | ||
| err.bail(InsertSitrepError::Other( | ||
| public_error_from_diesel( | ||
| e, | ||
| ErrorHandler::Server, | ||
| ) | ||
| .internal_context( | ||
| "failed to insert case records", | ||
| ), | ||
| )) | ||
| })?; | ||
| } | ||
|
|
||
| // Now, try to make the sitrep current. | ||
| let query = Self::insert_sitrep_version_query(sitrep_id); | ||
| query | ||
| .execute_async(&*conn) | ||
| .await | ||
| .map_err(|err| match err { | ||
| DieselError::DatabaseError( | ||
| DatabaseErrorKind::Unknown, | ||
| info, | ||
| ) if info.message() | ||
| == Self::PARENT_NOT_CURRENT_ERROR_MESSAGE => | ||
| { | ||
| InsertSitrepError::ParentNotCurrent(sitrep_id) | ||
| } | ||
| err => { | ||
| let err = | ||
| public_error_from_diesel(err, ErrorHandler::Server) | ||
| // Now, try to make the sitrep current. | ||
| let query = Self::insert_sitrep_version_query(sitrep_id); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One thing that occurred to me whilst thinking about this is that, IF AND ONLY IF we intend to always make the sitrep current as part of the same transaction as the insert, we could get rid of the CTE and just express the "make sitrep current" logic in Rust as part of the transaction code. I think it's probably wise to not do that as part of the same change as making it a txn, but it seemed worth noting. |
||
| query.execute_async(&conn).await.map_err(|e| match e { | ||
| DieselError::DatabaseError( | ||
| DatabaseErrorKind::Unknown, | ||
| ref info, | ||
| ) if info.message() | ||
| == Self::PARENT_NOT_CURRENT_ERROR_MESSAGE => | ||
| { | ||
| err.bail(InsertSitrepError::ParentNotCurrent( | ||
| sitrep_id, | ||
| )) | ||
| } | ||
| other => err.bail(InsertSitrepError::Other( | ||
| public_error_from_diesel( | ||
| other, | ||
| ErrorHandler::Server, | ||
| ) | ||
| .internal_context( | ||
| "failed to insert new sitrep version", | ||
| ); | ||
| InsertSitrepError::Other(err) | ||
| ), | ||
| )), | ||
| })?; | ||
|
|
||
| Ok(()) | ||
| } | ||
| }) | ||
| .map(|_| ()) | ||
| .await | ||
| .map_err(|e| match err.take() { | ||
| Some(err) => err, | ||
| None => InsertSitrepError::Other(public_error_from_diesel( | ||
| e, | ||
| ErrorHandler::Server, | ||
| )), | ||
| }) | ||
| } | ||
|
|
||
| // Uncastable sentinel used to detect we attempt to make a sitrep current when | ||
|
|
@@ -770,6 +819,10 @@ impl DataStore { | |
| /// sitreps will never be read, since sitreps are only read when they are | ||
| /// current,[^1] so they can safely be deleted at any time. | ||
| /// | ||
| /// Because sitrep insertion is transactional (see [`Self::fm_sitrep_insert`]), | ||
| /// orphaned sitreps are always fully formed: all child records (cases, | ||
| /// ereports, alert requests) are present. | ||
| /// | ||
| /// This query is used by the `fm_sitrep_gc` background task, which is | ||
| /// responsible for deleting orphaned sitreps. | ||
| /// | ||
|
|
@@ -913,7 +966,9 @@ impl DataStore { | |
| } = self | ||
| // Sitrep deletion is transactional to prevent a sitrep from being | ||
| // left in a partially-deleted state should the Nexus instance | ||
| // attempting the delete operation die suddenly. | ||
| // attempting the delete operation die suddenly. Sitrep insertion | ||
| // is also transactional (see `fm_sitrep_insert`), so the GC | ||
| // task cannot race with a partially-inserted sitrep. | ||
| .transaction_retry_wrapper("fm_sitrep_delete_all") | ||
| .transaction(&conn, |conn| { | ||
| let ids = ids.clone(); | ||
|
|
@@ -1654,6 +1709,13 @@ mod tests { | |
| Ok(listed_orphans) | ||
| } | ||
|
|
||
| /// Creates an orphaned sitrep by directly inserting a metadata row | ||
| /// into `fm_sitrep`, bypassing `fm_sitrep_insert`. | ||
| /// | ||
| /// Since `fm_sitrep_insert` is transactional, a failed CAS rolls | ||
| /// back the entire transaction (including the metadata row), so it | ||
| /// can no longer be used to create orphans in tests. Instead, we | ||
| /// insert the metadata row directly. | ||
|
Comment on lines
+1715
to
+1718
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Alternatively, we could have the test insert a sitrep (including the current sitrep table entry) and then go back and delete the entry in Not saying you have to do it that way, the current thing is also fine. |
||
| async fn insert_orphan( | ||
| datastore: &DataStore, | ||
| opctx: &OpContext, | ||
|
|
@@ -1662,31 +1724,27 @@ mod tests { | |
| v: usize, | ||
| i: usize, | ||
| ) { | ||
| let sitrep = fm::Sitrep { | ||
| metadata: fm::SitrepMetadata { | ||
| id: SitrepUuid::new_v4(), | ||
| inv_collection_id: CollectionUuid::new_v4(), | ||
| creator_id: OmicronZoneUuid::new_v4(), | ||
| comment: format!("test sitrep v{i}; orphan {i}"), | ||
| time_created: Utc::now(), | ||
| parent_sitrep_id, | ||
| }, | ||
| cases: Default::default(), | ||
| }; | ||
| match datastore.fm_sitrep_insert(&opctx, sitrep).await { | ||
| Ok(_) => { | ||
| panic!("inserting sitrep v{v} orphan {i} should not succeed") | ||
| } | ||
| Err(InsertSitrepError::ParentNotCurrent(id)) => { | ||
| orphans.insert(id); | ||
| } | ||
| Err(InsertSitrepError::Other(e)) => { | ||
| panic!( | ||
| "expected inserting sitrep v{v} orphan {i} to fail because \ | ||
| its parent is out of date, but saw an unexpected error: {e}" | ||
| ); | ||
| } | ||
| } | ||
| use nexus_db_schema::schema::fm_sitrep::dsl; | ||
|
|
||
| let id = SitrepUuid::new_v4(); | ||
| let metadata = model::SitrepMetadata::from(fm::SitrepMetadata { | ||
| id, | ||
| inv_collection_id: CollectionUuid::new_v4(), | ||
| creator_id: OmicronZoneUuid::new_v4(), | ||
| comment: format!("test sitrep v{v}; orphan {i}"), | ||
| time_created: Utc::now(), | ||
| parent_sitrep_id, | ||
| }); | ||
| let conn = datastore | ||
| .pool_connection_authorized(opctx) | ||
| .await | ||
| .expect("failed to get connection"); | ||
| diesel::insert_into(dsl::fm_sitrep) | ||
| .values(metadata) | ||
| .execute_async(&*conn) | ||
| .await | ||
| .expect("failed to insert orphan sitrep metadata"); | ||
| orphans.insert(id); | ||
| } | ||
|
|
||
| async fn make_sitrep_with_cases( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hrm, this brings up another performance concern about transactional insert besides the DB locking: this is potentially a lotta
memcpy, if the sitrep is big. This is probably not such a big deal to be worth worrying about, but I do keep wondering if there might be a nicer way to cajole the transaction retry stuff into working withArced or borrowed data so we don't have to bytewise copy it...NOT A BLOCKER FOR THIS PR MIND YOU