Orleans 解决并发之痛(一):单线程

程序在运行过程中有时会莫名其妙出现代码的某些约束或者执行结果和理想状况不一样,正常逻辑怎么会出现这样的情况?到底发生了什么?好像见了鬼!瞬间好无助。

谁来救救我

大多数出现正常逻辑很难解释的时候,我们可能会想到并发问题,因为好像只有并发才会能说服自己。为了验证和解决这个问题,我们可能会尝试一些方案,在并发的情况下我相信很多人都使用过锁,锁确实也能帮忙我们解决问题,不然它干嘛存在。

但随着业务逻辑的持续复杂,锁的使用可能无处不在。首先大家都知道锁本身的机制很耗性能;然后锁本身不涉及什么编程模式,所以在业务代码中融入大量锁对代码本身的稳定性也有一定影响。

经过查找资料,因为本身的项目是基于 .NET,所以发现 Microsoft Orleans 好像可以比较好的满足解决并发的需求。

Orleans 之前,先来扯一扯 Actor 模型

  1. Actor 是以单线程存在的,所有消息都是顺序到达的,每次收到消息后,就放入队列,而它每次也从队列中取出消息体来处理;

  2. 每一个 Actor 有一个 Id 和它对应,一个 Id 对应的 Actor 只会在 集群中 存在一个,使用者只需要通过 Id 就能随时访问不需要关注该 Actor 在集群的什么位置;

  3. 每一个 Actor 看作是一个独立的实体,拥有自己独立的状态。Actor 与 Actor 之间可以进行消息通知;

    注:有状态的 Actor 在集群中一个 Id 只会存在一个实例,无状态的可配置为根据流量存在多个,无状态的情况看具体业务需求。

    Actor System

再来扯一扯 Orleans 框架

Orleans 提供了一个简单的方法来构建大规模、高并发、分布式应用程序,被认为是 Actor 模型的分布式版本,是一种改进的 Actor 模型。在 Orleans 中,Actors 被称作 Grains,采用接口来表示,Actors 的消息用异步方法来接收,方法返回值必须是 Task or Task

Orleans几个核心角色:

Grains(Actors)

Grains 是 Orleans 应用程序的业务逻辑实现与抽象,Grains 是彼此孤立的原子单位,分布的,持久的。 一个典型的 Grain 是有状态和行为的一个单实例。

Silo

Silo 是一个主机服务,里面主要用于执行 Grains,也就是说 Grains 开发完成后需要注册到 Silo 中,然后等待调用。它监听一个端口,用来监听从 Silo 到 Silo 的消息或者从客户端到 Silo 的消息的,典型的 Silo 就是,每台机器运行一个 Silo,会对外暴露网关地址供调用。

Cluster(集群搭建的时候会具体介绍)

大量的 Silo 同时在一起工作就形成了 Orleans 的集群,Orleans 运行完全自动化的集群管理。

Client

具体的应用客户端,可以是控制台、Web 应用程序、WPF 等一切 .NET 端技术。


开始接触 Orleans Sample 的时候,第一感觉项目结构和 gRPC 还挺像的,如果你之前有接触,一定感觉很亲切:

  1. 定义一个接口(Interfaces)
  2. 实现接口(Grains) – 添加引用Interfaces
  3. 启动服务端(Silo)– 添加引用Interfaces,Grains
  4. 启动客户端 (Client)– 添加引用Interfaces

练习过程中对 NuGet 安装 Orleans 相关依赖包可能会有一些模糊,这里说明一下我的具体步骤,希望尽快帮忙实现效果,所有程序集使用 .NET Framework 的版本都是 4.6:

程序集名称 类型 NuGet 依赖包
Microsoft.Orleans.
引用
Interfaces 类库 Core -
Grains 类库 Core Interfaces
Silo 控制台程序 Core
OrleansCodeGenerator
OrleansProviders
OrleansRuntime
Interfaces
Grains
Client 控制台程序 Core
OrleansCodeGenerator
Interfaces

在 Silo 项目中添加配置文件 OrleansConfiguration.xml:

1
2
3
4
5
6
7
8
9
10
<?xml version="1.0" encoding="utf-8" ?>
<OrleansConfiguration xmlns="urn:orleans">
<Globals>
<SeedNode Address="localhost" Port="11111" />
</Globals>
<Defaults>
<Networking Address="localhost" Port="11111" />
<ProxyingGateway Address="localhost" Port="30000" />
</Defaults>
</OrleansConfiguration>

SeedNode:集群中主 Silo 地址,生产环境下不要这么使用。以这种方式配置主 Silo 的情况下,其他 Silo 加入集群需要等主 Silo 先启动。之后会介绍 SystemStore 来维护集群成员关系;
Networking:内部 Silo 与 Silo 之间通信地址;
ProxyingGateway:客户端调用的网关地址;

在 Client 项目中添加配置文件 ClientConfiguration.xml:

1
2
3
4
<?xml version="1.0" encoding="utf-8" ?>
<ClientConfiguration xmlns="urn:orleans">
<Gateway Address="localhost" Port="30000"/>
</ClientConfiguration>

Gateway:配置 Silo 对外的网关地址;

集群下可以配置多个 Gateway 节点,如下:

1
2
<Gateway Address="gateway1" Port="30000"/>
<Gateway Address="gateway2" Port="30000"/>

注意:配置文件需要设置属性 “复制到输出目录”

configuration

Grain 说明:

每个 Grain 都是单实例的,具有唯一标识。根据唯一标识获取 Grain,这个标识可以是 GUID、String、Long、混合类型。

在 Grain 内如果发送消息给其他 Grain,需要使用 this.GrainFactory.GetGrain,不能通过 GrainClient.GrainFactory.GetGrain。

1
var test = GrainClient.GrainFactory.GetGrain<ITest>(0); // long类型的primaryKey 0
1
2
3
4
5
6
7
8
9
10
11
public class TestGrain : Orleans.Grain, ITest
{
private int num = 0;

public Task AddCount()
{
num++;
Console.WriteLine(num);
return Task.CompletedTask;
}
}

Client 说明:

同时启动3个 Task,每个 Task 内并行200次调用 AddCount 方法。如果没有做特殊的处理,num 的结果肯定是乱的,并不会出现一直累加的效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private static void DoClientWork()
{
var t1 = Task.Factory.StartNew(() =>
{
AddCount();
});
var t2 = Task.Factory.StartNew(() =>
{
AddCount();
});
var t3 = Task.Factory.StartNew(() =>
{
AddCount();
});
Task.WaitAll(t1, t2, t3);
}

static void AddCount()
{
var test = GrainClient.GrainFactory.GetGrain<ITest>(0);

Parallel.For(0, 200, (i) =>
{
test.AddCount();
});
}

实际上执行最终的结果是600,并不会出现不一致的变化效果,这足以说明同一个 Grain 内部是单线程执行。

Test Result

参考链接:

如果对你有帮助就好