-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathddp.py
More file actions
137 lines (106 loc) · 3.47 KB
/
ddp.py
File metadata and controls
137 lines (106 loc) · 3.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
# -*- coding: utf-8 -*-
from __future__ import print_function
import re
import socket
import logging
UDP_IP = ''
UDP_PORT = 0
DDP_PORT = 987
DDP_VERSION = '00020020'
_LOGGER = logging.getLogger(__name__)
def get_ddp_message(msg_type, data=None):
"""Get DDP message."""
msg = u'{} * HTTP/1.1\n'.format(msg_type)
if data:
for key, value in data.items():
msg += '{}:{}\n'.format(key, value)
msg += 'device-discovery-protocol-version:{}\n'.format(DDP_VERSION)
return msg
def parse_ddp_response(rsp):
"""Parse the response."""
data = {}
for line in rsp.splitlines():
re_status = re.compile(r'HTTP/1.1 (?P<code>\d+) (?P<status>.*)')
line = line.strip()
# skip empty lines
if not line:
continue
elif re_status.match(line):
data[u'status_code'] = int(re_status.match(line).group('code'))
data[u'status'] = re_status.match(line).group('status')
else:
values = line.split(':')
data[values[0]] = values[1]
return data
def get_ddp_search_message():
"""Get DDP search message."""
return get_ddp_message('SRCH')
def get_ddp_wake_message(credential):
"""Get DDP wake message."""
data = {
'user-credential': credential,
'client-type': 'a',
'auth-type': 'C',
}
return get_ddp_message('WAKEUP', data)
def get_ddp_launch_message(credential):
"""Get DDP launch message."""
data = {
'user-credential': credential,
'client-type': 'a',
'auth-type': 'C',
}
return get_ddp_message('LAUNCH', data)
def _send_recv_msg(host, broadcast, msg, receive=True):
"""Send a ddp message and receive the response."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
except OSError as error:
_LOGGER.error('failed to create socket, %s', error)
return [ None, host ]
try:
sock.bind((UDP_IP, UDP_PORT))
sock.settimeout(3.0)
except OSError as error:
_LOGGER.error('failed to bind socket %s:%s, %s', UDP_IP, UDP_PORT, error)
sock.close()
return [ None, host ]
try:
if broadcast:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
host = '255.255.255.255'
_LOGGER.debug('send_recv_msg %s [%s]', host, msg)
sock.sendto(msg.encode('utf-8'), (host, DDP_PORT))
if receive:
return sock.recvfrom(1024)
sock.close()
except OSError as error:
_LOGGER.error('failed to send data using socket, %s', error)
sock.close()
return [ None, host ]
def _send_msg(host, broadcast, msg):
"""Send a ddp message."""
_send_recv_msg(host, broadcast, msg, receive=False)
def search(host=None, broadcast=True):
"""Discover PS4s."""
ps_list = []
msg = get_ddp_search_message()
data, addr = _send_recv_msg(host, broadcast, msg)
if data is not None:
data = parse_ddp_response(data.decode('utf-8'))
data[u'host-ip'] = addr[0]
ps_list.append(data)
return ps_list
def get_status(host):
"""Get status."""
for ps_list in search(host=host):
return ps_list
return {}
def wakeup(host, credential, broadcast=None):
"""Wakeup PS4s."""
msg = get_ddp_wake_message(credential)
_send_msg(host, broadcast, msg)
def launch(host, credential, broadcast=None):
"""Launch."""
msg = get_ddp_launch_message(credential)
_send_msg(host, broadcast, msg)