-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmhpf.py
More file actions
322 lines (253 loc) · 12.5 KB
/
mhpf.py
File metadata and controls
322 lines (253 loc) · 12.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
#!/usr/bin/python3
from collections import namedtuple
from contextlib import contextmanager
import os
import struct
import sys
HEADER_SIZE = 52
SECTOR_SIZE = 2048
DEFAULT_HASH_PRIME = 31
class BadMHPFFileError(ValueError):
pass
@contextmanager
def _fopen(file, *args, **kwargs):
if isinstance(file, (str, bytes, os.PathLike)):
with open(file, *args, **kwargs) as f:
yield f
elif hasattr(file, 'read') or hasattr(file, 'write'):
yield file
else:
raise TypeError('file must be a str or bytes object, or a file')
def _isHidden(file_path):
if os.name == 'nt':
import stat
return bool(os.stat(file_path).st_file_attributes & stat.FILE_ATTRIBUTE_HIDDEN)
return file_path.startswith('.')
def _parseMHPFHeader(file):
# Verify validity and endianness from the magic
try:
magic = struct.unpack('<4s', file.read(4))[0]
if magic != b'MHPF' and magic != b'FPHM':
raise BadMHPFFileError(f'Not a valid MHPF file ({magic!r})')
endianness = '<' if magic == b'MHPF' else '>'
(version1, version2, total_size, num_resources, hash_prime, res_offset, res_size, files_offset, files_size, name_offsets_offset,
name_offsets_size, names_offset, names_size) = struct.unpack(f'{endianness}HHIII2I2I2I2I', file.read(HEADER_SIZE - 4))
# Total size of the file must match total_size
file.seek(0, 2)
actual_size = file.tell()
file.seek(HEADER_SIZE, 0)
if total_size != actual_size:
raise BadMHPFFileError('Not a valid MHPF file')
TableEntry = namedtuple('TableEntry', ['offset', 'size'])
return (endianness, (version1, version2), total_size, num_resources, hash_prime, TableEntry(res_offset, res_size), TableEntry(files_offset, files_size),
TableEntry(name_offsets_offset, name_offsets_size), TableEntry(names_offset, names_size))
except struct.error as ex:
raise BadMHPFFileError('Not a valid MHPF file') from ex
def _getResourcesTable(file, endianness, num_resources, res_table_attr):
try:
file.seek(res_table_attr.offset, 0)
# TODO: Validate table size in strict mode
ResEntry = namedtuple('ResEntry', ['hash', 'offset', 'size'])
return [ResEntry(*res) for res in struct.iter_unpack(f'{endianness}3I', file.read(12 * num_resources))]
except struct.error as ex:
raise BadMHPFFileError('Not a valid MHPF file') from ex
def _getNamesTable(file, endianness, num_resources, names_table_attr, names_block_attr):
file.seek(names_table_attr.offset, 0)
# TODO: Validate table size in strict mode
name_offsets = [res[0] for res in struct.iter_unpack(
f'{endianness}I', file.read(4 * num_resources))]
def readString(file):
name = bytearray()
while True:
ch = file.read(1)
if ch == b'' or ch == b'\0':
return name.decode('ascii')
name.extend(ch)
names = []
for offset in name_offsets:
file.seek(offset + names_block_attr.offset, 0)
names.append(readString(file))
return names
def _gatherFiles(directory, hash_prime):
FileEntry = namedtuple('FileEntry', ['path', 'name', 'hash', 'size'])
result = []
for root, _, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file)
if _isHidden(file_path):
continue
game_path = os.path.normpath(os.path.relpath(file_path, directory)).replace(
'\\', '/').lower().encode('ascii')
result.append(FileEntry(path=file_path, size=os.stat(
file_path).st_size, name=game_path, hash=pathHash(game_path, hash_prime)))
# Sort by hash already to return a properly sorted list
result.sort(key=lambda x: x.hash)
return result
def unpack(file, output_dir='.'):
with _fopen(file, 'rb') as f:
# Read and parse the file header
(endianness, _, _, num_resources, _, res_table, _,
name_offsets_table, names_block_table) = _parseMHPFHeader(f)
# Read the resources table and names
resources = _getResourcesTable(f, endianness, num_resources, res_table)
names = _getNamesTable(f, endianness, num_resources,
name_offsets_table, names_block_table)
# Only now extract the files
for res, name in zip(resources, names):
try:
print(f'Unpacking {name}...')
# Keep the name in uppercase so it matches the files on disc
full_path = os.path.join(output_dir, name.upper())
os.makedirs(os.path.dirname(full_path), exist_ok=True)
f.seek(res.offset, 0)
with open(full_path, 'wb') as out:
BUF_SIZE = 64 * 1024 * 1024 # 64MB buffer
size_to_read = res.size
while size_to_read > 0:
chunk_size = min(size_to_read, BUF_SIZE)
out.write(f.read(chunk_size))
size_to_read -= chunk_size
except OSError:
print(f'Failed to unpack file {name}!', file=sys.stderr)
def pack(directory, output, *, hash_prime=DEFAULT_HASH_PRIME, big_endian=False):
if not os.path.isdir(directory):
raise ValueError(f'{directory} does not exist')
files = _gatherFiles(directory, hash_prime)
endianness = '<' if not big_endian else '>'
TableEntry = namedtuple('TableEntry', ['offset', 'size'])
# Start building the internal structures, all at once
def alignToSector(offset):
return (offset + SECTOR_SIZE - 1) & ~(SECTOR_SIZE - 1)
# res_table and files_table should be aligned to SECTOR_SIZE, name_offsets_table and names_table don't have to be
res_table = TableEntry(offset=alignToSector(
HEADER_SIZE), size=12*len(files))
resources = bytearray()
name_offsets = bytearray()
names = bytearray()
cur_files_offset = alignToSector(res_table.offset + res_table.size)
files_size = 0
for file in files:
resources.extend(struct.pack(
f'{endianness}3I', file.hash, cur_files_offset, file.size))
name_offsets.extend(struct.pack(f'{endianness}I', len(names)))
names.extend(file.name + b'\0')
size_aligned = alignToSector(file.size)
files_size += size_aligned
cur_files_offset += size_aligned
files_table = TableEntry(offset=alignToSector(
res_table.offset + res_table.size), size=files_size)
name_offsets_table = TableEntry(
offset=files_table.offset + files_table.size, size=4*len(files))
names_table = TableEntry(
offset=name_offsets_table.offset + name_offsets_table.size, size=len(names))
with _fopen(output, 'wb') as f:
# Prepare the header
total_size = alignToSector(HEADER_SIZE) + alignToSector(res_table.size) + alignToSector(
files_table.size) + name_offsets_table.size + names_table.size
f.write(struct.pack(f'{endianness}4sHHIII2I2I2I2I', b'MHPF' if not big_endian else b'FPHM', 1, 0,
total_size, len(files), hash_prime,
res_table.offset, res_table.size, files_table.offset, files_table.size,
name_offsets_table.offset, name_offsets_table.size, names_table.offset, names_table.size).ljust(alignToSector(HEADER_SIZE), b'\x00'))
f.write(resources.ljust(alignToSector(res_table.size), b'\x00'))
for file in files:
print(f'Packing {file.name.decode("ascii")}...')
with open(file.path, 'rb') as in_file:
BUF_SIZE = 64 * 1024 * 1024 # 64MB buffer
size_to_read = file.size
while size_to_read > 0:
chunk_size = alignToSector(min(size_to_read, BUF_SIZE))
f.write(in_file.read(chunk_size).ljust(
chunk_size, b'\x00'))
size_to_read -= chunk_size
f.write(name_offsets)
f.write(names)
def scan(file, list_files=False):
with _fopen(file, 'rb') as f:
# Read and parse the file header
(endianness, version, total_size, num_resources, hash_prime, res_table,
_, name_offsets_table, names_block_table) = _parseMHPFHeader(f)
def displaySize(num):
def sizeofFmt(num, suffix='B'):
for unit in ['', 'Ki', 'Mi', 'Gi', 'Ti', 'Pi', 'Ei', 'Zi']:
if abs(num) < 1024.0:
return f"{num:3.1f}{unit}{suffix}"
num /= 1024.0
return f'{num:.1f}Yi{suffix}'
if num <= 1024:
return f'{num} bytes'
return f'{num} bytes ({sizeofFmt(num)})'
print('MHPF file attributes:')
print(
f'\tHeader: {"MHPF (little endian)" if endianness == "<" else "FPHM (big endian)"}')
print(f'\tVersion: {version}')
print(f'\tTotal size: {displaySize(total_size)}')
print(f'\tNumber of files: {num_resources}')
print(f'\tString hash prime: {hash_prime}')
if list_files:
print()
print('Files:')
resources = _getResourcesTable(
f, endianness, num_resources, res_table)
names = _getNamesTable(f, endianness, num_resources,
name_offsets_table, names_block_table)
# Sort the files by name for better readability
sorted_resources = sorted(
zip(resources, names), key=lambda x: x[1])
for res, name in sorted_resources:
print(f'{name}, {displaySize(res.size)}')
def pathHash(path, prime=DEFAULT_HASH_PRIME):
result = 0
separator = True
for c in path:
if c == '/' or c == '\\':
if separator:
continue
c = '/'
separator = True
else:
separator = False
result = (result * prime) + ord(chr(c).lower())
return result & 0xffffffff
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(
description="A package tool for unpacking and repacking MHPF (Melbourne House Pack File) .PCK files in Test Drive Unlimited PS2/PSP.")
subparsers = parser.add_subparsers(required=True, help='sub-command')
parser_unpack = subparsers.add_parser(
'unpack', help='Unpack the MPHF archive to a specified directory')
parser_unpack.add_argument(
'file', metavar='PCK', type=str, help='path to the input PCK file')
parser_unpack.add_argument('-o', '--output', dest='output_dir',
type=str, default='.', help='path to the target directory')
parser_unpack.set_defaults(func=unpack)
parser_pack = subparsers.add_parser(
'pack', help='Create a MHPF archive from the files from a specified directory')
parser_pack.add_argument('directory', metavar='DIR',
type=str, help='path to the input directory')
parser_pack.add_argument('-o', '--output', dest='output',
type=str, help='path to the PCK file to create')
parser_pack.add_argument('-hp', '--hash-prime', dest='hash_prime', type=int,
default=DEFAULT_HASH_PRIME, help='a custom prime for file name hashes')
parser_pack.add_argument('-be', '--big-endian', dest='big_endian', action='store_true',
help='build a big endian archive (do NOT use for TDU, currently not known if big endian archives were ever used)')
parser_pack.set_defaults(func=pack)
parser_scan = subparsers.add_parser(
'scan', help='List the contents of the MHPF file')
parser_scan.add_argument('file', metavar='PCK',
type=str, help='path to the input PCK file')
parser_scan.add_argument('-l', '--list', dest='list_files', action='store_true',
help='list the archive contents on top of listing the attributes')
parser_scan.set_defaults(func=scan)
arguments = parser.parse_args()
func = arguments.func
# for pack, unspecified output directory should be set to a file of the name of the input directory
if func is pack:
if arguments.output is None:
arguments.output = os.path.abspath(arguments.directory + '.PCK')
args_var = vars(arguments)
del args_var['func']
func(**args_var)
if func is scan:
if not arguments.list_files:
print()
print('-l/--list was not specified. If you need to list the archive contents, re-run the script with this argument added.')