1. 引言
是的吹菱,Orleans v3.0.0 已經(jīng)發(fā)布了故河,并已經(jīng)完全支持 .NET Core 3.0吱韭。
所以,Orleans 系列是時(shí)候繼續(xù)了鱼的,抱歉理盆,讓大家久等了。
萬(wàn)丈高樓平地起凑阶,這一節(jié)我們就先來(lái)了解下Orleans的基本使用猿规。
2. 模板項(xiàng)目講解
在上一篇文章中,我們了解到Orleans 作為.NET 分布式框架宙橱,其主要包括三個(gè)部分:Client姨俩、Grains、Silo Host(Server)师郑。因此环葵,為了方便講解,創(chuàng)建如下的項(xiàng)目結(jié)構(gòu)進(jìn)行演示:
這里有幾點(diǎn)需要說(shuō)明:
- Orleans.Grains: 類庫(kù)項(xiàng)目宝冕,用于定義Grain的接口以及實(shí)現(xiàn)积担,需要引用
Microsoft.Orleans.CodeGenerator.MSBuild
和Microsoft.Orleans.Core.Abstractions
NuGet包。 - Orleans.Server:控制臺(tái)項(xiàng)目猬仁,為 Silo 宿主提供宿主環(huán)境帝璧,需要引用
Microsoft.Orleans.Server
和Microsoft.Extensions.Hosting
NuGet包阐虚,以及Orleans.Grains
項(xiàng)目称龙。 - Orleans.Client:控制臺(tái)項(xiàng)目,用于演示如何借助Orleans Client建立與Orleans Server的連接迟赃,需要引用
Microsoft.Orleans.Client
和Microsoft.Extensions.Hosting
NuGet包诈闺,同時(shí)添加Orleans.Grains
項(xiàng)目引用渴庆。
3. 第一個(gè)Grain
Grain作為Orleans的第一公民,以及Virtual Actor的實(shí)際代言人,想吃透Orleans襟雷,那Grain就是第一道坎刃滓。
先看一個(gè)簡(jiǎn)單的Demo,我們來(lái)模擬統(tǒng)計(jì)網(wǎng)站的實(shí)時(shí)在線用戶耸弄。
在Orlean s.Grains
添加ISessionControl
接口咧虎,主要用戶登錄狀態(tài)的管理。
public interface ISessionControlGrain : IGrainWithStringKey
{
Task Login(string userId);
Task Logout(string userId);
Task<int> GetActiveUserCount();
}
可以看見(jiàn)Grain的定義很簡(jiǎn)單计呈,只需要指定繼承自IGrain的接口就好砰诵。這里面繼承自IGrainWithStringKey
,說(shuō)明該Grain 的Identity Key(身份標(biāo)識(shí))為string
類型捌显。同時(shí)需要注意的是
Grain 的方法申明茁彭,返回值必須是: Task、Task<T>扶歪、ValueTask<T>理肺。
緊接著定義SessionControlGrain
來(lái)實(shí)現(xiàn)ISessionControlGrain
接口。
public class SessionControlGrain : Grain, ISessionControlGrain
{
private List<string> LoginUsers { get; set; } = new List<string>();
public Task Login(string userId)
{
//獲取當(dāng)前Grain的身份標(biāo)識(shí)(因?yàn)镮SessionControlGrain身份標(biāo)識(shí)為string類型善镰,GetPrimaryKeyString());
var appName = this.GetPrimaryKeyString();
LoginUsers.Add(userId);
Console.WriteLine($"Current active users count of {appName} is {LoginUsers.Count}");
return Task.CompletedTask;
}
public Task Logout(string userId)
{
//獲取當(dāng)前Grain的身份標(biāo)識(shí)
var appName = this.GetPrimaryKey();
LoginUsers.Remove(userId);
Console.WriteLine($"Current active users count of {appName} is {LoginUsers.Count}");
return Task.CompletedTask;
}
public Task<int> GetActiveUserCount()
{
return Task.FromResult(LoginUsers.Count);
}
}
實(shí)現(xiàn)也很簡(jiǎn)單妹萨,Grain的實(shí)現(xiàn)要繼承自Grain
基類。代碼中我們定義了一個(gè)List<string>
集合用于保存登錄用戶媳禁。
4. 第一個(gè)Silo Host(Server)
定義一個(gè)Silo用于暴露Grain提供的服務(wù),在Orleans.Server.Program
中添加以下代碼用于啟動(dòng)Silo Host画切。
static Task Main(string[] args)
{
Console.Title = typeof(Program).Namespace;
// define the cluster configuration
return Host.CreateDefaultBuilder()
.UseOrleans((builder) =>
{
builder.UseLocalhostClustering()
.AddMemoryGrainStorageAsDefault()
.Configure<ClusterOptions>(options =>
{
options.ClusterId = "Hello.Orleans";
options.ServiceId = "Hello.Orleans";
})
.Configure<EndpointOptions>(options => options.AdvertisedIPAddress = IPAddress.Loopback)
.ConfigureApplicationParts(parts =>
parts.AddApplicationPart(typeof(ISessionControlGrain).Assembly).WithReferences());
}
)
.ConfigureServices(services =>
{
services.Configure<ConsoleLifetimeOptions>(options =>
{
options.SuppressStatusMessages = true;
});
})
.ConfigureLogging(builder => { builder.AddConsole(); })
.RunConsoleAsync();
}
-
Host.CreateDefaultBuilder()
:創(chuàng)建泛型主機(jī)提供宿主環(huán)境竣稽。 -
UseOrleans
:用來(lái)配置Oleans。 -
UseLocalhostClustering()
:用于在開(kāi)發(fā)環(huán)境下指定連接到本地集群霍弹。 -
Configure<ClusterOptions>
:用于指定連接到那個(gè)集群毫别。 -
Configure<EndpointOptions>
:用于配置silo與silo、silo與client之間的通信端點(diǎn)典格。開(kāi)發(fā)環(huán)境下可僅指定回環(huán)地址作為集群間通信的IP地址岛宦。 -
ConfigureApplicationParts()
:用于指定暴露哪些Grain服務(wù)。
以上就是開(kāi)發(fā)環(huán)境下耍缴,Orleans Server的基本配置砾肺。對(duì)于詳細(xì)的配置也可以先參考Orleans Server Configuration。后續(xù)也會(huì)有專門(mén)的一篇文章來(lái)詳解防嗡。
5. 第一個(gè)Client
客戶端的定義也很簡(jiǎn)單变汪,主要是創(chuàng)建IClusterClient
對(duì)象建立于Orleans Server的連接。因?yàn)?code>IClusterClient最好能在程序啟動(dòng)之時(shí)就建立連接蚁趁,所以可以通過(guò)繼承IHostedService
來(lái)實(shí)現(xiàn)裙盾。
在Orleans.Client
中定義ClusterClientHostedService
繼承自IHostedService
。
public class ClusterClientHostedService : IHostedService
{
public IClusterClient Client { get; }
private readonly ILogger<ClusterClientHostedService> _logger;
public ClusterClientHostedService(ILogger<ClusterClientHostedService> logger, ILoggerProvider loggerProvider)
{
_logger = logger;
Client = new ClientBuilder()
.UseLocalhostClustering()
.Configure<ClusterOptions>(options =>
{
options.ClusterId = "Hello.Orleans";
options.ServiceId = "Hello.Orleans";
})
.ConfigureLogging(builder => builder.AddProvider(loggerProvider))
.Build();
}
public Task StartAsync(CancellationToken cancellationToken)
{
var attempt = 0;
var maxAttempts = 100;
var delay = TimeSpan.FromSeconds(1);
return Client.Connect(async error =>
{
if (cancellationToken.IsCancellationRequested)
{
return false;
}
if (++attempt < maxAttempts)
{
_logger.LogWarning(error,
"Failed to connect to Orleans cluster on attempt {@Attempt} of {@MaxAttempts}.",
attempt, maxAttempts);
try
{
await Task.Delay(delay, cancellationToken);
}
catch (OperationCanceledException)
{
return false;
}
return true;
}
else
{
_logger.LogError(error,
"Failed to connect to Orleans cluster on attempt {@Attempt} of {@MaxAttempts}.",
attempt, maxAttempts);
return false;
}
});
}
public async Task StopAsync(CancellationToken cancellationToken)
{
try
{
await Client.Close();
}
catch (OrleansException error)
{
_logger.LogWarning(error, "Error while gracefully disconnecting from Orleans cluster. Will ignore and continue to shutdown.");
}
}
}
代碼講解:
- 構(gòu)造函數(shù)中通過(guò)借助
ClientBuilder()
來(lái)初始化IClusterClient
。其中UseLocalhostClustering()
用于連接到開(kāi)發(fā)環(huán)境中的localhost 集群番官。并通過(guò)Configure<ClusterOptions>
指定連接到哪個(gè)集群庐完。(需要注意的是,這里的ClusterId必須與Orleans.Server中配置的保持一致徘熔。
Client = new ClientBuilder()
.UseLocalhostClustering()
.Configure<ClusterOptions>(options =>
{
options.ClusterId = "Hello.Orleans";
options.ServiceId = "Hello.Orleans";
})
.ConfigureLogging(builder => builder.AddProvider(loggerProvider))
.Build();
- 在
StartAsync
方法中通過(guò)調(diào)用Client.Connect
建立與Orleans Server的連接门躯。同時(shí)定義了一個(gè)重試機(jī)制。
緊接著我們需要將ClusterClientHostedService
添加到Ioc容器近顷,添加以下代碼到Orleans.Client.Program
中:
static Task Main(string[] args)
{
Console.Title = typeof(Program).Namespace;
return Host.CreateDefaultBuilder()
.ConfigureServices(services =>
{
services.AddSingleton<ClusterClientHostedService>();
services.AddSingleton<IHostedService>(_ => _.GetService<ClusterClientHostedService>());
services.AddSingleton(_ => _.GetService<ClusterClientHostedService>().Client);
services.AddHostedService<HelloOrleansClientHostedService>();
services.Configure<ConsoleLifetimeOptions>(options =>
{
options.SuppressStatusMessages = true;
});
})
.ConfigureLogging(builder =>
{
builder.AddConsole();
})
.RunConsoleAsync();
}
對(duì)于ClusterClientHostedService
生音,并沒(méi)有選擇直接通過(guò)services.AddHostedService<T>
的方式注入,是因?yàn)槲覀冃枰⑷朐摲?wù)中提供的IClusterClient
(單例)窒升,以供其他類去消費(fèi)缀遍。
緊接著,定義一個(gè)HelloOrleansClientHostedService
用來(lái)消費(fèi)定義的ISessionControlGrain
饱须。
public class HelloOrleansClientHostedService : IHostedService
{
private readonly IClusterClient _client;
private readonly ILogger<HelloOrleansClientHostedService> _logger;
public HelloOrleansClientHostedService(IClusterClient client, ILogger<HelloOrleansClientHostedService> logger)
{
_client = client;
_logger = logger;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
// 模擬控制臺(tái)終端用戶登錄
await MockLogin("Hello.Orleans.Console");
// 模擬網(wǎng)頁(yè)終端用戶登錄
await MockLogin("Hello.Orleans.Web");
}
/// <summary>
/// 模擬指定應(yīng)用的登錄
/// </summary>
/// <param name="appName"></param>
/// <returns></returns>
public async Task MockLogin(string appName)
{
//假設(shè)我們需要支持不同端登錄用戶域醇,則只需要將項(xiàng)目名稱作為身份標(biāo)識(shí)。
//即可獲取一個(gè)代表用來(lái)維護(hù)當(dāng)前項(xiàng)目登錄狀態(tài)的的單例Grain蓉媳。
var sessionControl = _client.GetGrain<ISessionControlGrain>(appName);
ParallelLoopResult result = Parallel.For(0, 10000, (index) =>
{
var userId = $"User-{index}";
sessionControl.Login(userId);
});
if (result.IsCompleted)
{
//ParallelLoopResult.IsCompleted 只是返回所有循環(huán)創(chuàng)建完畢譬挚,并不保證循環(huán)的內(nèi)部任務(wù)創(chuàng)建并執(zhí)行完畢
//所以,此處手動(dòng)延遲5秒后再去讀取活動(dòng)用戶數(shù)酪呻。
await Task.Delay(TimeSpan.FromSeconds(5));
var activeUserCount = await sessionControl.GetActiveUserCount();
_logger.LogInformation($"The Active Users Count of {appName} is {activeUserCount}");
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Closed!");
return Task.CompletedTask; ;
}
}
代碼講解:
這里定義了一個(gè)MockLogin
用于模擬不同終端10000個(gè)用戶的并發(fā)登錄减宣。
- 通過(guò)構(gòu)造函數(shù)注入需要的
IClusterClient
。 - 通過(guò)指定Grain接口以及身份標(biāo)識(shí)玩荠,就可以通過(guò)Client 獲取對(duì)應(yīng)的Grain漆腌,進(jìn)而消費(fèi)Grain中暴露的方法。
var sessionControl = _client.GetGrain<ISessionControlGrain>(appName);
這里需要注意的是阶冈,指定的身份標(biāo)識(shí)為終端應(yīng)用的名稱闷尿,那么在整個(gè)應(yīng)用生命周期內(nèi),將有且僅有一個(gè)代表這個(gè)終端應(yīng)用的Grain女坑。 - 使用
Parallel.For
模擬并發(fā) -
ParallelLoopResult.IsCompleted
只是返回所有循環(huán)任務(wù)創(chuàng)建完畢填具,并不代表循環(huán)的內(nèi)部任務(wù)執(zhí)行完畢。
6. 啟動(dòng)第一個(gè) Orleans 應(yīng)用
先啟動(dòng)Orleans.Server
:
再啟動(dòng)
Orleans.Client
:從上面的運(yùn)行結(jié)果來(lái)看匆骗,模擬兩個(gè)終端10000個(gè)用戶的并發(fā)登錄劳景,最終輸出的活動(dòng)用戶數(shù)量均為10000個(gè)。
回顧整個(gè)實(shí)現(xiàn)碉就,并沒(méi)有用到諸如鎖枢泰、并發(fā)集合等避免并發(fā)導(dǎo)致的線程安全問(wèn)題,但卻輸出正確的期望結(jié)果铝噩,這就正好說(shuō)明了Orleans強(qiáng)大的并發(fā)控制特性衡蚂。
public class SessionControlGrain : Grain, ISessionControlGrain
{
// 未使用并發(fā)集合
private List<string> LoginUsers { get; set; } = new List<string>();
public Task Login(string userId)
{
//獲取當(dāng)前Grain的身份標(biāo)識(shí)(因?yàn)镮SessionControlGrain身份標(biāo)識(shí)為string類型窿克,GetPrimaryKeyString());
var appName = this.GetPrimaryKeyString();
LoginUsers.Add(userId);//未加鎖
Console.WriteLine($"Current active users count of {appName} is {LoginUsers.Count}");
return Task.CompletedTask;
}
....
}
7. 小結(jié)
通過(guò)簡(jiǎn)單的演示,想必你對(duì)Orleans的編程實(shí)現(xiàn)有了基本的認(rèn)知毛甲,并體會(huì)到其并發(fā)控制的強(qiáng)大之處年叮。
這只是簡(jiǎn)單的入門(mén)演練,Orleans很多強(qiáng)大的特性玻募,后續(xù)再結(jié)合具體場(chǎng)景進(jìn)行詳細(xì)闡述只损。
源碼已上傳至GitHub:Hello.Orleans