Skip to content

Commit 3d215d1

Browse files
committed
Add emulator support
- Add Emulator object for emulating bpod hardware behavior and communicating outputs to emulator GUI server - Modify bpod classes to accommodate emulator functionality - Add emulator examples to showcase its usage
1 parent 4c9ced5 commit 3d215d1

28 files changed

Lines changed: 1815 additions & 155 deletions
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from pybpodapi.protocol import Bpod, StateMachine
2+
my_bpod = Bpod(emulator_mode=True)
3+
sma = StateMachine(my_bpod)
4+
for i in range(10):
5+
sma.add_state(
6+
state_name='State{}'.format(i),
7+
state_timer=1,
8+
state_change_conditions={Bpod.Events.Tup: 'State{}'.format(i + 1)},
9+
output_actions=[])
10+
sma.add_state(
11+
state_name='State10',
12+
state_timer=1,
13+
state_change_conditions={Bpod.Events.Tup: 'exit'},
14+
output_actions=[])
15+
my_bpod.send_state_machine(sma)
16+
my_bpod.run_state_machine(sma)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from pybpodapi.protocol import Bpod, StateMachine
2+
my_bpod = Bpod(emulator_mode=True)
3+
sma = StateMachine(my_bpod)
4+
# Set global timer 1 for 3 seconds, following a 1.5 second onset delay after
5+
# trigger. Link to LED of port 2.
6+
sma.set_global_timer(timer_id=1, timer_duration=0, on_set_delay=1.5,
7+
channel=Bpod.OutputChannels.PWM2, on_message=255)
8+
for i in range(10):
9+
sma.add_state(
10+
state_name='State{}'.format(i),
11+
state_timer=1,
12+
state_change_conditions={Bpod.Events.Tup: 'State{}'.format(i + 1)},
13+
output_actions=[])
14+
sma.add_state(
15+
state_name='State10',
16+
state_timer=1,
17+
state_change_conditions={Bpod.Events.Tup: 'exit'},
18+
output_actions=[])
19+
my_bpod.send_state_machine(sma)
20+
my_bpod.run_state_machine(sma)
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from pybpodapi.protocol import Bpod, StateMachine
2+
my_bpod = Bpod(emulator_mode=True)
3+
sma = StateMachine(my_bpod)
4+
sma.add_state(
5+
state_name='myState',
6+
state_timer=1,
7+
state_change_conditions={Bpod.Events.Tup: 'exit'},
8+
output_actions=[])
9+
my_bpod.send_state_machine(sma)
10+
my_bpod.run_state_machine(sma)
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
# !/usr/bin/python3
2+
# -*- coding: utf-8 -*-
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
import random
2+
import time
3+
from pybpodapi.protocol import Bpod, StateMachine
4+
from concurrent.futures import ThreadPoolExecutor
5+
my_bpod = Bpod(emulator_mode=True)
6+
nTrials = 5
7+
graceTime = 5
8+
port_numbers = [1, 2, 3]
9+
trialTypes = [1, 2] # 1 (rewarded left) or 2 (rewarded right)
10+
for i in range(nTrials): # Main loop
11+
print('Trial: ', i + 1)
12+
thisTrialType = random.choice(trialTypes) # Randomly choose trial type
13+
if thisTrialType == 1:
14+
# set stimulus channel for trial type 1
15+
stimulus = Bpod.OutputChannels.PWM1
16+
leftAction = 'Reward'
17+
rightAction = 'Punish'
18+
rewardValve = 1
19+
elif thisTrialType == 2:
20+
# set stimulus channel for trial type 1
21+
stimulus = Bpod.OutputChannels.PWM3
22+
leftAction = 'Punish'
23+
rightAction = 'Reward'
24+
rewardValve = 3
25+
sma = StateMachine(my_bpod)
26+
sma.set_global_timer_legacy(
27+
timer_id=1, timer_duration=graceTime) # Set timeout
28+
sma.add_state(
29+
state_name='WaitForPort2Poke',
30+
state_timer=1,
31+
state_change_conditions={Bpod.Events.Port2In: 'FlashStimulus'},
32+
output_actions=[('PWM2', 255)])
33+
sma.add_state(
34+
state_name='FlashStimulus',
35+
state_timer=0.1,
36+
state_change_conditions={Bpod.Events.Tup: 'WaitForResponse'},
37+
output_actions=[(stimulus, 255),
38+
(Bpod.OutputChannels.GlobalTimerTrig, 1)])
39+
sma.add_state(
40+
state_name='WaitForResponse',
41+
state_timer=1,
42+
state_change_conditions={Bpod.Events.Port1In: leftAction,
43+
Bpod.Events.Port3In: rightAction,
44+
Bpod.Events.Port2In: 'Warning',
45+
Bpod.Events.GlobalTimer1_End: 'MiniPunish'},
46+
output_actions=[])
47+
sma.add_state(
48+
state_name='Warning',
49+
state_timer=0.1,
50+
state_change_conditions={Bpod.Events.Tup: 'WaitForResponse',
51+
Bpod.Events.GlobalTimer1_End: 'MiniPunish'},
52+
output_actions=[(Bpod.OutputChannels.LED, 1),
53+
(Bpod.OutputChannels.LED, 2),
54+
(Bpod.OutputChannels.LED, 3)]) # Reward correct choice
55+
sma.add_state(
56+
state_name='Reward',
57+
state_timer=0.1,
58+
state_change_conditions={Bpod.Events.Tup: 'exit'},
59+
# Reward correct choice
60+
output_actions=[(Bpod.OutputChannels.Valve, rewardValve)])
61+
sma.add_state(
62+
state_name='Punish',
63+
state_timer=3,
64+
state_change_conditions={Bpod.Events.Tup: 'exit'},
65+
# Signal incorrect choice
66+
output_actions=[(Bpod.OutputChannels.LED, 1),
67+
(Bpod.OutputChannels.LED, 2),
68+
(Bpod.OutputChannels.LED, 3)])
69+
sma.add_state(
70+
state_name='MiniPunish',
71+
state_timer=1,
72+
state_change_conditions={Bpod.Events.Tup: 'exit'},
73+
# Signal incorrect choice
74+
output_actions=[(Bpod.OutputChannels.LED, 1),
75+
(Bpod.OutputChannels.LED, 2),
76+
(Bpod.OutputChannels.LED, 3)])
77+
# Send state machine description to Bpod device
78+
my_bpod.send_state_machine(sma)
79+
print("Waiting for poke. Reward: ",
80+
'left' if thisTrialType == 1 else 'right')
81+
82+
def mouse(data):
83+
time.sleep(3)
84+
my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port',
85+
channel_number=2,
86+
value=12)
87+
time.sleep(2)
88+
my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port',
89+
channel_number=random.choice(port_numbers),
90+
value=12)
91+
92+
executor = ThreadPoolExecutor(max_workers=1)
93+
executor.submit(mouse, None)
94+
95+
my_bpod.run_state_machine(sma) # Run state machine
96+
print("Current trial info: {0}".format(my_bpod.session.current_trial))
97+
my_bpod.close() # Disconnect Bpod
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import random
2+
import time
3+
from pybpodapi.protocol import Bpod, StateMachine
4+
from concurrent.futures import ThreadPoolExecutor
5+
my_bpod = Bpod(emulator_mode=True)
6+
nTrials = 5
7+
port_numbers = [1, 3]
8+
trialTypes = [1, 2] # 1 (rewarded left) or 2 (rewarded right)
9+
for i in range(nTrials): # Main loop
10+
print('Trial: ', i + 1)
11+
thisTrialType = random.choice(trialTypes) # Randomly choose trial type
12+
if thisTrialType == 1:
13+
# set stimulus channel for trial type 1
14+
stimulus = Bpod.OutputChannels.PWM1
15+
leftAction = 'Reward'
16+
rightAction = 'Punish'
17+
rewardValve = 1
18+
elif thisTrialType == 2:
19+
# set stimulus channel for trial type 1
20+
stimulus = Bpod.OutputChannels.PWM3
21+
leftAction = 'Punish'
22+
rightAction = 'Reward'
23+
rewardValve = 3
24+
sma = StateMachine(my_bpod)
25+
sma.add_state(
26+
state_name='WaitForPort2Poke',
27+
state_timer=1,
28+
state_change_conditions={Bpod.Events.Port2In: 'FlashStimulus'},
29+
output_actions=[(Bpod.OutputChannels.PWM2, 255)])
30+
sma.add_state(
31+
state_name='FlashStimulus',
32+
state_timer=0.1,
33+
state_change_conditions={Bpod.Events.Tup: 'WaitForResponse'},
34+
output_actions=[(stimulus, 255)])
35+
sma.add_state(
36+
state_name='WaitForResponse',
37+
state_timer=1,
38+
state_change_conditions={
39+
Bpod.Events.Port1In: leftAction, Bpod.Events.Port3In: rightAction},
40+
output_actions=[])
41+
sma.add_state(
42+
state_name='Reward',
43+
state_timer=0.1,
44+
state_change_conditions={Bpod.Events.Tup: 'exit'},
45+
# Reward correct choice
46+
output_actions=[(Bpod.OutputChannels.Valve, rewardValve)])
47+
sma.add_state(
48+
state_name='Punish',
49+
state_timer=3,
50+
state_change_conditions={Bpod.Events.Tup: 'exit'},
51+
# Signal incorrect choice
52+
output_actions=[(Bpod.OutputChannels.LED, 1),
53+
(Bpod.OutputChannels.LED, 2),
54+
(Bpod.OutputChannels.LED, 3)])
55+
# Send state machine description to Bpod device
56+
my_bpod.send_state_machine(sma)
57+
print("Waiting for poke. Reward: ",
58+
'left' if thisTrialType == 1 else 'right')
59+
60+
def mouse(data):
61+
time.sleep(3)
62+
my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port',
63+
channel_number=2, value=12)
64+
time.sleep(2)
65+
my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port',
66+
channel_number=random.choice(port_numbers),
67+
value=12)
68+
69+
executor = ThreadPoolExecutor(max_workers=1)
70+
executor.submit(mouse, None)
71+
my_bpod.run_state_machine(sma) # Run state machine
72+
print("Current trial info: {0}".format(my_bpod.session.current_trial))
73+
my_bpod.close() # Disconnect Bpod
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from pybpodapi.protocol import Bpod
2+
from confapp import conf
3+
my_bpod = Bpod(emulator_mode=True)
4+
my_bpod.close()
5+
print("Target Bpod firmware version: ", conf.TARGET_BPOD_FIRMWARE_VERSION)
6+
print("Firmware version (read from device): ",
7+
my_bpod.hardware.firmware_version)
8+
print("Machine type version (read from device): ",
9+
my_bpod.hardware.machine_type)
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
from pybpodapi.protocol import Bpod, StateMachine
2+
my_bpod = Bpod(emulator_mode=True)
3+
sma = StateMachine(my_bpod)
4+
sma.set_condition(condition_number=1,
5+
condition_channel='Port2', channel_value=1)
6+
sma.add_state(
7+
state_name='Port1Light',
8+
state_timer=1,
9+
state_change_conditions={Bpod.Events.Tup: 'Port2Light'},
10+
output_actions=[(Bpod.OutputChannels.PWM1, 255)])
11+
sma.add_state(
12+
state_name='Port2Light',
13+
state_timer=1,
14+
state_change_conditions={
15+
Bpod.Events.Tup: 'Port3Light', Bpod.Events.Condition1: 'Port3Light'},
16+
output_actions=[(Bpod.OutputChannels.PWM2, 255)])
17+
sma.add_state(
18+
state_name='Port3Light',
19+
state_timer=1,
20+
state_change_conditions={Bpod.Events.Tup: 'exit'},
21+
output_actions=[(Bpod.OutputChannels.PWM3, 255)])
22+
my_bpod.send_state_machine(sma)
23+
my_bpod.run_state_machine(sma)
24+
print("Current trial info: {0}".format(my_bpod.session.current_trial))
25+
my_bpod.close()
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import time
2+
from pybpodapi.protocol import Bpod, StateMachine
3+
from concurrent.futures import ThreadPoolExecutor
4+
my_bpod = Bpod(emulator_mode=True)
5+
sma = StateMachine(my_bpod)
6+
sma.set_condition(condition_number=1,
7+
condition_channel='Port2', channel_value=1)
8+
sma.add_state(
9+
state_name='Port1Light',
10+
state_timer=1,
11+
state_change_conditions={Bpod.Events.Tup: 'Port2Light'},
12+
output_actions=[(Bpod.OutputChannels.PWM1, 255)])
13+
sma.add_state(
14+
state_name='Port2Light',
15+
state_timer=1,
16+
state_change_conditions={
17+
Bpod.Events.Tup: 'Port1Light', Bpod.Events.Condition1: 'Port3Light'},
18+
output_actions=[(Bpod.OutputChannels.PWM2, 255)])
19+
sma.add_state(
20+
state_name='Port3Light',
21+
state_timer=1,
22+
state_change_conditions={Bpod.Events.Tup: 'exit'},
23+
output_actions=[(Bpod.OutputChannels.PWM3, 255)])
24+
my_bpod.send_state_machine(sma)
25+
26+
27+
def mouse(data):
28+
time.sleep(5)
29+
my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port',
30+
channel_number=2, value=12)
31+
32+
33+
executor = ThreadPoolExecutor(max_workers=1)
34+
executor.submit(mouse, None)
35+
36+
my_bpod.run_state_machine(sma)
37+
print("Current trial info: {0}".format(my_bpod.session.current_trial))
38+
my_bpod.close()
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
import time
2+
from pybpodapi.protocol import Bpod, StateMachine
3+
from concurrent.futures import ThreadPoolExecutor
4+
my_bpod = Bpod(emulator_mode=True)
5+
sma = StateMachine(my_bpod)
6+
sma.set_global_counter(counter_number=1, target_event='Port1In', threshold=5)
7+
sma.add_state(
8+
state_name='InitialDelay',
9+
state_timer=2,
10+
state_change_conditions={Bpod.Events.Tup: 'ResetGlobalCounter1'},
11+
output_actions=[(Bpod.OutputChannels.PWM2, 255)])
12+
sma.add_state(
13+
state_name='ResetGlobalCounter1',
14+
state_timer=0,
15+
state_change_conditions={Bpod.Events.Tup: 'Port1Lit'},
16+
output_actions=[(Bpod.OutputChannels.GlobalCounterReset, 1)])
17+
sma.add_state(
18+
# Infinite loop (with next state). Only a global counter can save us.
19+
state_name='Port1Lit',
20+
state_timer=.25,
21+
state_change_conditions={
22+
Bpod.Events.Tup: 'Port3Lit', 'GlobalCounter1_End': 'exit'},
23+
output_actions=[(Bpod.OutputChannels.PWM1, 255)])
24+
sma.add_state(
25+
state_name='Port3Lit',
26+
state_timer=.25,
27+
state_change_conditions={
28+
Bpod.Events.Tup: 'Port1Lit', 'GlobalCounter1_End': 'exit'},
29+
output_actions=[(Bpod.OutputChannels.PWM3, 255)])
30+
my_bpod.send_state_machine(sma)
31+
32+
33+
def mouse(data):
34+
time.sleep(1)
35+
for _ in range(5):
36+
time.sleep(1)
37+
my_bpod.manual_override(Bpod.ChannelTypes.INPUT, 'Port',
38+
channel_number=1, value=12)
39+
40+
41+
executor = ThreadPoolExecutor(max_workers=1)
42+
executor.submit(mouse, None)
43+
44+
my_bpod.run_state_machine(sma)
45+
print("Current trial info: {0}".format(my_bpod.session.current_trial))
46+
my_bpod.close()

0 commit comments

Comments
 (0)