|
1 | 1 | """Classes to manage credential revocation.""" |
2 | 2 |
|
| 3 | +import asyncio |
3 | 4 | import json |
4 | 5 | import logging |
5 | 6 | from typing import Mapping, Optional, Sequence, Text, Tuple |
6 | 7 |
|
| 8 | +from ..cache.base import BaseCache |
7 | 9 | from ..connections.models.conn_record import ConnRecord |
8 | 10 | from ..core.error import BaseError |
9 | | -from ..core.profile import Profile |
| 11 | +from ..core.profile import Profile, ProfileSession |
| 12 | +from ..indy.credx.issuer import CATEGORY_REV_REG |
10 | 13 | from ..indy.issuer import IndyIssuer |
| 14 | +from ..ledger.base import BaseLedger |
| 15 | +from ..messaging.responder import BaseResponder |
| 16 | +from ..protocols.endorse_transaction.v1_0.manager import ( |
| 17 | + TransactionManager, |
| 18 | + TransactionManagerError, |
| 19 | +) |
| 20 | +from ..protocols.endorse_transaction.v1_0.util import ( |
| 21 | + get_endorser_connection_id, |
| 22 | +) |
11 | 23 | from ..protocols.issue_credential.v1_0.models.credential_exchange import ( |
12 | 24 | V10CredentialExchange, |
13 | 25 | ) |
|
17 | 29 | from ..protocols.revocation_notification.v1_0.models.rev_notification_record import ( |
18 | 30 | RevNotificationRecord, |
19 | 31 | ) |
20 | | -from ..storage.error import StorageNotFoundError |
| 32 | +from ..storage.error import StorageError, StorageNotFoundError |
21 | 33 | from .indy import IndyRevocation |
22 | 34 | from .models.issuer_cred_rev_record import IssuerCredRevRecord |
23 | 35 | from .models.issuer_rev_reg_record import IssuerRevRegRecord |
24 | 36 | from .util import notify_pending_cleared_event, notify_revocation_published_event |
25 | 37 |
|
| 38 | +LOGGER = logging.getLogger(__name__) |
| 39 | + |
26 | 40 |
|
27 | 41 | class RevocationManagerError(BaseError): |
28 | 42 | """Revocation manager error.""" |
@@ -410,3 +424,140 @@ async def set_cred_revoked_state( |
410 | 424 | await txn.commit() |
411 | 425 | except StorageNotFoundError: |
412 | 426 | pass |
| 427 | + |
| 428 | + async def _get_endorser_info(self) -> Tuple[Optional[str], Optional[ConnRecord]]: |
| 429 | + connection_id = await get_endorser_connection_id(self._profile) |
| 430 | + |
| 431 | + endorser_did = None |
| 432 | + async with self._profile.session() as session: |
| 433 | + connection_record = await ConnRecord.retrieve_by_id(session, connection_id) |
| 434 | + endorser_info = await connection_record.metadata_get(session, "endorser_info") |
| 435 | + endorser_did = endorser_info.get("endorser_did") |
| 436 | + |
| 437 | + return endorser_did, connection_record |
| 438 | + |
| 439 | + async def fix_and_publish_from_invalid_accum_err(self, err_msg: str): |
| 440 | + """Fix and publish revocation registry entries from invalid accumulator error.""" |
| 441 | + cache = self._profile.inject_or(BaseCache) |
| 442 | + |
| 443 | + async def check_retry(accum): |
| 444 | + """Used to manage retries for fixing revocation registry entries.""" |
| 445 | + retry_value = await cache.get(accum) |
| 446 | + if not retry_value: |
| 447 | + await cache.set(accum, 5) |
| 448 | + else: |
| 449 | + if retry_value > 0: |
| 450 | + await cache.set(accum, retry_value - 1) |
| 451 | + else: |
| 452 | + LOGGER.error( |
| 453 | + f"Revocation registry entry transaction failed for {accum}" |
| 454 | + ) |
| 455 | + |
| 456 | + def get_genesis_transactions(): |
| 457 | + """Get the genesis transactions needed for fixing broken accum.""" |
| 458 | + genesis_transactions = self._profile.context.settings.get( |
| 459 | + "ledger.genesis_transactions" |
| 460 | + ) |
| 461 | + if not genesis_transactions: |
| 462 | + write_ledger = self._profile.context.injector.inject(BaseLedger) |
| 463 | + pool = write_ledger.pool |
| 464 | + genesis_transactions = pool.genesis_txns |
| 465 | + return genesis_transactions |
| 466 | + |
| 467 | + async def sync_accumulator(session: ProfileSession): |
| 468 | + """Sync the local accumulator with the ledger and create recovery txn.""" |
| 469 | + rev_reg_record = await IssuerRevRegRecord.retrieve_by_id( |
| 470 | + session, rev_reg_entry.name |
| 471 | + ) |
| 472 | + |
| 473 | + # Fix and get the recovery transaction |
| 474 | + ( |
| 475 | + rev_reg_delta, |
| 476 | + recovery_txn, |
| 477 | + applied_txn, |
| 478 | + ) = await rev_reg_record.fix_ledger_entry( |
| 479 | + self._profile, False, genesis_transactions |
| 480 | + ) |
| 481 | + |
| 482 | + # Update locally assuming ledger write will succeed |
| 483 | + rev_reg = await session.handle.fetch( |
| 484 | + CATEGORY_REV_REG, |
| 485 | + rev_reg_entry.value_json["revoc_reg_id"], |
| 486 | + for_update=True, |
| 487 | + ) |
| 488 | + new_value_json = rev_reg.value_json |
| 489 | + new_value_json["value"]["accum"] = recovery_txn["value"]["accum"] |
| 490 | + await session.handle.replace( |
| 491 | + CATEGORY_REV_REG, |
| 492 | + rev_reg.name, |
| 493 | + json.dumps(new_value_json), |
| 494 | + rev_reg.tags, |
| 495 | + ) |
| 496 | + |
| 497 | + return rev_reg_record, recovery_txn |
| 498 | + |
| 499 | + async def create_and_send_endorser_txn(): |
| 500 | + """Create and send the endorser transaction again.""" |
| 501 | + async with ledger: |
| 502 | + # Create the revocation registry entry |
| 503 | + rev_entry_res = await ledger.send_revoc_reg_entry( |
| 504 | + rev_reg_entry.value_json["revoc_reg_id"], |
| 505 | + "CL_ACCUM", |
| 506 | + recovery_txn, |
| 507 | + rev_reg_record.issuer_did, |
| 508 | + write_ledger=False, |
| 509 | + endorser_did=endorser_did, |
| 510 | + ) |
| 511 | + |
| 512 | + # Send the transaction to the endorser again with recovery txn |
| 513 | + transaction_manager = TransactionManager(self._profile) |
| 514 | + try: |
| 515 | + revo_transaction = await transaction_manager.create_record( |
| 516 | + messages_attach=rev_entry_res["result"], |
| 517 | + connection_id=connection.connection_id, |
| 518 | + ) |
| 519 | + ( |
| 520 | + revo_transaction, |
| 521 | + revo_transaction_request, |
| 522 | + ) = await transaction_manager.create_request(transaction=revo_transaction) |
| 523 | + except (StorageError, TransactionManagerError) as err: |
| 524 | + raise RevocationManagerError(err.roll_up) from err |
| 525 | + |
| 526 | + responder = self._profile.inject_or(BaseResponder) |
| 527 | + if not responder: |
| 528 | + raise RevocationManagerError( |
| 529 | + "No responder found. Unable to send transaction request" |
| 530 | + ) |
| 531 | + await responder.send( |
| 532 | + revo_transaction_request, |
| 533 | + connection_id=connection.connection_id, |
| 534 | + ) |
| 535 | + |
| 536 | + async with self._profile.session() as session: |
| 537 | + rev_reg_records = await session.handle.fetch_all( |
| 538 | + IssuerRevRegRecord.RECORD_TYPE |
| 539 | + ) |
| 540 | + # Cycle through all rev_rev_def records to find the offending accumulator |
| 541 | + for rev_reg_entry in rev_reg_records: |
| 542 | + ledger = session.inject_or(BaseLedger) |
| 543 | + # Get the value from the ledger |
| 544 | + async with ledger: |
| 545 | + (accum_response, _) = await ledger.get_revoc_reg_delta( |
| 546 | + rev_reg_entry.value_json["revoc_reg_id"] |
| 547 | + ) |
| 548 | + accum = accum_response.get("value", {}).get("accum") |
| 549 | + |
| 550 | + # If the accum from the ledger matches the error message, fix it |
| 551 | + if accum and accum in err_msg: |
| 552 | + await check_retry(accum) |
| 553 | + |
| 554 | + # Get the genesis transactions needed for fix |
| 555 | + genesis_transactions = get_genesis_transactions() |
| 556 | + |
| 557 | + # We know this needs endorsement |
| 558 | + endorser_did, connection = await self._get_endorser_info() |
| 559 | + rev_reg_record, recovery_txn = await sync_accumulator(session=session) |
| 560 | + await create_and_send_endorser_txn() |
| 561 | + |
| 562 | + # Some time in between re-tries |
| 563 | + await asyncio.sleep(1) |
0 commit comments