forked from nonolith/pixelpulse
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbuspirate.py
More file actions
122 lines (103 loc) · 3.07 KB
/
buspirate.py
File metadata and controls
122 lines (103 loc) · 3.07 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python2
# BusPirate driver for Pixelpulse
# Distributed under the terms of the BSD License
# (C) 2011 Kevin Mehall (Nonolith Labs) <km@kevinmehall.net>
from optparse import OptionParser
import time
import tornado_serial
import pixelpulse
channel_masks = {
'CS': (1<<0),
'MISO': (1<<1),
'CLK': (1<<2),
'MOSI': (1<<3),
'AUX': (1<<4),
}
class BusPirateDevice(pixelpulse.Device):
def __init__(self, port='/dev/ttyUSB0', colors=None, pullups=True):
opts = dict(
onSet=self.onSet, stateOptions=['input','output']
)
self.digitalChannels = [
pixelpulse.DigitalChannel(name, showGraph=True, color=colors[name], **opts)
for name in ['AUX','MOSI', 'CLK', 'MISO', 'CS']]
self.channels = self.digitalChannels
self.port = port
self.recv_mode = 'init'
self.output_state = (1<<7)|(1<<6)
if pullups:
self.output_state |= (1<<5)
self.output_mode = 0x5f # all pins input
self.initData = ''
def start(self, server):
def onReceive(data):
if self.recv_mode == 'init':
self.initData += data
if 'BBIO1' in self.initData:
print "BusPirate initialized"
time.sleep(0.1)
self.serial.flush()
self.recv_mode = 'input'
server.poll(self.onPoll)
elif self.recv_mode == 'input':
data = ord(data[0])
out = []
for chan in self.digitalChannels:
mask = channel_masks[chan.name]
out.append((chan, bool(data & mask)))
server.data(out)
self.serial = tornado_serial.TornadoSerial(self.port, 115200, onReceive, on_error=server.quit)
self.enter_bbio()
def enter_bbio(self):
self.reset()
self.serial.write('\r\n'*10 + '#')
time.sleep(0.1)
self.serial.write('\0'*20)
self.recv_mode = 'init'
def reset(self):
self.serial.write('\x0f')
time.sleep(0.1)
self.serial.flush()
def onPoll(self):
self.serial.write(chr(self.output_state) + chr(self.output_mode))
def onSet(self, chan, val, state):
mask = channel_masks[chan.name]
self.output_mode &= ~mask
if state == 'output':
self.output_mode &= ~mask
chan.setState('output')
elif state == 'input':
self.output_mode |= mask
chan.setState('input')
if state == 'output' and val is not None:
if val:
self.output_state |= mask
else:
self.output_state &= ~mask
COLORS = {
'sparkfun': {
'CS': 'red',
'MISO': 'brown',
'CLK': '#eeee00',
'MOSI': 'orange',
'AUX': '#00ff00',
},
'seeed': {
'CS': '#bbb',
'MISO': 'black',
'CLK': '#ff00ff',
'MOSI': '#888',
'AUX': 'blue',
}
}
if __name__ == '__main__':
op = OptionParser()
op.add_option("-p", dest="port", help="Use BusPirate attached to port PORT", metavar="PORT", default="auto")
op.add_option("-c", dest="colors", type="choice", choices=COLORS.keys(), default="sparkfun",
help="Wire colors - 'seeed' or 'sparkfun'", metavar="MODEL")
op.add_option("-d", dest="nopullups", action="store_true", help="Disable pullups",)
(options, args) = op.parse_args()
port = tornado_serial.check_port(options.port)
dev = BusPirateDevice(port, COLORS[options.colors], not options.nopullups)
server = pixelpulse.DataServer(dev)
server.start()