開篇依然是那三個(gè)問題:
- redigo 是否能夠用于 codis ?
- 如果不經(jīng)過任何加工, 直接用 redigo 去訪問 codis, 會(huì)出現(xiàn)什么樣的問題 ?
- codis 的 golang 客戶端如何實(shí)現(xiàn) ?
先貼出來, 我之前直接用 Redigo 接入 codis 的代碼
// Redis global redis connection pool
var Redis *redis.Pool
var RedisInitErr = errors.New("init redis error")
Redis = &redis.Pool{
MaxIdle: 10,
Dial: func() (conn redis.Conn, e error) {
addrs, err := getHosts()
if err != nil {
panic("init redis panic")
}
rand.Seed(time.Now().UnixNano())
rand.Shuffle(len(addrs), func(i, j int) {
addrs[i], addrs[j] = addrs[j], addrs[i]
})
var handler redis.Conn
for _, v := range addrs {
var err error
handler, err = redis.Dial("tcp", v)
if err != nil || handler == nil {
continue
}
res, err := handler.Do("PING")
if pong, err := redis.String(res, err); err != nil && pong != "PONG" {
_ = handler.Close()
continue
}
}
if handler != nil && handler.Err() == nil {
return handler, nil
}
return nil, RedisInitErr
},
Wait: true,
}
這代碼里 getHosts()
函數(shù)是從服務(wù)發(fā)現(xiàn)里面取到 Codis Proxy 的所有的 IP + Port. 還使用洗牌算法
, 保證是隨機(jī)從所有的 Proxy 中拿到一個(gè) IP + Port.
func TestRedis(t *testing.T) {
res, err := Redis.Get().Do("ping")
pong, err := redis.String(res, err)
assert.NoError(t, err)
assert.Equal(t, pong, "PONG")
}
我還寫了一系列的單元測試, 表面上看也能設(shè)置/獲取到數(shù)據(jù), 似乎一切都很完美, Perfect!
但是上完線后, OP 告訴我訪問 Codis 訪問不均勻. 我當(dāng)時(shí)就納悶了, 啥叫訪問不均勻 (咱啥也不知道, 啥也不敢問呀!)
說到這里, 我們不得不說說 Codis 的架構(gòu)圖
請注意, Zookeeper, 這里也就是告訴客戶端想要獲取到 codis-proxy, 是需要通過服務(wù)注冊發(fā)現(xiàn)的方式的. 但是我的程序里面也有這個(gè)呀, 為啥就訪問不均勻了呢?
想這樣一個(gè)問題, 假如有 10 個(gè) codis-proxy, 如果因?yàn)槭请S機(jī)從 codis-proxy 中取值的, 如果說剛好從 4 個(gè) proxy 中取到的連接數(shù)就能滿足所有的請求數(shù). 由于這 4 個(gè)連接一直在 redigo pool 中保持活躍, 而且 pool 參數(shù)里面沒有設(shè)置 MaxConnLifetime
, 最終的結(jié)果就導(dǎo)致了所有的請求全都分配到了這 4 個(gè) proxy 上.
這個(gè)現(xiàn)象依然適用于很大的 QPS 時(shí), 當(dāng)很大的 QPS 請求時(shí)取到的連接, 很可能大部分集中在某幾個(gè) codis-proxy上, 也就出現(xiàn)了上面訪問不均勻的截圖, 真相就是這樣
結(jié)論: 直接使用 redigo 訪問 codis 是有問題的. 當(dāng)然, 如果你的服務(wù) QPS 很低, 這個(gè)問題倒不是很大, 但也要特別注意這個(gè)問題
那么問題來了, Go 如何訪問 Codis 呢?
Google 了一下也沒有發(fā)現(xiàn)開源的 golang codis 客戶端. 我們還是去看看官方爸爸的 codis java 版本客戶端 jodis 是如何實(shí)現(xiàn)的?
使用起來很簡單, 如下:
JedisResourcePool jedisPool = RoundRobinJedisPool.create()
.curatorClient("zkserver:2181", 30000).zkProxyDir("/jodis/xxx").build();
try (Jedis jedis = jedisPool.getResource()) {
jedis.set("foo", "bar");
String value = jedis.get("foo");
System.out.println(value);
}
主要關(guān)注create()
, build()
, getResource()
是如何實(shí)現(xiàn)的即可
1.create()
public static Builder create() {
return new Builder();
}
2.build()
public RoundRobinJedisPool build() {
validate();
return new RoundRobinJedisPool(curatorClient, closeCurator, zkProxyDir, poolConfig,
connectionTimeoutMs, soTimeoutMs, password, database, clientName);
}
build() 返回一個(gè) RoundRobinJedisPool
對象. 從名字也能看出來, Codis Pool 是個(gè)輪詢池
3.getResource()
@Override
public Jedis getResource() {
ImmutableList<PooledObject> pools = this.pools;
if (pools.isEmpty()) {
throw new JedisException("Proxy list empty");
}
for (;;) {
int current = nextIdx.get();
int next = current >= pools.size() - 1 ? 0 : current + 1;
if (nextIdx.compareAndSet(current, next)) {
return pools.get(next).getResource();
}
}
}
要先弄明白 pools 是什么?
private void resetPools() {
ImmutableList<PooledObject> pools = this.pools;
Map<String, PooledObject> addr2Pool = Maps.newHashMapWithExpectedSize(pools.size());
for (PooledObject pool: pools) {
addr2Pool.put(pool.addr, pool);
}
ImmutableList.Builder<PooledObject> builder = ImmutableList.builder();
for (ChildData childData : watcher.getCurrentData()) {
try {
CodisProxyInfo proxyInfo = MAPPER.readValue(childData.getData(), CodisProxyInfo.class);
if (!CODIS_PROXY_STATE_ONLINE.equals(proxyInfo.getState())) {
continue;
}
String addr = proxyInfo.getAddr();
PooledObject pool = addr2Pool.remove(addr);
if (pool == null) {
String[] hostAndPort = addr.split(":");
String host = hostAndPort[0];
int port = Integer.parseInt(hostAndPort[1]);
pool = new PooledObject(addr,
new JedisPool(poolConfig, host, port, connectionTimeoutMs, soTimeoutMs,
password, database, clientName, false, null, null, null));
LOG.info("Add new proxy: " + addr);
}
builder.add(pool);
} catch (Exception e) {
LOG.warn("parse " + childData.getPath() + " failed", e);
}
}
this.pools = builder.build();
...
}
這個(gè) pools 其實(shí)是 redis pool 的集合, 具體的操作流程:
- 從 Zookeeper 中獲取到 codis proxy 的信息. 這個(gè)其實(shí)不重要, 我們可以把這個(gè)換成 etcd
- 為所有的 codis proxy 都建立一個(gè) redis pool, 當(dāng)客戶端從某個(gè) codis proxy上取連接的時(shí)候, 其實(shí)是中這個(gè) codis proxy 的 redis pool 中去取連接
- 查看 codis pools 中是否有所有的 proxy 的 redis pool, 如果沒有的話, 就創(chuàng)建一個(gè)放到 codis pools 中
再回到 getResource()
函數(shù)
- 通過 RoundRobinJedisPool 類的原子變量 nextIdx 獲取上一次從哪個(gè) codis-proxy 的 redis pool 中獲取的redis 連接
- 按照輪詢的方式, 計(jì)算獲取下一次應(yīng)該去哪個(gè) codis-proxy 的 redis pool 中去獲取連接. 這里使用的 compareAndSet lockfree 方法來處理的
看完 Jodis 的實(shí)現(xiàn), 我們?nèi)绾问褂?Redigo 來實(shí)現(xiàn)一個(gè) Golang 版本的 codis pool 呢 ?
看到這里其實(shí)我們心里應(yīng)該有個(gè)大致的思路了, 總結(jié)一下:
1. 設(shè)計(jì)一個(gè)類 CodisPool
type CodisPool struct {
pools []*redigo.Pool
mux sync.Mutex
index int
}
2. 從 Zk 或者 etcd 中獲取所有的 codis proxys 信息
3. 為每一個(gè) codis-proxy 都建立一個(gè) redis.Pool 放入 pools 中
for _, proxy := range proxys {
p, err := NewRedisPool(
url.Host,
url.Port,
url.MaxIdle,
url.IdleTimeout,
url.ConnectTimeout,
url.ReadTimeout,
url.WriteTimeout,
url.PoolSize)
if err != nil {
log.Printf("%s %d connected failed %s", url.Host, url.Port, err.Error())
continue
}
c.pools = append(c.pools, p)
}
4. 從 codis pool 獲取連接時(shí), 通過輪詢計(jì)算該從哪個(gè) codis-proxy Pool 上獲取連接
func (c *CodisPool) Pool() {
...
c.mux.Lock()
defer c.mux.Lock()
c.index++
if c.LastPoolIdx >= len(c.pools) {
c.index = 0
}
p := c.pools[c.index]
...
}
具體代碼目前還不能放出來, 所以提供一個(gè)大致思路.