集群模式
作為緩存數據庫,肯定要考慮緩存服務器穩(wěn)定性相關的保障機制刷钢。
持久化機制就是一種保障方式框往,持久化機制保證了Redis服務器重啟的情況下也不會損失(或少量損失)數據,因為持久化會把內存中的數據保存到硬盤上闯捎,重啟會從硬盤上加載數據
隨著Redis使用場景越來越多椰弊,技術發(fā)展越來越完善许溅,在Redis整體服務上的容錯、擴容秉版、穩(wěn)定各個方面都需要不斷優(yōu)化贤重,因此在Redis的集群模式上也有不同的搭建方式來應對各種需求。
總結來說清焕,Redis集群模式有三種:
- 主從模式
- 哨兵模式
- Cluster集群模式
1并蝗、主從模式
為了Redis服務避免單點故障,通常的做法是將redis的數據復制到多個副本以部署在不同的服務器上秸妥,這樣即使有一臺服務器出現(xiàn)故障滚停,其他服務器依然可以繼續(xù)提供服務,為此粥惧,Redis提供了復制(replication)功能键畴,可以實現(xiàn)當一臺數據庫的數據更新后,自動將更新的數據同步到其他數據庫上突雪。
Redis服務器分為兩類:一類是主數據庫(Master)起惕,另一類是從數據庫(Slave);
主數據庫可以進行讀寫操作,當寫操作導致數據變化時咏删,會自動將數據同步給從數據庫惹想。
從數據庫一般是只讀的,并接受主數據庫同步過來的數據督函。
一個主數據庫可以擁有多個從數據庫嘀粱,而一個從數據庫卻只能擁有一個主數據庫。
優(yōu)點
1辰狡、一個主锋叨,可以有多個從,并以非阻塞的方式完成數據同步搓译;
2悲柱、從服務器提供讀服務锋喜,分散主服務的壓力些己,實現(xiàn)讀寫分離;
3嘿般、從服務器之間可以彼此連接和同步請求段标,減少主服務同步壓力;缺點
1炉奴、不具備容錯和恢復功能逼庞,主服務存在單點風險;
2瞻赶、Redis的主從復制采用全量復制赛糟,需要服務器有足夠的空余內存派任;
3、主從模式較難支持在線擴容璧南;
2掌逛、哨兵模式——Sentinel 集群
Redis提供的sentinel(哨兵)機制,通過sentinel模式啟動redis后司倚,自動監(jiān)控Master/Slave的運行狀態(tài)豆混,基本原理是:心跳機制+投票裁決。
簡單來說动知,哨兵的作用就是監(jiān)控redis系統(tǒng)的運行狀況皿伺,它的功能包括以下兩個:
- 1、監(jiān)控主數據庫和從數據庫是否正常運行
- 2盒粮、主數據庫出現(xiàn)故障時鸵鸥,自動將從數據庫轉換為主數據庫
哨兵模式主要以下幾個內容:
- 監(jiān)控(Monitoring):Sentinel會定期檢查主從服務器是否處于正常工作狀態(tài)
- 提醒(Notification):當被監(jiān)控的某個Redis服務器出現(xiàn)異常時,Sentinel可以通過API向管理員或者其他應用程序發(fā)送通知
- 自動故障遷移(Antomatic failover):當一個主服務器不能正常工作時拆讯,Sentinel會開始一次自動故障遷移操作脂男,它會將失效主服務器的其中一個從服務器升級為新的主服務器,并讓失效主服務器的其他從服務器改為復制新的主服務器种呐;當客戶端試圖連接失效的主服務器時宰翅,集群也會向客戶端返回新主服務器的地址,使得集群可以使用新主服務器代替失效服務器
Redis Sentinel 是一個分布式系統(tǒng)爽室,你可以在一個架構中運行多個Sentinel進程(progress)
優(yōu)點
1汁讼、哨兵模式主從可以切換,具備基本的故障轉移能力阔墩;
2嘿架、哨兵模式具備主從模式的所有優(yōu)點缺點
1、哨兵模式也很難支持在線擴容操作
2啸箫、集群的配置信息管理比較復雜
3耸彪、集群模式
3.1 Redis Cluster
Redis Cluster是一種服務器Sharding技術,采用CRC16算法來實現(xiàn)數據的分片忘苛,3.0版本開始正式提供蝉娜,采用無中心架構,每個節(jié)點保存數據和整個集群狀態(tài)扎唾,每個節(jié)點都和其他所有節(jié)點連接召川。
Cluster集群結構特點:
1、Redis Cluster所有物理節(jié)點都映射到[0-16383]slot上(不一定均勻分布)胸遇,Cluster負責維護節(jié)點荧呐、桶(slot)、值之間的關系;
2倍阐、在Redis集群中放置一個key-value時概疆,根據CRC16(16) mod 16384的值,從之前劃分的16384個桶中選擇一個峰搪;
3届案、所有的Redis節(jié)點彼此互聯(lián)(PING_PONG機制),內部使用二進制協(xié)議優(yōu)化傳輸效率罢艾;
4楣颠、超過半數的節(jié)點檢測到某個幾點失效時盒齿,則判定該節(jié)點失效航闺;
5吃环、使用端與Redis節(jié)點連接骤宣,不需要中間proxy層瓮顽,直接可以操作居触,使用端不需要連接集群所有節(jié)點叠穆,連接集群中任意一個可用節(jié)點即可瘦材。
優(yōu)點
1期奔、無中心架構侧馅,節(jié)點間數據共享,可動態(tài)調整數據分布呐萌;
2馁痴、節(jié)點可動態(tài)添加刪除,擴張性比較靈活肺孤;
3罗晕、部分節(jié)點異常,不影響整體集群的可用性赠堵;缺點
1小渊、集群實現(xiàn)比較復雜;
2茫叭、批量操作指令(mget酬屉、mset等)支持有限;
3揍愁、事務操作支持有限
Jedis客戶端實現(xiàn):JedisCluster
3.2 Redis Sharding
Redis Sharding 屬于客戶端sharding分片技術呐萨,采用一致性Hash算法來實現(xiàn)數據的分片,3.0版本以前基本上使用分片實現(xiàn)集群吗垮。
Redis Sharding特點:
- 各個Redis節(jié)點獨立垛吗,之間無關系
- 某個Redis節(jié)點掛了凹髓,整個集群不可用烁登,所以需要對每個節(jié)點做主從備份
- 主從備份方案一般通過讀寫分離設置,每個master至少兩個slaver,只有這樣master掛掉后饵沧,才能選舉其中一個Slaver成為新的master锨络,原來master節(jié)點加入集群后成為新master的slaver節(jié)點
- redis主從切換對客戶端jedis使用時透明的,即redis發(fā)生了主從切換并不影響jedis的使用
缺點:
節(jié)點擴展和收縮不友好
Jedis客戶端實現(xiàn):ShardedJedis
4狼牺、哨兵Sentinel Sharding集群模式
如果既想要哨兵模式提供的自動監(jiān)控和故障轉移機制羡儿,又想要Sharding集群的分片機制,那么該怎么辦呢是钥?
在服務端掠归,以Sharding集群啟動,同時悄泥,使得Redis Sentinel分布式系統(tǒng)監(jiān)聽多個Master節(jié)點虏冻;
在客戶端,自定義一個類弹囚,繼承redis.clients.util.Pool厨相,實現(xiàn)redis線程池;
以jedis為例鸥鹉,自定義線程池實現(xiàn)如下蛮穿,參考jedis源碼redis.clients.jedis.JedisSentinelPool,redis.clients.jedis.ShardedJedisPool
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.*;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.util.Hashing;
import redis.clients.util.Pool;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
/**
* Jedis不能同時支持Shareded和Sentinel毁渗。
*
* 這里是把單master改成多master践磅,同時把Jedis改成ShardedJedis。
* 支持多主機集群
*/
public class ShardedJedisSentinelPoolExt extends Pool<ShardedJedis> {
public static final int MAX_RETRY_SENTINEL = 10;
private static final Logger logger = LoggerFactory.getLogger(LoggerType.COMMON);
protected GenericObjectPoolConfig poolConfig;
protected int timeout = Protocol.DEFAULT_TIMEOUT;
private int sentinelRetry = 0;
protected String password;
protected int database = Protocol.DEFAULT_DATABASE;
protected Set<MasterListener> masterListeners = new HashSet<>();
private volatile List<HostAndPort> currentHostMasters;
public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels) {
this(masters, sentinels, new GenericObjectPoolConfig(),
Protocol.DEFAULT_TIMEOUT, null, Protocol.DEFAULT_DATABASE);
}
public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels, String password) {
this(masters, sentinels, new GenericObjectPoolConfig(),
Protocol.DEFAULT_TIMEOUT, password);
}
public ShardedJedisSentinelPoolExt(final GenericObjectPoolConfig poolConfig, Set<String> masters, Set<String> sentinels) {
this(masters, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT, null,
Protocol.DEFAULT_DATABASE);
}
public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig, int timeout,
final String password) {
this(masters, sentinels, poolConfig, timeout, password,
Protocol.DEFAULT_DATABASE);
}
public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig, final int timeout) {
this(masters, sentinels, poolConfig, timeout, null,
Protocol.DEFAULT_DATABASE);
}
public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig, final String password) {
this(masters, sentinels, poolConfig, Protocol.DEFAULT_TIMEOUT,
password);
}
public ShardedJedisSentinelPoolExt(Set<String> masters, Set<String> sentinels,
final GenericObjectPoolConfig poolConfig, int timeout,
final String password, final int database) {
this.poolConfig = poolConfig;
this.timeout = timeout;
this.password = password;
this.database = database;
List<String> convertList = new ArrayList<>(masters);
List<HostAndPort> masterList = initSentinels(sentinels, convertList);
initPool(masterList);
}
@Override
public void destroy() {
for (MasterListener m : masterListeners) {
m.shutdown();
}
super.destroy();
}
public List<HostAndPort> getCurrentHostMaster() {
return currentHostMasters;
}
private void initPool(List<HostAndPort> masters) {
if (!equalsObj(currentHostMasters, masters)) {
StringBuilder sb = new StringBuilder();
for (HostAndPort master : masters) {
sb.append(master.toString());
sb.append(" ");
}
logger.info("Created ShardedJedisPool to master at [" + sb.toString() + "]");
List<JedisShardInfo> shardMasters = makeShardInfoList(masters);
initPool(poolConfig, new ShardedJedisFactory(shardMasters, Hashing.MURMUR_HASH, null));
currentHostMasters = masters;
}
}
private static boolean equalsObj(List<HostAndPort> currentShardMasters, List<HostAndPort> shardMasters) {
if (currentShardMasters != null && shardMasters != null && checkListSize(currentShardMasters,shardMasters)) {
for (int i = 0; i < currentShardMasters.size(); i++) {
if (!currentShardMasters.get(i).equals(shardMasters.get(i)))
return false;
}
return true;
}
return false;
}
private static boolean checkListSize(List<HostAndPort> currentShardMasters, List<HostAndPort> shardMasters){
return (currentShardMasters.size() == shardMasters.size())? true : false;
}
private List<JedisShardInfo> makeShardInfoList(List<HostAndPort> masters) {
List<JedisShardInfo> shardMasters = new ArrayList<>();
for (HostAndPort master : masters) {
JedisShardInfo jedisShardInfo = new JedisShardInfo(master.getHost(), master.getPort(), timeout);
jedisShardInfo.setPassword(password);
shardMasters.add(jedisShardInfo);
}
return shardMasters;
}
private List<HostAndPort> initSentinels(Set<String> sentinels, final List<String> masters) {
Map<String, HostAndPort> masterMap = new HashMap<>();
List<HostAndPort> shardMasters = new ArrayList<>();
logger.info("Trying to find all master from available Sentinels...");
for (String masterName : masters) {
HostAndPort master = null;
boolean fetched = false;
while (!fetched && sentinelRetry < MAX_RETRY_SENTINEL) {
for (String sentinel : sentinels) {
final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
logger.info("Connecting to Sentinel " + hap);
try( Jedis jedis = new Jedis(hap.getHost(), hap.getPort())) {
master = masterMap.get(masterName);
if (master == null) {
List<String> hostAndPort = jedis.sentinelGetMasterAddrByName(masterName);
if (hostAndPort != null && ! hostAndPort.isEmpty()) {
master = toHostAndPort(hostAndPort);
logger.info("Found Redis master at " + master);
shardMasters.add(master);
masterMap.put(masterName, master);
fetched = true;
jedis.disconnect();
break;
}
}
} catch (JedisConnectionException e) {
logger.error("Cannot connect to sentinel running @ " + hap + ". Trying next one.",e);
}
}
if (null == master) {
try {
logger.info("All sentinels down, cannot determine where is "
+ masterName + " master is running... sleeping 1000ms, Will try again.");
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.error(e.getMessage());
Thread.currentThread().interrupt();
}
fetched = false;
sentinelRetry++;
}
}
// Try MAX_RETRY_SENTINEL times.
if (!fetched && sentinelRetry >= MAX_RETRY_SENTINEL) {
logger.info("All sentinels down and try " + MAX_RETRY_SENTINEL + " times, Abort.");
throw new JedisConnectionException("Cannot connect all sentinels, Abort.");
}
}
// All shards master must been accessed.
if (! masters.isEmpty() && masters.size() == shardMasters.size()) {
logger.info("Starting Sentinel listeners...");
for (String sentinel : sentinels) {
final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel.split(":")));
MasterListener masterListener = new MasterListener(masters, hap.getHost(), hap.getPort());
masterListeners.add(masterListener);
masterListener.start();
}
}
return shardMasters;
}
private static HostAndPort toHostAndPort(List<String> getMasterAddrByNameResult) {
String host = getMasterAddrByNameResult.get(0);
int port = Integer.parseInt(getMasterAddrByNameResult.get(1));
return new HostAndPort(host, port);
}
/**
* PoolableObjectFactory custom impl.
*/
protected static class ShardedJedisFactory implements PooledObjectFactory<ShardedJedis> {
private List<JedisShardInfo> shards;
private Hashing algo;
private Pattern keyTagPattern;
public ShardedJedisFactory(List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {
this.shards = shards;
this.algo = algo;
this.keyTagPattern = keyTagPattern;
}
@Override
public PooledObject<ShardedJedis> makeObject() throws Exception {
ShardedJedis jedis = new ShardedJedis(shards, algo, keyTagPattern);
return new DefaultPooledObject<>(jedis);
}
@Override
public void destroyObject(PooledObject<ShardedJedis> pooledShardedJedis) throws Exception {
final ShardedJedis shardedJedis = pooledShardedJedis.getObject();
for (Jedis jedis : shardedJedis.getAllShards()) {
try {
jedis.quit();
} catch (Exception e) {
logger.error(e.getMessage(),e);
}
try {
jedis.disconnect();
} catch (Exception e) {
logger.error(e.getMessage(),e);
}
}
}
@Override
public boolean validateObject(PooledObject<ShardedJedis> pooledShardedJedis) {
try {
ShardedJedis jedis = pooledShardedJedis.getObject();
for (Jedis shard : jedis.getAllShards()) {
if (!"PONG".equals(shard.ping())) {
return false;
}
}
return true;
} catch (Exception ex) {
logger.error(ex.getMessage(),ex);
return false;
}
}
@Override
public void activateObject(PooledObject<ShardedJedis> p) throws Exception {
// Do nothing because of X and Y.
}
@Override
public void passivateObject(PooledObject<ShardedJedis> p) throws Exception {
// Do nothing because of X and Y.
}
}
protected class JedisPubSubAdapter extends JedisPubSub {
@Override
public void onMessage(String channel, String message) {
// Do nothing because of X and Y.
}
@Override
public void onPMessage(String pattern, String channel, String message) {
// Do nothing because of X and Y.
}
@Override
public void onPSubscribe(String pattern, int subscribedChannels) {
// Do nothing because of X and Y.
}
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
// Do nothing because of X and Y.
}
@Override
public void onSubscribe(String channel, int subscribedChannels) {
// Do nothing because of X and Y.
}
@Override
public void onUnsubscribe(String channel, int subscribedChannels) {
// Do nothing because of X and Y.
}
}
protected class MasterListener extends Thread {
protected List<String> masters;
protected String host;
protected int port;
protected long subscribeRetryWaitTimeMillis = 5000;
protected Jedis jedis;
protected AtomicBoolean running = new AtomicBoolean(false);
protected MasterListener() {
}
public MasterListener(List<String> masters, String host, int port) {
this.masters = masters;
this.host = host;
this.port = port;
}
public MasterListener(List<String> masters, String host, int port,
long subscribeRetryWaitTimeMillis) {
this(masters, host, port);
this.subscribeRetryWaitTimeMillis = subscribeRetryWaitTimeMillis;
}
@Override
public void run() {
running.set(true);
while (running.get()) {
jedis = new Jedis(host, port);
try {
jedis.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
logger.info("Sentinel " + host + ":" + port + " published: " + message + ".");
String[] switchMasterMsg = message.split(" ");
if (switchMasterMsg.length > 3) {
int index = masters.indexOf(switchMasterMsg[0]);
if (index >= 0) {
HostAndPort newHostMaster = toHostAndPort(Arrays.asList(switchMasterMsg[3], switchMasterMsg[4]));
List<HostAndPort> newHostMasters = new ArrayList<>();
for (int i = 0; i < masters.size(); i++) {
newHostMasters.add(null);
}
Collections.copy(newHostMasters, currentHostMasters);
newHostMasters.set(index, newHostMaster);
initPool(newHostMasters);
} else {
StringBuilder sb = new StringBuilder();
for (String masterName : masters) {
sb.append(masterName);
sb.append(",");
}
logger.info("Ignoring message on +switch-master for master name "
+ switchMasterMsg[0]
+ ", our monitor master name are ["
+ sb + "]");
}
} else {
logger.info("Invalid message received on Sentinel "
+ host
+ ":"
+ port
+ " on channel +switch-master: "
+ message);
}
}
}, "+switch-master");
} catch (JedisConnectionException e) {
if (running.get()) {
logger.info("Lost connection to Sentinel at " + host
+ ":" + port
+ ". Sleeping 5000ms and retrying.");
try {
Thread.sleep(subscribeRetryWaitTimeMillis);
} catch (InterruptedException e1) {
logger.error(e.getMessage(),e1);
Thread.currentThread().interrupt();
}
} else {
logger.info("Unsubscribing from Sentinel at " + host + ":"
+ port);
}
}
}
}
public void shutdown() {
try {
logger.info("Shutting down listener on " + host + ":" + port);
running.set(false);
// This isn't good, the Jedis object is not thread safe
jedis.disconnect();
} catch (Exception e) {
logger.error("Caught exception while shutting down: " , e);
}
}
}
}