@@ -91,6 +91,28 @@ def compute_pref_summary(prefs: pd.Series) -> dict[str, float | int]:
9191 }
9292
9393
94+ def _is_retryable_error (e : Exception ) -> bool :
95+ """Return True if the exception is a transient server error that should be retried.
96+
97+ Handles two formats:
98+ - String representation contains the HTTP code (most providers)
99+ - ValueError raised by langchain-openai with a dict arg: {'message': ..., 'code': 429}
100+ """
101+ # langchain-openai raises ValueError(response_dict.get("error")) where the
102+ # error value is a dict like {'message': '...', 'code': 408}
103+ _RETRYABLE_CODES = {408 , 429 , 502 , 503 , 504 }
104+ if isinstance (e , ValueError ) and e .args :
105+ arg = e .args [0 ]
106+ if isinstance (arg , dict ) and arg .get ("code" ) in _RETRYABLE_CODES :
107+ return True
108+
109+ error_str = str (e )
110+ return (
111+ any (str (code ) in error_str for code in _RETRYABLE_CODES )
112+ or "rate" in error_str .lower ()
113+ )
114+
115+
94116def do_inference (chat_model , inputs , use_tqdm : bool = False ):
95117 # Retries on rate-limit/server errors with exponential backoff.
96118 # Async path retries individual calls; batch path splits into 4^attempt chunks on failure.
@@ -109,8 +131,7 @@ async def process_single(input_item, max_retries=5, base_delay=1.0):
109131 pbar .update (1 )
110132 return result
111133 except Exception as e :
112- is_rate_limit = "429" in str (e ) or "rate" in str (e ).lower ()
113- if attempt == max_retries - 1 or not is_rate_limit :
134+ if attempt == max_retries - 1 or not _is_retryable_error (e ):
114135 raise
115136 delay = base_delay * (2 ** attempt )
116137 print (
@@ -144,14 +165,7 @@ def batch_with_retry(batch_inputs, max_retries=5, base_delay=1.0):
144165 results .extend (chat_model .batch (inputs = chunk , ** invoke_kwargs ))
145166 return results
146167 except Exception as e :
147- is_server_error = (
148- "429" in str (e )
149- or "500" in str (e )
150- or "502" in str (e )
151- or "503" in str (e )
152- or "rate" in str (e ).lower ()
153- )
154- if attempt == max_retries - 1 or not is_server_error :
168+ if attempt == max_retries - 1 or not _is_retryable_error (e ):
155169 raise
156170 delay = base_delay * (2 ** attempt )
157171 next_chunks = 4 ** (attempt + 1 )
@@ -481,80 +495,60 @@ def cache_function_dataframe(
481495 cache_name : str ,
482496 ignore_cache : bool = False ,
483497 cache_path : Path | None = None ,
498+ parquet : bool = False ,
484499) -> pd .DataFrame :
485500 """
486501 :param fun: a function whose dataframe result obtained `fun()` will be cached
487- :param cache_name: the cache of the function result is written into
488- `{cache_path}/{cache_name}.csv.zip`
502+ :param cache_name: the cache of the function result is written into `{cache_path}/{cache_name}.csv.zip`
489503 :param ignore_cache: whether to recompute even if the cache is present
490504 :param cache_path: folder where to write cache files, default to ~/cache-zeroshot/
505+ :param parquet: whether to store the data in parquet, if not specified use csv.zip
491506 :return: result of fun()
492507 """
493508 if cache_path is None :
494509 cache_path = data_root / "cache"
495- cache_file = cache_path / (cache_name + ".csv.zip" )
510+
511+ if parquet :
512+ cache_file = cache_path / (cache_name + ".parquet" )
513+ else :
514+ cache_file = cache_path / (cache_name + ".csv.zip" )
496515 cache_file .parent .mkdir (parents = True , exist_ok = True )
497516 if cache_file .exists () and not ignore_cache :
498517 print (f"Loading cache { cache_file } " )
499- return pd .read_csv (cache_file )
518+ if parquet :
519+ return pd .read_parquet (cache_file )
520+ else :
521+ return pd .read_csv (cache_file )
500522 else :
501523 print (
502524 f"Cache { cache_file } not found or ignore_cache set to True, regenerating the file"
503525 )
504526 with Timeblock ("Evaluate function." ):
505527 df = fun ()
506528 assert isinstance (df , pd .DataFrame )
507- df .to_csv (cache_file , index = False )
508- return pd .read_csv (cache_file )
509-
510-
511- def compute_cohen_kappa (y1 : list [str ], y2 : list [str ]) -> float :
512- """
513- Compute Cohen's kappa coefficient for inter-rater agreement.
514-
515- Args:
516- y1: List of labels from first rater
517- y2: List of labels from second rater
518-
519- Returns:
520- Cohen's kappa coefficient (float between -1 and 1)
521- """
522- if len (y1 ) != len (y2 ):
523- raise ValueError ("Both lists must have the same length" )
524-
525- if len (y1 ) == 0 :
526- raise ValueError ("Lists cannot be empty" )
527-
528- # Get all unique categories
529- categories = sorted (set (y1 ) | set (y2 ))
530- n = len (y1 )
531-
532- # Build confusion matrix
533- matrix = {}
534- for cat1 in categories :
535- matrix [cat1 ] = {cat2 : 0 for cat2 in categories }
536-
537- for label1 , label2 in zip (y1 , y2 , strict = True ):
538- matrix [label1 ][label2 ] += 1
539-
540- # Compute observed agreement (p_o)
541- observed_agreement = sum (matrix [cat ][cat ] for cat in categories ) / n
542-
543- # Compute expected agreement (p_e)
544- expected_agreement = 0
545- for cat in categories :
546- # Marginal probabilities
547- p1 = sum (matrix [cat ][c ] for c in categories ) / n # rater 1
548- p2 = sum (matrix [c ][cat ] for c in categories ) / n # rater 2
549- expected_agreement += p1 * p2
550-
551- # Compute Cohen's kappa
552- if expected_agreement == 1 :
553- return 1.0 if observed_agreement == 1 else 0.0
554-
555- kappa = (observed_agreement - expected_agreement ) / (1 - expected_agreement )
556-
557- return kappa
529+ if parquet :
530+ # object cols cannot be saved easily in parquet; numpy arrays must be
531+ # deep-converted to plain Python so str() produces ast.literal_eval-safe
532+ # repr (no "array([...])" syntax, which breaks literal_eval)
533+ import numpy as np
534+
535+ def _to_python (x ):
536+ """Recursively convert numpy arrays/scalars to Python lists/dicts."""
537+ if isinstance (x , np .ndarray ):
538+ return [_to_python (i ) for i in x ]
539+ if isinstance (x , dict ):
540+ return {k : _to_python (v ) for k , v in x .items ()}
541+ if isinstance (x , list ):
542+ return [_to_python (i ) for i in x ]
543+ return x
544+
545+ for col in df .select_dtypes (include = "object" ).columns :
546+ df [col ] = df [col ].apply (_to_python ).astype (str )
547+ df .to_parquet (cache_file , index = False )
548+ return pd .read_parquet (cache_file )
549+ else :
550+ df .to_csv (cache_file , index = False )
551+ return pd .read_csv (cache_file )
558552
559553
560554if __name__ == "__main__" :
0 commit comments