Skip to content

Commit 2dbb9df

Browse files
committed
feat: Round 10 배치 작업물을 upstream 템플릿에 통합
Round 10에서 작업한 주간/월간 랭킹 배치 Job을 upstream 배치 템플릿 구조에 통합합니다. 변경 사항: - WeeklyRankingJobConfig, MonthlyRankingJobConfig 추가 - ProductMetricsAggregateReader 추가 (집계 쿼리) - RankingScoreProcessor 추가 (점수 계산 로직) - WeeklyRankingWriter, MonthlyRankingWriter 추가 - 도메인 엔티티 추가: WeeklyProductRank, MonthlyProductRank - 리포지토리 추가: WeeklyRankRepository, MonthlyRankRepository - JpaConfig 수정: com.loopers.domain 패키지 스캔 추가 - CommerceBatchApplicationTest 수정: Job 자동 실행 비활성화 Upstream의 Listener 구조(JobListener, StepMonitorListener, ChunkListener)를 유지하면서 Round 10 랭킹 Job을 추가했습니다.
1 parent bce3f0c commit 2dbb9df

14 files changed

Lines changed: 992 additions & 1 deletion

File tree

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package com.loopers.batch.config;
2+
3+
import org.springframework.context.annotation.Configuration;
4+
5+
/**
6+
* Base configuration class for Spring Batch.
7+
*
8+
* <p>Spring Boot 3.x auto-configures Spring Batch, so @EnableBatchProcessing is not needed.
9+
* <p>This class can be used for common batch configuration beans if needed.
10+
*/
11+
@Configuration
12+
public class BatchConfig {
13+
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package com.loopers.batch.config;
2+
3+
import com.loopers.batch.processor.RankingScoreProcessor;
4+
import com.loopers.batch.reader.ProductMetricsAggregateReader;
5+
import com.loopers.batch.writer.MonthlyRankingWriter;
6+
import com.loopers.domain.dto.ProductRankingAggregation;
7+
import com.loopers.domain.rank.MonthlyProductRank;
8+
import com.loopers.domain.rank.MonthlyRankRepository;
9+
import jakarta.persistence.EntityManager;
10+
import lombok.RequiredArgsConstructor;
11+
import lombok.extern.slf4j.Slf4j;
12+
import org.springframework.batch.core.Job;
13+
import org.springframework.batch.core.Step;
14+
import org.springframework.batch.core.configuration.annotation.JobScope;
15+
import org.springframework.batch.core.configuration.annotation.StepScope;
16+
import org.springframework.batch.core.job.builder.JobBuilder;
17+
import org.springframework.batch.core.repository.JobRepository;
18+
import org.springframework.batch.core.step.builder.StepBuilder;
19+
import org.springframework.batch.item.ItemProcessor;
20+
import org.springframework.batch.item.ItemReader;
21+
import org.springframework.batch.item.ItemWriter;
22+
import org.springframework.beans.factory.annotation.Value;
23+
import org.springframework.context.annotation.Bean;
24+
import org.springframework.context.annotation.Configuration;
25+
import org.springframework.transaction.PlatformTransactionManager;
26+
27+
/**
28+
* Configuration class for monthly ranking aggregation batch job.
29+
*
30+
* <p>This job aggregates product metrics data on a monthly basis and
31+
* stores the top N rankings in the materialized view table.
32+
*
33+
* <p>Job execution example:
34+
* <pre>
35+
* java -jar commerce-batch.jar \
36+
* --job.name=monthlyRankingJob \
37+
* yearMonth=2025-01
38+
* </pre>
39+
*
40+
* <p>Chunk-oriented processing flow:
41+
* <ol>
42+
* <li>Reader: Aggregate product_metrics by month</li>
43+
* <li>Processor: Calculate ranking scores</li>
44+
* <li>Writer: Save to mv_product_rank_monthly</li>
45+
* </ol>
46+
*/
47+
@Slf4j
48+
@Configuration
49+
@RequiredArgsConstructor
50+
public class MonthlyRankingJobConfig {
51+
52+
private static final int CHUNK_SIZE = 100;
53+
private static final int TOP_N = 100;
54+
55+
private final EntityManager entityManager;
56+
private final MonthlyRankRepository monthlyRankRepository;
57+
58+
/**
59+
* Defines the monthly ranking job.
60+
*
61+
* @param jobRepository the Spring Batch job repository
62+
* @param monthlyRankingStep the step to execute
63+
* @return configured Job instance
64+
*/
65+
@Bean
66+
public Job monthlyRankingJob(
67+
JobRepository jobRepository,
68+
Step monthlyRankingStep
69+
) {
70+
return new JobBuilder("monthlyRankingJob", jobRepository)
71+
.start(monthlyRankingStep)
72+
.build();
73+
}
74+
75+
/**
76+
* Defines the monthly ranking step with chunk-oriented processing.
77+
*
78+
* @param jobRepository the Spring Batch job repository
79+
* @param transactionManager the transaction manager
80+
* @param yearMonth the target month (injected from job parameters)
81+
* @return configured Step instance
82+
*/
83+
@Bean
84+
@JobScope
85+
public Step monthlyRankingStep(
86+
JobRepository jobRepository,
87+
PlatformTransactionManager transactionManager,
88+
@Value("#{jobParameters['yearMonth']}") String yearMonth
89+
) {
90+
log.info("Initializing monthly ranking step: yearMonth={}", yearMonth);
91+
92+
return new StepBuilder("monthlyRankingStep", jobRepository)
93+
.<ProductRankingAggregation, MonthlyProductRank>chunk(CHUNK_SIZE, transactionManager)
94+
.reader(monthlyMetricsReader(yearMonth))
95+
.processor(monthlyRankingProcessor(yearMonth))
96+
.writer(monthlyRankingWriter())
97+
.build();
98+
}
99+
100+
/**
101+
* Creates an ItemReader for monthly metrics aggregation.
102+
*
103+
* @param yearMonth the target month
104+
* @return configured ItemReader
105+
*/
106+
@Bean
107+
@StepScope
108+
public ItemReader<ProductRankingAggregation> monthlyMetricsReader(
109+
@Value("#{jobParameters['yearMonth']}") String yearMonth
110+
) {
111+
return new ProductMetricsAggregateReader(entityManager, yearMonth, "MONTHLY", TOP_N);
112+
}
113+
114+
/**
115+
* Creates an ItemProcessor for ranking score calculation.
116+
*
117+
* @param yearMonth the target month
118+
* @return configured ItemProcessor
119+
*/
120+
@Bean
121+
@StepScope
122+
public ItemProcessor<ProductRankingAggregation, MonthlyProductRank> monthlyRankingProcessor(
123+
@Value("#{jobParameters['yearMonth']}") String yearMonth
124+
) {
125+
RankingScoreProcessor processor = new RankingScoreProcessor("MONTHLY", yearMonth);
126+
return item -> (MonthlyProductRank) processor.process(item);
127+
}
128+
129+
/**
130+
* Creates an ItemWriter for persisting monthly rankings.
131+
*
132+
* @return configured ItemWriter
133+
*/
134+
@Bean
135+
@StepScope
136+
public ItemWriter<MonthlyProductRank> monthlyRankingWriter() {
137+
return new MonthlyRankingWriter(monthlyRankRepository);
138+
}
139+
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package com.loopers.batch.config;
2+
3+
import com.loopers.batch.processor.RankingScoreProcessor;
4+
import com.loopers.batch.reader.ProductMetricsAggregateReader;
5+
import com.loopers.batch.writer.WeeklyRankingWriter;
6+
import com.loopers.domain.dto.ProductRankingAggregation;
7+
import com.loopers.domain.rank.WeeklyProductRank;
8+
import com.loopers.domain.rank.WeeklyRankRepository;
9+
import jakarta.persistence.EntityManager;
10+
import lombok.RequiredArgsConstructor;
11+
import lombok.extern.slf4j.Slf4j;
12+
import org.springframework.batch.core.Job;
13+
import org.springframework.batch.core.Step;
14+
import org.springframework.batch.core.configuration.annotation.JobScope;
15+
import org.springframework.batch.core.configuration.annotation.StepScope;
16+
import org.springframework.batch.core.job.builder.JobBuilder;
17+
import org.springframework.batch.core.repository.JobRepository;
18+
import org.springframework.batch.core.step.builder.StepBuilder;
19+
import org.springframework.batch.item.ItemProcessor;
20+
import org.springframework.batch.item.ItemReader;
21+
import org.springframework.batch.item.ItemWriter;
22+
import org.springframework.beans.factory.annotation.Value;
23+
import org.springframework.context.annotation.Bean;
24+
import org.springframework.context.annotation.Configuration;
25+
import org.springframework.transaction.PlatformTransactionManager;
26+
27+
/**
28+
* Configuration class for weekly ranking aggregation batch job.
29+
*
30+
* <p>This job aggregates product metrics data on a weekly basis and
31+
* stores the top N rankings in the materialized view table.
32+
*
33+
* <p>Job execution example:
34+
* <pre>
35+
* java -jar commerce-batch.jar \
36+
* --job.name=weeklyRankingJob \
37+
* yearWeek=2025-W01
38+
* </pre>
39+
*
40+
* <p>Chunk-oriented processing flow:
41+
* <ol>
42+
* <li>Reader: Aggregate product_metrics by week</li>
43+
* <li>Processor: Calculate ranking scores</li>
44+
* <li>Writer: Save to mv_product_rank_weekly</li>
45+
* </ol>
46+
*/
47+
@Slf4j
48+
@Configuration
49+
@RequiredArgsConstructor
50+
public class WeeklyRankingJobConfig {
51+
52+
private static final int CHUNK_SIZE = 100;
53+
private static final int TOP_N = 100;
54+
55+
private final EntityManager entityManager;
56+
private final WeeklyRankRepository weeklyRankRepository;
57+
58+
/**
59+
* Defines the weekly ranking job.
60+
*
61+
* @param jobRepository the Spring Batch job repository
62+
* @param weeklyRankingStep the step to execute
63+
* @return configured Job instance
64+
*/
65+
@Bean
66+
public Job weeklyRankingJob(
67+
JobRepository jobRepository,
68+
Step weeklyRankingStep
69+
) {
70+
return new JobBuilder("weeklyRankingJob", jobRepository)
71+
.start(weeklyRankingStep)
72+
.build();
73+
}
74+
75+
/**
76+
* Defines the weekly ranking step with chunk-oriented processing.
77+
*
78+
* @param jobRepository the Spring Batch job repository
79+
* @param transactionManager the transaction manager
80+
* @param yearWeek the target week in ISO format (injected from job parameters)
81+
* @return configured Step instance
82+
*/
83+
@Bean
84+
@JobScope
85+
public Step weeklyRankingStep(
86+
JobRepository jobRepository,
87+
PlatformTransactionManager transactionManager,
88+
@Value("#{jobParameters['yearWeek']}") String yearWeek
89+
) {
90+
log.info("Initializing weekly ranking step: yearWeek={}", yearWeek);
91+
92+
return new StepBuilder("weeklyRankingStep", jobRepository)
93+
.<ProductRankingAggregation, WeeklyProductRank>chunk(CHUNK_SIZE, transactionManager)
94+
.reader(weeklyMetricsReader(yearWeek))
95+
.processor(weeklyRankingProcessor(yearWeek))
96+
.writer(weeklyRankingWriter())
97+
.build();
98+
}
99+
100+
/**
101+
* Creates an ItemReader for weekly metrics aggregation.
102+
*
103+
* @param yearWeek the target week
104+
* @return configured ItemReader
105+
*/
106+
@Bean
107+
@StepScope
108+
public ItemReader<ProductRankingAggregation> weeklyMetricsReader(
109+
@Value("#{jobParameters['yearWeek']}") String yearWeek
110+
) {
111+
return new ProductMetricsAggregateReader(entityManager, yearWeek, "WEEKLY", TOP_N);
112+
}
113+
114+
/**
115+
* Creates an ItemProcessor for ranking score calculation.
116+
*
117+
* @param yearWeek the target week
118+
* @return configured ItemProcessor
119+
*/
120+
@Bean
121+
@StepScope
122+
public ItemProcessor<ProductRankingAggregation, WeeklyProductRank> weeklyRankingProcessor(
123+
@Value("#{jobParameters['yearWeek']}") String yearWeek
124+
) {
125+
RankingScoreProcessor processor = new RankingScoreProcessor("WEEKLY", yearWeek);
126+
return item -> (WeeklyProductRank) processor.process(item);
127+
}
128+
129+
/**
130+
* Creates an ItemWriter for persisting weekly rankings.
131+
*
132+
* @return configured ItemWriter
133+
*/
134+
@Bean
135+
@StepScope
136+
public ItemWriter<WeeklyProductRank> weeklyRankingWriter() {
137+
return new WeeklyRankingWriter(weeklyRankRepository);
138+
}
139+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package com.loopers.batch.processor;
2+
3+
import com.loopers.domain.dto.ProductRankingAggregation;
4+
import com.loopers.domain.rank.MonthlyProductRank;
5+
import com.loopers.domain.rank.WeeklyProductRank;
6+
import lombok.extern.slf4j.Slf4j;
7+
import org.springframework.batch.item.ItemProcessor;
8+
9+
/**
10+
* ItemProcessor for converting aggregated metrics into ranking entities.
11+
*
12+
* <p>This processor transforms ProductRankingAggregation DTOs into either
13+
* WeeklyProductRank or MonthlyProductRank entities based on the period type.
14+
* The ranking score is calculated using weighted metrics.
15+
*
16+
* <p>Score calculation formula:
17+
* <pre>
18+
* score = (viewCount * WEIGHT_VIEW) +
19+
* (likeCount * WEIGHT_LIKE) +
20+
* (orderCount * WEIGHT_ORDER * log10(salesAmount + 1))
21+
* </pre>
22+
*
23+
* where:
24+
* <ul>
25+
* <li>WEIGHT_VIEW = 0.1</li>
26+
* <li>WEIGHT_LIKE = 0.2</li>
27+
* <li>WEIGHT_ORDER = 0.6</li>
28+
* </ul>
29+
*/
30+
@Slf4j
31+
public class RankingScoreProcessor implements ItemProcessor<ProductRankingAggregation, Object> {
32+
33+
private static final double WEIGHT_VIEW = 0.1;
34+
private static final double WEIGHT_LIKE = 0.2;
35+
private static final double WEIGHT_ORDER = 0.6;
36+
37+
private final String periodType;
38+
private final String period;
39+
40+
/**
41+
* Constructs a new RankingScoreProcessor.
42+
*
43+
* @param periodType the type of period ("WEEKLY" or "MONTHLY")
44+
* @param period the period string (e.g., "2025-W01" or "2025-01")
45+
*/
46+
public RankingScoreProcessor(String periodType, String period) {
47+
this.periodType = periodType;
48+
this.period = period;
49+
}
50+
51+
@Override
52+
public Object process(ProductRankingAggregation item) {
53+
double score = calculateScore(item);
54+
55+
log.debug("Processing ranking: productId={}, rank={}, score={}",
56+
item.getProductId(), item.getRankPosition(), score);
57+
58+
if ("WEEKLY".equals(periodType)) {
59+
return WeeklyProductRank.builder()
60+
.productId(item.getProductId())
61+
.yearWeek(period)
62+
.rankPosition(item.getRankPosition())
63+
.totalScore(score)
64+
.likeCount(item.getLikeCount())
65+
.viewCount(item.getViewCount())
66+
.orderCount(item.getOrderCount())
67+
.salesAmount(item.getSalesAmount())
68+
.build();
69+
} else {
70+
return MonthlyProductRank.builder()
71+
.productId(item.getProductId())
72+
.yearMonth(period)
73+
.rankPosition(item.getRankPosition())
74+
.totalScore(score)
75+
.likeCount(item.getLikeCount())
76+
.viewCount(item.getViewCount())
77+
.orderCount(item.getOrderCount())
78+
.salesAmount(item.getSalesAmount())
79+
.build();
80+
}
81+
}
82+
83+
/**
84+
* Calculates the ranking score based on weighted metrics.
85+
*
86+
* <p>Uses logarithmic normalization for sales amount to prevent
87+
* extreme values from dominating the score.
88+
*
89+
* @param agg the aggregated metrics
90+
* @return the calculated score
91+
*/
92+
private double calculateScore(ProductRankingAggregation agg) {
93+
double salesAmountValue = agg.getSalesAmount() != null ? agg.getSalesAmount().doubleValue() : 0.0;
94+
return (agg.getViewCount() * WEIGHT_VIEW) +
95+
(agg.getLikeCount() * WEIGHT_LIKE) +
96+
(agg.getOrderCount() * WEIGHT_ORDER * Math.log10(salesAmountValue + 1));
97+
}
98+
}

0 commit comments

Comments
 (0)