From 0c2f4d428b8a743414e3b354d2a619bfa7957ee3 Mon Sep 17 00:00:00 2001 From: Andre Coullard Date: Tue, 19 May 2026 20:22:40 -0400 Subject: [PATCH 1/6] updated functions and new scraper script --- functions/src/bills/BillProcessor.ts | 18 +- functions/src/bills/updateBillReferences.ts | 3 + .../src/committees/updateCommitteeRosters.ts | 42 ++-- functions/src/events/scrapeEvents.ts | 15 +- .../src/members/createMemberSearchIndex.ts | 40 ++-- functions/src/scraper.ts | 86 ++++--- scripts/firebase-admin/runScrapers.ts | 217 +++++++++++++++++- 7 files changed, 334 insertions(+), 87 deletions(-) diff --git a/functions/src/bills/BillProcessor.ts b/functions/src/bills/BillProcessor.ts index 88af95fdf..e6626dfee 100644 --- a/functions/src/bills/BillProcessor.ts +++ b/functions/src/bills/BillProcessor.ts @@ -12,12 +12,22 @@ export type BillUpdates = Map> /** Base class for jobs that need to process all bills. */ export default abstract class BillProcessor { + protected court: number | string = currentGeneralCourt protected bills!: any[] protected billIds!: string[] protected committees!: Committee[] protected members!: Member[] protected cities!: City[] + static async runForCourt( + ProcessorClass: { new (): BillProcessor }, + court: number | string + ): Promise { + const p = new ProcessorClass() + p.court = court + await p.run() + } + static pubsub( Processor: { new (args?: any): BillProcessor }, topic: string, @@ -56,7 +66,7 @@ export default abstract class BillProcessor { abstract process(): Promise billPath(id?: string) { - return `/generalCourts/${currentGeneralCourt}/bills${id ? `/${id}` : ""}` + return `/generalCourts/${this.court}/bills${id ? `/${id}` : ""}` } protected async writeBills(updates: BillUpdates) { @@ -77,15 +87,15 @@ export default abstract class BillProcessor { .then(snap => snap.docs.map(d => d.data())) this.billIds = this.bills.map(b => b.id) this.cities = await db - .collection(`/generalCourts/${currentGeneralCourt}/cities`) + .collection(`/generalCourts/${this.court}/cities`) .get() .then(this.load(City)) this.committees = await db - .collection(`/generalCourts/${currentGeneralCourt}/committees`) + .collection(`/generalCourts/${this.court}/committees`) .get() .then(this.load(Committee)) this.members = await db - .collection(`/generalCourts/${currentGeneralCourt}/members`) + .collection(`/generalCourts/${this.court}/members`) .get() .then(this.load(Member)) } diff --git a/functions/src/bills/updateBillReferences.ts b/functions/src/bills/updateBillReferences.ts index 4d0594fed..861d92791 100644 --- a/functions/src/bills/updateBillReferences.ts +++ b/functions/src/bills/updateBillReferences.ts @@ -218,5 +218,8 @@ class UpdateBillReferences extends BillProcessor { } } +export const runUpdateBillReferences = (court: number | string) => + BillProcessor.runForCourt(UpdateBillReferences, court) + export const updateBillReferences = BillProcessor.scheduled(UpdateBillReferences) diff --git a/functions/src/committees/updateCommitteeRosters.ts b/functions/src/committees/updateCommitteeRosters.ts index 2720b7c1f..ddfdbea69 100644 --- a/functions/src/committees/updateCommitteeRosters.ts +++ b/functions/src/committees/updateCommitteeRosters.ts @@ -5,29 +5,31 @@ import { Member } from "../members/types" import { Committee } from "./types" import { currentGeneralCourt } from "../shared" +export async function runUpdateCommitteeRosters(court: number | string) { + const members = await db + .collection(`/generalCourts/${court}/members`) + .get() + .then(c => c.docs.map(d => d.data()).filter(Member.guard)) + const rosters = computeRosters(members) + + const writer = db.bulkWriter() + rosters.forEach((roster, id) => { + const update: DocUpdate = { + members: roster.map(m => ({ id: m.id, name: m.content.Name })) + } + writer.set( + db.doc(`/generalCourts/${court}/committees/${id}`), + update, + { merge: true } + ) + }) + await writer.close() +} + /** Updates the list of members in each committee. */ export const updateCommitteeRosters = runWith({ timeoutSeconds: 120 }) .pubsub.schedule("every 24 hours") - .onRun(async () => { - const members = await db - .collection(`/generalCourts/${currentGeneralCourt}/members`) - .get() - .then(c => c.docs.map(d => d.data()).filter(Member.guard)) - const rosters = computeRosters(members) - - const writer = db.bulkWriter() - rosters.forEach((roster, id) => { - const update: DocUpdate = { - members: roster.map(m => ({ id: m.id, name: m.content.Name })) - } - writer.set( - db.doc(`/generalCourts/${currentGeneralCourt}/committees/${id}`), - update, - { merge: true } - ) - }) - await writer.close() - }) + .onRun(() => runUpdateCommitteeRosters(currentGeneralCourt)) function computeRosters(members: Member[]) { const rosters = new Map() diff --git a/functions/src/events/scrapeEvents.ts b/functions/src/events/scrapeEvents.ts index 419bd505b..834b190ca 100644 --- a/functions/src/events/scrapeEvents.ts +++ b/functions/src/events/scrapeEvents.ts @@ -54,7 +54,7 @@ abstract class EventScraper { abstract listEvents(): Promise abstract getEvent(item: ListItem): Promise - private async run() { + private async run(skipCutoff = false) { const list = await this.listEvents().catch(logFetchError("event list")) if (!list) return @@ -67,7 +67,7 @@ abstract class EventScraper { event = await this.getEvent(item).catch(logFetchError("event", id)) if (!event) continue - if (event.startsAt.toMillis() < upcomingOrRecentCutoff.toMillis()) break + if (!skipCutoff && event.startsAt.toMillis() < upcomingOrRecentCutoff.toMillis()) break writer.set(db.doc(`/events/${event.id}`), event, { merge: true }) @@ -77,6 +77,10 @@ abstract class EventScraper { await writer.close() } + async runForBackfill() { + return this.run(true) + } + /** Parse the event start time in the time zone of the API. */ getEventStart(content: { EventDate: string; StartTime: string }) { const { year, month, day } = DateTime.fromISO(content.EventDate, { @@ -128,10 +132,11 @@ class SpecialEventsScraper extends EventScraper< } class SessionScraper extends EventScraper { - private court = currentGeneralCourt + private court: number - constructor() { + constructor(court: number = currentGeneralCourt) { super("every 60 minutes", 120) + this.court = court } async listEvents() { @@ -517,4 +522,6 @@ export const scrapeSingleHearingv2 = onCall( export const scrapeSpecialEvents = new SpecialEventsScraper().function export const scrapeSessions = new SessionScraper().function +export const scrapeSessionsForCourt = (court: number) => + new SessionScraper(court).runForBackfill() export const scrapeHearings = new HearingScraper().function diff --git a/functions/src/members/createMemberSearchIndex.ts b/functions/src/members/createMemberSearchIndex.ts index 289a6e804..cefe4a1af 100644 --- a/functions/src/members/createMemberSearchIndex.ts +++ b/functions/src/members/createMemberSearchIndex.ts @@ -2,26 +2,28 @@ import { pubsub } from "firebase-functions" import { db } from "../firebase" import { currentGeneralCourt } from "../shared" +export async function runCreateMemberSearchIndex(court: number | string) { + const members = await db + .collection(`/generalCourts/${court}/members`) + .get() + + const index = members.docs + .map(d => (d.exists && d.data().content) ?? {}) + .filter(Boolean) + // Strip out sponsored and cosponsored bills for size + .map(({ SponsoredBills, CoSponsoredBills, ...member }) => member) + .sort((m1, m2) => (m1.Name < m2.Name ? -1 : 1)) + + await db + .doc(`/generalCourts/${court}/indexes/memberSearch`) + .set({ + representatives: index.filter(d => d.Branch === "House"), + senators: index.filter(d => d.Branch === "Senate") + }) +} + /** Create a document that aggregates all legislative members for easier * searching on the client. */ export const createMemberSearchIndex = pubsub .schedule("every 24 hours") - .onRun(async () => { - const members = await db - .collection(`/generalCourts/${currentGeneralCourt}/members`) - .get() - - const index = members.docs - .map(d => (d.exists && d.data().content) ?? {}) - .filter(Boolean) - // Strip out sponsored and cosponsored bills for size - .map(({ SponsoredBills, CoSponsoredBills, ...member }) => member) - .sort((m1, m2) => (m1.Name < m2.Name ? -1 : 1)) - - await db - .doc(`/generalCourts/${currentGeneralCourt}/indexes/memberSearch`) - .set({ - representatives: index.filter(d => d.Branch === "House"), - senators: index.filter(d => d.Branch === "Senate") - }) - }) + .onRun(() => runCreateMemberSearchIndex(currentGeneralCourt)) diff --git a/functions/src/scraper.ts b/functions/src/scraper.ts index deda99093..bc4037d26 100644 --- a/functions/src/scraper.ts +++ b/functions/src/scraper.ts @@ -117,41 +117,67 @@ export function createScraper({ const fetchBatch = runWith({ timeoutSeconds: fetchBatchTimeout }) .firestore.document(`/scrapers/${resourceName}/batches/{batchId}`) .onCreate(async snap => { - const batch = snap.data() as Batch, - court = batch.court, - writer = db.bulkWriter() - - for (const id of batch.ids) { - try { - const path = `/generalCourts/${court}/${resourceName}/${id}` - const current = await db.doc(path).get() - const resource = await fetchResource(court, id, current.data()) - - writer.set( - db.doc(path), - { - ...resource, - fetchedAt: Timestamp.now(), - lastFetch: FieldValue.delete(), - id, - court - }, - { merge: true } - ) - } catch (e) { - if (axios.isAxiosError(e)) { - if (!missingResource(e)) { - logger.warn( - `Could not fetch resource ${resourceName}/${id}: ${e.message}` + try { + const batch = snap.data() as Batch, + court = batch.court, + writer = db.bulkWriter() + + for (const id of batch.ids) { + try { + const path = `/generalCourts/${court}/${resourceName}/${id}` + const current = await db.doc(path).get() + const resource = await fetchResource(court, id, current.data()) + + writer.set( + db.doc(path), + { + ...resource, + fetchedAt: Timestamp.now(), + lastFetch: FieldValue.delete(), + id, + court + }, + { merge: true } + ) + } catch (e) { + if (axios.isAxiosError(e)) { + if (!missingResource(e)) { + logger.warn( + `Could not fetch resource ${resourceName}/${id}: ${e.message}` + ) + } + } else { + logger.error( + `Unexpected error fetching ${resourceName}/${id}`, + e ) } - } else { - throw e } } - } - await writer.close() + try { + await writer.close() + } catch (e) { + logger.error(`bulkWriter.close failed for ${resourceName} batch`, e) + } + } finally { + logger.info( + `Attempting to delete ${resourceName} batch doc ${snap.ref.path}` + ) + await snap.ref + .delete() + .then(() => + logger.info( + `Deleted ${resourceName} batch doc ${snap.ref.path}` + ) + ) + .catch(e => + logger.warn( + `Failed to delete ${resourceName} batch doc ${snap.ref.path}`, + e + ) + ) + } }) return { startBatches, fetchBatch } diff --git a/scripts/firebase-admin/runScrapers.ts b/scripts/firebase-admin/runScrapers.ts index 10ac5d639..a37c9db0b 100644 --- a/scripts/firebase-admin/runScrapers.ts +++ b/scripts/firebase-admin/runScrapers.ts @@ -1,9 +1,42 @@ +/** + * Runs scrapers locally against the Firebase emulators. + * + * CLI options: + * --targets Comma-separated scraper names to run, in order. + * Defaults to every key in `scrapers` below. + * Valid names: startBillBatches, startCityBatches, + * startCommitteeBatches, startMemberBatches, scrapeSessions, + * scrapeSpecialEvents, updateCommitteeRosters, + * createMemberSearchIndex, updateBillReferences. + * --interval Minimum seconds between dispatches. The pause runs + * after each target in the default mode, and after each + * per-court invocation or batch queue in --allCourts mode + * (fallback pubsub scrapers still pause once per target). + * Default 5. + * --concurrency Rolling window of in-flight batch documents when + * dispatching batched scrapers in --allCourts mode. + * Default 3. + * --allCourts When set, fan each target out across every court in + * `supportedGeneralCourts`. Court-runnable scrapers are + * invoked in-process per court; batchable scrapers list + * their IDs, chunk them, and dispatch via `processQueue`. + * Targets that are neither (e.g. `scrapeSpecialEvents`) + * are skipped with a warning. + * Default false (single pubsub trigger per target, + * matching the original behavior). + */ import axios from "axios" import { FunctionName } from "functions/src" -import { uniq } from "lodash" +import { scrapeSessionsForCourt } from "functions/src/events/scrapeEvents" +import { runUpdateBillReferences } from "functions/src/bills/updateBillReferences" +import { runUpdateCommitteeRosters } from "functions/src/committees/updateCommitteeRosters" +import { runCreateMemberSearchIndex } from "functions/src/members/createMemberSearchIndex" +import { chunk, uniq } from "lodash" import { z } from "zod" import { Script } from "./types" import { performance } from "perf_hooks" +import { supportedGeneralCourts } from "functions/src/shared/constants" +import * as api from "functions/src/malegislature" /** All the scrapers that can be run. Scrapers are run in insertion order. */ const scrapers: { @@ -13,7 +46,7 @@ const scrapers: { startCityBatches: "startCityBatches", startCommitteeBatches: "startCommitteeBatches", startMemberBatches: "startMemberBatches", - scrapeHearings: "scrapeHearings", + // scrapeHearings: "scrapeHearings", scrapeSessions: "scrapeSessions", scrapeSpecialEvents: "scrapeSpecialEvents", updateCommitteeRosters: "updateCommitteeRosters", @@ -21,8 +54,57 @@ const scrapers: { updateBillReferences: "updateBillReferences" } +type BatchableScraper = { + resourceName: string + resourcesPerBatch: number + listIds: (court: number) => Promise<(string | null | undefined)[]> +} + +type CourtRunnableFn = (court: number) => Promise + +/** Non-batchable scrapers that are court-specific and can be run for any court. + * Typed with a plain string key rather than `FunctionName` because resolving + * `FunctionName` against the inferred type of `updateBillReferences` (a + * `BillProcessor.scheduled(...)` value re-exported from the same module as + * `runUpdateBillReferences`) trips a circular-inference error. */ +const courtRunnableScrapers: { [k: string]: CourtRunnableFn } = { + scrapeSessions: scrapeSessionsForCourt, + updateCommitteeRosters: runUpdateCommitteeRosters, + createMemberSearchIndex: runCreateMemberSearchIndex, + updateBillReferences: runUpdateBillReferences +} + +/** Scrapers that use the batched framework and can be run for any court. */ +const batchableScrapers: { [k: string]: BatchableScraper } = { + startBillBatches: { + resourceName: "bills", + resourcesPerBatch: 150, + listIds: court => + api.listDocuments({ court }).then(docs => docs.map(d => d.BillNumber)) + }, + startMemberBatches: { + resourceName: "members", + resourcesPerBatch: 5, + listIds: court => + api.listMembers({ court }).then(members => members.map(m => m.MemberCode)) + }, + startCityBatches: { + resourceName: "cities", + resourcesPerBatch: 200, + listIds: court => api.listCities(court) + }, + startCommitteeBatches: { + resourceName: "committees", + resourcesPerBatch: 200, + listIds: court => + api.listCommittees(court).then(cs => cs.map(c => c.CommitteeCode)) + } +} + const Args = z.object({ interval: z.number().default(5), + concurrency: z.number().default(3), + allCourts: z.boolean().default(false), targets: z .string() .transform(s => { @@ -43,22 +125,137 @@ const Args = z.object({ > }) -export const script: Script = async ({ env, args }) => { - if (env !== "local") throw Error("only local supported") +/** Dispatches batch documents with a rolling concurrency window. Relies on + * fetchBatch deleting its batch doc on completion to signal a free slot. + * + * Only counts removals of docs this run dispatched (tracked in `ours`), so + * pre-existing or manually-deleted batch docs in the collection don't trip + * the listener and break the concurrency cap. */ +async function processQueue( + batches: FirebaseFirestore.CollectionReference, + court: number, + queue: string[][], + concurrency: number +): Promise { + let inFlight = 0 + let completed = 0 + const total = queue.length + const ours = new Set() - const { interval, targets } = Args.parse(args) + const dispatchNext = async () => { + if (queue.length === 0) return + const ids = queue.shift()! + inFlight++ + // Pre-allocate the ref so the ID is in `ours` before the write lands — + // otherwise fetchBatch could finish and delete the doc before we record it. + const ref = batches.doc() + ours.add(ref.id) + await ref.set({ court, ids }) + } + + return new Promise((resolve, reject) => { + let settled = false + const finish = (err?: unknown) => { + if (settled) return + settled = true + unsubscribe() + if (err) reject(err) + else resolve() + } + + const unsubscribe = batches.onSnapshot( + async snap => { + for (const change of snap.docChanges()) { + if (change.type !== "removed") continue + if (!ours.delete(change.doc.id)) continue + inFlight-- + completed++ + if (completed % 10 === 0 || completed === total) { + console.log(` ${completed}/${total} batches complete`) + } + if (queue.length > 0) { + try { + await dispatchNext() + } catch (e) { + finish(e) + return + } + } else if (inFlight === 0) { + finish() + } + } + }, + err => finish(err) + ) + + ;(async () => { + try { + const initial = Math.min(concurrency, queue.length) + for (let i = 0; i < initial; i++) await dispatchNext() + if (queue.length === 0 && inFlight === 0) finish() + } catch (e) { + finish(e) + } + })() + }) +} + +export const script: Script = async ({ args, db }) => { + const { interval, concurrency, targets, allCourts } = Args.parse(args) const intervalMs = interval * 1e3 + const waitForInterval = async (start: number) => { + const remaining = Math.max(0, intervalMs - (performance.now() - start)) + if (remaining) { + console.log(`pausing ${(remaining * 1e-3).toFixed(1)} s`) + await new Promise(r => setTimeout(r, remaining)) + } + } + + if (allCourts) { + for (const target of targets) { + const courtRunnable = courtRunnableScrapers[target] + if (courtRunnable) { + for (const court of supportedGeneralCourts) { + console.log(`Running ${target} for court ${court}`) + const start = performance.now() + await courtRunnable(court) + await waitForInterval(start) + } + continue + } + + const batchable = batchableScrapers[target] + if (!batchable) { + console.warn( + `Skipping ${target}: not court-runnable or batchable in --allCourts mode` + ) + continue + } + const { resourceName, resourcesPerBatch, listIds } = batchable + for (const court of supportedGeneralCourts) { + console.log(`Dispatching ${target} batches for court ${court}`) + const start = performance.now() + const ids = (await listIds(court)).filter(Boolean) as string[] + const queue = chunk(ids, resourcesPerBatch) + const total = queue.length + console.log(` ${total} batches queued for ${ids.length} IDs (concurrency=${concurrency})`) + + const batches = db.collection(`scrapers/${resourceName}/batches`) + await processQueue(batches, court, queue, concurrency) + console.log(` Done — all ${total} batches processed`) + await waitForInterval(start) + } + } + return + } + for (const target of targets) { console.log("Running", target) const start = performance.now() await axios.get( `http://localhost:5001/demo-dtp/us-central1/triggerPubsubFunction?scheduled=${target}` ) - const remaining = Math.max(0, intervalMs - (performance.now() - start)) - if (remaining) { - console.log(`pausing ${(remaining * 1e-3).toFixed(1)} s`) - await new Promise(r => setTimeout(r, remaining)) - } + await waitForInterval(start) } } From 89d4053201f5422dc8f8aff7497025bfdc86dc0e Mon Sep 17 00:00:00 2001 From: Andre Coullard Date: Tue, 19 May 2026 20:50:54 -0400 Subject: [PATCH 2/6] split into two scripts --- scripts/README.md | 4 + scripts/firebase-admin/runScrapers.ts | 256 ++++--------------- scripts/firebase-admin/runScrapersByCourt.ts | 225 ++++++++++++++++ 3 files changed, 273 insertions(+), 212 deletions(-) create mode 100644 scripts/firebase-admin/runScrapersByCourt.ts diff --git a/scripts/README.md b/scripts/README.md index f5a786554..e9e3c886d 100644 --- a/scripts/README.md +++ b/scripts/README.md @@ -251,6 +251,10 @@ yarn firebase-admin run-script backfillBallotQuestionTestimonyCounts --env prod +#### `runScrapersByCourt` + + + #### `seedActiveTopicSubscriptions` diff --git a/scripts/firebase-admin/runScrapers.ts b/scripts/firebase-admin/runScrapers.ts index a37c9db0b..22d4d2f18 100644 --- a/scripts/firebase-admin/runScrapers.ts +++ b/scripts/firebase-admin/runScrapers.ts @@ -1,5 +1,13 @@ /** - * Runs scrapers locally against the Firebase emulators. + * Runs scrapers locally against the Firebase emulators by HTTP-triggering + * the emulator's `triggerPubsubFunction` endpoint once per target. Each + * scheduled function runs with whatever court default it picks (typically + * `currentGeneralCourt`). + * + * For per-court control (single court or fan-out across every supported + * court), see `runScrapersByCourt.ts`, which bypasses pubsub and either + * invokes scraper functions in-process or writes batch docs directly to + * Firestore. * * CLI options: * --targets Comma-separated scraper names to run, in order. @@ -8,38 +16,17 @@ * startCommitteeBatches, startMemberBatches, scrapeSessions, * scrapeSpecialEvents, updateCommitteeRosters, * createMemberSearchIndex, updateBillReferences. - * --interval Minimum seconds between dispatches. The pause runs - * after each target in the default mode, and after each - * per-court invocation or batch queue in --allCourts mode - * (fallback pubsub scrapers still pause once per target). - * Default 5. - * --concurrency Rolling window of in-flight batch documents when - * dispatching batched scrapers in --allCourts mode. - * Default 3. - * --allCourts When set, fan each target out across every court in - * `supportedGeneralCourts`. Court-runnable scrapers are - * invoked in-process per court; batchable scrapers list - * their IDs, chunk them, and dispatch via `processQueue`. - * Targets that are neither (e.g. `scrapeSpecialEvents`) - * are skipped with a warning. - * Default false (single pubsub trigger per target, - * matching the original behavior). + * --interval Minimum seconds between dispatches. Default 5. */ import axios from "axios" import { FunctionName } from "functions/src" -import { scrapeSessionsForCourt } from "functions/src/events/scrapeEvents" -import { runUpdateBillReferences } from "functions/src/bills/updateBillReferences" -import { runUpdateCommitteeRosters } from "functions/src/committees/updateCommitteeRosters" -import { runCreateMemberSearchIndex } from "functions/src/members/createMemberSearchIndex" -import { chunk, uniq } from "lodash" +import { uniq } from "lodash" import { z } from "zod" import { Script } from "./types" import { performance } from "perf_hooks" -import { supportedGeneralCourts } from "functions/src/shared/constants" -import * as api from "functions/src/malegislature" /** All the scrapers that can be run. Scrapers are run in insertion order. */ -const scrapers: { +export const scrapers: { [K in FunctionName]?: K } = { startBillBatches: "startBillBatches", @@ -54,208 +41,53 @@ const scrapers: { updateBillReferences: "updateBillReferences" } -type BatchableScraper = { - resourceName: string - resourcesPerBatch: number - listIds: (court: number) => Promise<(string | null | undefined)[]> -} - -type CourtRunnableFn = (court: number) => Promise - -/** Non-batchable scrapers that are court-specific and can be run for any court. - * Typed with a plain string key rather than `FunctionName` because resolving - * `FunctionName` against the inferred type of `updateBillReferences` (a - * `BillProcessor.scheduled(...)` value re-exported from the same module as - * `runUpdateBillReferences`) trips a circular-inference error. */ -const courtRunnableScrapers: { [k: string]: CourtRunnableFn } = { - scrapeSessions: scrapeSessionsForCourt, - updateCommitteeRosters: runUpdateCommitteeRosters, - createMemberSearchIndex: runCreateMemberSearchIndex, - updateBillReferences: runUpdateBillReferences -} - -/** Scrapers that use the batched framework and can be run for any court. */ -const batchableScrapers: { [k: string]: BatchableScraper } = { - startBillBatches: { - resourceName: "bills", - resourcesPerBatch: 150, - listIds: court => - api.listDocuments({ court }).then(docs => docs.map(d => d.BillNumber)) - }, - startMemberBatches: { - resourceName: "members", - resourcesPerBatch: 5, - listIds: court => - api.listMembers({ court }).then(members => members.map(m => m.MemberCode)) - }, - startCityBatches: { - resourceName: "cities", - resourcesPerBatch: 200, - listIds: court => api.listCities(court) - }, - startCommitteeBatches: { - resourceName: "committees", - resourcesPerBatch: 200, - listIds: court => - api.listCommittees(court).then(cs => cs.map(c => c.CommitteeCode)) +/** Shared Zod parser for the `--targets` CLI option. Splits on commas, + * validates each name against `scrapers`, and dedupes. */ +export const Targets = z + .string() + .transform(s => { + const t = s + .split(",") + .map(s => s.trim()) + .map(name => { + const s = scrapers[name as FunctionName] + if (!s) throw Error(`Invalid scraper ${name}`) + return s + }) + return uniq(t) + }) + .default(Object.keys(scrapers).join(",")) as z.ZodType< + Array, + any, + string +> + +/** Sleeps until at least `intervalMs` has elapsed since `start`. */ +export async function waitForInterval(start: number, intervalMs: number) { + const remaining = Math.max(0, intervalMs - (performance.now() - start)) + if (remaining) { + console.log(`pausing ${(remaining * 1e-3).toFixed(1)} s`) + await new Promise(r => setTimeout(r, remaining)) } } const Args = z.object({ interval: z.number().default(5), - concurrency: z.number().default(3), - allCourts: z.boolean().default(false), - targets: z - .string() - .transform(s => { - const t = s - .split(",") - .map(s => s.trim()) - .map(name => { - const s = scrapers[name as FunctionName] - if (!s) throw Error(`Invalid scraper ${name}`) - return s - }) - return uniq(t) - }) - .default(Object.keys(scrapers).join(",")) as z.ZodType< - Array, - any, - string - > + targets: Targets }) -/** Dispatches batch documents with a rolling concurrency window. Relies on - * fetchBatch deleting its batch doc on completion to signal a free slot. - * - * Only counts removals of docs this run dispatched (tracked in `ours`), so - * pre-existing or manually-deleted batch docs in the collection don't trip - * the listener and break the concurrency cap. */ -async function processQueue( - batches: FirebaseFirestore.CollectionReference, - court: number, - queue: string[][], - concurrency: number -): Promise { - let inFlight = 0 - let completed = 0 - const total = queue.length - const ours = new Set() - - const dispatchNext = async () => { - if (queue.length === 0) return - const ids = queue.shift()! - inFlight++ - // Pre-allocate the ref so the ID is in `ours` before the write lands — - // otherwise fetchBatch could finish and delete the doc before we record it. - const ref = batches.doc() - ours.add(ref.id) - await ref.set({ court, ids }) - } - - return new Promise((resolve, reject) => { - let settled = false - const finish = (err?: unknown) => { - if (settled) return - settled = true - unsubscribe() - if (err) reject(err) - else resolve() - } +export const script: Script = async ({ env, args }) => { + if (env !== "local") throw Error("only local supported") - const unsubscribe = batches.onSnapshot( - async snap => { - for (const change of snap.docChanges()) { - if (change.type !== "removed") continue - if (!ours.delete(change.doc.id)) continue - inFlight-- - completed++ - if (completed % 10 === 0 || completed === total) { - console.log(` ${completed}/${total} batches complete`) - } - if (queue.length > 0) { - try { - await dispatchNext() - } catch (e) { - finish(e) - return - } - } else if (inFlight === 0) { - finish() - } - } - }, - err => finish(err) - ) - - ;(async () => { - try { - const initial = Math.min(concurrency, queue.length) - for (let i = 0; i < initial; i++) await dispatchNext() - if (queue.length === 0 && inFlight === 0) finish() - } catch (e) { - finish(e) - } - })() - }) -} - -export const script: Script = async ({ args, db }) => { - const { interval, concurrency, targets, allCourts } = Args.parse(args) + const { interval, targets } = Args.parse(args) const intervalMs = interval * 1e3 - const waitForInterval = async (start: number) => { - const remaining = Math.max(0, intervalMs - (performance.now() - start)) - if (remaining) { - console.log(`pausing ${(remaining * 1e-3).toFixed(1)} s`) - await new Promise(r => setTimeout(r, remaining)) - } - } - - if (allCourts) { - for (const target of targets) { - const courtRunnable = courtRunnableScrapers[target] - if (courtRunnable) { - for (const court of supportedGeneralCourts) { - console.log(`Running ${target} for court ${court}`) - const start = performance.now() - await courtRunnable(court) - await waitForInterval(start) - } - continue - } - - const batchable = batchableScrapers[target] - if (!batchable) { - console.warn( - `Skipping ${target}: not court-runnable or batchable in --allCourts mode` - ) - continue - } - const { resourceName, resourcesPerBatch, listIds } = batchable - for (const court of supportedGeneralCourts) { - console.log(`Dispatching ${target} batches for court ${court}`) - const start = performance.now() - const ids = (await listIds(court)).filter(Boolean) as string[] - const queue = chunk(ids, resourcesPerBatch) - const total = queue.length - console.log(` ${total} batches queued for ${ids.length} IDs (concurrency=${concurrency})`) - - const batches = db.collection(`scrapers/${resourceName}/batches`) - await processQueue(batches, court, queue, concurrency) - console.log(` Done — all ${total} batches processed`) - await waitForInterval(start) - } - } - return - } - for (const target of targets) { console.log("Running", target) const start = performance.now() await axios.get( `http://localhost:5001/demo-dtp/us-central1/triggerPubsubFunction?scheduled=${target}` ) - await waitForInterval(start) + await waitForInterval(start, intervalMs) } } diff --git a/scripts/firebase-admin/runScrapersByCourt.ts b/scripts/firebase-admin/runScrapersByCourt.ts new file mode 100644 index 000000000..ad6a5ad50 --- /dev/null +++ b/scripts/firebase-admin/runScrapersByCourt.ts @@ -0,0 +1,225 @@ +/** + * Runs scrapers per-court against the Firebase emulators. Unlike + * `runScrapers.ts` (which HTTP-triggers a single pubsub message per + * target), this script bypasses pubsub and either: + * - invokes court-runnable scraper functions in-process per court, or + * - lists IDs itself, chunks them, and writes batch docs directly to + * `scrapers//batches`, which the `fetchBatch` Firestore + * trigger consumes (see `processQueue`). + * + * CLI options: + * --targets Comma-separated scraper names to run, in order. + * Defaults to every key in `scrapers` (see runScrapers.ts). + * Targets that are neither court-runnable nor batchable + * (e.g. `scrapeSpecialEvents`) are skipped with a warning. + * --interval Minimum seconds between dispatches. The pause runs + * after each per-court invocation or batch queue. + * Default 5. + * --concurrency Rolling window of in-flight batch documents when + * dispatching batched scrapers. Default 3. + * --court A single court number (e.g. 193) to run targets + * against. Must be in `supportedGeneralCourts`. Cannot + * be combined with --allCourts. + * --allCourts When set, fan each target out across every court in + * `supportedGeneralCourts`. Cannot be combined with + * --court. + * + * Exactly one of `--court` or `--allCourts` must be specified. + */ +import { scrapeSessionsForCourt } from "functions/src/events/scrapeEvents" +import { runUpdateBillReferences } from "functions/src/bills/updateBillReferences" +import { runUpdateCommitteeRosters } from "functions/src/committees/updateCommitteeRosters" +import { runCreateMemberSearchIndex } from "functions/src/members/createMemberSearchIndex" +import { chunk } from "lodash" +import { z } from "zod" +import { Script } from "./types" +import { performance } from "perf_hooks" +import { supportedGeneralCourts } from "functions/src/shared/constants" +import * as api from "functions/src/malegislature" +import { Targets, waitForInterval } from "./runScrapers" + +type BatchableScraper = { + resourceName: string + resourcesPerBatch: number + listIds: (court: number) => Promise<(string | null | undefined)[]> +} + +type CourtRunnableFn = (court: number) => Promise + +/** Non-batchable scrapers that are court-specific and can be run for any court. + * Typed with a plain string key rather than `FunctionName` because resolving + * `FunctionName` against the inferred type of `updateBillReferences` (a + * `BillProcessor.scheduled(...)` value re-exported from the same module as + * `runUpdateBillReferences`) trips a circular-inference error. */ +const courtRunnableScrapers: { [k: string]: CourtRunnableFn } = { + scrapeSessions: scrapeSessionsForCourt, + updateCommitteeRosters: runUpdateCommitteeRosters, + createMemberSearchIndex: runCreateMemberSearchIndex, + updateBillReferences: runUpdateBillReferences +} + +/** Scrapers that use the batched framework and can be run for any court. */ +const batchableScrapers: { [k: string]: BatchableScraper } = { + startBillBatches: { + resourceName: "bills", + resourcesPerBatch: 150, + listIds: court => + api.listDocuments({ court }).then(docs => docs.map(d => d.BillNumber)) + }, + startMemberBatches: { + resourceName: "members", + resourcesPerBatch: 5, + listIds: court => + api.listMembers({ court }).then(members => members.map(m => m.MemberCode)) + }, + startCityBatches: { + resourceName: "cities", + resourcesPerBatch: 200, + listIds: court => api.listCities(court) + }, + startCommitteeBatches: { + resourceName: "committees", + resourcesPerBatch: 200, + listIds: court => + api.listCommittees(court).then(cs => cs.map(c => c.CommitteeCode)) + } +} + +const Args = z.object({ + interval: z.number().default(5), + concurrency: z.number().default(3), + allCourts: z.boolean().default(false), + court: z + .number() + .optional() + .refine(c => c === undefined || supportedGeneralCourts.includes(c), { + message: `--court must be one of ${supportedGeneralCourts.join(", ")}` + }), + targets: Targets +}) + +/** Dispatches batch documents with a rolling concurrency window. Relies on + * fetchBatch deleting its batch doc on completion to signal a free slot. + * + * Only counts removals of docs this run dispatched (tracked in `ours`), so + * pre-existing or manually-deleted batch docs in the collection don't trip + * the listener and break the concurrency cap. */ +async function processQueue( + batches: FirebaseFirestore.CollectionReference, + court: number, + queue: string[][], + concurrency: number +): Promise { + let inFlight = 0 + let completed = 0 + const total = queue.length + const ours = new Set() + + const dispatchNext = async () => { + if (queue.length === 0) return + const ids = queue.shift()! + inFlight++ + // Pre-allocate the ref so the ID is in `ours` before the write lands — + // otherwise fetchBatch could finish and delete the doc before we record it. + const ref = batches.doc() + ours.add(ref.id) + await ref.set({ court, ids }) + } + + return new Promise((resolve, reject) => { + let settled = false + const finish = (err?: unknown) => { + if (settled) return + settled = true + unsubscribe() + if (err) reject(err) + else resolve() + } + + const unsubscribe = batches.onSnapshot( + async snap => { + for (const change of snap.docChanges()) { + if (change.type !== "removed") continue + if (!ours.delete(change.doc.id)) continue + inFlight-- + completed++ + if (completed % 10 === 0 || completed === total) { + console.log(` ${completed}/${total} batches complete`) + } + if (queue.length > 0) { + try { + await dispatchNext() + } catch (e) { + finish(e) + return + } + } else if (inFlight === 0) { + finish() + } + } + }, + err => finish(err) + ) + + ;(async () => { + try { + const initial = Math.min(concurrency, queue.length) + for (let i = 0; i < initial; i++) await dispatchNext() + if (queue.length === 0 && inFlight === 0) finish() + } catch (e) { + finish(e) + } + })() + }) +} + +export const script: Script = async ({ args, db }) => { + const { interval, concurrency, targets, allCourts, court } = Args.parse(args) + const intervalMs = interval * 1e3 + + if (allCourts && court !== undefined) { + throw Error("--allCourts and --court cannot be combined") + } + if (!allCourts && court === undefined) { + throw Error("Specify --court or --allCourts") + } + + const courts = court !== undefined ? [court] : supportedGeneralCourts + + for (const target of targets) { + const courtRunnable = courtRunnableScrapers[target] + if (courtRunnable) { + for (const c of courts) { + console.log(`Running ${target} for court ${c}`) + const start = performance.now() + await courtRunnable(c) + await waitForInterval(start, intervalMs) + } + continue + } + + const batchable = batchableScrapers[target] + if (!batchable) { + console.warn( + `Skipping ${target}: not court-runnable or batchable in per-court mode` + ) + continue + } + const { resourceName, resourcesPerBatch, listIds } = batchable + for (const c of courts) { + console.log(`Dispatching ${target} batches for court ${c}`) + const start = performance.now() + const ids = (await listIds(c)).filter(Boolean) as string[] + const queue = chunk(ids, resourcesPerBatch) + const total = queue.length + console.log( + ` ${total} batches queued for ${ids.length} IDs (concurrency=${concurrency})` + ) + + const batches = db.collection(`scrapers/${resourceName}/batches`) + await processQueue(batches, c, queue, concurrency) + console.log(` Done — all ${total} batches processed`) + await waitForInterval(start, intervalMs) + } + } +} From 96d2be38ad56781fdc846e2005644a00854d2df8 Mon Sep 17 00:00:00 2001 From: Andre Coullard Date: Tue, 19 May 2026 21:05:48 -0400 Subject: [PATCH 3/6] prettier --- functions/src/committees/updateCommitteeRosters.ts | 8 +++----- functions/src/events/scrapeEvents.ts | 6 +++++- functions/src/members/createMemberSearchIndex.ts | 14 +++++--------- functions/src/scraper.ts | 9 ++------- 4 files changed, 15 insertions(+), 22 deletions(-) diff --git a/functions/src/committees/updateCommitteeRosters.ts b/functions/src/committees/updateCommitteeRosters.ts index ddfdbea69..1708268e0 100644 --- a/functions/src/committees/updateCommitteeRosters.ts +++ b/functions/src/committees/updateCommitteeRosters.ts @@ -17,11 +17,9 @@ export async function runUpdateCommitteeRosters(court: number | string) { const update: DocUpdate = { members: roster.map(m => ({ id: m.id, name: m.content.Name })) } - writer.set( - db.doc(`/generalCourts/${court}/committees/${id}`), - update, - { merge: true } - ) + writer.set(db.doc(`/generalCourts/${court}/committees/${id}`), update, { + merge: true + }) }) await writer.close() } diff --git a/functions/src/events/scrapeEvents.ts b/functions/src/events/scrapeEvents.ts index 834b190ca..5939e52bd 100644 --- a/functions/src/events/scrapeEvents.ts +++ b/functions/src/events/scrapeEvents.ts @@ -67,7 +67,11 @@ abstract class EventScraper { event = await this.getEvent(item).catch(logFetchError("event", id)) if (!event) continue - if (!skipCutoff && event.startsAt.toMillis() < upcomingOrRecentCutoff.toMillis()) break + if ( + !skipCutoff && + event.startsAt.toMillis() < upcomingOrRecentCutoff.toMillis() + ) + break writer.set(db.doc(`/events/${event.id}`), event, { merge: true }) diff --git a/functions/src/members/createMemberSearchIndex.ts b/functions/src/members/createMemberSearchIndex.ts index cefe4a1af..279fa8171 100644 --- a/functions/src/members/createMemberSearchIndex.ts +++ b/functions/src/members/createMemberSearchIndex.ts @@ -3,9 +3,7 @@ import { db } from "../firebase" import { currentGeneralCourt } from "../shared" export async function runCreateMemberSearchIndex(court: number | string) { - const members = await db - .collection(`/generalCourts/${court}/members`) - .get() + const members = await db.collection(`/generalCourts/${court}/members`).get() const index = members.docs .map(d => (d.exists && d.data().content) ?? {}) @@ -14,12 +12,10 @@ export async function runCreateMemberSearchIndex(court: number | string) { .map(({ SponsoredBills, CoSponsoredBills, ...member }) => member) .sort((m1, m2) => (m1.Name < m2.Name ? -1 : 1)) - await db - .doc(`/generalCourts/${court}/indexes/memberSearch`) - .set({ - representatives: index.filter(d => d.Branch === "House"), - senators: index.filter(d => d.Branch === "Senate") - }) + await db.doc(`/generalCourts/${court}/indexes/memberSearch`).set({ + representatives: index.filter(d => d.Branch === "House"), + senators: index.filter(d => d.Branch === "Senate") + }) } /** Create a document that aggregates all legislative members for easier diff --git a/functions/src/scraper.ts b/functions/src/scraper.ts index bc4037d26..700ef0997 100644 --- a/functions/src/scraper.ts +++ b/functions/src/scraper.ts @@ -147,10 +147,7 @@ export function createScraper({ ) } } else { - logger.error( - `Unexpected error fetching ${resourceName}/${id}`, - e - ) + logger.error(`Unexpected error fetching ${resourceName}/${id}`, e) } } } @@ -167,9 +164,7 @@ export function createScraper({ await snap.ref .delete() .then(() => - logger.info( - `Deleted ${resourceName} batch doc ${snap.ref.path}` - ) + logger.info(`Deleted ${resourceName} batch doc ${snap.ref.path}`) ) .catch(e => logger.warn( From 9bc36d6f8802af9323d75f456b211f40c1152c37 Mon Sep 17 00:00:00 2001 From: Andre Coullard Date: Tue, 19 May 2026 21:21:14 -0400 Subject: [PATCH 4/6] reduce batch sizes to avoid timeouts --- scripts/firebase-admin/runScrapersByCourt.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/scripts/firebase-admin/runScrapersByCourt.ts b/scripts/firebase-admin/runScrapersByCourt.ts index ad6a5ad50..8eb5ea32e 100644 --- a/scripts/firebase-admin/runScrapersByCourt.ts +++ b/scripts/firebase-admin/runScrapersByCourt.ts @@ -62,7 +62,7 @@ const courtRunnableScrapers: { [k: string]: CourtRunnableFn } = { const batchableScrapers: { [k: string]: BatchableScraper } = { startBillBatches: { resourceName: "bills", - resourcesPerBatch: 150, + resourcesPerBatch: 75, listIds: court => api.listDocuments({ court }).then(docs => docs.map(d => d.BillNumber)) }, @@ -74,12 +74,12 @@ const batchableScrapers: { [k: string]: BatchableScraper } = { }, startCityBatches: { resourceName: "cities", - resourcesPerBatch: 200, + resourcesPerBatch: 100, listIds: court => api.listCities(court) }, startCommitteeBatches: { resourceName: "committees", - resourcesPerBatch: 200, + resourcesPerBatch: 100, listIds: court => api.listCommittees(court).then(cs => cs.map(c => c.CommitteeCode)) } From 46b446306dfd9aadd918ea82ddd4d529b45e4ac8 Mon Sep 17 00:00:00 2001 From: Andre Coullard Date: Tue, 19 May 2026 21:41:16 -0400 Subject: [PATCH 5/6] update supported courts constant --- functions/src/shared/constants.ts | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/functions/src/shared/constants.ts b/functions/src/shared/constants.ts index ff375ec22..1857947cf 100644 --- a/functions/src/shared/constants.ts +++ b/functions/src/shared/constants.ts @@ -23,6 +23,24 @@ export const generalCourts: Record = { Number: 192, FirstYear: 2021, SecondYear: 2022 + }, + 191: { + Name: "191st (2019 - 2020)", + Number: 191, + FirstYear: 2019, + SecondYear: 2020 + }, + 190: { + Name: "190th (2017 - 2018)", + Number: 190, + FirstYear: 2017, + SecondYear: 2018 + }, + 189: { + Name: "189th (2015 - 2016)", + Number: 189, + FirstYear: 2015, + SecondYear: 2016 } } From 23086e8b89d95ba114909472de6999974cc1933a Mon Sep 17 00:00:00 2001 From: Andre Coullard Date: Tue, 19 May 2026 21:53:04 -0400 Subject: [PATCH 6/6] fix accidentally swallowing errors for non back fill scraper runs --- functions/src/scraper.ts | 9 ++++++++- scripts/firebase-admin/runScrapersByCourt.ts | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/functions/src/scraper.ts b/functions/src/scraper.ts index 700ef0997..4b4249063 100644 --- a/functions/src/scraper.ts +++ b/functions/src/scraper.ts @@ -8,6 +8,11 @@ import { currentGeneralCourt } from "./shared" type Batch = { court: number ids: string[] + /** When true, unexpected (non-Axios) errors are logged and the batch + * continues with the next id instead of failing the whole invocation. + * Used by the per-court backfill script so one bad id doesn't sink a + * historical-court run. Defaults to false for the scheduled prod path. */ + resilient?: boolean } /** List all ids of the resources to scrape. Falsey values will be filtered out. @@ -146,8 +151,10 @@ export function createScraper({ `Could not fetch resource ${resourceName}/${id}: ${e.message}` ) } - } else { + } else if (batch.resilient) { logger.error(`Unexpected error fetching ${resourceName}/${id}`, e) + } else { + throw e } } } diff --git a/scripts/firebase-admin/runScrapersByCourt.ts b/scripts/firebase-admin/runScrapersByCourt.ts index 8eb5ea32e..a147554b2 100644 --- a/scripts/firebase-admin/runScrapersByCourt.ts +++ b/scripts/firebase-admin/runScrapersByCourt.ts @@ -123,7 +123,7 @@ async function processQueue( // otherwise fetchBatch could finish and delete the doc before we record it. const ref = batches.doc() ours.add(ref.id) - await ref.set({ court, ids }) + await ref.set({ court, ids, resilient: true }) } return new Promise((resolve, reject) => {