Skip to content

Commit 100db22

Browse files
authored
Merge pull request #25 from jamesleesaunders/fake-serial
Improved Fake Serial device used by unit tests
2 parents 059fe9f + e3a933e commit 100db22

4 files changed

Lines changed: 176 additions & 97 deletions

File tree

xbee/tests/Fake.py

Lines changed: 107 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -4,53 +4,123 @@
44
55
By Paul Malmsten, 2010
66
pmalmsten@gmail.com
7+
Updated by James Saunders, 2016
8+
Inspired by code written by D. Thiebaut http://cs.smith.edu/dftwiki/index.php/PySerial_Simulator
79
810
Provides fake device objects for other unit tests.
911
"""
1012
import sys
1113

12-
class FakeDevice(object):
13-
"""
14-
Represents a fake serial port for testing purposes
15-
"""
16-
def __init__(self):
17-
self.data = b''
18-
19-
def write(self, data):
14+
class Serial(object):
15+
def __init__( self, port='/dev/null', baudrate = 19200, timeout=1,
16+
bytesize = 8, parity = 'N', stopbits = 1, xonxoff=0,
17+
rtscts = 0 ):
2018
"""
21-
Writes data to the fake port for later evaluation
19+
Init constructor, setup standard serial variables with default values.
2220
"""
23-
self.data = data
21+
self.name = port
22+
self.port = port
23+
self.timeout = timeout
24+
self.parity = parity
25+
self.baudrate = baudrate
26+
self.bytesize = bytesize
27+
self.stopbits = stopbits
28+
self.xonxoff = xonxoff
29+
self.rtscts = rtscts
30+
self._is_open = True
2431

25-
class FakeReadDevice(object):
26-
"""
27-
Represents a fake serial port which can be read from in a similar
28-
fashion to the real thing
29-
"""
30-
31-
def __init__(self, data, silent_on_empty=False):
32-
self.data = data
33-
self.read_index = 0
34-
self.silent_on_empty = silent_on_empty
35-
36-
def read(self, length=1):
32+
self._data_written = ""
33+
self._read_data = ""
34+
35+
def isOpen( self ):
3736
"""
38-
Read the indicated number of bytes from the port
37+
Returns True if the serial port is open, otherwise False.
3938
"""
40-
# If too many bytes would be read, raise exception
41-
if self.read_index + length > len(self.data):
42-
if self.silent_on_empty:
43-
sys.exit(0)
44-
else:
45-
raise ValueError("Not enough bytes exist!")
46-
47-
read_data = self.data[self.read_index:self.read_index + length]
48-
self.read_index += length
49-
50-
return read_data
39+
return self._isOpen
40+
41+
def open( self ):
42+
"""
43+
Open the serial port.
44+
"""
45+
self._is_open = True
46+
47+
def close( self ):
48+
"""
49+
Close the serial port.
50+
"""
51+
self._is_open = False
52+
53+
def write( self, data ):
54+
"""
55+
Write a string of characters to the serial port.
56+
"""
57+
self._data_written = data
58+
59+
def read( self, len=1 ):
60+
"""
61+
Read the indicated number of bytes from the port.
62+
"""
63+
data = self._read_data[0:len]
64+
self._read_data = self._read_data[len:]
65+
return data
66+
67+
def readline( self ):
68+
"""
69+
Read characters from the port until a '\n' (newline) is found.
70+
"""
71+
returnIndex = self._read_data.index( "\n" )
72+
if returnIndex != -1:
73+
data = self._read_data[0:returnIndex+1]
74+
self._read_data = self._read_data[returnIndex+1:]
75+
return data
76+
else:
77+
return ""
78+
79+
def inWaiting( self ):
80+
"""
81+
Returns the number of bytes available to be read.
82+
"""
83+
return len(self._read_data)
84+
85+
def getSettingsDict( self ):
86+
""""
87+
Get a dictionary with port settings.
88+
"""
89+
settings = {
90+
'timeout' : self.timeout,
91+
'parity' : self.parity,
92+
'baudrate' : self.baudrate,
93+
'bytesize' : self.bytesize,
94+
'stopbits' : self.stopbits,
95+
'xonxoff' : self.xonxoff,
96+
'rtscts' : self.rtscts
97+
}
98+
return settings
99+
100+
def set_read_data( self, data ):
101+
"""
102+
Set fake data to be be returned by the read() and readline() functions.
103+
"""
104+
self._read_data = data
105+
106+
def get_data_written( self ):
107+
"""
108+
Return record of data sent via the write command.
109+
"""
110+
return(self._data_written)
111+
112+
def set_silent_on_empty( self, flag ):
113+
"""
114+
Set silent on error flag. If True do not error.
115+
"""
116+
self._silent_on_empty = flag
51117

52-
def inWaiting(self):
118+
def __str__( self ):
53119
"""
54-
Returns the number of bytes available to be read
120+
Returns a string representation of the serial class.
55121
"""
56-
return len(self.data) - self.read_index
122+
return "Serial<id=0xa81c10, open=%s>( port='%s', baudrate=%d," \
123+
% ( str(self._is_open), self.port, self.baudrate ) \
124+
+ " bytesize=%d, parity='%s', stopbits=%d, xonxoff=%d, rtscts=%d)"\
125+
% ( self.bytesize, self.parity, self.stopbits, self.xonxoff,
126+
self.rtscts )

xbee/tests/test_base.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
"""
1010
import unittest
1111
from xbee.base import XBeeBase
12-
from xbee.tests.Fake import FakeDevice, FakeReadDevice
12+
from xbee.tests.Fake import Serial
1313

1414
class TestWriteToDevice(unittest.TestCase):
1515
"""
@@ -22,42 +22,45 @@ def test_write(self):
2222
_write method should write the expected data to the serial
2323
device
2424
"""
25-
device = FakeDevice()
25+
device = Serial()
2626

2727
xbee = XBeeBase(device)
2828
xbee._write(b'\x00')
2929

3030
# Check resuting state of fake device
31+
result_frame = device.get_data_written()
3132
expected_frame = b'\x7E\x00\x01\x00\xFF'
32-
self.assertEqual(device.data, expected_frame)
33+
self.assertEqual(result_frame, expected_frame)
3334

3435
def test_write_again(self):
3536
"""
3637
_write method should write the expected data to the serial
3738
device
3839
"""
39-
device = FakeDevice()
40+
device = Serial()
4041

4142
xbee = XBeeBase(device)
4243
xbee._write(b'\x00\x01\x02')
4344

4445
# Check resuting state of fake device
4546
expected_frame = b'\x7E\x00\x03\x00\x01\x02\xFC'
46-
self.assertEqual(device.data, expected_frame)
47+
result_frame = device.get_data_written()
48+
self.assertEqual(result_frame, expected_frame)
4749

4850
def test_write_escaped(self):
4951
"""
5052
_write method should write the expected data to the serial
5153
device
5254
"""
53-
device = FakeDevice()
55+
device = Serial()
5456

5557
xbee = XBeeBase(device,escaped=True)
5658
xbee._write(b'\x7E\x01\x7D\x11\x13')
5759

5860
# Check resuting state of fake device
5961
expected_frame = b'\x7E\x00\x05\x7D\x5E\x01\x7D\x5D\x7D\x31\x7D\x33\xDF'
60-
self.assertEqual(device.data, expected_frame)
62+
result_frame = device.get_data_written()
63+
self.assertEqual(result_frame, expected_frame)
6164

6265
class TestReadFromDevice(unittest.TestCase):
6366
"""
@@ -68,7 +71,8 @@ def test_read(self):
6871
"""
6972
_wait_for_frame should properly read a frame of data
7073
"""
71-
device = FakeReadDevice(b'\x7E\x00\x01\x00\xFF')
74+
device = Serial()
75+
device.set_read_data(b'\x7E\x00\x01\x00\xFF')
7276
xbee = XBeeBase(device)
7377

7478
frame = xbee._wait_for_frame()
@@ -78,8 +82,8 @@ def test_read_invalid_followed_by_valid(self):
7882
"""
7983
_wait_for_frame should skip invalid data
8084
"""
81-
device = FakeReadDevice(
82-
b'\x7E\x00\x01\x00\xFA' + b'\x7E\x00\x01\x05\xFA')
85+
device = Serial()
86+
device.set_read_data(b'\x7E\x00\x01\x00\xFA' + b'\x7E\x00\x01\x05\xFA')
8387
xbee = XBeeBase(device)
8488

8589
frame = xbee._wait_for_frame()
@@ -90,7 +94,8 @@ def test_read_escaped(self):
9094
_wait_for_frame should properly read a frame of data
9195
Verify that API mode 2 escaped bytes are read correctly
9296
"""
93-
device = FakeReadDevice(b'\x7E\x00\x04\x7D\x5E\x7D\x5D\x7D\x31\x7D\x33\xE0')
97+
device = Serial()
98+
device.set_read_data(b'\x7E\x00\x04\x7D\x5E\x7D\x5D\x7D\x31\x7D\x33\xE0')
9499

95100
xbee = XBeeBase(device,escaped=True)
96101

@@ -150,7 +155,7 @@ class TestAsyncCallback(unittest.TestCase):
150155

151156
def setUp(self):
152157
self.xbee = None
153-
self.serial = FakeReadDevice([], silent_on_empty=True)
158+
self.serial = Serial()
154159
self.callback = lambda data: None
155160
self.error_callback = lambda data: None
156161

xbee/tests/test_fake.py

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,23 @@
88
Tests fake device objects for proper functionality.
99
"""
1010
import unittest
11-
from xbee.tests.Fake import FakeReadDevice
11+
from xbee.tests.Fake import Serial
1212

13-
class TestFakeReadDevice(unittest.TestCase):
13+
class TestFakeSerialRead(unittest.TestCase):
1414
"""
15-
FakeReadDevice class should work as intended to emluate a serial
16-
port
15+
Fake Serial class should work as intended to emluate reading from a serial port.
1716
"""
17+
1818
def setUp(self):
1919
"""
20-
Create a fake read device for each test
20+
Create a fake read device for each test.
2121
"""
22-
self.device = FakeReadDevice("test")
23-
22+
self.device = Serial()
23+
self.device.set_read_data("test")
24+
2425
def test_read_single_byte(self):
2526
"""
26-
reading one byte at a time should work as expected
27+
Reading one byte at a time should work as expected.
2728
"""
2829
self.assertEqual(self.device.read(), 't')
2930
self.assertEqual(self.device.read(), 'e')
@@ -32,13 +33,14 @@ def test_read_single_byte(self):
3233

3334
def test_read_multiple_bytes(self):
3435
"""
35-
reading multiple bytes at a time should work as expected
36+
Reading multiple bytes at a time should work as expected.
3637
"""
3738
self.assertEqual(self.device.read(3), 'tes')
3839
self.assertEqual(self.device.read(), 't')
3940

40-
def test_read_too_many(self):
41+
def test_write(self):
4142
"""
42-
attempting to read too many bytes should raise an exception
43+
Test serial write function.
4344
"""
44-
self.assertRaises(ValueError, self.device.read, 5)
45+
self.device.write("Hello World")
46+
self.assertEqual(self.device.get_data_written(), "Hello World")

0 commit comments

Comments
 (0)