Orleans解决并发之痛(二):Grain状态

Grains是Orleans应用程序的构建块,它们是彼此孤立的原子单位,分布的,持久的, 一个典型的Grain是有状态和行为的一个单实例,每个Grain实例的在单线程内执行,Grain之间共享数据通过消息传递,Grains是由Silo自动化管理。

Grain之间传递消息过程中也可能出现死锁的情况,如:Grain A发送消息给Grain B,并等待它的完成,此时Grain B发送一个消息给Grain A,也等待其完成,这时候出现相互等待而造成死锁。Orleans对Grain之间产生的死锁问题解决也是非常简单的,只需要在Grain上加[Reentrant]属性,具体可查看官方Concurrency

Grain状态有好几种存储方式,比如:AzureTableStorage、AzureBlobStorage、SQLStorage、MemoryStorage 等,我们还可以自定义存储。MemoryStorage在测试项目使用没问题,但实际生产环境要使用其他持久存储的方式,因为一旦一个Silo被关闭,内存存储的状态将会消失。

在分布式下,State的使用可以减少很多对数据库层面的压力。当然也不是所有的Grain都推荐使用State,还是看实际业务需求。我们可以想象一个场景,一个商品的库存如果保存在State中,所有请求都共享这个State,在判断是否有剩余商品的时候是不是就不需要每次都去查询数据库了?

定义接口

1
2
3
4
public interface IPersonGrain : IGrainWithStringKey
{
Task SayHelloAsync();
}

实现接口

1
2
3
4
5
6
7
8
9
10
11
public class PersonGrain : Grain, IPersonGrain
{
public Task SayHelloAsync()
{
string primaryKey = this.GetPrimaryKeyString();

Console.WriteLine($"{primaryKey} said hello!");

return Task.CompletedTask;
}
}

为了实现状态存储,我们需要创建一个class:

1
2
3
4
public class PersonGrainState
{
public bool SaidHello { get; set; }
}

修改代码,实现的PersonGrain不应该再继承Grain,而是Grain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
[StorageProvider(ProviderName = "OrleansStorage")]
public class PersonGrain : Grain<PersonGrainState>, IPersonGrain
{
public async Task SayHelloAsync()
{
string primaryKey = this.GetPrimaryKeyString();

bool saidHelloBefore = this.State.SaidHello;
string saidHelloBeforeStr = saidHelloBefore ? " already" : null;

Console.WriteLine($"{primaryKey}{saidHelloBeforeStr} said hello!");

this.State.SaidHello = true;
await this.WriteStateAsync();
}
}

第一次调用这个方法的时候this.State.SaidHello为false,输出’xxx said hello!’。然后我们通过WriteStateAsync修改SaidHello为true,当第二次被调用的时候,从State里取出的SaidHello已经变成了true,则输出’xxx already said hello!’

Orleans 提供了非常简单的API来处理持久化装状态,看方法名就知道什么啥意思了,WriteStateAsync()、ReadStateAsync() 、 ClearStateAsync()。

同时在PersonGrain加了一个StorageProvider属性,参数ProviderName赋值为OrleansStorage,这里需要对Silo的配置文件(OrleansConfiguration.xml)做调整,添加StorageProviders配置,Type表示存储方式,Name表示名称,程序内指定的ProviderName需要和配置中这个名称保持一致。

注意:
当Name为Default时,如果某个Grain使用Default来存储,可以不需要加StorageProvider属性。StorageProviders下可以有多个Provider,每个Provider的Type可以不一样,每个Grain指定的存储方式也可以不一样,ProviderName指定是谁就用谁存储。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<?xml version="1.0" encoding="utf-8" ?>
<OrleansConfiguration xmlns="urn:orleans">
<Globals>
<SeedNode Address="localhost" Port="11111" />
<StorageProviders>
<Provider Type="Orleans.Storage.MemoryStorage"
Name="OrleansStorage" />
</StorageProviders>
</Globals>
<Defaults>
<Networking Address="localhost" Port="11111" />
<ProxyingGateway Address="localhost" Port="30000" />
</Defaults>
</OrleansConfiguration>

为了验证Grain之间是独立的,在Client加入以下代码:

1
2
3
4
5
6
7
 var joe = GrainClient.GrainFactory.GetGrain<IPersonGrain>("Joe");
joe.SayHelloAsync();
joe.SayHelloAsync();

var sam = GrainClient.GrainFactory.GetGrain<IPersonGrain>("Sam");
sam.SayHelloAsync();
sam.SayHelloAsync();

测试结果:

Test Result

SQL Server 持久存储State

上面提到State以内存存储的方式并不适合生产环境,那下面我们使用SQL Server来实现。

在Silo程序集中安装依赖包:
1
2
Install-Package Microsoft.Orleans.OrleansSqlUtils
Install-Package System.Data.SqlClient
创建数据库和表:
  1. 在SQL Server中创建一个数据库,命名如:OrleansStorage(随意);
  2. 在解决方案下找到目录:packages\Microsoft.Orleans.OrleansSqlUtils.1.5.0\lib\net461\SQLServer,目录下有一个.sql文件,在OrleansStorage数据库下执行这个sql脚本即可;
修改OrleansConfiguration.xml的StorageProviders节点为:
1
2
3
4
5
6
<StorageProviders>
<Provider Type="Orleans.Storage.AdoNetStorageProvider"
Name="OrleansStorage"
AdoInvariant="System.Data.SqlClient"
DataConnectionString="Server=.;Database=OrleansStorage;User ID=sa;Password=123456;"/>
</StorageProviders>
重新启动Silo和Client:

执行完成后查看数据库中表Storage的内容,数据的值是二进制是方式存储。
storage

之后不管重启多少次,输出的结果都是 “xxx already saild hello!” 。

参考链接:

如果对你有帮助就好