-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #9 from lotteon2/develop
Develop
- Loading branch information
Showing
36 changed files
with
1,031 additions
and
176 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
35 changes: 35 additions & 0 deletions
35
src/main/java/com/dailyon/auctionservice/chat/WebSocketSessionsManager.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
package com.dailyon.auctionservice.chat; | ||
|
||
import org.springframework.stereotype.Component; | ||
import org.springframework.web.reactive.socket.WebSocketSession; | ||
import reactor.core.publisher.Flux; | ||
import reactor.core.publisher.Mono; | ||
import reactor.core.publisher.Sinks; | ||
|
||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
@Component | ||
public class WebSocketSessionsManager { | ||
private final Map<String, WebSocketSession> sessions = new ConcurrentHashMap<>(); | ||
private final Sinks.Many<WebSocketSession> sessionsSink = | ||
Sinks.many().multicast().onBackpressureBuffer(); | ||
|
||
public void addSession(String userId, WebSocketSession session) { | ||
// 세션을 저장소에 추가 | ||
sessions.put(userId, session); | ||
// Sink에 세션 추가 | ||
sessionsSink.tryEmitNext(session); | ||
} | ||
|
||
public void removeSession(String userId) { | ||
// 세션을 저장소에서 제거 | ||
sessions.remove(userId); | ||
} | ||
|
||
public Mono<Void> broadcast(String message) { | ||
return Flux.fromIterable(sessions.values()) | ||
.flatMap(session -> session.send(Mono.just(session.textMessage(message)))) | ||
.then(); // 모든 세션에 대한 send 작업이 완료될 때까지 기다림 | ||
} | ||
} |
43 changes: 43 additions & 0 deletions
43
src/main/java/com/dailyon/auctionservice/chat/messaging/RedisChatMessageListener.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
package com.dailyon.auctionservice.chat.messaging; | ||
|
||
import com.dailyon.auctionservice.controller.ChatHandler; | ||
import com.dailyon.auctionservice.dto.request.Message; | ||
import com.dailyon.auctionservice.util.ObjectStringConverter; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.springframework.context.annotation.Profile; | ||
import org.springframework.data.redis.connection.ReactiveSubscription; | ||
import org.springframework.data.redis.core.ReactiveStringRedisTemplate; | ||
import org.springframework.data.redis.listener.PatternTopic; | ||
import org.springframework.stereotype.Component; | ||
import reactor.core.publisher.Mono; | ||
|
||
import static com.dailyon.auctionservice.config.ChatConstants.MESSAGE_TOPIC; | ||
|
||
@Slf4j | ||
@Component | ||
@Profile({"!test"}) | ||
public class RedisChatMessageListener { | ||
|
||
private final ReactiveStringRedisTemplate reactiveStringRedisTemplate; | ||
private final ChatHandler chatWebSocketHandler; | ||
private final ObjectStringConverter objectStringConverter; | ||
|
||
public RedisChatMessageListener( | ||
ReactiveStringRedisTemplate reactiveStringRedisTemplate, | ||
ChatHandler chatWebSocketHandler, | ||
ObjectStringConverter objectStringConverter) { | ||
this.reactiveStringRedisTemplate = reactiveStringRedisTemplate; | ||
this.chatWebSocketHandler = chatWebSocketHandler; | ||
this.objectStringConverter = objectStringConverter; | ||
} | ||
|
||
public Mono<Void> subscribeMessageChannelAndPublishOnWebSocket() { | ||
return reactiveStringRedisTemplate | ||
.listenTo(new PatternTopic(MESSAGE_TOPIC)) | ||
.map(ReactiveSubscription.Message::getMessage) | ||
.flatMap(message -> objectStringConverter.stringToObject(message, Message.class)) | ||
.filter(chatMessage -> !chatMessage.getMessage().isEmpty()) | ||
.flatMap(chatWebSocketHandler::sendMessage) | ||
.then(); | ||
} | ||
} |
69 changes: 69 additions & 0 deletions
69
src/main/java/com/dailyon/auctionservice/chat/messaging/RedisChatMessagePublisher.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
package com.dailyon.auctionservice.chat.messaging; | ||
|
||
import com.dailyon.auctionservice.dto.request.Message; | ||
import com.fasterxml.jackson.core.JsonProcessingException; | ||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import lombok.RequiredArgsConstructor; | ||
import lombok.extern.slf4j.Slf4j; | ||
import org.springframework.context.annotation.Profile; | ||
import org.springframework.data.redis.core.ReactiveStringRedisTemplate; | ||
import org.springframework.data.redis.support.atomic.RedisAtomicInteger; | ||
import org.springframework.data.redis.support.atomic.RedisAtomicLong; | ||
import org.springframework.stereotype.Component; | ||
import reactor.core.publisher.Mono; | ||
|
||
import java.net.InetAddress; | ||
import java.net.UnknownHostException; | ||
|
||
import static com.dailyon.auctionservice.config.ChatConstants.MESSAGE_TOPIC; | ||
|
||
@Slf4j | ||
@Component | ||
@Profile({"!test"}) | ||
@RequiredArgsConstructor | ||
public class RedisChatMessagePublisher { | ||
|
||
private final ReactiveStringRedisTemplate reactiveStringRedisTemplate; | ||
private final RedisAtomicInteger chatMessageCounter; | ||
private final RedisAtomicLong activeUserCounter; | ||
private final ObjectMapper objectMapper; | ||
|
||
public Mono<Long> publishChatMessage(String message) { | ||
Integer totalChatMessage = chatMessageCounter.incrementAndGet(); | ||
return Mono.fromCallable( | ||
() -> { | ||
try { | ||
return InetAddress.getLocalHost().getHostName(); | ||
} catch (UnknownHostException e) { | ||
log.error("Error getting hostname.", e); | ||
} | ||
log.info("inetAddress.getLocalHost().getHostName() {} ", InetAddress.getLocalHost().getHostName()); | ||
return "localhost"; | ||
}) | ||
.map( | ||
hostName -> { | ||
log.info("message -> {}", message); | ||
String chatString = "EMPTY_MESSAGE"; | ||
try { | ||
Message chatMessage = objectMapper.readValue(message, Message.class); | ||
chatMessage.setActiveUserCount(activeUserCounter.get()); | ||
chatString = objectMapper.writeValueAsString(chatMessage); | ||
} catch (JsonProcessingException e) { | ||
log.error("Error converting ChatMessage {} into string", message, e); | ||
log.error("Error converting ChatMessage {} into string", "chatMessage", e); | ||
} | ||
return chatString; | ||
}) | ||
.flatMap( | ||
chatString -> { | ||
// Publish Message to Redis Channels | ||
return reactiveStringRedisTemplate | ||
.convertAndSend(MESSAGE_TOPIC, chatString) | ||
.doOnSuccess( | ||
aLong -> | ||
log.debug( | ||
"Total of {} Messages published to Redis Topic.", totalChatMessage)) | ||
.doOnError(throwable -> log.error("Error publishing message.", throwable)); | ||
}); | ||
} | ||
} |
26 changes: 0 additions & 26 deletions
26
src/main/java/com/dailyon/auctionservice/common/feign/client/ProductFeignClient.java
This file was deleted.
Oops, something went wrong.
45 changes: 45 additions & 0 deletions
45
src/main/java/com/dailyon/auctionservice/common/webclient/client/ProductClient.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package com.dailyon.auctionservice.common.webclient.client; | ||
|
||
|
||
import com.dailyon.auctionservice.common.webclient.response.CreateProductResponse; | ||
import com.dailyon.auctionservice.dto.request.CreateAuctionRequest; | ||
import com.dailyon.auctionservice.dto.response.ReadAuctionDetailResponse; | ||
import lombok.RequiredArgsConstructor; | ||
import org.springframework.stereotype.Component; | ||
import org.springframework.web.reactive.function.client.WebClient; | ||
import reactor.core.publisher.Mono; | ||
|
||
import java.util.List; | ||
import java.util.stream.Collectors; | ||
|
||
@Component | ||
@RequiredArgsConstructor | ||
public class ProductClient { | ||
private final WebClient webClient; | ||
public Mono<ReadAuctionDetailResponse.ReadProductDetailResponse> readProductDetail(Long productId) { | ||
return webClient.get() | ||
.uri(uriBuilder -> uriBuilder.path("/products/id/{productId}").build(productId)) | ||
.retrieve() | ||
.bodyToMono(ReadAuctionDetailResponse.ReadProductDetailResponse.class); | ||
} | ||
|
||
public void deleteProducts(String memberId, String role, List<Long> ids) { | ||
String joinedIds = ids.stream().map(String::valueOf).collect(Collectors.joining(",")); | ||
webClient.delete() | ||
.uri(uriBuilder -> uriBuilder.path("/admin/products?ids={joinedIds}").build(joinedIds)) | ||
.header("memberId", memberId) | ||
.header("role", role) | ||
.retrieve(); | ||
} | ||
|
||
public Mono<CreateProductResponse> createProduct(String memberId, String role, | ||
CreateAuctionRequest.CreateProductRequest request) { | ||
return webClient.post() | ||
.uri(uriBuilder -> uriBuilder.path("/admin/products").build()) | ||
.header("memberId", memberId) | ||
.header("role", role) | ||
.body(Mono.just(request), CreateAuctionRequest.CreateProductRequest.class) | ||
.retrieve() | ||
.bodyToMono(CreateProductResponse.class); | ||
} | ||
} |
2 changes: 1 addition & 1 deletion
2
...feign/response/CreateProductResponse.java → ...lient/response/CreateProductResponse.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
8 changes: 8 additions & 0 deletions
8
src/main/java/com/dailyon/auctionservice/config/ChatConstants.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,8 @@ | ||
package com.dailyon.auctionservice.config; | ||
|
||
public class ChatConstants { | ||
public static final String MESSAGE_TOPIC = "MESSAGE"; | ||
public static final String MESSAGE_COUNTER_KEY = "TOTAL_MESSAGE_COUNT"; | ||
public static final String ACTIVE_USER_KEY = "ACTIVE_USER_COUNT"; | ||
public static final String WEBSOCKET_MESSAGE_MAPPING = "ws/chat/**"; | ||
} |
Oops, something went wrong.