-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcode.py
More file actions
310 lines (254 loc) · 12.6 KB
/
code.py
File metadata and controls
310 lines (254 loc) · 12.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
import pandas as pd
import json
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import os
from collections import Counter, defaultdict
from datetime import datetime
from mlxtend.frequent_patterns import apriori, association_rules
from mlxtend.preprocessing import TransactionEncoder
import matplotlib.font_manager as fm
# --------------------------- General Utility Functions ---------------------------
def configure_chinese_font():
"""Configure Chinese font and negative sign display"""
plt.rcParams['font.sans-serif'] = ['Microsoft YaHei']
plt.rcParams['axes.unicode_minus'] = False
plt.rcParams['font.family'] = 'Microsoft YaHei'
def generate_output_directory(task_name):
"""Create task-specific output directory"""
output_dir = f'results/{task_name}'
os.makedirs(output_dir, exist_ok=True)
return output_dir
# --------------------------- Task1 Functions ---------------------------
def perform_association_rule_mining():
"""Task1: Product Category Association Rule Mining"""
print("\n=== Starting Task 1: Product Category Association Rule Mining ===")
configure_chinese_font()
output_dir = generate_output_directory('task1')
# Data loading with error handling
try:
with open('processed_data/transactions.json', 'r', encoding='utf-8') as f:
transactions = json.load(f)
except FileNotFoundError:
print("Error: Transaction data file not found. Ensure processed_data/transactions.json exists")
return
except Exception as e:
print(f"Error loading transaction data: {e}")
return
# Data exploration
category_freq = Counter(cat for trans in transactions for cat in trans)
print(f"\nTotal unique categories: {len(category_freq)}")
print("\nTop 5 frequent categories:")
for cat, cnt in category_freq.most_common(5):
print(f"- {cat}: {cnt} ({cnt / len(transactions) * 100:.2f}%)")
# Data transformation
te = TransactionEncoder()
encoded_data = pd.DataFrame(te.fit_transform(transactions), columns=te.columns_)
encoded_data.to_csv(f"{output_dir}/encoded_transactions.csv", index=False)
# Frequent itemset mining
frequent_itemsets = apriori(encoded_data, min_support=0.02, use_colnames=True, max_len=3)
print(f"\nFound {len(frequent_itemsets)} frequent itemsets (support >= 0.02)")
frequent_itemsets.to_csv(f"{output_dir}/frequent_itemsets.csv", index=False)
# Rule generation with confidence threshold
rules = association_rules(frequent_itemsets, metric="confidence", min_threshold=0.5)
print(f"Generated {len(rules)} association rules (confidence >= 0.5)")
rules.to_csv(f"{output_dir}/association_rules.csv", index=False)
# Target category analysis (Electronics)
target_category = "电子产品"
electronics_rules = rules[
rules.apply(lambda x: target_category in x['antecedents'] or target_category in x['consequents'], axis=1)
]
print(f"\nElectronics-related rules: {len(electronics_rules)}")
electronics_rules.nlargest(5, 'lift').to_csv(f"{output_dir}/top_electronics_rules.csv", index=False)
# Visualizations
plt.figure(figsize=(12, 6))
sns.barplot(x=list(category_freq.values()), y=category_freq.keys(), palette="viridis")
plt.title("Product Category Frequency Distribution")
plt.savefig(f"{output_dir}/category_frequency.png", dpi=300)
if not rules.empty:
plt.figure(figsize=(10, 6))
sns.scatterplot(x='support', y='confidence', data=rules, size='lift', hue='lift', palette="viridis")
plt.title("Association Rules: Support vs Confidence")
plt.savefig(f"{output_dir}/rules_scatter.png", dpi=300)
# Detailed report generation
with open(f"{output_dir}/report.md", "w", encoding="utf-8") as f:
f.write("# Task1 Analysis Report\n")
f.write(f"## Data Summary\n- Transactions: {len(transactions)}\n- Categories: {len(category_freq)}\n")
f.write("\n## Key Findings\n- Top category: {} ({}%)".format(*category_freq.most_common(1)[0]))
f.write(f"\n- Strongest rule: {rules.nlargest(1, 'lift')['rule'].values[0]}")
print(f"Task1 completed. Results saved to {output_dir}")
# --------------------------- Task2 Functions ---------------------------
def analyze_payment_category_correlations():
"""Task2: Payment Method & Category Correlation Analysis"""
print("\n=== Starting Task 2: Payment Method & Category Correlation Analysis ===")
configure_chinese_font()
output_dir = generate_output_directory('task2')
# Load and process transaction details
try:
with open('processed_data/transaction_details.json', 'r', encoding='utf-8') as f:
transactions = json.load(f)
except Exception as e:
print(f"Data load error: {e}")
return
# Extract payment-category pairs
payment_data = []
high_value_payments = []
for trans in transactions:
payment = trans.get("payment_method", "Unknown")
categories = trans.get("categories", [])
amount = trans.get("total_amount", 0)
for cat in set(categories):
payment_data.append([f"Payment_{payment}", f"Category_{cat}"])
if amount > 5000:
high_value_payments.append(payment)
# High-value payment analysis
hv_dist = Counter(high_value_payments)
print(f"\nHigh-value transactions: {len(high_value_payments)}")
print("Top payment methods for high-value purchases:")
for method, cnt in hv_dist.most_common(3):
print(f"- {method}: {cnt} ({cnt / len(high_value_payments) * 100:.2f}%)")
# Apriori for payment-category rules
te = TransactionEncoder()
encoded_data = pd.DataFrame(te.fit_transform(payment_data), columns=te.columns_)
frequent_itemsets = apriori(encoded_data, min_support=0.01, use_colnames=True)
rules = association_rules(frequent_itemsets, metric="confidence", min_threshold=0.1)
# Filter rule directions
payment_to_cat = rules[
rules['antecedents'].apply(lambda x: any(item.startswith('Payment_') for item in x))
]
cat_to_payment = rules[
rules['consequents'].apply(lambda x: any(item.startswith('Payment_') for item in x))
]
# Visualizations
plt.figure(figsize=(12, 6))
sns.countplot(x=high_value_payments, palette="viridis")
plt.title("High-Value Payment Method Distribution")
plt.savefig(f"{output_dir}/hv_payment_dist.png", dpi=300)
if not payment_to_cat.empty:
plt.figure(figsize=(10, 6))
sns.barplot(x='lift', y='rule', data=payment_to_cat.nlargest(5, 'lift'), palette="viridis")
plt.title("Top Payment-to-Category Rules by Lift")
plt.savefig(f"{output_dir}/top_payment_rules.png", dpi=300)
# Report generation
with open(f"{output_dir}/report.md", "w", encoding="utf-8") as f:
f.write("# Payment-Category Correlation Report\n")
f.write(
f"## High-Value Insights\n- {hv_dist.most_common(1)[0][0]} used in {hv_dist.most_common(1)[0][1]}% of high-value transactions")
f.write(f"\n## Strongest Rule: {rules.nlargest(1, 'lift')['rule'].values[0]}")
print(f"Task2 completed. Results saved to {output_dir}")
# --------------------------- Task3 Functions ---------------------------
def analyze_time_series_patterns():
"""Task3: Time Series Pattern Mining"""
print("\n=== Starting Task 3: Time Series Pattern Mining ===")
configure_chinese_font()
output_dir = generate_output_directory('task3')
# Load and parse time series data
try:
with open('processed_data/time_series_data.json', 'r', encoding='utf-8') as f:
data = json.load(f)
except Exception as e:
print(f"Data load error: {e}")
return
# Date parsing with error handling
valid_data = []
date_errors = 0
for item in data:
try:
item['date_obj'] = datetime.strptime(item['date'], "%Y-%m-%d")
valid_data.append(item)
except:
date_errors += 1
print(f"Parsed {len(valid_data)} records, {date_errors} date parsing errors")
# Seasonal analysis
def get_seasonal_distribution(key, name):
dist = Counter(item[key] for item in valid_data)
print(f"\n{name} Distribution:")
for k, v in sorted(dist.items()):
print(f"- {k}: {v} ({v / len(valid_data) * 100:.2f}%)")
return dist
quarterly_dist = get_seasonal_distribution('quarter', "Quarterly Sales")
monthly_dist = get_seasonal_distribution('month', "Monthly Sales")
weekday_dist = get_seasonal_distribution('day_of_week', "Weekday Sales",
mapping={0: "Mon", 1: "Tue", 2: "Wed", 3: "Thu", 4: "Fri", 5: "Sat",
6: "Sun"})
# Category-time analysis
category_timing = defaultdict(lambda: defaultdict(int))
for item in valid_data:
for cat in item['categories']:
category_timing[cat][item['month']] += 1
top_categories = [cat for cat, _ in
Counter(cat for item in valid_data for cat in item['categories']).most_common(5)]
# Visualizations
plt.figure(figsize=(12, 6))
sns.countplot(x='quarter', data=valid_data, palette="viridis")
plt.title("Quarterly Transaction Distribution")
plt.savefig(f"{output_dir}/quarterly_sales.png", dpi=300)
plt.figure(figsize=(15, 8))
for cat in top_categories:
months = range(1, 13)
counts = [category_timing[cat].get(m, 0) for m in months]
plt.plot(months, counts, label=cat)
plt.title("Monthly Sales by Top Categories")
plt.legend()
plt.savefig(f"{output_dir}/monthly_categories.png", dpi=300)
# Report generation
with open(f"{output_dir}/report.md", "w", encoding="utf-8") as f:
f.write("# Time Series Analysis Report\n")
f.write(f"## Data Quality\n- Valid records: {len(valid_data)}\n- Invalid dates: {date_errors}")
f.write(
f"\n## Peak Season\n- Q{max(quarterly_dist, key=quarterly_dist.get)} has {quarterly_dist[max(quarterly_dist, key=quarterly_dist.get)]} transactions")
print(f"Task3 completed. Results saved to {output_dir}")
# --------------------------- Task4 Functions ---------------------------
def analyze_refund_patterns():
"""Task4: Refund Pattern Analysis"""
print("\n=== Starting Task 4: Refund Pattern Analysis ===")
configure_chinese_font()
output_dir = generate_output_directory('task4')
# Data analysis
df = pd.DataFrame(refund_data)
print("\nRefund data summary:")
print(df[["refund_amount", "refund_reason"]].describe())
# Refund reason distribution
reason_dist = df['refund_reason'].value_counts(normalize=True)
print("\nTop refund reasons:")
for reason, prop in reason_dist.items():
print(f"- {reason}: {prop * 100:.2f}%")
# Category-wise refund analysis
category_refund = df.groupby('category')['refund_amount'].agg(['mean', 'count'])
print("\nCategory refund statistics:")
print(category_refund)
# Visualizations
plt.figure(figsize=(10, 6))
sns.countplot(x='refund_reason', data=df, palette="viridis")
plt.title("Refund Reason Distribution")
plt.savefig(f"{output_dir}/refund_reasons.png", dpi=300)
plt.figure(figsize=(12, 4))
sns.boxplot(x='category', y='refund_amount', data=df, palette="viridis")
plt.title("Refund Amount by Category")
plt.savefig(f"{output_dir}/category_refund_boxplot.png", dpi=300)
# Report generation
with open(f"{output_dir}/report.md", "w", encoding="utf-8") as f:
f.write("# Refund Pattern Analysis Report\n")
f.write(f"## Key Insights\n- {reason_dist.idxmax()} is the most common reason ({reason_dist.max() * 100:.2f}%)")
f.write(
f"\n- {category_refund['mean'].idxmax()} has the highest average refund amount ({category_refund['mean'].max():.2f}¥)")
print(f"Task4 completed. Results saved to {output_dir}")
# --------------------------- Main Execution ---------------------------
def main():
"""Main execution flow"""
tasks = [
perform_association_rule_mining,
analyze_payment_category_correlations,
analyze_time_series_patterns,
analyze_refund_patterns
]
for task in tasks:
try:
task()
except Exception as e:
print(f"\nTask failed: {str(e)}")
continue
print("\nAll tasks completed. Results saved in 'results' directory")
if __name__ == "__main__":
main()