Skip to content

Commit 712d4fb

Browse files
committed
added maker example
1 parent bfb7d7a commit 712d4fb

1 file changed

Lines changed: 313 additions & 0 deletions

File tree

tests/test_maker.py

Lines changed: 313 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,313 @@
1+
import itertools
2+
import logging
3+
import random
4+
import re
5+
from collections import Counter
6+
from typing import Optional
7+
8+
from openai import OpenAI
9+
from pydantic.dataclasses import dataclass
10+
11+
from effectful.handlers import futures
12+
from effectful.handlers.futures import Executor, ThreadPoolFuturesInterpretation
13+
from effectful.handlers.llm import Template
14+
from effectful.handlers.llm.providers import LLMLoggingHandler, OpenAIAPIProvider
15+
from effectful.ops.semantics import handler
16+
17+
18+
@dataclass
19+
class Step:
20+
start: int
21+
end: int
22+
23+
24+
@dataclass(frozen=True)
25+
class GameState:
26+
"""State of a game of towers of Hanoi where the initial state is a
27+
set of towers. We use higher numbers to represesnt smaller
28+
disks. So [1,2,3] is a valid tower. The towers are all stacked at
29+
the left at the start (self.towers[0]), and the goal is to move
30+
them to the rightmost tower (self.towers[-1])."""
31+
32+
size: int
33+
towers: tuple[tuple[int, ...], ...]
34+
35+
@classmethod
36+
def new(cls, size: int) -> "GameState":
37+
towers = [[] for _ in range(size)]
38+
towers[0] = list(reversed(range(size)))
39+
towers = tuple(tuple(tower) for tower in towers)
40+
return cls(size, towers)
41+
42+
def visualise_text(self):
43+
max_disk = self.size
44+
width = max_disk * 2 + 3
45+
for i, tower in enumerate(self.towers):
46+
print(f"\nTower {i}:")
47+
for disk in reversed(tower):
48+
disk_width = (disk + 1) * 2 - 1
49+
padding = (max_disk - disk_width) // 2
50+
print(" " * padding + "=" * disk_width + " " * padding)
51+
print("=" * width)
52+
print()
53+
54+
def visualise_image(self):
55+
"Uses python graphics libraries to visualise the state of the hanoi game."
56+
try:
57+
from PIL import Image, ImageDraw
58+
except ImportError:
59+
return None
60+
# Pillow-based visualization
61+
tower_width = 150
62+
disk_height = 30
63+
base_height = 20
64+
pole_width = 10
65+
img_width = tower_width * len(self.towers)
66+
img_height = disk_height * (self.size + 1) + base_height + 50
67+
68+
img = Image.new("RGB", (img_width, img_height), "white")
69+
draw = ImageDraw.Draw(img)
70+
71+
for tower_idx, tower in enumerate(self.towers):
72+
# Draw pole
73+
pole_x = tower_idx * tower_width + tower_width // 2
74+
pole_top = 40
75+
pole_bottom = img_height - base_height - 10
76+
draw.rectangle(
77+
[
78+
pole_x - pole_width // 2,
79+
pole_top,
80+
pole_x + pole_width // 2,
81+
pole_bottom,
82+
],
83+
fill="brown",
84+
)
85+
86+
# Draw base
87+
base_y = img_height - base_height - 10
88+
draw.rectangle(
89+
[
90+
tower_idx * tower_width + 20,
91+
base_y,
92+
(tower_idx + 1) * tower_width - 20,
93+
base_y + base_height,
94+
],
95+
fill="gray",
96+
)
97+
98+
# Draw disks
99+
for disk_idx, disk in enumerate(tower):
100+
disk_width_px = 30 + disk * 15
101+
disk_y = pole_bottom - (disk_idx + 1) * disk_height
102+
disk_x1 = pole_x - disk_width_px // 2
103+
disk_x2 = pole_x + disk_width_px // 2
104+
105+
# Color gradient based on disk size
106+
color_intensity = int(255 * (disk / self.size))
107+
color = (color_intensity, 100, 255 - color_intensity)
108+
draw.rectangle(
109+
[disk_x1, disk_y, disk_x2, disk_y + disk_height - 5],
110+
fill=color,
111+
outline="black",
112+
width=2,
113+
)
114+
return img
115+
116+
def visualise(self):
117+
img = self.visualise_image()
118+
if img:
119+
img.show()
120+
else:
121+
self.visualise_text()
122+
123+
def apply(self, step: Step) -> Optional["GameState"]:
124+
"""
125+
Given a tower `start` and a target tower `end` moves the topmost disk to the end tower.
126+
"""
127+
start, end = step.start, step.end
128+
129+
if not (0 <= start < len(self.towers) and 0 <= end < len(self.towers)):
130+
return None
131+
132+
# start tower is non empty
133+
if len(self.towers[start]) == 0:
134+
return None
135+
136+
# end tower is a valid target
137+
if len(self.towers[end]) > 0 and self.towers[start][-1] > self.towers[end][-1]:
138+
return None
139+
140+
# create state with the move applied
141+
new_towers = [list(tower) for tower in self.towers]
142+
disk = new_towers[start].pop()
143+
new_towers[end].append(disk)
144+
145+
#
146+
new_state = GameState(
147+
size=self.size, towers=tuple(tuple(tower) for tower in new_towers)
148+
)
149+
return new_state
150+
151+
def steps_to_complete(self) -> int:
152+
"""Compute the number of steps to complete the towers of hanoi from a given configuration if using the optimal algorithm."""
153+
# Count disks on each tower
154+
total_moves = 0
155+
156+
# For each tower that's not the destination, we need to move all its disks
157+
for tower_idx, tower in enumerate(self.towers):
158+
if tower_idx == self.size - 1:
159+
continue
160+
161+
# Number of disks on this tower
162+
n_disks = len(tower)
163+
164+
if n_disks > 0:
165+
# Moving n disks from one peg to another requires 2^n - 1 moves
166+
total_moves += (2**n_disks) - 1
167+
168+
return total_moves
169+
170+
def is_done(self) -> bool:
171+
return all(len(tower) == 0 for tower in self.towers[:-1]) and all(
172+
self.towers[-1][i] > self.towers[-1][i + 1]
173+
for i in range(len(self.towers[-1]) - 1)
174+
)
175+
176+
def valid_steps(self) -> list[Step]:
177+
steps = []
178+
for i, tower_i in enumerate(self.towers):
179+
for j, tower_j in enumerate(self.towers):
180+
if i == j:
181+
continue
182+
if len(tower_i) == 0:
183+
continue
184+
# if tower_i's disk is smaller than tower_j's topmost, then it is valid to move from tower i to j
185+
if len(tower_j) == 0 or tower_i[-1] < tower_j[-1]:
186+
steps.append(Step(i, j))
187+
return steps
188+
189+
190+
class MicroAgent:
191+
"""Micro agent (based on MAKERS paper) responsible for predicting a single next step."""
192+
193+
game_state: GameState
194+
195+
def __init__(self, state: GameState):
196+
self.game_state = state
197+
198+
@Template.define
199+
def predict_next_step(self) -> str:
200+
"""
201+
Given the state of the game of towers of Hanoi as follows:
202+
203+
{self.game_state}
204+
205+
Predict the next step to complete the game (moving all disks to the rightmost tower).
206+
207+
Give a reasoning for your prediction, and return the step following the format:
208+
209+
<step>start,end</step>
210+
211+
where start and end are zero-based indices for the towers to move. Be concise and avoid wordy answers.
212+
"""
213+
pass
214+
215+
def parse_response(self, response: str) -> Step | None:
216+
"Parse the predicted step from an LLM response."
217+
pattern = r"<step>\s*(\d+)\s*,\s*(\d+)\s*</step>"
218+
m = re.search(pattern, response)
219+
if not m:
220+
raise ValueError(
221+
f"No valid <step>start,end</step> tag found in: {response!r}"
222+
)
223+
return Step(int(m.group(1)), int(m.group(2)))
224+
225+
def has_no_red_flags(self, response: str) -> Step | None:
226+
"""Returns the underlying step if the provided step has no red flags."""
227+
if len(response) > 450.0: # based on a sample
228+
return None
229+
230+
step = self.parse_response(response)
231+
if not step:
232+
return None
233+
if not (
234+
0 <= step.start < len(self.game_state.towers)
235+
and 0 <= step.end < len(self.game_state.towers)
236+
):
237+
return None
238+
if step not in self.game_state.valid_steps():
239+
return None
240+
return step
241+
242+
def get_vote(self): # algorithm 3
243+
while True:
244+
resp = self.predict_next_step()
245+
if step := self.has_no_red_flags(resp):
246+
return step
247+
248+
249+
class FirstToAheadMoveSelector:
250+
k: int
251+
game_state: GameState
252+
agents: list[MicroAgent]
253+
votes: Counter[Step]
254+
255+
def __init__(self, state: GameState, no_agents=6, k=3):
256+
self.k = k
257+
self.game_state = state
258+
self.agents = [MicroAgent(self.game_state) for _ in range(no_agents)]
259+
self.votes = Counter()
260+
261+
def do_voting(self) -> Step: # algorithm 2
262+
# run n in parallel repeatedly until k come out in top
263+
while True:
264+
# submit a batch of votes
265+
for vote in futures.as_completed(
266+
Executor.submit(agent.get_vote) for agent in self.agents
267+
):
268+
self.votes[vote] += 1
269+
max_other_votes = max(
270+
self.votes[o_vote] for o_vote in self.votes if o_vote != vote
271+
)
272+
if self.votes[vote] >= max_other_votes + self.k:
273+
return vote
274+
275+
276+
def calculate_average_sample_size():
277+
"""Function I used to calculate the number 450. in the above code."""
278+
sizes = []
279+
samples = []
280+
281+
with handler(OpenAIAPIProvider(OpenAI())):
282+
for _ in range(10):
283+
s = GameState.new(random.randint(3, 6))
284+
for i in range(100):
285+
step = random.choice(s.valid_steps())
286+
s = s.apply(step) or s
287+
resp = MicroAgent(s).predict_next_step()
288+
samples.append(resp)
289+
sizes.append(len(resp))
290+
return sum(sizes) / len(sizes)
291+
292+
293+
def solve_hanoi(state: GameState):
294+
log = []
295+
296+
for i in itertools.count():
297+
print(f"step {i} - {state}")
298+
step = FirstToAheadMoveSelector(state).do_voting()
299+
# track the step at each point
300+
log.append((state, step))
301+
302+
state = state.apply(step)
303+
state.visualise()
304+
305+
306+
logging.basicConfig()
307+
308+
with (
309+
handler(ThreadPoolFuturesInterpretation(max_workers=3)),
310+
handler(LLMLoggingHandler()),
311+
handler(OpenAIAPIProvider(OpenAI())),
312+
):
313+
solve_hanoi(state=GameState.new(3))

0 commit comments

Comments
 (0)