Orleans解决并发之痛(一):单线程

程序在运行过程中有时会莫名其妙出现代码的某些约束或者执行结果和理想状况不一样,正常逻辑怎么会出现这样的情况?到底发生了什么?好像见了鬼!瞬间好无助。

谁来救救我

大多数出现正常逻辑很难解释的时候,我们可能会想到并发问题,因为好像只有并发才会能说服自己。为了验证和解决这个问题,我们可能会尝试一些方案,在并发的情况下我相信很多人都使用过锁,锁确实也能帮忙我们解决问题,不然它干嘛存在。

但随着业务逻辑的持续复杂,锁的使用可能无处不在。首先大家都知道锁本身的机制很耗性能;然后锁本身不涉及什么编程模式,所以在业务代码中融入大量锁对代码本身的稳定性也有一定影响。

经过查找资料,因为本身的项目是基于.NET,所以发现Microsoft Orleans好像可以比较好的满足解决并发的需求。

Orleans之前,先来扯一扯Actor模型

  1. Actor是以单线程存在的,所有消息都是顺序到达的,每次收到消息后,就放入队列,而它每次也从队列中取出消息体来处理;

  2. 每一个Actor有一个Id和它对应,一个Id对应的Actor只会在集群中存在一个,使用者只需要通过Id就能随时访问不需要关注该Actor在集群的什么位置;

  3. 每一个Actor看作是一个独立的实体,拥有自己独立的状态。Actor与Actor直接可以进行消息通知;

    注:有状态的 Actor在集群中一个Id只会存在一个实例,无状态的可配置为根据流量存在多个,无状态的情况看具体业务需求。

    Actor System

再来扯一扯Orleans框架

Orleans 提供了一个简单的方法来构建大规模、高并发、分布式应用程序,被认为是Actor模型的分布式版本,是一种改进的Actor模型。在Orleans中,Actors被称作Grains,采用接口来表示,Actors的消息用异步方法来接收,方法返回值必须是Task or Task

Orleans几个核心角色:

Grains(Actors)

Grains是Orleans应用程序的业务逻辑实现与抽象,Grains是彼此孤立的原子单位,分布的,持久的。 一个典型的Grain是有状态和行为的一个单实例。

Silo

Silo是一个主机服务,里面主要用于执行Grains,也就是说Grains开发完成后需要注册到Silo中,然后等待调用。它监听一个端口,用来监听从Silo到Silo的消息或者从客户端到Silo的消息的,典型的Silo就是,每台机器运行一个Silo,会对外暴露网关地址供调用。

Cluster(集群搭建的时候会具体介绍)

大量的Silo同时在一起工作就形成了Orleans的集群,Orleans运行完全自动化的集群管理。

Client

具体的应用客户端,可以是控制台、Web应用程序、WPF等一切.NET端技术。


开始接触Orleans Sample的时候,第一感觉项目结构和gRPC还挺像的,如果你之前有接触,一定感觉很亲切:

  1. 定义一个接口(Interfaces)
  2. 实现接口(Grains) – 添加引用Interfaces
  3. 启动服务端(Silo)– 添加引用Interfaces,Grains
  4. 启动客户端 (Client)– 添加引用Interfaces

练习过程中对Nuget安装Orleans相关依赖包可能会有一些模糊,这里说明一下我的具体步骤,希望尽快帮忙实现效果,所有程序集使用.Net Framework的版本都是4.6:

程序集名称 类型 Nuget依赖包
Microsoft.Orleans.
引用
Interfaces 类库 Core -
Grains 类库 Core Interfaces
Silo 控制台程序 Core
OrleansCodeGenerator
OrleansProviders
OrleansRuntime
Interfaces
Grains
Client 控制台程序 Core
OrleansCodeGenerator
Interfaces

在Silo项目中添加配置文件 OrleansConfiguration.xml:

1
2
3
4
5
6
7
8
9
10
<?xml version="1.0" encoding="utf-8" ?>
<OrleansConfiguration xmlns="urn:orleans">
<Globals>
<SeedNode Address="localhost" Port="11111" />
</Globals>
<Defaults>
<Networking Address="localhost" Port="11111" />
<ProxyingGateway Address="localhost" Port="30000" />
</Defaults>
</OrleansConfiguration>

SeedNode:集群中主Silo地址,生产环境并不可以这么使用,已这种方式配置主Silo的情况下,其他Silo加入集群需要等主Silo先启动。之后会介绍SystemStore来维护集群成员关系;
Networking:内部Silo与Silo之间通信地址;
ProxyingGateway:客户端调用的网关地址;

在Client项目中添加配置文件 ClientConfiguration.xml:

1
2
3
4
<?xml version="1.0" encoding="utf-8" ?>
<ClientConfiguration xmlns="urn:orleans">
<Gateway Address="localhost" Port="30000"/>
</ClientConfiguration>

Gateway:配置Silo对外的网关地址;

集群下可以配置多个Gateway节点,如下:

1
2
<Gateway Address="gateway1" Port="30000"/>
<Gateway Address="gateway2" Port="30000"/>

注意:配置文件需要设置属性 “复制到输出目录”

configuration

Grain说明:

每个Grain都是单实例的,具有唯一标识。根据唯一标识获取Grain,这个标识可以是GUID、String、Long、混合类型。

在Grain内如果发送消息给其他Grain,需要使用 this.GrainFactory.GetGrain,不能通过 GrainClient.GrainFactory.GetGrain。

1
var test = GrainClient.GrainFactory.GetGrain<ITest>(0); // long类型的primaryKey 0
1
2
3
4
5
6
7
8
9
10
11
public class TestGrain : Orleans.Grain, ITest
{
private int num = 0;

public Task AddCount()
{
num++;
Console.WriteLine(num);
return Task.CompletedTask;
}
}

Client说明:

同时启动3个Task,每个Task内并行200次调用AddCount方法。如果没有做特殊的处理,num的结果肯定是乱的,并不会出现一直累加的效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private static void DoClientWork()
{
var t1 = Task.Factory.StartNew(() =>
{
AddCount();
});
var t2 = Task.Factory.StartNew(() =>
{
AddCount();
});
var t3 = Task.Factory.StartNew(() =>
{
AddCount();
});
Task.WaitAll(t1, t2, t3);
}

static void AddCount()
{
var test = GrainClient.GrainFactory.GetGrain<ITest>(0);

Parallel.For(0, 200, (i) =>
{
test.AddCount();
});
}

实际上执行最终的结果是600,并不会出现不一致的变化效果,这足以说明同一个Grain内部是单线程执行。

Test Result

参考链接:

如果对你有帮助就好