Skip to content

feat(pipeline): context manager for pre/post operation on pipeline.run()#3677

Open
rudolfix wants to merge 2 commits intodevelfrom
feat/customize-run-pipeline
Open

feat(pipeline): context manager for pre/post operation on pipeline.run()#3677
rudolfix wants to merge 2 commits intodevelfrom
feat/customize-run-pipeline

Conversation

@rudolfix
Copy link
Copy Markdown
Collaborator

Description

This PR enables and demonstrates customization of the run method: it injects additional data to be loaded in the same trace transaction in an idempotent (can be retried) way.

See the included test for PoC

@rudolfix rudolfix requested a review from zilto February 24, 2026 20:57
@rudolfix rudolfix self-assigned this Feb 24, 2026
@cloudflare-workers-and-pages
Copy link
Copy Markdown

cloudflare-workers-and-pages Bot commented Feb 24, 2026

Deploying with  Cloudflare Workers  Cloudflare Workers

The latest updates on your project. Learn more about integrating Git with Workers.

Status Name Latest Commit Preview URL Updated (UTC)
✅ Deployment successful!
View logs
docs fbdc0d6 Commit Preview URL

Branch Preview URL
Feb 25 2026, 04:09 PM

@zilto zilto changed the title (feat) allows to customize pipeline.run method feat(pipeline: context manager for pre/post operation on pipeline.run() Feb 25, 2026
@zilto zilto changed the title feat(pipeline: context manager for pre/post operation on pipeline.run() feat(pipeline): context manager for pre/post operation on pipeline.run() Feb 25, 2026
Copy link
Copy Markdown
Collaborator

@zilto zilto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My summary of the design constraints:

  • Pipeline.run() is a public interface; it is what people call in their code.
  • Pipeline.run() decorated with a bunch of context manager and has a lot of built-in assumptions for transactions, which makes patching it hard.
  • Pipeline._run_once() is a new method that is private and include the whole logic of what Pipeline.run() does. It can be patched while preserving all of the decorators, context manager, public interface

Suggestion

  • rename Pipeline._run_once() to Pipeline._run()

  • move the logic of ._run_once() to an internal function called _run_pipeline()

    def _run_pipeline(
       pipeline: Pipeline,
        data: Any,
        *,
        ...,
    ) -> LoadInfo:
        pipeline.extract(...)
        pipeline.normalize()
        return pipeline.load(...)
    
    class Pipeline:
       def _run(self, ...):
           return _run_pipeline(...)
    
        def run(self, ...):
            return self._run(...)

    This reduces clunkiness of patching by replacing base class invocation Pipeline._run(...) with functional pattern _run_pipeline()

    class SidecarPipeline(Pipeline):
        def _run(self, data: Any, **kwargs: Any) -> LoadInfo:
            load_info = Pipeline._run(self, data, **kwargs)
            return load_info

    Becomes

    class SidecarPipeline(Pipeline):
        def _run(self, data: Any, **kwargs: Any) -> LoadInfo:
            load_info = _run_pipeline(self, data, **kwargs)
            return load_info

Comment on lines +4930 to +4942
class SidecarPipeline(Pipeline):
def _run_once(self, data: Any, **kwargs: Any) -> LoadInfo:
load_info = Pipeline._run_once(self, data, **kwargs)
# guard idempotency via local state
try:
self.get_local_state_val(SIDECAR_LOADED_KEY)
except KeyError:
self.set_local_state_val(SIDECAR_LOADED_KEY, True)
Pipeline._run_once(self, sidecar_source(), **kwargs)
return load_info

def __reduce__(self):
return (Pipeline.__new__, (Pipeline,), Pipeline.__getstate__(self))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code really deserve some comments and explanation.

Class vs instance

AFAIU, the key trick is to use the class object instead of the instance

# via class, passing `self` explicitly
Pipeline._run_once(self, data, **kwargs)

# instead of via instance
self._run_once(data, **kwargs) 

I understand that SidecarPipeline overrides _run_once which is called by the user's code pipeline.run(). So SidecarPipeline._run_once() references Pipeline._run_once() to avoid recursion

State key

How / why is the local state key set? It seems to not be used anywhere. I imagine that on a retry, you can tell if you need to retry main pipeline, sidecar pipeline, or both

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants