diff --git a/src/main/java/org/swyp/linkit/domain/chat/controller/ChatStompController.java b/src/main/java/org/swyp/linkit/domain/chat/controller/ChatStompController.java index 020d3393..0e64e6f0 100644 --- a/src/main/java/org/swyp/linkit/domain/chat/controller/ChatStompController.java +++ b/src/main/java/org/swyp/linkit/domain/chat/controller/ChatStompController.java @@ -7,7 +7,6 @@ import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Controller; import org.swyp.linkit.domain.chat.dto.request.ChatSendRequestDto; -import org.swyp.linkit.domain.chat.entity.ChatMessage; import org.swyp.linkit.domain.chat.service.ChatService; import java.security.Principal; @@ -40,11 +39,9 @@ public void send(@Payload ChatSendRequestDto dto, Principal principal) { // 권한 체크 (room 참여자 여부) chatService.assertParticipant(senderId, roomId); - ChatMessage saved = chatService.saveMessage( + chatService.saveMessage( roomId, senderId, dto.getText(), dto.getMessageType(), dto.getImageUrl()); - - chatService.publishToRedis(saved); } /** @@ -94,11 +91,10 @@ public void markAsRead(@DestinationVariable Long roomId, Principal principal) { } /** - * 읽음 처리 공통 로직 (권한 체크 + 읽음 처리 + Redis 이벤트 발행) + * 읽음 처리 공통 로직 (권한 체크 + 읽음 처리 + Redis 이벤트 발행은 afterCommit에서 자동 처리) */ private void processReadAndNotify(Long roomId, Long userId) { chatService.assertParticipant(userId, roomId); chatService.markAsRead(roomId, userId); - chatService.publishReadEvent(roomId, userId, null); } } diff --git a/src/main/java/org/swyp/linkit/domain/chat/service/ChatService.java b/src/main/java/org/swyp/linkit/domain/chat/service/ChatService.java index 4f1672b6..34c69041 100644 --- a/src/main/java/org/swyp/linkit/domain/chat/service/ChatService.java +++ b/src/main/java/org/swyp/linkit/domain/chat/service/ChatService.java @@ -7,6 +7,8 @@ import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationManager; import org.swyp.linkit.domain.chat.dto.ChatMessageDto; import org.swyp.linkit.domain.chat.dto.response.ChatPayloadResponseDto; import org.swyp.linkit.domain.chat.entity.*; @@ -18,7 +20,11 @@ import org.swyp.linkit.domain.notification.service.NotificationService; import org.swyp.linkit.domain.user.entity.User; import org.swyp.linkit.domain.user.repository.UserRepository; -import org.swyp.linkit.global.error.exception.*; +import org.swyp.linkit.global.error.exception.ChatInvalidMessageException; +import org.swyp.linkit.global.error.exception.ChatMessageNotFoundException; +import org.swyp.linkit.global.error.exception.ChatNotParticipantException; +import org.swyp.linkit.global.error.exception.ChatRoomNotFoundException; +import org.swyp.linkit.global.error.exception.UserNotFoundException; import java.time.ZoneOffset; import java.util.List; @@ -82,6 +88,7 @@ public ChatMessage saveMessage(Long roomId, Long senderId, String content, ChatMessage message = ChatMessage.create(room, sender, senderRole, content, messageType, fileUrl); ChatMessage saved = chatMessageRepository.save(message); + chatMessageRepository.flush(); room.updateLastMessage(saved.getId(), saved.getCreatedAt()); @@ -89,6 +96,30 @@ public ChatMessage saveMessage(Long roomId, Long senderId, String content, Long receiverId = senderRole == SenderRole.MENTOR ? room.getMenteeId() : room.getMentorId(); notificationService.createNotification(receiverId, senderId, NotificationType.CHAT_MESSAGE, roomId); + // 트랜잭션 커밋 후 Redis 발행 (Transactional Outbox 패턴) + ChatPayloadResponseDto payload = ChatPayloadResponseDto.builder() + .roomId(roomId) + .messageId(saved.getId()) + .senderId(saved.getSenderId()) + .senderRole(saved.getSenderRole().name()) + .text(saved.getContent()) + .messageType(saved.getMessageType().name()) + .imageUrl(saved.getFileUrl()) + .sentAtEpochMs(saved.getCreatedAt().toInstant(ZoneOffset.UTC).toEpochMilli()) + .system(false) + .build(); + Long savedMessageId = saved.getId(); + if (TransactionSynchronizationManager.isActualTransactionActive()) { + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCommit() { + doPublishToRedis(roomId, savedMessageId, payload); + } + }); + } else { + doPublishToRedis(roomId, savedMessageId, payload); + } + log.info("메시지 저장: roomId={}, senderId={}, messageId={}, type={}", roomId, senderId, saved.getId(), messageType); return saved; } @@ -154,6 +185,19 @@ public void markAsRead(Long roomId, Long userId) { // Notification 기반 미읽음 알림 읽음 처리 notificationService.markChatRoomAsRead(userId, roomId); + // 트랜잭션 커밋 후 읽음 이벤트 Redis 발행 (Transactional Outbox 패턴) + Long lastReadId = lastMessage.getId(); + if (TransactionSynchronizationManager.isActualTransactionActive()) { + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCommit() { + doPublishReadEvent(roomId, userId, lastReadId); + } + }); + } else { + doPublishReadEvent(roomId, userId, lastReadId); + } + log.info("메시지 읽음 처리: roomId={}, userId={}, lastReadMessageId={}", roomId, userId, lastMessage.getId()); } @@ -185,38 +229,28 @@ public void deleteMessages(Long roomId, Long userId, List messageIds) { log.info("메시지 삭제: roomId={}, userId={}, count={}", roomId, userId, messageIds.size()); } + // === Private Helper Methods === + /** - * Redis Pub/Sub을 통해 메시지 발행 + * Redis Pub/Sub 메시지 발행 (afterCommit 내부 전용) + * afterCommit에서 발생하는 예외는 Spring이 억제하므로 로그로 대체 */ - public void publishToRedis(ChatMessage message) { - Long roomId = message.getChatRoom().getId(); - ChatPayloadResponseDto payload = ChatPayloadResponseDto.builder() - .roomId(roomId) - .messageId(message.getId()) - .senderId(message.getSenderId()) - .senderRole(message.getSenderRole().name()) - .text(message.getContent()) - .messageType(message.getMessageType().name()) - .imageUrl(message.getFileUrl()) - .sentAtEpochMs(message.getCreatedAt().toInstant(ZoneOffset.UTC).toEpochMilli()) - .system(false) - .build(); - + private void doPublishToRedis(Long roomId, Long messageId, ChatPayloadResponseDto payload) { try { String json = objectMapper.writeValueAsString(payload); String channel = CHAT_CHANNEL_PREFIX + roomId; redisTemplate.convertAndSend(channel, json); - log.info("Redis 메시지 발행: channel={}, messageId={}", channel, message.getId()); + log.info("Redis 메시지 발행: channel={}, messageId={}", channel, messageId); } catch (JsonProcessingException e) { - log.error("채팅 메시지 직렬화 실패: roomId={}, messageId={}", roomId, message.getId(), e); - throw new ChatPublishFailedException(roomId); + log.error("채팅 메시지 직렬화 실패: roomId={}, messageId={}", roomId, messageId, e); } } /** - * 읽음 처리 이벤트 Redis 발행 + * 읽음 이벤트 Redis 발행 (afterCommit 내부 전용) + * afterCommit에서 발생하는 예외는 Spring이 억제하므로 로그로 대체 */ - public void publishReadEvent(Long roomId, Long userId, Long lastReadMessageId) { + private void doPublishReadEvent(Long roomId, Long userId, Long lastReadMessageId) { ChatPayloadResponseDto payload = ChatPayloadResponseDto.builder() .roomId(roomId) .readerId(userId) @@ -231,12 +265,9 @@ public void publishReadEvent(Long roomId, Long userId, Long lastReadMessageId) { log.info("읽음 이벤트 발행: channel={}, readerId={}", channel, userId); } catch (JsonProcessingException e) { log.error("읽음 이벤트 직렬화 실패: roomId={}, userId={}", roomId, userId, e); - throw new ChatPublishFailedException(roomId); } } - // === Private Helper Methods === - private User findUserById(Long userId) { return userRepository.findById(userId) .orElseThrow(() -> new UserNotFoundException("User not found: " + userId)); diff --git a/src/main/java/org/swyp/linkit/domain/exchange/service/SkillExchangeExpireProcessor.java b/src/main/java/org/swyp/linkit/domain/exchange/service/SkillExchangeExpireProcessor.java index d3f63e40..af5d002e 100644 --- a/src/main/java/org/swyp/linkit/domain/exchange/service/SkillExchangeExpireProcessor.java +++ b/src/main/java/org/swyp/linkit/domain/exchange/service/SkillExchangeExpireProcessor.java @@ -9,6 +9,8 @@ import org.swyp.linkit.domain.credit.service.CreditService; import org.swyp.linkit.domain.exchange.entity.SkillExchange; import org.swyp.linkit.domain.exchange.repository.SkillExchangeRepository; +import org.swyp.linkit.domain.notification.entity.NotificationType; +import org.swyp.linkit.domain.notification.service.NotificationService; import org.swyp.linkit.global.error.exception.ExchangeNotFoundException; @Component @@ -18,6 +20,7 @@ public class SkillExchangeExpireProcessor { private final SkillExchangeRepository skillExchangeRepository; private final CreditService creditService; + private final NotificationService notificationService; // 새로운 트랜잭션 적용 @Transactional(propagation = Propagation.REQUIRES_NEW) @@ -36,6 +39,14 @@ public void expireSingleSkillExchange(Long skillExchangeId){ // 크레딧 환불 처리 creditService.refundCreditForExchange(skillExchange, HistoryType.EXCHANGE_EXPIRED); + + // 알림 생성 (requester, receiver 모두에게 시스템 알림 — afterCommit 시 Redis 발행) + notificationService.createSystemNotification( + skillExchange.getRequester().getId(), + NotificationType.REQUEST_STATUS_CHANGED, skillExchange.getId()); + notificationService.createSystemNotification( + skillExchange.getReceiver().getId(), + NotificationType.REQUEST_STATUS_CHANGED, skillExchange.getId()); log.debug("거래 만료 처리 완료. skillExchangeId: {}", skillExchange.getId()); } } diff --git a/src/main/java/org/swyp/linkit/domain/exchange/service/SkillExchangeRequestProcessor.java b/src/main/java/org/swyp/linkit/domain/exchange/service/SkillExchangeRequestProcessor.java index 83a54c2a..dae4059e 100644 --- a/src/main/java/org/swyp/linkit/domain/exchange/service/SkillExchangeRequestProcessor.java +++ b/src/main/java/org/swyp/linkit/domain/exchange/service/SkillExchangeRequestProcessor.java @@ -9,6 +9,8 @@ import org.swyp.linkit.domain.exchange.entity.ExchangeStatus; import org.swyp.linkit.domain.exchange.entity.SkillExchange; import org.swyp.linkit.domain.exchange.repository.SkillExchangeRepository; +import org.swyp.linkit.domain.notification.entity.NotificationType; +import org.swyp.linkit.domain.notification.service.NotificationService; import org.swyp.linkit.domain.user.entity.User; import org.swyp.linkit.domain.user.entity.UserSkill; import org.swyp.linkit.domain.user.service.UserService; @@ -32,6 +34,7 @@ public class SkillExchangeRequestProcessor { private final CreditService creditService; private final UserService userService; private final UserSkillService userSkillService; + private final NotificationService notificationService; /** * 처리 순서: @@ -78,6 +81,14 @@ public SkillExchangeResponseDto executeWithLock(Long requesterId, // 6. 크레딧 차감 및 사용 내역 생성 creditService.useCreditForExchangeRequest(saved); + // 7. 알림 생성 (멘토: REQUEST_RECEIVED, 멘티: REQUEST_SENT) — afterCommit 시 Redis 발행 + notificationService.createNotification( + saved.getReceiver().getId(), saved.getRequester().getId(), + NotificationType.REQUEST_RECEIVED, saved.getId()); + notificationService.createNotification( + saved.getRequester().getId(), saved.getReceiver().getId(), + NotificationType.REQUEST_SENT, saved.getId()); + // TX 커밋 → 락 해제 return SkillExchangeResponseDto.from(saved); } diff --git a/src/main/java/org/swyp/linkit/domain/exchange/service/SkillExchangeServiceImpl.java b/src/main/java/org/swyp/linkit/domain/exchange/service/SkillExchangeServiceImpl.java index 5046fd58..68100ec1 100644 --- a/src/main/java/org/swyp/linkit/domain/exchange/service/SkillExchangeServiceImpl.java +++ b/src/main/java/org/swyp/linkit/domain/exchange/service/SkillExchangeServiceImpl.java @@ -11,6 +11,8 @@ import org.swyp.linkit.domain.credit.service.CreditService; import org.swyp.linkit.domain.exchange.dto.SkillExchangeDto; import org.swyp.linkit.domain.exchange.dto.response.*; +import org.swyp.linkit.domain.notification.entity.NotificationType; +import org.swyp.linkit.domain.notification.service.NotificationService; import org.swyp.linkit.domain.exchange.dto.response.ReceivedExchangeDetailsResponseDto; import org.swyp.linkit.domain.exchange.dto.response.SentExchangeDetailsResponseDto; import org.swyp.linkit.domain.exchange.entity.ExchangeStatus; @@ -50,6 +52,7 @@ public class SkillExchangeServiceImpl implements SkillExchangeService { private final SkillExchangeExpireProcessor exchangeExpireProcessor; private final SkillExchangeRequestProcessor exchangeRequestProcessor; private final SkillExchangePreValidator exchangePreValidator; + private final NotificationService notificationService; /** * 멘토의 거래 가능 스킬 목록 조회 @@ -228,6 +231,10 @@ public SentExchangeDetailsResponseDto getSentRequests(Long userId, Long cursorId // 4. bulkUpdate (isRequesterRead = false -> true) exchangeRepository.bulkUpdateRequesterReadStatus(userId); + + // 5. Notification 도메인 읽음 처리 (REQUEST_SENT + REQUEST_STATUS_CHANGED) + notificationService.markSentRequestAsRead(userId); + return responseDto; } @@ -247,8 +254,12 @@ public ReceivedExchangeDetailsResponseDto getReceivedRequests(Long userId, Long // 3. 응답 Dto 변환 ReceivedExchangeDetailsResponseDto responseDto = ReceivedExchangeDetailsResponseDto.from(slice); - // 4.bulkUpdate (isReceiverRead = false -> true) + // 4. bulkUpdate (isReceiverRead = false -> true) exchangeRepository.bulkUpdateReceiverReadStatus(userId); + + // 5. Notification 도메인 읽음 처리 (REQUEST_RECEIVED) + notificationService.markReceivedRequestAsRead(userId); + return responseDto; } @@ -271,7 +282,12 @@ public SkillExchangeResponseDto acceptSkillExchange(Long receiverId, Long skillE // 4. requester 에게 변경 사항 표시 skillExchange.updateRequesterReadToFalse(); - // 5. Settlement 생성 + // 5. 알림 생성 (requester에게 REQUEST_STATUS_CHANGED) + notificationService.createNotification( + skillExchange.getRequester().getId(), receiverId, + NotificationType.REQUEST_STATUS_CHANGED, skillExchangeId); + + // 6. Settlement 생성 settlementService.createSettlement(skillExchange); // 5. 응답 Dto 변환 @@ -297,7 +313,12 @@ public SkillExchangeResponseDto rejectSkillExchange(Long receiverId, Long skillE // 4. requester 에게 변경 사항 표시 skillExchange.updateRequesterReadToFalse(); - // 5. requester 크레딧 환불 -> NotFoundCreditException, InvalidCreditAmountException + // 5. 알림 생성 (requester에게 REQUEST_STATUS_CHANGED) + notificationService.createNotification( + skillExchange.getRequester().getId(), receiverId, + NotificationType.REQUEST_STATUS_CHANGED, skillExchangeId); + + // 6. requester 크레딧 환불 -> NotFoundCreditException, InvalidCreditAmountException creditService.refundCreditForExchange(skillExchange, HistoryType.EXCHANGE_REJECTED); // 5. 응답 Dto 변환 @@ -407,6 +428,10 @@ private void processParticipantCancel(Long userId, SkillExchange skillExchange) settlementService.cancelSettlement(skillExchange.getId()); } skillExchange.updateReceiverReadToFalse(); + // 알림 생성 (receiver에게 REQUEST_STATUS_CHANGED) + notificationService.createNotification( + skillExchange.getReceiver().getId(), userId, + NotificationType.REQUEST_STATUS_CHANGED, skillExchange.getId()); } else{ // receiver -> ACCEPTED일 때만 취소 가능 if (currentStatus != ExchangeStatus.ACCEPTED) { @@ -414,6 +439,10 @@ private void processParticipantCancel(Long userId, SkillExchange skillExchange) } settlementService.cancelSettlement(skillExchange.getId()); skillExchange.updateRequesterReadToFalse(); + // 알림 생성 (requester에게 REQUEST_STATUS_CHANGED) + notificationService.createNotification( + skillExchange.getRequester().getId(), userId, + NotificationType.REQUEST_STATUS_CHANGED, skillExchange.getId()); } } diff --git a/src/test/java/org/swyp/linkit/domain/chat/service/ChatServiceTest.java b/src/test/java/org/swyp/linkit/domain/chat/service/ChatServiceTest.java index 437a2168..c85c721e 100644 --- a/src/test/java/org/swyp/linkit/domain/chat/service/ChatServiceTest.java +++ b/src/test/java/org/swyp/linkit/domain/chat/service/ChatServiceTest.java @@ -1,5 +1,6 @@ package org.swyp.linkit.domain.chat.service; +import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; @@ -64,6 +65,9 @@ class ChatServiceTest { @Mock private NotificationService notificationService; + @Mock + private ObjectMapper objectMapper; + private ChatRoom chatRoom; private Long mentorId; private Long menteeId;