|
| 1 | +import csv |
| 2 | +import logging |
| 3 | +import time |
| 4 | + |
| 5 | +from django.core.management.base import BaseCommand |
| 6 | +from django.db.models import Exists |
| 7 | +from django.db.models import FilteredRelation |
| 8 | +from django.db.models import OuterRef |
| 9 | +from django.db.models import Q |
| 10 | +from django.db.models.expressions import F |
| 11 | +from django_cte import With |
| 12 | + |
| 13 | +from contentcuration.models import Channel |
| 14 | +from contentcuration.models import ContentNode |
| 15 | + |
| 16 | + |
| 17 | +logger = logging.getLogger(__name__) |
| 18 | + |
| 19 | + |
| 20 | +class Command(BaseCommand): |
| 21 | + """ |
| 22 | + Audits nodes that have imported content from public channels and whether the imported content |
| 23 | + has a missing source node. |
| 24 | +
|
| 25 | + TODO: this does not yet FIX them |
| 26 | + """ |
| 27 | + |
| 28 | + def handle(self, *args, **options): |
| 29 | + start = time.time() |
| 30 | + |
| 31 | + public_cte = self.get_public_cte() |
| 32 | + |
| 33 | + # preliminary filter on channels to those private and non-deleted, which have content |
| 34 | + # lft=1 is always true for root nodes, so rght>2 means it actually has children |
| 35 | + private_channels_cte = With( |
| 36 | + Channel.objects.filter( |
| 37 | + public=False, |
| 38 | + deleted=False, |
| 39 | + ) |
| 40 | + .annotate( |
| 41 | + non_empty_main_tree=FilteredRelation( |
| 42 | + "main_tree", condition=Q(main_tree__rght__gt=2) |
| 43 | + ), |
| 44 | + ) |
| 45 | + .annotate( |
| 46 | + tree_id=F("non_empty_main_tree__tree_id"), |
| 47 | + ) |
| 48 | + .values("id", "name", "tree_id"), |
| 49 | + name="dest_channel_cte", |
| 50 | + ) |
| 51 | + |
| 52 | + # reduce the list of private channels to those that have an imported node |
| 53 | + # from a public channel |
| 54 | + destination_channels = ( |
| 55 | + private_channels_cte.queryset() |
| 56 | + .with_cte(public_cte) |
| 57 | + .with_cte(private_channels_cte) |
| 58 | + .filter( |
| 59 | + Exists( |
| 60 | + public_cte.join( |
| 61 | + ContentNode.objects.filter( |
| 62 | + tree_id=OuterRef("tree_id"), |
| 63 | + ), |
| 64 | + original_channel_id=public_cte.col.id, |
| 65 | + ) |
| 66 | + ) |
| 67 | + ) |
| 68 | + .values("id", "name", "tree_id") |
| 69 | + .order_by("id") |
| 70 | + ) |
| 71 | + |
| 72 | + logger.info("=== Iterating over private destination channels. ===") |
| 73 | + channel_count = 0 |
| 74 | + total_node_count = 0 |
| 75 | + |
| 76 | + with open("fix_missing_import_sources.csv", "w", newline="") as csv_file: |
| 77 | + csv_writer = csv.DictWriter( |
| 78 | + csv_file, |
| 79 | + fieldnames=[ |
| 80 | + "channel_id", |
| 81 | + "channel_name", |
| 82 | + "contentnode_id", |
| 83 | + "contentnode_title", |
| 84 | + "public_channel_id", |
| 85 | + "public_channel_name", |
| 86 | + "public_channel_deleted", |
| 87 | + ], |
| 88 | + ) |
| 89 | + csv_writer.writeheader() |
| 90 | + |
| 91 | + for channel in destination_channels.iterator(): |
| 92 | + node_count = self.handle_channel(csv_writer, channel) |
| 93 | + |
| 94 | + if node_count > 0: |
| 95 | + total_node_count += node_count |
| 96 | + channel_count += 1 |
| 97 | + |
| 98 | + logger.info("=== Done iterating over private destination channels. ===") |
| 99 | + logger.info(f"Found {total_node_count} nodes across {channel_count} channels.") |
| 100 | + logger.info(f"Finished in {time.time() - start}") |
| 101 | + |
| 102 | + def get_public_cte(self) -> With: |
| 103 | + # This CTE gets all public channels with their main tree info |
| 104 | + return With( |
| 105 | + Channel.objects.filter(public=True) |
| 106 | + .annotate( |
| 107 | + tree_id=F("main_tree__tree_id"), |
| 108 | + ) |
| 109 | + .values("id", "name", "deleted", "tree_id"), |
| 110 | + name="public_cte", |
| 111 | + ) |
| 112 | + |
| 113 | + def handle_channel(self, csv_writer: csv.DictWriter, channel: dict) -> int: |
| 114 | + public_cte = self.get_public_cte() |
| 115 | + channel_id = channel["id"] |
| 116 | + channel_name = channel["name"] |
| 117 | + tree_id = channel["tree_id"] |
| 118 | + |
| 119 | + missing_source_nodes = ( |
| 120 | + public_cte.join( |
| 121 | + ContentNode.objects.filter(tree_id=tree_id), |
| 122 | + original_channel_id=public_cte.col.id, |
| 123 | + ) |
| 124 | + .with_cte(public_cte) |
| 125 | + .annotate( |
| 126 | + public_channel_id=public_cte.col.id, |
| 127 | + public_channel_name=public_cte.col.name, |
| 128 | + public_channel_deleted=public_cte.col.deleted, |
| 129 | + ) |
| 130 | + .filter( |
| 131 | + Q(public_channel_deleted=True) |
| 132 | + | ~Exists( |
| 133 | + ContentNode.objects.filter( |
| 134 | + tree_id=public_cte.col.tree_id, |
| 135 | + node_id=OuterRef("original_source_node_id"), |
| 136 | + ) |
| 137 | + ) |
| 138 | + ) |
| 139 | + .values( |
| 140 | + "public_channel_id", |
| 141 | + "public_channel_name", |
| 142 | + "public_channel_deleted", |
| 143 | + contentnode_id=F("id"), |
| 144 | + contentnode_title=F("title"), |
| 145 | + ) |
| 146 | + ) |
| 147 | + |
| 148 | + # Count and log results |
| 149 | + node_count = missing_source_nodes.count() |
| 150 | + |
| 151 | + # TODO: this will be replaced with logic to correct the missing source nodes |
| 152 | + if node_count > 0: |
| 153 | + logger.info( |
| 154 | + f"{channel_id}:{channel_name}\t{node_count} node(s) with missing source nodes." |
| 155 | + ) |
| 156 | + row_dict = { |
| 157 | + "channel_id": channel_id, |
| 158 | + "channel_name": channel_name, |
| 159 | + } |
| 160 | + for node_dict in missing_source_nodes.iterator(): |
| 161 | + row_dict.update(node_dict) |
| 162 | + csv_writer.writerow(row_dict) |
| 163 | + |
| 164 | + return node_count |
0 commit comments