Skip to content

Commit 072f563

Browse files
committed
feat: 배치 랭킹 잡 추가
1 parent febeab0 commit 072f563

8 files changed

Lines changed: 338 additions & 0 deletions

File tree

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
package com.loopers.batch.domain.metrics;
2+
3+
import jakarta.persistence.Column;
4+
import jakarta.persistence.Entity;
5+
import jakarta.persistence.Id;
6+
import jakarta.persistence.Table;
7+
import lombok.Getter;
8+
9+
@Getter
10+
@Entity
11+
@Table(name = "product_metrics")
12+
public class ProductMetrics {
13+
14+
@Id
15+
@Column(name = "product_id", nullable = false)
16+
private Long productId;
17+
18+
@Column(name = "like_count", nullable = false)
19+
private long likeCount;
20+
21+
@Column(name = "sales_count", nullable = false)
22+
private long salesCount;
23+
24+
@Column(name = "last_catalog_version")
25+
private Long lastCatalogVersion;
26+
27+
@Column(name = "last_order_version")
28+
private Long lastOrderVersion;
29+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package com.loopers.batch.job.ranking;
2+
3+
public record ProductRankAggregate(
4+
Long productId,
5+
long likeCount,
6+
long salesCount,
7+
double score
8+
) {
9+
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package com.loopers.batch.job.ranking;
2+
3+
import com.loopers.batch.domain.metrics.ProductMetrics;
4+
import com.loopers.batch.listener.JobListener;
5+
import com.loopers.batch.listener.StepMonitorListener;
6+
import jakarta.persistence.EntityManagerFactory;
7+
import lombok.RequiredArgsConstructor;
8+
import org.springframework.batch.core.Job;
9+
import org.springframework.batch.core.Step;
10+
import org.springframework.batch.core.configuration.annotation.StepScope;
11+
import org.springframework.batch.core.job.builder.JobBuilder;
12+
import org.springframework.batch.core.repository.JobRepository;
13+
import org.springframework.batch.core.step.builder.StepBuilder;
14+
import org.springframework.batch.item.database.JpaPagingItemReader;
15+
import org.springframework.batch.item.database.builder.JpaPagingItemReaderBuilder;
16+
import org.springframework.beans.factory.annotation.Value;
17+
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
18+
import org.springframework.context.annotation.Bean;
19+
import org.springframework.context.annotation.Configuration;
20+
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
21+
import org.springframework.transaction.PlatformTransactionManager;
22+
23+
@Configuration
24+
@RequiredArgsConstructor
25+
@ConditionalOnProperty(name = "spring.batch.job.name", havingValue = ProductRankAggregationJobConfig.JOB_NAME)
26+
public class ProductRankAggregationJobConfig {
27+
28+
public static final String JOB_NAME = "productRankAggregationJob";
29+
private static final String STEP_WEEKLY = "weeklyProductRankingStep";
30+
private static final String STEP_MONTHLY = "monthlyProductRankingStep";
31+
private static final int CHUNK_SIZE = 50;
32+
private static final int MAX_ITEM_COUNT = 100;
33+
34+
private final JobRepository jobRepository;
35+
private final PlatformTransactionManager transactionManager;
36+
private final EntityManagerFactory entityManagerFactory;
37+
private final JobListener jobListener;
38+
private final StepMonitorListener stepMonitorListener;
39+
private final ProductRankItemProcessor itemProcessor;
40+
private final NamedParameterJdbcTemplate jdbcTemplate;
41+
42+
@Bean(JOB_NAME)
43+
public Job productRankAggregationJob(
44+
Step weeklyRankingStep,
45+
Step monthlyRankingStep
46+
) {
47+
return new JobBuilder(JOB_NAME, jobRepository)
48+
.start(weeklyRankingStep)
49+
.next(monthlyRankingStep)
50+
.listener(jobListener)
51+
.build();
52+
}
53+
54+
@Bean(STEP_WEEKLY)
55+
public Step weeklyRankingStep(
56+
JpaPagingItemReader<ProductMetrics> productMetricsReader,
57+
RankingMaterializedViewWriter weeklyRankingWriter
58+
) {
59+
return new StepBuilder(STEP_WEEKLY, jobRepository)
60+
.<ProductMetrics, ProductRankAggregate>chunk(CHUNK_SIZE, transactionManager)
61+
.reader(productMetricsReader)
62+
.processor(itemProcessor)
63+
.writer(weeklyRankingWriter)
64+
.listener(stepMonitorListener)
65+
.build();
66+
}
67+
68+
@Bean(STEP_MONTHLY)
69+
public Step monthlyRankingStep(
70+
JpaPagingItemReader<ProductMetrics> productMetricsReader,
71+
RankingMaterializedViewWriter monthlyRankingWriter
72+
) {
73+
return new StepBuilder(STEP_MONTHLY, jobRepository)
74+
.<ProductMetrics, ProductRankAggregate>chunk(CHUNK_SIZE, transactionManager)
75+
.reader(productMetricsReader)
76+
.processor(itemProcessor)
77+
.writer(monthlyRankingWriter)
78+
.listener(stepMonitorListener)
79+
.build();
80+
}
81+
82+
@Bean
83+
@StepScope
84+
public RankingMaterializedViewWriter weeklyRankingWriter(
85+
@Value("#{jobParameters['requestDate']}") String requestDate
86+
) {
87+
RankingPeriod period = RankingPeriodResolver.weekly(requestDate);
88+
return new RankingMaterializedViewWriter(jdbcTemplate, period, "mv_product_rank_weekly");
89+
}
90+
91+
@Bean
92+
@StepScope
93+
public RankingMaterializedViewWriter monthlyRankingWriter(
94+
@Value("#{jobParameters['requestDate']}") String requestDate
95+
) {
96+
RankingPeriod period = RankingPeriodResolver.monthly(requestDate);
97+
return new RankingMaterializedViewWriter(jdbcTemplate, period, "mv_product_rank_monthly");
98+
}
99+
100+
@Bean
101+
@StepScope
102+
public JpaPagingItemReader<ProductMetrics> productMetricsReader() {
103+
return new JpaPagingItemReaderBuilder<ProductMetrics>()
104+
.name("productMetricsReader")
105+
.entityManagerFactory(entityManagerFactory)
106+
.queryString(
107+
"SELECT p FROM ProductMetrics p ORDER BY (p.likeCount * 0.2d + p.salesCount * 0.8d) DESC"
108+
)
109+
.pageSize(CHUNK_SIZE)
110+
.maxItemCount(MAX_ITEM_COUNT)
111+
.build();
112+
}
113+
}
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package com.loopers.batch.job.ranking;
2+
3+
import com.loopers.batch.domain.metrics.ProductMetrics;
4+
import lombok.RequiredArgsConstructor;
5+
import org.springframework.batch.item.ItemProcessor;
6+
import org.springframework.stereotype.Component;
7+
8+
@Component
9+
@RequiredArgsConstructor
10+
public class ProductRankItemProcessor implements ItemProcessor<ProductMetrics, ProductRankAggregate> {
11+
12+
private final ProductRankScorePolicy scorePolicy;
13+
14+
@Override
15+
public ProductRankAggregate process(ProductMetrics item) {
16+
if (item == null || item.getProductId() == null) {
17+
return null;
18+
}
19+
double score = scorePolicy.calculate(item.getLikeCount(), item.getSalesCount());
20+
return new ProductRankAggregate(
21+
item.getProductId(),
22+
item.getLikeCount(),
23+
item.getSalesCount(),
24+
score
25+
);
26+
}
27+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.loopers.batch.job.ranking;
2+
3+
import org.springframework.stereotype.Component;
4+
5+
@Component
6+
public class ProductRankScorePolicy {
7+
8+
private static final double LIKE_WEIGHT = 0.2d;
9+
private static final double SALES_WEIGHT = 0.8d;
10+
11+
public double calculate(long likeCount, long salesCount) {
12+
return (likeCount * LIKE_WEIGHT) + (salesCount * SALES_WEIGHT);
13+
}
14+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package com.loopers.batch.job.ranking;
2+
3+
import java.time.LocalDateTime;
4+
import java.time.ZoneId;
5+
import java.util.ArrayList;
6+
import java.util.List;
7+
import java.util.concurrent.atomic.AtomicInteger;
8+
import org.springframework.batch.item.Chunk;
9+
import org.springframework.batch.item.ExecutionContext;
10+
import org.springframework.batch.item.ItemStreamException;
11+
import org.springframework.batch.item.ItemStreamWriter;
12+
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
13+
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
14+
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
15+
16+
public class RankingMaterializedViewWriter implements ItemStreamWriter<ProductRankAggregate> {
17+
18+
private static final ZoneId ZONE_ID = ZoneId.of("Asia/Seoul");
19+
private static final String PERIOD_COLUMN = "period_key";
20+
21+
private final NamedParameterJdbcTemplate jdbcTemplate;
22+
private final RankingPeriod period;
23+
private final String tableName;
24+
private final AtomicInteger rankCounter = new AtomicInteger(1);
25+
26+
public RankingMaterializedViewWriter(
27+
NamedParameterJdbcTemplate jdbcTemplate,
28+
RankingPeriod period,
29+
String tableName
30+
) {
31+
this.jdbcTemplate = jdbcTemplate;
32+
this.period = period;
33+
this.tableName = tableName;
34+
}
35+
36+
@Override
37+
public void open(ExecutionContext executionContext) throws ItemStreamException {
38+
deleteExistingRows();
39+
rankCounter.set(1);
40+
}
41+
42+
@Override
43+
public void update(ExecutionContext executionContext) throws ItemStreamException {
44+
// no-op
45+
}
46+
47+
@Override
48+
public void close() throws ItemStreamException {
49+
// no-op
50+
}
51+
52+
@Override
53+
public void write(Chunk<? extends ProductRankAggregate> chunk) {
54+
if (chunk == null || chunk.isEmpty()) {
55+
return;
56+
}
57+
LocalDateTime aggregatedAt = LocalDateTime.now(ZONE_ID);
58+
List<SqlParameterSource> params = new ArrayList<>();
59+
for (ProductRankAggregate item : chunk.getItems()) {
60+
if (item == null || item.productId() == null) {
61+
continue;
62+
}
63+
MapSqlParameterSource paramSource = new MapSqlParameterSource()
64+
.addValue("periodKey", period.key())
65+
.addValue("productId", item.productId())
66+
.addValue("likeCount", item.likeCount())
67+
.addValue("salesCount", item.salesCount())
68+
.addValue("score", item.score())
69+
.addValue("rank", rankCounter.getAndIncrement())
70+
.addValue("aggregatedAt", aggregatedAt);
71+
params.add(paramSource);
72+
}
73+
if (!params.isEmpty()) {
74+
jdbcTemplate.batchUpdate(insertSql(), params.toArray(SqlParameterSource[]::new));
75+
}
76+
}
77+
78+
private void deleteExistingRows() {
79+
jdbcTemplate.update(
80+
"DELETE FROM " + tableName + " WHERE " + PERIOD_COLUMN + " = :periodKey",
81+
new MapSqlParameterSource("periodKey", period.key())
82+
);
83+
}
84+
85+
private String insertSql() {
86+
return """
87+
INSERT INTO %s (%s, product_id, like_count, sales_count, score, rank, aggregated_at)
88+
VALUES (:periodKey, :productId, :likeCount, :salesCount, :score, :rank, :aggregatedAt)
89+
""".formatted(tableName, PERIOD_COLUMN);
90+
}
91+
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package com.loopers.batch.job.ranking;
2+
3+
public record RankingPeriod(
4+
String key
5+
) {
6+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.loopers.batch.job.ranking;
2+
3+
import java.time.DayOfWeek;
4+
import java.time.LocalDate;
5+
import java.time.ZoneId;
6+
import java.time.format.DateTimeFormatter;
7+
import java.time.temporal.TemporalAdjusters;
8+
import java.time.temporal.WeekFields;
9+
import java.util.Locale;
10+
import org.springframework.util.StringUtils;
11+
12+
public final class RankingPeriodResolver {
13+
14+
private static final ZoneId ZONE_ID = ZoneId.of("Asia/Seoul");
15+
private static final DateTimeFormatter FORMATTER = DateTimeFormatter.BASIC_ISO_DATE;
16+
17+
private RankingPeriodResolver() {
18+
}
19+
20+
public static RankingPeriod weekly(String targetDate) {
21+
LocalDate target = parse(targetDate);
22+
LocalDate weekStart = target.with(TemporalAdjusters.previousOrSame(DayOfWeek.MONDAY));
23+
return new RankingPeriod(toYearMonthWeek(weekStart));
24+
}
25+
26+
public static RankingPeriod monthly(String targetDate) {
27+
LocalDate target = parse(targetDate);
28+
LocalDate monthStart = target.withDayOfMonth(1);
29+
return new RankingPeriod(toYearMonth(monthStart));
30+
}
31+
32+
private static LocalDate parse(String targetDate) {
33+
if (!StringUtils.hasText(targetDate)) {
34+
return LocalDate.now(ZONE_ID);
35+
}
36+
return LocalDate.parse(targetDate, FORMATTER);
37+
}
38+
39+
private static String toYearMonthWeek(LocalDate target) {
40+
WeekFields weekFields = WeekFields.of(Locale.KOREA);
41+
int weekBasedYear = target.get(weekFields.weekBasedYear());
42+
int week = target.get(weekFields.weekOfWeekBasedYear());
43+
return String.format("%04d-W%02d", weekBasedYear, week);
44+
}
45+
46+
private static String toYearMonth(LocalDate target) {
47+
return String.format("%04d-%02d", target.getYear(), target.getMonthValue());
48+
}
49+
}

0 commit comments

Comments
 (0)