Skip to content

Commit 311aa40

Browse files
authored
Move file I/O out of the event loop (#1)
* Use aiofiles for file I/O * Move all I/O out of the event loop * Delete commented out code
1 parent 1b7cfc0 commit 311aa40

5 files changed

Lines changed: 87 additions & 21 deletions

File tree

ppadb/device_async.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,25 @@
1+
try:
2+
from asyncio import get_running_loop
3+
except ImportError: # pragma: no cover
4+
from asyncio import get_event_loop as get_running_loop # Python 3.6 compatibility
5+
16
import re
27
import os
38

49
from ppadb.command.transport_async import TransportAsync
510
from ppadb.sync_async import SyncAsync
611

712

13+
def _get_src_info(src):
14+
exists = os.path.exists(src)
15+
isfile = os.path.isfile(src)
16+
isdir = os.path.isdir(src)
17+
basename = os.path.basename(src)
18+
walk = None if not isdir else list(os.walk(src))
19+
20+
return exists, isfile, isdir, basename, walk
21+
22+
823
class DeviceAsync(TransportAsync):
924
INSTALL_RESULT_PATTERN = "(Success|Failure|Error)\s?(.*)"
1025
UNINSTALL_RESULT_PATTERN = "(Success|Failure.*|.*Unknown package:.*)"
@@ -30,16 +45,15 @@ async def _push(self, src, dest, mode, progress):
3045
await sync.push(src, dest, mode, progress)
3146

3247
async def push(self, src, dest, mode=0o644, progress=None):
33-
if not os.path.exists(src):
48+
exists, isfile, isdir, basename, walk = await get_running_loop().run_in_executor(None, _get_src_info, src)
49+
if not exists:
3450
raise FileNotFoundError("Cannot find {}".format(src))
3551

36-
if os.path.isfile(src):
52+
if isfile:
3753
await self._push(src, dest, mode, progress)
3854

39-
elif os.path.isdir(src):
40-
basename = os.path.basename(src)
41-
42-
for root, dirs, files in os.walk(src):
55+
elif isdir:
56+
for root, dirs, files in walk:
4357
root_dir_path = os.path.join(basename, root.replace(src, ""))
4458

4559
await self.shell("mkdir -p {}/{}".format(dest, root_dir_path))

ppadb/sync_async/__init__.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,31 @@
1+
try:
2+
from asyncio import get_running_loop
3+
except ImportError: # pragma: no cover
4+
from asyncio import get_event_loop as get_running_loop # Python 3.6 compatibility
5+
16
import struct
27
import os
38

49
from ppadb.protocol import Protocol
510
from ppadb.sync.stats import S_IFREG
611
from ppadb.utils.logger import AdbLogging
712

13+
import aiofiles
14+
815
logger = AdbLogging.get_logger(__name__)
916

1017

18+
def _get_src_info(src):
19+
exists = os.path.exists(src)
20+
if not exists:
21+
return exists, None, None
22+
23+
timestamp = os.stat(src).st_mtime
24+
total_size = os.path.getsize(src)
25+
26+
return exists, timestamp, total_size
27+
28+
1129
class SyncAsync:
1230
DATA_MAX_LENGTH = 65536
1331

@@ -18,14 +36,11 @@ async def push(self, src, dest, mode, progress=None):
1836
"""Push from local path |src| to |dest| on device.
1937
:param progress: callback, called with (filename, total_size, sent_size)
2038
"""
21-
if not os.path.exists(src):
22-
raise FileNotFoundError("Can't find the source file {}".format(src))
39+
exists, timestamp, total_size = await get_running_loop().run_in_executor(None, _get_src_info, src)
2340

24-
stat = os.stat(src)
25-
26-
timestamp = int(stat.st_mtime)
41+
if not exists:
42+
raise FileNotFoundError("Can't find the source file {}".format(src))
2743

28-
total_size = os.path.getsize(src)
2944
sent_size = 0
3045

3146
# SEND
@@ -34,9 +49,9 @@ async def push(self, src, dest, mode, progress=None):
3449
await self._send_str(Protocol.SEND, args)
3550

3651
# DATA
37-
with open(src, 'rb') as stream:
52+
async with aiofiles.open(src, 'rb') as stream:
3853
while True:
39-
chunk = stream.read(self.DATA_MAX_LENGTH)
54+
chunk = await stream.read(self.DATA_MAX_LENGTH)
4055
if not chunk:
4156
break
4257

@@ -56,13 +71,13 @@ async def pull(self, src, dest):
5671
await self._send_str(Protocol.RECV, src)
5772

5873
# DATA
59-
with open(dest, 'wb') as stream:
74+
async with aiofiles.open(dest, 'wb') as stream:
6075
while True:
6176
flag = (await self.connection.read(4)).decode('utf-8')
6277

6378
if flag == Protocol.DATA:
6479
data = await self._read_data()
65-
stream.write(data)
80+
await stream.write(data)
6681
continue
6782

6883
if flag == Protocol.DONE:

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
license='MIT license',
3232
packages=find_packages(exclude=["*.test", "*.test.*", "test.*", "test"]),
3333
install_requires=[],
34+
extras_require={"async": ["aiofiles>=0.4.0"]},
3435
keywords="adb",
3536
classifiers=classifiers,
3637
)

test_async/patchers.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
"""Patches for async socket functionality."""
22

3-
from contextlib import contextmanager
3+
from contextlib import asynccontextmanager
44
from unittest.mock import patch
55

66
try:
@@ -13,6 +13,39 @@ async def __call__(self, *args, **kwargs):
1313
return super(AsyncMock, self).__call__(*args, **kwargs)
1414

1515

16+
def async_mock_open(read_data=""):
17+
class AsyncMockFile:
18+
def __init__(self, read_data):
19+
self.read_data = read_data
20+
_async_mock_open.written = read_data[:0]
21+
22+
async def read(self, size=-1):
23+
if size == -1:
24+
ret = self.read_data
25+
self.read_data = self.read_data[:0]
26+
return ret
27+
28+
n = min(size, len(self.read_data))
29+
ret = self.read_data[:n]
30+
self.read_data = self.read_data[n:]
31+
return ret
32+
33+
async def write(self, b):
34+
if _async_mock_open.written:
35+
_async_mock_open.written += b
36+
else:
37+
_async_mock_open.written = b
38+
39+
@asynccontextmanager
40+
async def _async_mock_open(*args, **kwargs):
41+
try:
42+
yield AsyncMockFile(read_data)
43+
finally:
44+
pass
45+
46+
return _async_mock_open
47+
48+
1649
class FakeStreamWriter:
1750
def close(self):
1851
pass

test_async/test_device_async.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55

66
import asyncio
7+
from contextlib import asynccontextmanager
78
import os
89
import sys
910
import unittest
@@ -16,13 +17,15 @@
1617
from ppadb.sync_async import SyncAsync
1718

1819
from .async_wrapper import awaiter
19-
from .patchers import FakeStreamReader, FakeStreamWriter, async_patch
20+
from .patchers import FakeStreamReader, FakeStreamWriter, async_mock_open, async_patch
2021

2122

2223
PNG_IMAGE = b'\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\n\x00\x00\x00\n\x08\x06\x00\x00\x00\x8d2\xcf\xbd\x00\x00\x00\x04sBIT\x08\x08\x08\x08|\x08d\x88\x00\x00\x00\tpHYs\x00\x00\x0fa\x00\x00\x0fa\x01\xa8?\xa7i\x00\x00\x00\x0eIDAT\x18\x95c`\x18\x05\x83\x13\x00\x00\x01\x9a\x00\x01\x16\xca\xd3i\x00\x00\x00\x00IEND\xaeB`\x82'
2324

2425
PNG_IMAGE_NEEDS_REPLACING = PNG_IMAGE[:5] + b'\r' + PNG_IMAGE[5:]
2526

27+
FILEDATA = b'Ohayou sekai.\nGood morning world!'
28+
2629

2730
class TestDevice(unittest.TestCase):
2831
@awaiter
@@ -72,7 +75,7 @@ def progress(*args, **kwargs):
7275
pass
7376

7477
filedata = b'Ohayou sekai.\nGood morning world!'
75-
with patch('os.path.exists', return_value=True), patch('os.path.isfile', return_value=True), patch('ppadb.device_async.open', mock_open(read_data=filedata)), patch('os.stat', return_value=os.stat_result((123,) * 10)), patch('ppadb.sync_async.open', mock_open(read_data=filedata)):
78+
with patch('os.path.exists', return_value=True), patch('os.path.isfile', return_value=True), patch('os.stat', return_value=os.stat_result((123,) * 10)), patch('ppadb.sync_async.aiofiles.open', async_mock_open(FILEDATA)):
7679
with async_patch('asyncio.open_connection', return_value=(FakeStreamReader(), FakeStreamWriter())):
7780
with async_patch('{}.FakeStreamReader.read'.format(__name__), side_effect=[b'OKAY', b'OKAY', b'OKAY', PNG_IMAGE_NEEDS_REPLACING, b'', b'OKAY']):
7881
await self.device.push('src', 'dest', progress=progress)
@@ -87,14 +90,14 @@ async def test_push_dir(self):
8790
async def test_pull(self):
8891
with async_patch('asyncio.open_connection', return_value=(FakeStreamReader(), FakeStreamWriter())):
8992
with async_patch('{}.FakeStreamReader.read'.format(__name__), side_effect=[b'OKAY', b'OKAY', b'DATA', SyncAsync._little_endian(4), b'TEST', b'DONE', b'OKAY']):
90-
with patch('ppadb.sync_async.open', mock_open()):
93+
with patch('ppadb.sync_async.aiofiles.open', async_mock_open(FILEDATA)):
9194
await self.device.pull('src', 'dest')
9295

9396
@awaiter
9497
async def test_pull_fail(self):
9598
with async_patch('asyncio.open_connection', return_value=(FakeStreamReader(), FakeStreamWriter())):
9699
with async_patch('{}.FakeStreamReader.read'.format(__name__), side_effect=[b'OKAY', b'OKAY', b'FAIL', SyncAsync._little_endian(4), b'TEST', b'DONE', b'OKAY']):
97-
with patch('ppadb.sync_async.open', mock_open()):
100+
with patch('ppadb.sync_async.aiofiles.open', async_mock_open(FILEDATA)):
98101
await self.device.pull('src', 'dest')
99102

100103

0 commit comments

Comments
 (0)