Blog

REDIS의 WAIT Command

September 09, 2020  Dooyoung Hwang

Synchronous vs Asynchronous Replication

  • Synchronous : Write 시 모든 replication으로부터 ACK을 받고 Client에게 OK전송 - latency 문제
  • Asynchronous : Write 시 replication으로부터 ACK을 받기 전에 Client에게 OK전송 - Redis에서 default로 채택하는 방식
  • Asynchronous 방식 채택 시 master와 replica간의 Consistency 문제는 ? WAIT Command를 통해 해결함

Example

1
2
3
4
5
6
7
8
9
redis 127.0.0.1:9999> set foo bar
OK
redis 127.0.0.1:9999> incr mycounter
(integer) 1

# 5개의 Replica로 Write가 Propagate될 때까지 MAX 100ms동안 기다림
# 현재까지 Write command를 sync완료한 replica의 갯수가 return
redis 127.0.0.1:9999> wait 5 100
(integer) 7

동작 방식

  • global offset : replica로 Command를 propagate할 때마다 증가되는 offset
  • replication offset : 모든 replica는 현재까지 처리완료한 offset을 저장하고 있음
    • 매초마다 replica는 replication offset을 master로 전송(ACK)
  • WAIT command가 호출될 시
    • REPLCONF GETACK command를 Replica로 propagate함
      • Replica는 REPLCONF GETACK을 받자마자 현재까지 처리한 Replication offset을 master로 전송(ACK)
      • WAIT command parameter로 전달된 replication 수만큼 ACK을 받자마자 blocking을 풀고 client로 응답 전송

Client 사용 예시

  • 3개 이상의 Replica가 살아있을 때만 payment_id를 save하는 예제 - wait가 3이상 return할 시에만 confirmed를 저장함
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    
    def save_payment(payment_id)
          redis.rpush(payment_id,in progress) # Return false on exception
          if redis.wait(3,1000) >= 3 then
              redis.rpush(payment_id,confirmed) # Return false on exception
              if redis.wait(3,1000) >= 3 then
                  return true
              else
                  redis.rpush(payment_id,cancelled)
                  return false
              end
          else
              return false
      end
    

REDIS Source code in 5.0 version

  • waitCommand 구현 부분 (replication.c)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void waitCommand(client *c) {
    mstime_t timeout;
    long numreplicas, ackreplicas;
    // offset 변수는 command를 수신한 시점의 global offset이다.
    // 결국 replica들의 offset이 이 offset변수만큼 sync되는 걸 기다린다고 보면 된다.
    long long offset = c->woff;
    ...
    // global offset만큼 sync된 replica 수가 client에서 질의한 replica수보다 크다면 blocking없이 바로 return한다.
    ackreplicas = replicationCountAcksByOffset(c->woff);
    if (ackreplicas >= numreplicas || c->flags & CLIENT_MULTI) {
        addReplyLongLong(c,ackreplicas);
        return;
    }
    // Blocking된 client list에 추가하고 client block후 replica로부터의 ACK을 기다린다.
    c->bpop.timeout = timeout;
    c->bpop.reploffset = offset;
    c->bpop.numreplicas = numreplicas;
    listAddNodeTail(server.clients_waiting_acks,c);
    blockClient(c,BLOCKED_WAIT);
    // REPLCONF GETACK Command를 replica로 보낸다.
    // (실제로는 replicationRequestAckFromSlaves에서는 바로 REPLCONF GETACK을 보내진 않고 flag설정 후 다음 event loop에서 전송한다. 이는 WAIT요청이 여러 클라이언트로 부터 동시에 이루어져도 한 event loop내에서는 한번만 REPLCONF GETACK을 보내기 위한 처리로 보여진다.)
    replicationRequestAckFromSlaves();
}
  • Replica의 REPLCONF GETACK 처리 부분 (replication.c)
1
2
3
4
5
6
7
8
9
void replconfCommand(client *c) {
    ...
     else if (!strcasecmp(c->argv[j]->ptr,"getack")) {
            // REPLCONF GETACK을 수신한 replica는 즉시 현재 sync offset을 master로 전송한다.
            if (server.masterhost && server.master) replicationSendAck();
            return;
        }
    ...    
}
  • Client를 unblock하는 부분 (server.c)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// wait Command의 다음 event loop 시작점에서 호출됨
void beforeSleep(struct aeEventLoop *eventLoop) {
    ...
    // waitCommand에서 설정한 get_ack_from_slaves flag가 ON일 경우 REPLCONF GETACK을 replica들에게 전송함
    if (server.get_ack_from_slaves) {
        robj *argv[3];

        argv[0] = createStringObject("REPLCONF",8);
        argv[1] = createStringObject("GETACK",6);
        argv[2] = createStringObject("*",1); /* Not used argument. */
        replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3);
        decrRefCount(argv[0]);
        decrRefCount(argv[1]);
        decrRefCount(argv[2]);
        server.get_ack_from_slaves = 0;
    }
     // replica들의 replication offset을 기준으로 WAIT 상태의 client들의 unblock여부를 판단함
    if (listLength(server.clients_waiting_acks))
        processClientsWaitingReplicas();
    ...
}

We provide speed and trust to business.