본문 바로가기
카테고리 없음

Redis를 통한 Polling 구조 개선 : wait/notify 와 Lettuce Event Bus 활용기

by 공덕 Bro 2025. 5. 28.

Docker Image, Container 원격 배포 프로젝트를 진행하며 Redis와 Java의 wait/notify를 활용한 경험을 공유하려 합니다.

( 해당 글에선 Lettuce나 Redis 사용방법이 없습니다. )

 

컨테이너를 활용하는 시스템이라면 배포나 컨테이너 관리시에 Kubernetes나 Docker Swarm과 같은 오케스트레이션 툴이 대표적인 선택입니다. 하지만, 네트워크의 제약이 있는 상황이라면 이를 활용하기는 쉽지 않습니다.

 

제가 진행했던 프로젝트는 Out Bound-Only 네트워크 구조로, 일반적인 오케스트레이션 툴을 온전히 활용할 수 없는 상황이었습니다. 이로 인해 직접 배포 시스템을 설계 및 구현해야 했습니다.

 

이런 문제점을 해결하기 위해 가장 단순하게 떠올린 방식은 Polling 구조였습니다. 배포 받을 Image가 존재하는지 꾸준히 요청을 보내는 것이었습니다. 그러나, 각 Edge-Server들의 개수도 500여개가 넘고 Image 요청 뿐만 아니라 서버 별 container health 정보도 중앙 서버로 보내주기에 결국 부하가 발생하여 장애로 이어지는 상황이 일어날 수 밖에 없는 구조였습니다.

 

Redis의 도입

흔히, Polling 방식의 단점으로 뽑는 것이 불필요한 요청을 통한 자원 낭비와 실시간성 저하 입니다. 그럼에도 네트워크 환경의 제약으로 인해 Polling 방식 외에는 다른 방법이 떠오르지 않던 그때, NHN FORWARD 컨퍼런스의 Redis 야무지게 사용하기 (출처 : https://www.youtube.com/watch?v=92NizoBL4uA)를 보게 됐습니다.

 

Redis를 캐시로만 사용한다는 얄팍한 주니어의 관점을 깨는 BRPOP에 대한 내용이 있었습니다.

Redis의 BRPOP은 데이터가 들어올 때까지 특정 키에 대해 Block 상태로 대기하도록 합니다. 기존 Polling 방식처럼 주기적으로 요청을 보내는 대신, 필요한 데이터가 들어오면 즉시 처리하도록 설계할 수 있습니다.

 

Polling 방식을 없애 DB 조회와 네트워크 부하를 없애는 것이기 때문에 Redis를 도입하기에 충분한 이유가 됐습니다.

 

Lettuce 활용

Lettuce의 rightPop()을 활용하면 BRPOP의 커맨드를 사용할 수 있습니다. rightPop()에는 timeout 값을 지정할 수 있기 때문에 이를 주기적 Polling 보다 더 긴 timeout으로 설정하여 진행했습니다. 전 2시간 55분 으로 설정했고 lettuce의 기본 client timeout은 3시간으로 설정해놨습니다.

 

spring:
  data:
    redis:
      timeout: 3h

 

반드시 rightPop의 timeout은 Lettuce client timeout보다 길어야 합니다.

rightPop의 timeout시간이 지났음에도 받아올 데이터가 없다면 Lettuce는 null 값을 반환하게 됩니다. 

이 반환값을 토대로 Lettuce client의 timeout은 다시 초기화됩니다.

 

Redis와의 Connection이 끊기면?

BRPOP 커맨드는 blocking 상태로 Connection은 유지되나 IDLE 상태를 유지하고 있습니다.

그렇기 때문에 리소스를 적게 소모하게 되는 장점이 있습니다만 중간에 Connection이 끊길 수 있다는 단점이 존재합니다.

 

이 문제를 해결하기 위해선 Lettuce의 재연결 로직을 살펴볼 필요가 있었습니다.

 

Lettuce는 기본적으로 connection retry 정책으로 지수 백오프 전략을 취하고 있습니다.

지수 백오프 전략은 DefaultClientResources에 들어가면 확인해 볼 수 있습니다.

    /**
     * Default delay {@link Supplier} for {@link Delay#exponential()} delay.
     */
    public static final Supplier<Delay> DEFAULT_RECONNECT_DELAY = Delay::exponential;

 

또한 reconnectDelay의 최대 MAX는 30Sec으로 제한되어 있습니다.

 

/**
 * Sets the stateless reconnect {@link Delay} to delay reconnect attempts. Defaults to binary exponential delay capped
 * at {@literal 30 SECONDS}. {@code reconnectDelay} must be a stateless {@link Delay}.
 *
 * @param reconnectDelay the reconnect delay, must not be {@code null}.
 * @return this
 * @since 4.3
 */
@Override
public Builder reconnectDelay(Delay reconnectDelay) {

    LettuceAssert.notNull(reconnectDelay, "Delay must not be null");
    LettuceAssert.isTrue(!(reconnectDelay instanceof StatefulDelay), "Delay must be a stateless instance.");

    return reconnectDelay(() -> reconnectDelay);
}

 

우선, 제 경우에는 delay max time을 30초보다 훨씬 더 길게 설정해도 무방했고 그래야만 했습니다. 새로운 버전의 배포가 잦은 이벤트는 아니기 때문입니다. 그렇기에 Delay를 커스텀하게 설정해야 했습니다.

 

@Slf4j
public class RedisCustomExponentialDelay extends Delay {

    private final int initialDelayMinutes;
    private final int maxDelayMinutes;
    private final int multiplier;


    public RedisCustomExponentialDelay(int initialDelayMinutes, int maxDelayMinutes, int multiplier) {
        this.initialDelayMinutes = initialDelayMinutes;
        this.maxDelayMinutes = maxDelayMinutes;
        this.multiplier = multiplier;
    }

    @Override
    public Duration createDelay(long attempt) {
        log.info("now attemp is {}", attempt);

        int delay = (int) Math.min(maxDelayMinutes, initialDelayMinutes * Math.pow(multiplier, attempt - 1));
         return Duration.ofMinutes(Math.min(maxDelayMinutes, delay));
    }
}

 

Delay를 확장하여 지수 백오프 전략을 커스텀화 시켰습니다.

 

이후 spring-redis-data 에서 위의 백오프 전략을 사용할 수 있도록

 

    @Bean
    public ClientResources lettuceClientResources() {
        return DefaultClientResources.builder()
                .reconnectDelay(new RedisCustomExponentialDelay(1, 180, 2))
                .build();
    }

 

다음과 같이 lettuce client를 빈으로 등록했습니다.

 

이제 retry 전략 로그를 찍은 것을 확인해보면 정상적으로 1분 -> 2분 ... -> 16분 ... 순으로 지수 백오프가 동작하게 됩니다.

 

Lettuce의 rightPop()

이제 본격적으로 BRPOP 커맨드를 Lettuce의 rightPop()을 통해 구현해보겠습니다.

 

redis와 connection을 맺는 스레드는 비동기로 동작하고 있어야 합니다. Main 스레드를 block 할 순 없으니까요

 

1차 rightPop()

    public void startListener() {
        Executors.newSingleThreadExecutor().submit(() -> {
            log.info("Redis listener started");
            while (true) {
                try {
                    String result = stringRedisTemplate.opsForList().rightPop(createQueueName(), Duration.ofSeconds(10));
                    log.info("Redis message received {}", result);
                    businessLogic();
                } catch (Exception rcfe) {
                    log.error("Redis listener error", rcfe);
                    
                }
            }
        });
    }

 

초기 단계에선 다음과 같이 설계 개발 했습니다. while 루프를 통해서 rightPop의 timeout이 지난 이후에 다시 blocking 상태로 connection을 맺는걸 지속하는 것이였습니다.

 

다만 위 코드의 문제점은 중간에 redis와 connection이 끊겼을 경우 위 스레드가 그대로 죽어버린다는 것입니다.

 

연결하는 스레드는 반드시 application이 shutdown 될 때 까지 유효한 상태에 존재해야 했기 때문에 Thread sleep으로 2차 개발을 진행했습니다.

 

2차 rightPop()

Executors.newSingleThreadExecutor().submit(() -> {
    log.info("Redis listener started");
    while (true) {
        try {
            String result = stringRedisTemplate.opsForList().rightPop(createQueueName(), Duration.ofSeconds(10));
            log.info("Redis message received {}", result);
        } catch (Exception rcfe) {
            log.error("Redis listener error", rcfe);
            try {
            	Thread.sleep(millis);
            } catch (Exception e) {}
        }
    }
});

 

1차와 비슷하지만 Thread.sleep을 사용했습니다. millis의 경우 custom하게 만든 Delay retry 정책의 지수 백오프 값을 참조하여 개발 했습니다.

 

완벽할 줄 알았지만 여전히 문제점은 존재했습니다.

sleep 이 끝난 이후 해당 스레드는 결국 while문을 돌아 다시 redis와 connection을 시도합니다. reconnection이 맺어졌는지 맺어지지 않았는지 확실하지도 않은데 말이죠

 

이것 또한 불필요한 과정이자 로직인 셈이였습니다.

 

Lettuce EventBus

2차 rightPop()을 개선하기 위해 찾고 찾은 방안은 EventBus였습니다.

 

Lettuce에는 EventBus를 통해서 Connection events, Metrics events, Cluster topology events를 제공합니다 (출처 : https://github.com/redis/lettuce/wiki/Connection-Events)

 

redis와 connection 이 맺어지면

@Override
    public void onRedisConnected(RedisChannelHandler<?, ?> connection)
    {

 

위와 같은 메서드를 통해서 원하는 로직을 오버라이딩 할 수 있습니다.

 

그래서 생각해낸 방식이 Java의 wait() / notify()를 활용 해야겠다 였습니다.

 

connection이 끊어지면 rightPop() 메서드를 wait() 시킨 후

onRedisConnected() 메서드에서 reconnection을 감지하면 rightPop()을 실행하는 스레드를 notify() 해주는 방식인 것입니다.

 

3차 rightPop()

@Slf4j
@Component
public class RedisReconnectNotifier {
    private final Object reconnectLock = new Object();

    public Object getLock() {
        return reconnectLock;
    }

    @Autowired
    public RedisReconnectNotifier(LettuceConnectionFactory lettuceConnectionFactory) {
        ClientResources clientResources = lettuceConnectionFactory.getClientResources();
        EventBus eventBus = clientResources.eventBus();

        eventBus.get().subscribe(e -> {
            if (e instanceof ConnectionActivatedEvent) {
                log.info("Redis Connect Success");
                synchronized (reconnectLock) {
                    reconnectLock.notifyAll();
                }
            }
        });
    }
}

wait / notify를 활용하기 위해서 lock object를 가진 객체를 notifier라 선언하고 eventBus를 subscribe 하도록 지정했습니다

 

@PostConstruct
public void startListener() {
    Executors.newSingleThreadExecutor().submit(() -> {
        log.info("Redis listener started");
        while (true) {
            try {
                String result = stringRedisTemplate.opsForList().rightPop(createQueueName(), Duration.ofSeconds(10));
                log.info("Redis message received {}", result);
            } catch (Exception rcfe) {
                log.error("Redis listener error", rcfe);
                synchronized (redisReconnectNotifier.getLock()) {
                    redisReconnectNotifier.getLock().wait();
                    log.info("Thread wake up");
                }
            }
        }
    });
}

 

 

이후에 위와 같은 구성을 통해서 wait / notify를 구성했습니다.

 

reconnect 가 성공하면 eventBus에서 event를 받아 notify를 통해 wait 중인 rightPop() 메서드를 깨우는 방식입니다.

 

위 방식을 활용하니 확실히 Lettuce와 Redis가 연결 됐을 경우에만 thread가 running 할 수 있는 구조로 만들 수 있었습니다.

 

마무리

Redis를 캐시로만 써오고 MessageQueue처럼 사용할 수 있다는 것은 말로만 들어봤지만 직접 실무를 경험하며 Redis와 더 친해질 수 있었던 계기가 됐던 것 같습니다. 혹시 저와 같이 Polling 구조를 없애야 하는 과정에 계시다면 Redis를 활용해보시는 건 어떨까요

 

혹여나 코드를 보며 지적해주시거나 더 좋은 방법이 있다면 의견 공유해주시면 감사하겠습니다.