11import itertools
22import logging
3- import random
4- import re
53import sys
6- from collections import Counter
4+ from abc import ABC , abstractmethod
75from typing import Optional
86
9- from openai import OpenAI
7+ import pydantic
8+ from litellm import ConfigDict
9+ from PIL import Image , ImageDraw
1010from pydantic .dataclasses import dataclass
1111
12- from effectful .handlers import futures
13- from effectful .handlers .futures import Executor , ThreadPoolFuturesInterpretation
1412from effectful .handlers .llm import Template
15- from effectful .handlers .llm .providers import LLMLoggingHandler , OpenAIAPIProvider
13+ from effectful .handlers .llm .providers import (
14+ LiteLLMProvider ,
15+ RetryLLMHandler ,
16+ )
17+ from effectful .handlers .llm .sampling import KAheadSampler
1618from effectful .ops .semantics import handler
19+ from effectful .ops .types import NotHandled
1720
18-
19- @dataclass (frozen = True )
20- class Step :
21- start : int
22- end : int
21+ type Step = tuple [int , int ]
2322
2423
2524@dataclass (frozen = True )
@@ -40,25 +39,8 @@ def new(cls, size: int) -> "GameState":
4039 towers = tuple (tuple (tower ) for tower in towers )
4140 return cls (size , towers )
4241
43- def visualise_text (self ):
44- max_disk = self .size
45- width = max_disk * 2 + 3
46- for i , tower in enumerate (self .towers ):
47- print (f"\n Tower { i } :" )
48- for disk in reversed (tower ):
49- disk_width = (disk + 1 ) * 2 - 1
50- padding = (max_disk - disk_width ) // 2
51- print (" " * padding + "=" * disk_width + " " * padding )
52- print ("=" * width )
53- print ()
54-
55- def visualise_image (self ):
42+ def visualise_image (self ) -> Image :
5643 "Uses python graphics libraries to visualise the state of the hanoi game."
57- try :
58- from PIL import Image , ImageDraw
59- except ImportError :
60- return None
61- # Pillow-based visualization
6244 tower_width = 150
6345 disk_height = 30
6446 base_height = 20
@@ -116,16 +98,13 @@ def visualise_image(self):
11698
11799 def visualise (self ):
118100 img = self .visualise_image ()
119- if img :
120- img .show ()
121- else :
122- self .visualise_text ()
101+ img .show ()
123102
124103 def apply (self , step : Step ) -> Optional ["GameState" ]:
125104 """
126105 Given a tower `start` and a target tower `end` moves the topmost disk to the end tower.
127106 """
128- start , end = step . start , step . end
107+ start , end = step
129108
130109 if not (0 <= start < len (self .towers ) and 0 <= end < len (self .towers )):
131110 return None
@@ -149,25 +128,6 @@ def apply(self, step: Step) -> Optional["GameState"]:
149128 )
150129 return new_state
151130
152- def steps_to_complete (self ) -> int :
153- """Compute the number of steps to complete the towers of hanoi from a given configuration if using the optimal algorithm."""
154- # Count disks on each tower
155- total_moves = 0
156-
157- # For each tower that's not the destination, we need to move all its disks
158- for tower_idx , tower in enumerate (self .towers ):
159- if tower_idx == self .size - 1 :
160- continue
161-
162- # Number of disks on this tower
163- n_disks = len (tower )
164-
165- if n_disks > 0 :
166- # Moving n disks from one peg to another requires 2^n - 1 moves
167- total_moves += (2 ** n_disks ) - 1
168-
169- return total_moves
170-
171131 def is_done (self ) -> bool :
172132 return all (len (tower ) == 0 for tower in self .towers [:- 1 ]) and all (
173133 self .towers [- 1 ][i ] > self .towers [- 1 ][i + 1 ]
@@ -184,24 +144,62 @@ def valid_steps(self) -> list[Step]:
184144 continue
185145 # if tower_i's disk is smaller than tower_j's topmost, then it is valid to move from tower i to j
186146 if len (tower_j ) == 0 or tower_i [- 1 ] < tower_j [- 1 ]:
187- steps .append (Step (i , j ))
147+ steps .append ((i , j ))
188148 return steps
189149
190150
191- class MicroAgent :
192- """Micro agent (based on MAKERS paper) responsible for predicting a single next step."""
151+ class Step (ABC ):
152+ @property
153+ @abstractmethod
154+ def start (self ) -> int :
155+ raise NotImplementedError
156+
157+ @property
158+ @abstractmethod
159+ def end (self ) -> int :
160+ raise NotImplementedError
161+
162+
163+ def build_validated_model (game_state : GameState ) -> type [Step ]:
164+ valid_steps = game_state .valid_steps ()
165+
166+ @pydantic .dataclasses .dataclass (frozen = True )
167+ class StepModel :
168+ start : int
169+ end : int
170+ explanation : str = ""
171+ model_config = ConfigDict (extra = "forbid" )
172+
173+ @pydantic .field_validator ("start" , "end" , mode = "before" )
174+ def validate_indices (cls , v , info ):
175+ if isinstance (v , int ):
176+ if not (0 <= v < len (game_state .towers )):
177+ raise ValueError (f"{ info .field_name } { v } out of range" )
178+ else :
179+ raise TypeError ("start/end must both be int" )
180+ return v
193181
194- game_state : GameState
182+ @pydantic .model_validator (mode = "after" )
183+ def validate_step (self ):
184+ if (self .start , self .end ) not in valid_steps :
185+ raise ValueError ("step is not in {self.valid_steps}" )
186+ return self
195187
196- def __init__ (self , state : GameState ):
197- self .game_state = state
188+ def __hash__ (self ):
189+ return hash ((self .start , self .end ))
190+
191+ return StepModel
192+
193+
194+ def predict_next_step (game_state : GameState ) -> Step :
195+ ValidStep = build_validated_model (game_state )
198196
199197 @Template .define
200- def predict_next_step ( self ) -> str :
198+ def predict_next_step_inner ( game_state ) -> ValidStep :
201199 """
202200 Given the state of the game of towers of Hanoi as follows:
203201
204- {self. game_state}
202+ {game_state}
205203
206204 Predict the next step to complete the game (moving all disks to the rightmost tower).
207205
@@ -211,96 +209,24 @@ def predict_next_step(self) -> str:
211209
212210 where start and end are zero-based indices for the towers to move. Be concise and avoid wordy answers.
213211 """
214- pass
215-
216- def parse_response (self , response : str ) -> Step | None :
217- "Parse the predicted step from an LLM response."
218- pattern = r"<step>\s*(\d+)\s*,\s*(\d+)\s*</step>"
219- m = re .search (pattern , response )
220- if not m :
221- return None
222- return Step (int (m .group (1 )), int (m .group (2 )))
223-
224- def has_no_red_flags (self , response : str ) -> Step | None :
225- """Returns the underlying step if the provided step has no red flags."""
226- if len (response ) > 450.0 : # based on a sample
227- return None
228-
229- step = self .parse_response (response )
230- if not step :
231- return None
232- if not (
233- 0 <= step .start < len (self .game_state .towers )
234- and 0 <= step .end < len (self .game_state .towers )
235- ):
236- return None
237- if step not in self .game_state .valid_steps ():
238- return None
239- return step
240-
241- def get_vote (self ): # algorithm 3
242- while True :
243- resp = self .predict_next_step ()
244- if step := self .has_no_red_flags (resp ):
245- return step
246-
247-
248- class FirstToAheadMoveSelector :
249- k : int
250- game_state : GameState
251- agents : list [MicroAgent ]
252- votes : Counter [Step ]
253-
254- def __init__ (self , state : GameState , no_agents = 6 , k = 3 ):
255- self .k = k
256- self .game_state = state
257- self .agents = [MicroAgent (self .game_state ) for _ in range (no_agents )]
258- self .votes = Counter ()
259-
260- def do_voting (self ) -> Step : # algorithm 2
261- # run n in parallel repeatedly until k come out in top
262- while True :
263- # submit a batch of votes
264- for vote in futures .as_completed (
265- [Executor .submit (agent .get_vote ) for agent in self .agents ]
266- ):
267- vote = vote .result ()
268- self .votes [vote ] += 1
269- max_other_votes = max (
270- (self .votes [o_vote ] for o_vote in self .votes if o_vote != vote ),
271- default = 0 ,
272- )
273- if self .votes [vote ] >= max_other_votes + self .k :
274- return vote
275-
276-
277- def calculate_average_sample_size ():
278- """Function I used to calculate the number 450. in the above code."""
279- sizes = []
280- samples = []
212+ raise NotHandled
281213
282- with handler (OpenAIAPIProvider (OpenAI ())):
283- for _ in range (10 ):
284- s = GameState .new (random .randint (3 , 6 ))
285- for i in range (100 ):
286- step = random .choice (s .valid_steps ())
287- s = s .apply (step ) or s
288- resp = MicroAgent (s ).predict_next_step ()
289- samples .append (resp )
290- sizes .append (len (resp ))
291- return sum (sizes ) / len (sizes )
214+ s = predict_next_step_inner (game_state )
215+ return (s .start , s .end )
292216
293217
294218def solve_hanoi (state : GameState ):
295219 log = []
296220
297221 for i in itertools .count ():
298222 print (f"step { i } - { state } " )
299- step = FirstToAheadMoveSelector (state ).do_voting ()
223+ with handler (KAheadSampler ()), handler (RetryLLMHandler ()):
224+ step = predict_next_step (state )
300225 # track the step at each point
301- log .append ((state , step ))
226+ if new_state := state .apply (step ):
227+ log .append ((state , step ))
302228
303- state = state . apply ( step )
229+ state = new_state or state
304230 state .visualise ()
305231 if state .is_done ():
306232 break
@@ -313,8 +239,6 @@ def solve_hanoi(state: GameState):
313239)
314240
315241with (
316- handler (ThreadPoolFuturesInterpretation ()),
317- handler (OpenAIAPIProvider (OpenAI ())),
318- handler (LLMLoggingHandler ()),
242+ handler (LiteLLMProvider (model_name = "gpt-4o-mini" )),
319243):
320244 solve_hanoi (state = GameState .new (3 ))
0 commit comments