SNOW-3484790: initialize aggregation functions list during SCOS init#4217
Conversation
|
the updated system function list is fetched with query: |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4217 +/- ##
==========================================
+ Coverage 95.27% 95.40% +0.13%
==========================================
Files 171 171
Lines 44158 44205 +47
Branches 7535 7548 +13
==========================================
+ Hits 42071 42174 +103
+ Misses 1295 1247 -48
+ Partials 792 784 -8 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| self._client_telemetry = EventTableTelemetry(session=self) | ||
| self._agg_function_prefetch_job: Optional[AsyncJob] = None | ||
| # Guards the one-time atomic claim of _agg_function_prefetch_job. | ||
| self._agg_function_prefetch_lock = Lock() |
There was a problem hiding this comment.
I don't think there's a way for the same thread to attempt to acquire this lock multiple times, but I think we should make this an RLock instead (which is already imported in this file) to be safe.
|
|
||
| with context._aggregation_function_set_lock: | ||
| context._aggregation_function_set.update(retrieved_set) | ||
| def _submit_internal_async_prefetch_query(self, query: str) -> Optional[AsyncJob]: |
There was a problem hiding this comment.
nit: Can we inline this method since it's only called once, and pretty short?
| ctx._is_snowpark_connect_compatible_mode = True | ||
| ctx._snowpark_connect_flatten_select_after_sort = True | ||
| ctx._aggregation_function_set = set() |
There was a problem hiding this comment.
Why do we need this instead of mocking these fields like in the other test?
| ctx._aggregation_function_set = orig[2] | ||
|
|
||
|
|
||
| def test_retrieve_agg_concurrent_waiters_see_result_not_sync_query(): |
There was a problem hiding this comment.
How is this different from test_concurrent_retrieve_agg_waiters_no_sync_query in the other test file?
| set() | ||
| ) # lower cased names of aggregation functions, used in sql simplification | ||
| _aggregation_function_set_lock = threading.RLock() | ||
| _aggregation_function_prefetch_state: dict[str, Any] = { |
There was a problem hiding this comment.
Does this need to be a dict? Can we just use 3 variables or a singleton class instead?
There was a problem hiding this comment.
yes we definitely can, I thought using a dictionary make it more clear that this is for the agg function prefetch
Which Jira issue is this PR addressing? Make sure that there is an accompanying issue to your PR.
Fixes SNOW-NNNNNNN
Fill out the following pre-review checklist:
Please describe how your code solves the related issue.
_retrieve_aggregation_function_list fires two blocking Snowflake queries on the first filter() call. The function is added to fix a bug in SCOS in previous PR. This PR meant to file async query instead at the beginning of snowpark session init.
This change also remove the redundant select from infromation_schema.functions query as currently UDAF is not supported in scos and will not be used.