如图右上角所示,Ray中有两类Handler(SubHandler和PartSubHandler),在使用中,SubHandler派生Actor的CoreHandler,PartSubHandler派生SQLToReadHandler,SQLToReadHandler派生Actor的ToReadHandler,使用Ray主要写Actor的CoreHandler和ToReadHandler。
CoreHandler是复合消息路由器,包含的功能有:消息路由器、消息处理器、消息分离器、消息聚合器、消息过滤器、消息丰富器、副本同步协调器。
Tell方法中的Switch块,针对不同的Event类型,将Event事件中的数据分发给不同的事件处理方法CoreHandler承担的是消息路由器的作用。如下代码:
public override Task Tell(byte[] bytes, IActorOwnMessage<string> data, MessageInfo msg)
{
switch (data)
{
case CoinAddressGeneratingResponse value: return AcceptCoinAddressAsync(value);
case CoinWithdrawWithholdingFailedMsg value: return RollbackWithholdingAsync(value);
case CoinWithdrawWithheldEvent value: return CreateWithdrawAsync(value);
case CoinOrderCreatedEvent value: return CreateOrderAsync(value);
case CoinOrderCreatedEventV1 value: return CreateOrderAsyncV1(value);
case CoinPlanOrderCreatedEvent value: return CreatePlanOrderAsync(value);
case CoinDepositIncreasedEvent value: return CoinDepositIncreased(value);
case CoinIcoEvent value: return CoinIcoEventHandle(value);
default: return Task.CompletedTask;
}
}
针对不同的Event类型,在该Actor的CoreHandler里处理该事件,CoreHandler承担的是消息处理器的作用。如下代码中,AmountAddEventHandler方法处理AmountTransferEvent事件。
在事件处理代码中,可编写的代码如下:
- 可以只针对当前事件处理。
- 可以获得其他actor引用,调用其他actor的方法(包括actor的只读方法和写操作方法)。
- 可以调用数据访问层进行数据库读写。(写方法建议在ToReadHandler中进行)。
public override Task Tell(byte[] bytes, IActorOwnMessage<string> data, MessageInfo msg)
{
switch (data)
{
case AmountTransferEvent value: return AmountAddEventHandler(value);
default: return Task.CompletedTask;
}
}
public Task AmountAddEventHandler(AmountTransferEvent value)
{
var toActor = HandlerStart.Client.GetGrain<IAccount>(value.ToAccountId);
return toActor.AddAmount(value.Amount, value.Id);
}
当需要将较大的消息分割成多个独立的部分,并将这些独立的部分作为其他actor处理的参数时,CoreHandler承担的是分离器的作用。
当需要对不同类型消息中的数据进行聚合统计时,CoreHandler承担的是消息聚合器的作用。 例如:在加密货币交易的场景中,有ETH、BTC、USDT不同的市场,EOS在三个市场中都有交易,现在要统计本周内每个用户EOS的交易量,可以如下操作:
public override Task Tell(byte[] bytes, IActorOwnMessage<string> data, MessageInfo msg)
{
switch (data)
{
case CoinTradeSoldEvent value: return ActivitySellStatistic(value);
case CoinTradeBoughtEvent value: return ActivityBuyStatistic(value);
default: return Task.CompletedTask;
}
}
CoreHandler有可能收到它不感兴趣额的消息,并且需要丢弃这些无用消息时,CoreHandler可以承担消息过滤器的作用。
public override Task Tell(byte[] bytes, IActorOwnMessage<string> data, MessageInfo msg)
{
switch (data)
{
case AmountTransferEvent value: return AmountAddEventHandler(value);
default: return Task.CompletedTask;//消息过滤
}
}
当需要将收到的消息分进一步丰富,并将丰富后的消息作为其他actor处理的参数时,CoreHandler承担的是消息丰富器的作用。
Ray中有主actor和副本actor两类actor,副本actor用于分担主actor的压力,执行一些异步操作。当使用副本actor,需要主actor与副本actor保持同步时,需要CoreHandler将关注的事件交给副本actor,此时CoreHandler承担的是副本同步协调器的作用。
public override Task Tell(byte[] bytes, IActorOwnMessage<string> data, MessageInfo msg)
{
var replicatedRef = HandlerStart.Client.GetGrain<IAccountRep>(data.StateId);//获得副本actor
var task = replicatedRef.Tell(bytes);//通知副本同步
switch (data)
{
case AmountTransferEvent value: return Task.WhenAll(task, AmountAddEventHandler(value));
default: return task;
}
}
参考Example中,Ray.Handler项目内的AccountCoreHandler。
CoreHandler实际编写中很简单,主要承担消息路由和消息处理器的作用,其他功能为特殊场景提供了切入点。