Redis我一直都在使用仗哨,但是目前的Redis3.*版本橫空出世,竟然支持了Redis集群热某,這可解決了我們很久以來的Redis擴(kuò)展的難題透典。但是通過測試讓我又喜又悲刽宪,我一直都是在使ServiceStack.Redis進(jìn)行Redis的相關(guān)操作哀蘑,現(xiàn)有的dll版本已經(jīng)無法滿足Redis3.0下的集群模式诚卸,終于現(xiàn)在有了時間,所以可以開搞了.
那么如何讓ServiceStack.Redis支持集群我們也將陸續(xù)的開展工作绘迁,通過閱讀redis cluster的官方文檔找到了答案合溺,其實(shí)很簡單.
- 了解Redis3.0的集群模式
- 了解Hash Slot的計(jì)算方式
- 了解ServiceStack.Redis
- 尋找切入點(diǎn)
- 代碼實(shí)現(xiàn)
一、Redis3.0集群模式
目前Redis3.0主要模式為主從設(shè)計(jì)缀台,并且在主節(jié)點(diǎn)的算法上采用少數(shù)服從多數(shù)棠赛,每一個主節(jié)點(diǎn)都會產(chǎn)生N-1個副本進(jìn)行主從復(fù)制,如果當(dāng)其中一個主節(jié)點(diǎn)失敗膛腐,集群將自動將其中一個從節(jié)點(diǎn)提升為主繼續(xù)維持整個集群的正常工作.
二睛约、Hash Slot
Redis3.0在Hash Slot上固定設(shè)定為16384個.并且各個節(jié)點(diǎn)都會平均的分布這些slot.
如:
Node A contains hash slots from 0 to 5500.
Node B contains hash slots from 5501 to 11000.
Node C contains hash slots from 11001 to 16384.
當(dāng)我們要獲取一個指定的Key,那么我將通過Slot的算法進(jìn)行計(jì)算出所要操作的Key存在哪一個主節(jié)點(diǎn)內(nèi)哲身,Slot的計(jì)算公式為: Slot = CRC16(KEY)% 16384.
如 key=a 那么他的slot計(jì)算得出為 15495
通過比對A\B\C三個節(jié)點(diǎn)的slots則可以斷定辩涝,這個key應(yīng)該寫入到節(jié)點(diǎn)C.
三、ServiceStack.Redis
類圖結(jié)構(gòu):
通過上面的類圖勘天,我們可以關(guān)注一下string client包含的這些接口怔揩,從類圖上看它將封裝了Redis操作的所有功能捉邢,并以接口對象的形式來實(shí)現(xiàn)具體操作的代碼編寫。
打開源代碼分析哈商膊,RedisClient類為操作具體實(shí)現(xiàn)類伏伐,使用partial class特性來使用多個文件合并類。
PooledRedisClientManager 類將作為RedisClient的管理類進(jìn)行Client分配和一些管理操作,如故障轉(zhuǎn)移.
至少我是是那么認(rèn)為這兩個類是這樣搭配使用的.
四晕拆、切入點(diǎn)
是到了尋找如何下手的時候啦藐翎,看到前面所涉及的一些內(nèi)容,可以設(shè)想一下當(dāng)根據(jù)PooledRedisClientManager對象進(jìn)行了RedisClient初始化,并當(dāng)我們每次根據(jù)Key進(jìn)行操作獲取RedisClient時的無參函數(shù)GetClient(),擴(kuò)展為GetClient(key)來根據(jù)key的Slot選擇該返回的RedisClient對象实幕,這樣就每次都能夠正確的節(jié)點(diǎn)地址進(jìn)行連接操作啦,同理讀取操作也是一樣的.
思路步驟:
- 為ServiceStack.Redis增加集群模式的屬性吝镣,確定開啟和關(guān)閉集群模式
- 增加Cluster擴(kuò)展類并支持一些Cluter模式下的公共方法,此類同樣partial class RedisClient茬缩,提供查詢Cluster Nodes信息功能赤惊、Hash Slot計(jì)算方法以及節(jié)點(diǎn)信息的實(shí)體屬性類.
- 增加帶參的GetClient(key)方法,通過計(jì)算key的slot來返回對應(yīng)的RedisClient.
就是那么簡單凰锡!
五未舟、代碼實(shí)現(xiàn)
在修改代碼的同時為了不破壞原有的代碼結(jié)構(gòu),我們還是使用if else來分隔我們的代碼域,防止由于我們的疏忽而導(dǎo)致以前的程序出現(xiàn)問題,當(dāng)然如果有足夠的把握掂为,也可以直接把這個函數(shù)重寫啦裕膀!
修改PooledRedisClientManage
/// <summary>
/// wanglei add
/// open redis 3.0 cluster mode
/// </summary>
public bool? OpenCluster { get; set; }
尋找到private void InitClient(RedisClient client)方法,加入初始化判斷.
private void InitClient(RedisClient client)
{
if (this.ConnectTimeout != null)
client.ConnectTimeout = this.ConnectTimeout.Value;
if (this.SocketSendTimeout.HasValue)
client.SendTimeout = this.SocketSendTimeout.Value;
if (this.SocketReceiveTimeout.HasValue)
client.ReceiveTimeout = this.SocketReceiveTimeout.Value;
if (this.IdleTimeOutSecs.HasValue)
client.IdleTimeOutSecs = this.IdleTimeOutSecs.Value;
if (this.ForceReconnectInIdle.HasValue)
client.ForceReconnectInIdle = this.ForceReconnectInIdle.Value;
else
client.ForceReconnectInIdle = false;
if (this.NamespacePrefix != null)
client.NamespacePrefix = NamespacePrefix;
if (Db != null && client.Db != Db) //Reset database to default if changed
client.ChangeDb(Db.Value);
//wanglei add
//close the cluster mode default.
if (OpenCluster.HasValue)
{
client.OpenCluster = this.OpenCluster.HasValue;
}
else
{
client.OpenCluster = false;
this.OpenCluster = false;
}
//wanglei add
if (this.OpenCluster.Value)
{
InitClusterInfo(client);
}
}
public List<ClusterNode> ClusterNodes;//wanglei add
private bool _loadClusterNodes = false;//wanglei add
/// <summary>
/// init cluster info
/// wanglei add
/// </summary>
/// <param name="client"></param>
public void InitClusterInfo(RedisClient client)
{
if (_loadClusterNodes) return;
ClusterNodes = new List<ClusterNode>();
string host = string.Format("{0}:{1}", client.Host, client.Port);
string nodes = client.GetClusterInfo();
using (var reader = new StringReader(nodes))
{
string line;
while ((line = reader.ReadLine()) != null)
{
if (string.IsNullOrWhiteSpace(line)) continue;
ClusterNodes.Add(new ClusterNode(line));
}
}
client.FillSlot(ClusterNodes);
//according cluster nodes to reset the read or write client
//1. the slave node as read from the client
//2. the master node as write from the client
lock (readClients)
{
readClients = new RedisClient[0];
ReadOnlyHosts = ClusterNodes.Where(w => w.Slave == true).Select(s => string.Format("{0}:{1}", s.Host, s.Port))
.ToRedisEndPoints();
}
lock (writeClients)
{
writeClients = new RedisClient[0];
ReadWriteHosts = ClusterNodes.Where(w => w.Slave == false).Select(s => string.Format("{0}:{1}", s.Host, s.Port))
.ToRedisEndPoints();
}
_loadClusterNodes = true;
}
無參函數(shù)到有參的過程所涉及到要修改的函數(shù):
//獲取寫入客戶端
public IRedisClient GetClient()
//得到一個寫入角色的客戶端
private RedisClient GetInActiveWriteClient() --此函數(shù)在修改中被注釋掉由private RedisClient GetInActiveWriteClient(string key)代替.
//獲取讀客戶端
public virtual IRedisClient GetReadOnlyClient()
//獲取一個讀角色的客戶端
private RedisClient GetInActiveReadClient() --此函數(shù)在修改中被注釋掉由private RedisClient GetInActiveReadClient(string key)代替.
以上四個函數(shù)為Manage類主要的入口點(diǎn)勇哗,需要利用函數(shù)重載為他們擴(kuò)展一個參數(shù)key來支持slot計(jì)算.
/// <summary>
/// get a readwriteclient by key slot
/// wanglei add
/// </summary>
/// <param name="key">the redis keyword</param>
/// <returns></returns>
public IRedisClient GetClient(string key)
{
lock (writeClients)
{
if (!OpenCluster.Value) //open cluster mode wanglei add
{
AssertValidReadWritePool();
}
RedisClient inActiveClient;
while ((inActiveClient = GetInActiveWriteClient(key)) == null)
{
if (PoolTimeout.HasValue)
{
// wait for a connection, cry out if made to wait too long
if (!Monitor.Wait(writeClients, PoolTimeout.Value))
throw new TimeoutException(PoolTimeoutError);
}
else
Monitor.Wait(writeClients, RecheckPoolAfterMs);
}
WritePoolIndex++;
inActiveClient.Active = true;
InitClient(inActiveClient);
return inActiveClient;
}
}
/// <summary>
/// Returns a Read/Write client (The default) using the hosts defined in ReadWriteHosts
/// </summary>
/// <returns></returns>
public IRedisClient GetClient()
{
//將代碼移植到有參函數(shù)并且調(diào)用.
return this.GetClient(string.Empty);
}
/// <summary>
/// Called within a lock
/// wanglei add
/// </summary>
/// <returns></returns>
private RedisClient GetInActiveWriteClient(string key)
{
//if key is empty then will think not cluster mode (default).
int slot = -1;
if (OpenCluster.Value && !string.IsNullOrWhiteSpace(key))
{
slot = RedisClient.HashSlot(key);
}
if (OpenCluster.Value && slot > -1) //open cluster mode wanglei add
{
ClusterNode clusterNode = this.ClusterNodes.FirstOrDefault(s => s.Slot1 <= slot && s.Slot2 >= slot && s.Slave == false);
if (clusterNode == null)
{
return null;
}
RedisEndpoint rePoint = ReadWriteHosts.FirstOrDefault(f =>
f.Host == clusterNode.Host && f.Port == clusterNode.Port
);
if (rePoint == null)
{
return null;
}
return InitNewClient(rePoint);
}
else
{
var desiredIndex = WritePoolIndex % writeClients.Length;
//this will loop through all hosts in readClients once even though there are 2 for loops
//both loops are used to try to get the prefered host according to the round robin algorithm
for (int x = 0; x < ReadWriteHosts.Count; x++)
{
var nextHostIndex = (desiredIndex + x) % ReadWriteHosts.Count;
RedisEndpoint nextHost = ReadWriteHosts[nextHostIndex];
for (var i = nextHostIndex; i < writeClients.Length; i += ReadWriteHosts.Count)
{
if (writeClients[i] != null && !writeClients[i].Active && !writeClients[i].HadExceptions)
return writeClients[i];
else if (writeClients[i] == null || writeClients[i].HadExceptions)
{
if (writeClients[i] != null)
writeClients[i].DisposeConnection();
var client = InitNewClient(nextHost);
writeClients[i] = client;
return client;
}
}
}
}
return null;
}
/// <summary>
/// get a readonlyclient by key slot
/// wanglei add
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
public IRedisClient GetReadOnlyClient(string key)
{
lock (readClients)
{
if (!OpenCluster.Value) //wanglei add
{
AssertValidReadOnlyPool();
}
RedisClient inActiveClient;
while ((inActiveClient = GetInActiveReadClient(key)) == null)
{
if (PoolTimeout.HasValue)
{
// wait for a connection, cry out if made to wait too long
if (!Monitor.Wait(readClients, PoolTimeout.Value))
throw new TimeoutException(PoolTimeoutError);
}
else
Monitor.Wait(readClients, RecheckPoolAfterMs);
}
ReadPoolIndex++;
inActiveClient.Active = true;
InitClient(inActiveClient);
return inActiveClient;
}
}
/// <summary>
/// Returns a ReadOnly client using the hosts defined in ReadOnlyHosts.
/// </summary>
/// <returns></returns>
public virtual IRedisClient GetReadOnlyClient()
{
return GetReadOnlyClient(string.Empty);//wanglei add
}
/// <summary>
/// According to the key value Called within a lock
/// wanglei add
/// </summary>
/// <returns></returns>
private RedisClient GetInActiveReadClient(string key)
{
//if key is empty then will think not cluster mode (default).
int slot = -1;
if (OpenCluster.Value && !string.IsNullOrWhiteSpace(key))
{
slot = RedisClient.HashSlot(key);
}
if (OpenCluster.Value && slot > -1) //open cluster mode wanglei add
{
ClusterNode clusterNode = this.ClusterNodes.FirstOrDefault(s => s.Slot1 <= slot && s.Slot2 >= slot && s.Slave==false);
if (clusterNode == null)
{
return null;
}
RedisEndpoint rePoint = ReadWriteHosts.FirstOrDefault(f =>
f.Host == clusterNode.Host && f.Port == clusterNode.Port
);
if (rePoint == null)
{
return null;
}
return InitNewClient(rePoint);
}
else
{
var desiredIndex = ReadPoolIndex % readClients.Length;
//this will loop through all hosts in readClients once even though there are 2 for loops
//both loops are used to try to get the prefered host according to the round robin algorithm
for (int x = 0; x < ReadOnlyHosts.Count; x++)
{
var nextHostIndex = (desiredIndex + x) % ReadOnlyHosts.Count;
var nextHost = ReadOnlyHosts[nextHostIndex];
for (var i = nextHostIndex; i < readClients.Length; i += ReadOnlyHosts.Count)
{
if (readClients[i] != null && !readClients[i].Active && !readClients[i].HadExceptions)
return readClients[i];
else if (readClients[i] == null || readClients[i].HadExceptions)
{
if (readClients[i] != null)
readClients[i].DisposeConnection();
var client = InitNewClient(nextHost);
readClients[i] = client;
return client;
}
}
}
}
return null;
}
新增類
/// <summary>
/// redis cluster client extension
/// wanglei add
/// </summary>
public partial class RedisClient
: IRedisClient
{
internal ClusterNode _clusterNode = null;
internal const int NoSlot = -1;
internal const int RedisClusterSlotCount = 16384;
/// <summary>
/// Get Cluster Nodes Info
/// wanglei add
/// </summary>
internal string GetClusterInfo()
{
return SendExpectString(Commands.Cluster, "NODES".ToUtf8Bytes());
}
/// <summary>
/// Fill Slot Range To all
/// </summary>
/// <param name="clusterNodes"></param>
internal void FillSlot(List<ClusterNode> clusterNodes)
{
List<ClusterNode> nodes = clusterNodes.Where(s => s.ParentNodeId != "-").ToList();
foreach (var node in nodes)
{
ClusterNode n = clusterNodes.FirstOrDefault(f => f.NodeID == node.ParentNodeId);
if(n != null)
{
node.Slot1 = n.Slot1;
node.Slot2 = n.Slot2;
}
}
}
internal static unsafe int IndexOf(byte* ptr, byte value, int start, int end)
{
for (int offset = start; offset < end; offset++)
if (ptr[offset] == value) return offset;
return -1;
}
static readonly ushort[] crc16tab =
{
0x0000,0x1021,0x2042,0x3063,0x4084,0x50a5,0x60c6,0x70e7,
0x8108,0x9129,0xa14a,0xb16b,0xc18c,0xd1ad,0xe1ce,0xf1ef,
0x1231,0x0210,0x3273,0x2252,0x52b5,0x4294,0x72f7,0x62d6,
0x9339,0x8318,0xb37b,0xa35a,0xd3bd,0xc39c,0xf3ff,0xe3de,
0x2462,0x3443,0x0420,0x1401,0x64e6,0x74c7,0x44a4,0x5485,
0xa56a,0xb54b,0x8528,0x9509,0xe5ee,0xf5cf,0xc5ac,0xd58d,
0x3653,0x2672,0x1611,0x0630,0x76d7,0x66f6,0x5695,0x46b4,
0xb75b,0xa77a,0x9719,0x8738,0xf7df,0xe7fe,0xd79d,0xc7bc,
0x48c4,0x58e5,0x6886,0x78a7,0x0840,0x1861,0x2802,0x3823,
0xc9cc,0xd9ed,0xe98e,0xf9af,0x8948,0x9969,0xa90a,0xb92b,
0x5af5,0x4ad4,0x7ab7,0x6a96,0x1a71,0x0a50,0x3a33,0x2a12,
0xdbfd,0xcbdc,0xfbbf,0xeb9e,0x9b79,0x8b58,0xbb3b,0xab1a,
0x6ca6,0x7c87,0x4ce4,0x5cc5,0x2c22,0x3c03,0x0c60,0x1c41,
0xedae,0xfd8f,0xcdec,0xddcd,0xad2a,0xbd0b,0x8d68,0x9d49,
0x7e97,0x6eb6,0x5ed5,0x4ef4,0x3e13,0x2e32,0x1e51,0x0e70,
0xff9f,0xefbe,0xdfdd,0xcffc,0xbf1b,0xaf3a,0x9f59,0x8f78,
0x9188,0x81a9,0xb1ca,0xa1eb,0xd10c,0xc12d,0xf14e,0xe16f,
0x1080,0x00a1,0x30c2,0x20e3,0x5004,0x4025,0x7046,0x6067,
0x83b9,0x9398,0xa3fb,0xb3da,0xc33d,0xd31c,0xe37f,0xf35e,
0x02b1,0x1290,0x22f3,0x32d2,0x4235,0x5214,0x6277,0x7256,
0xb5ea,0xa5cb,0x95a8,0x8589,0xf56e,0xe54f,0xd52c,0xc50d,
0x34e2,0x24c3,0x14a0,0x0481,0x7466,0x6447,0x5424,0x4405,
0xa7db,0xb7fa,0x8799,0x97b8,0xe75f,0xf77e,0xc71d,0xd73c,
0x26d3,0x36f2,0x0691,0x16b0,0x6657,0x7676,0x4615,0x5634,
0xd94c,0xc96d,0xf90e,0xe92f,0x99c8,0x89e9,0xb98a,0xa9ab,
0x5844,0x4865,0x7806,0x6827,0x18c0,0x08e1,0x3882,0x28a3,
0xcb7d,0xdb5c,0xeb3f,0xfb1e,0x8bf9,0x9bd8,0xabbb,0xbb9a,
0x4a75,0x5a54,0x6a37,0x7a16,0x0af1,0x1ad0,0x2ab3,0x3a92,
0xfd2e,0xed0f,0xdd6c,0xcd4d,0xbdaa,0xad8b,0x9de8,0x8dc9,
0x7c26,0x6c07,0x5c64,0x4c45,0x3ca2,0x2c83,0x1ce0,0x0cc1,
0xef1f,0xff3e,0xcf5d,0xdf7c,0xaf9b,0xbfba,0x8fd9,0x9ff8,
0x6e17,0x7e36,0x4e55,0x5e74,0x2e93,0x3eb2,0x0ed1,0x1ef0
};
/// <summary>
/// get slot number by key
/// </summary>
/// <param name="key"></param>
/// <returns></returns>
internal static unsafe int HashSlot(string key)
{
if (string.IsNullOrEmpty(key)) return NoSlot;
unchecked
{
var blob = System.Text.Encoding.UTF8.GetBytes(key);
fixed (byte* ptr = blob)
{
int offset = 0, count = blob.Length, start, end;
if ((start = IndexOf(ptr, (byte)'{', 0, count - 1)) >= 0
&& (end = IndexOf(ptr, (byte)'}', start + 1, count)) >= 0
&& --end != start)
{
offset = start + 1;
count = end - start; // note we already subtracted one via --end
}
uint crc = 0;
for (int i = 0; i < count; i++)
crc = ((crc << 8) ^ crc16tab[((crc >> 8) ^ ptr[offset++]) & 0x00FF]) & 0x0000FFFF;
return (int)(crc % RedisClusterSlotCount);
}
}
}
}
在獲取客戶端的時候昼扛,ServiceStack.Redis的設(shè)計(jì)者為了能夠更好地獲取RedisClient使用了輪詢算法來依次獲取每一個Client Pool 內(nèi)的對象,但是切換到了集群模式則不同于單例模式欲诺,他的key slot完全阻礙了輪詢的路徑抄谐,所以為了能最少的影響代碼我使用了if else來完全的切割了代碼。
當(dāng)獲取只讀客戶端時候扰法,集群模式也不同于單例蛹含,它還是依靠主節(jié)點(diǎn)來達(dá)到目的,所以我也是使用的ReadWriteClient來返回想要的Client對象.
所有代碼已經(jīng)修改完成.可以測試?yán)玻?/p>
六塞颁、測試結(jié)果##
測試代碼:
PooledRedisClientManager prcm = new PooledRedisClientManager(RedisConfig.ReadWriteHost, RedisConfig.ReadOnlyHost,
new RedisClientManagerConfig
{
MaxWritePoolSize = RedisConfig.MaxWritePoolSize,
MaxReadPoolSize = RedisConfig.MaxReadPoolSize,
AutoStart = RedisConfig.AutoStart
});
prcm.OpenCluster = true; //我們?yōu)樗_啟集群模式.
string key_tmp = "key";
string value = "wanglei_test_value";
//測試寫
for (int i = 1; i <= 10; i++)
{
string key = key_tmp + i.ToString();
using (IRedisClient irc = prcm.GetClient(key))
{
irc.SetEntry(key, value+i.ToString());
}
}
//測試讀
for (int i = 1; i <= 10; i++)
{
string key = key_tmp + i.ToString();
using (IRedisClient irc = prcm.GetReadOnlyClient(key))
{
Console.WriteLine(irc.GetValue(key));
}
}
寫入數(shù)據(jù)分布情況:
七浦箱、一些問題##
經(jīng)過測試,大部分的key操作目前已經(jīng)完全的兼容Redis3.0祠锣,但是我發(fā)現(xiàn)還有一些key操作沒有解決酷窥,如set中的sunion操作,如果在集群模式下使用伴网,兩個key不在同一個節(jié)點(diǎn)下就會出現(xiàn)找不到key slot這樣的問題蓬推,原生的redis-cli -c的集群模式也會有同樣的問題,所以目前這類問題還沒有得到解決澡腾,不知以后redis的作者會不會對這種操作有所改善.
如果您有任何問題拳氢,可以及時的交流,做技術(shù)的喜歡交流募逞!