Skip to content

Commit 98db67e

Browse files
authored
Merge pull request Swind#47 from JeffLIrion/async-pull-request
Implement async functionality
2 parents 40b1ae9 + a512af5 commit 98db67e

13 files changed

Lines changed: 683 additions & 0 deletions

File tree

ppadb/client_async.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from ppadb.command.host_async import HostAsync
2+
from ppadb.connection_async import ConnectionAsync
3+
4+
5+
class ClientAsync(HostAsync):
6+
def __init__(self, host='127.0.0.1', port=5037):
7+
self.host = host
8+
self.port = port
9+
10+
async def create_connection(self, timeout=None):
11+
conn = ConnectionAsync(self.host, self.port, timeout)
12+
return await conn.connect()
13+
14+
async def device(self, serial):
15+
devices = await self.devices()
16+
17+
for device in devices:
18+
if device.serial == serial:
19+
return device
20+
21+
return None
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
from ppadb.device_async import DeviceAsync
2+
3+
4+
class HostAsync:
5+
CONNECT_RESULT_PATTERN = "(connected to|already connected)"
6+
7+
OFFLINE = "offline"
8+
DEVICE = "device"
9+
BOOTLOADER = "bootloader"
10+
11+
async def _execute_cmd(self, cmd):
12+
async with await self.create_connection() as conn:
13+
await conn.send(cmd)
14+
return await conn.receive()
15+
16+
async def devices(self):
17+
cmd = "host:devices"
18+
result = await self._execute_cmd(cmd)
19+
20+
devices = []
21+
22+
for line in result.split('\n'):
23+
if not line:
24+
break
25+
26+
devices.append(DeviceAsync(self, line.split()[0]))
27+
28+
return devices
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import logging
2+
import re
3+
import time
4+
5+
6+
class TransportAsync:
7+
async def transport(self, connection):
8+
cmd = "host:transport:{}".format(self.serial)
9+
await connection.send(cmd)
10+
11+
return connection
12+
13+
async def shell(self, cmd, timeout=None):
14+
conn = await self.create_connection(timeout=timeout)
15+
16+
cmd = "shell:{}".format(cmd)
17+
await conn.send(cmd)
18+
19+
result = await conn.read_all()
20+
await conn.close()
21+
return result.decode('utf-8')
22+
23+
async def sync(self):
24+
conn = await self.create_connection()
25+
26+
cmd = "sync:"
27+
await conn.send(cmd)
28+
29+
return conn
30+
31+
async def screencap(self):
32+
async with await self.create_connection() as conn:
33+
cmd = "shell:/system/bin/screencap -p"
34+
await conn.send(cmd)
35+
result = await conn.read_all()
36+
37+
if result and len(result) > 5 and result[5] == 0x0d:
38+
return result.replace(b'\r\n', b'\n')
39+
else:
40+
return result

ppadb/connection_async.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
import asyncio
2+
import struct
3+
import socket
4+
5+
from ppadb.protocol import Protocol
6+
from ppadb.utils.logger import AdbLogging
7+
8+
logger = AdbLogging.get_logger(__name__)
9+
10+
11+
class ConnectionAsync:
12+
def __init__(self, host='localhost', port=5037, timeout=None):
13+
self.host = host
14+
self.port = int(port)
15+
self.timeout = timeout
16+
17+
self.reader = None
18+
self.writer = None
19+
20+
async def __aenter__(self):
21+
return self
22+
23+
async def __aexit__(self, type, value, traceback):
24+
await self.close()
25+
26+
async def connect(self):
27+
logger.debug("Connect to ADB server - %s:%d", self.host, self.port)
28+
29+
try:
30+
if self.timeout:
31+
self.reader, self.writer = await asyncio.wait_for(asyncio.open_connection(self.host, self.port), self.timeout)
32+
else:
33+
self.reader, self.writer = await asyncio.open_connection(self.host, self.port)
34+
35+
except (OSError, asyncio.TimeoutError) as e:
36+
raise RuntimeError("ERROR: connecting to {}:{} {}.\nIs adb running on your computer?".format(self.host, self.port, e))
37+
38+
return self
39+
40+
async def close(self):
41+
logger.debug("Connection closed...")
42+
43+
if self.writer:
44+
try:
45+
self.writer.close()
46+
await self.writer.wait_closed()
47+
except OSError:
48+
pass
49+
50+
self.reader = None
51+
self.writer = None
52+
53+
##############################################################################################################
54+
#
55+
# Send command & Receive command result
56+
#
57+
##############################################################################################################
58+
async def _recv(self, length):
59+
return await asyncio.wait_for(self.reader.read(length), self.timeout)
60+
61+
async def _send(self, data):
62+
self.writer.write(data)
63+
await asyncio.wait_for(self.writer.drain(), self.timeout)
64+
65+
async def receive(self):
66+
nob = int((await self._recv(4)).decode('utf-8'), 16)
67+
return (await self._recv(nob)).decode('utf-8')
68+
69+
async def send(self, msg):
70+
msg = Protocol.encode_data(msg)
71+
logger.debug(msg)
72+
await self._send(msg)
73+
return await self._check_status()
74+
75+
async def _check_status(self):
76+
recv = (await self._recv(4)).decode('utf-8')
77+
if recv != Protocol.OKAY:
78+
error = (await self._recv(1024)).decode('utf-8')
79+
raise RuntimeError("ERROR: {} {}".format(repr(recv), error))
80+
81+
return True
82+
83+
##############################################################################################################
84+
#
85+
# Socket read/write
86+
#
87+
##############################################################################################################
88+
async def read_all(self):
89+
data = bytearray()
90+
91+
while True:
92+
recv = await self._recv(4096)
93+
if not recv:
94+
break
95+
data += recv
96+
97+
return data
98+
99+
async def read(self, length=0):
100+
data = await self._recv(length)
101+
return data
102+
103+
async def write(self, data):
104+
await self._send(data)

ppadb/device_async.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
try:
2+
from asyncio import get_running_loop
3+
except ImportError: # pragma: no cover
4+
from asyncio import get_event_loop as get_running_loop # Python 3.6 compatibility
5+
6+
import re
7+
import os
8+
9+
from ppadb.command.transport_async import TransportAsync
10+
from ppadb.sync_async import SyncAsync
11+
12+
13+
def _get_src_info(src):
14+
exists = os.path.exists(src)
15+
isfile = os.path.isfile(src)
16+
isdir = os.path.isdir(src)
17+
basename = os.path.basename(src)
18+
walk = None if not isdir else list(os.walk(src))
19+
20+
return exists, isfile, isdir, basename, walk
21+
22+
23+
class DeviceAsync(TransportAsync):
24+
INSTALL_RESULT_PATTERN = "(Success|Failure|Error)\s?(.*)"
25+
UNINSTALL_RESULT_PATTERN = "(Success|Failure.*|.*Unknown package:.*)"
26+
27+
def __init__(self, client, serial):
28+
self.client = client
29+
self.serial = serial
30+
31+
async def create_connection(self, set_transport=True, timeout=None):
32+
conn = await self.client.create_connection(timeout=timeout)
33+
34+
if set_transport:
35+
await self.transport(conn)
36+
37+
return conn
38+
39+
async def _push(self, src, dest, mode, progress):
40+
# Create a new connection for file transfer
41+
sync_conn = await self.sync()
42+
sync = SyncAsync(sync_conn)
43+
44+
async with sync_conn:
45+
await sync.push(src, dest, mode, progress)
46+
47+
async def push(self, src, dest, mode=0o644, progress=None):
48+
exists, isfile, isdir, basename, walk = await get_running_loop().run_in_executor(None, _get_src_info, src)
49+
if not exists:
50+
raise FileNotFoundError("Cannot find {}".format(src))
51+
52+
if isfile:
53+
await self._push(src, dest, mode, progress)
54+
55+
elif isdir:
56+
for root, dirs, files in walk:
57+
root_dir_path = os.path.join(basename, root.replace(src, ""))
58+
59+
await self.shell("mkdir -p {}/{}".format(dest, root_dir_path))
60+
61+
for item in files:
62+
await self._push(os.path.join(root, item), os.path.join(dest, root_dir_path, item), mode, progress)
63+
64+
async def pull(self, src, dest):
65+
sync_conn = await self.sync()
66+
sync = SyncAsync(sync_conn)
67+
68+
async with sync_conn:
69+
return await sync.pull(src, dest)

ppadb/sync_async/__init__.py

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
try:
2+
from asyncio import get_running_loop
3+
except ImportError: # pragma: no cover
4+
from asyncio import get_event_loop as get_running_loop # Python 3.6 compatibility
5+
6+
import struct
7+
import os
8+
9+
from ppadb.protocol import Protocol
10+
from ppadb.sync.stats import S_IFREG
11+
from ppadb.utils.logger import AdbLogging
12+
13+
import aiofiles
14+
15+
logger = AdbLogging.get_logger(__name__)
16+
17+
18+
def _get_src_info(src):
19+
exists = os.path.exists(src)
20+
if not exists:
21+
return exists, None, None
22+
23+
timestamp = os.stat(src).st_mtime
24+
total_size = os.path.getsize(src)
25+
26+
return exists, timestamp, total_size
27+
28+
29+
class SyncAsync:
30+
DATA_MAX_LENGTH = 65536
31+
32+
def __init__(self, connection):
33+
self.connection = connection
34+
35+
async def push(self, src, dest, mode, progress=None):
36+
"""Push from local path |src| to |dest| on device.
37+
:param progress: callback, called with (filename, total_size, sent_size)
38+
"""
39+
exists, timestamp, total_size = await get_running_loop().run_in_executor(None, _get_src_info, src)
40+
41+
if not exists:
42+
raise FileNotFoundError("Can't find the source file {}".format(src))
43+
44+
sent_size = 0
45+
46+
# SEND
47+
mode = mode | S_IFREG
48+
args = "{dest},{mode}".format(dest=dest, mode=mode)
49+
await self._send_str(Protocol.SEND, args)
50+
51+
# DATA
52+
async with aiofiles.open(src, 'rb') as stream:
53+
while True:
54+
chunk = await stream.read(self.DATA_MAX_LENGTH)
55+
if not chunk:
56+
break
57+
58+
sent_size += len(chunk)
59+
await self._send_length(Protocol.DATA, len(chunk))
60+
await self.connection.write(chunk)
61+
62+
if progress is not None:
63+
progress(src, total_size, sent_size)
64+
65+
# DONE
66+
await self._send_length(Protocol.DONE, timestamp)
67+
await self.connection._check_status()
68+
69+
async def pull(self, src, dest):
70+
# RECV
71+
await self._send_str(Protocol.RECV, src)
72+
73+
# DATA
74+
async with aiofiles.open(dest, 'wb') as stream:
75+
while True:
76+
flag = (await self.connection.read(4)).decode('utf-8')
77+
78+
if flag == Protocol.DATA:
79+
data = await self._read_data()
80+
await stream.write(data)
81+
continue
82+
83+
if flag == Protocol.DONE:
84+
await self.connection.read(4)
85+
return
86+
87+
if flag == Protocol.FAIL:
88+
return (await self._read_data()).decode('utf-8')
89+
90+
@staticmethod
91+
def _integer(little_endian):
92+
return struct.unpack("<I", little_endian)
93+
94+
@staticmethod
95+
def _little_endian(n):
96+
return struct.pack('<I', n)
97+
98+
async def _read_data(self):
99+
length = self._integer(await self.connection.read(4))[0]
100+
data = bytearray()
101+
while len(data) < length:
102+
data += await self.connection.read(length - len(data))
103+
return data
104+
105+
async def _send_length(self, cmd, length):
106+
le_len = self._little_endian(length)
107+
data = cmd.encode() + le_len
108+
109+
logger.debug("Send length: {}".format(data))
110+
await self.connection.write(data)
111+
112+
async def _send_str(self, cmd, args):
113+
"""
114+
Format:
115+
{Command}{args length(little endian)}{str}
116+
Length:
117+
{4}{4}{str length}
118+
"""
119+
logger.debug("{} {}".format(cmd, args))
120+
args = args.encode('utf-8')
121+
122+
le_args_len = self._little_endian(len(args))
123+
data = cmd.encode() + le_args_len + args
124+
logger.debug("Send string: {}".format(data))
125+
await self.connection.write(data)

0 commit comments

Comments
 (0)