Skip to content

Commit

Permalink
添加调度限制规则
Browse files Browse the repository at this point in the history
  • Loading branch information
leeveel committed Nov 23, 2022
1 parent 4f0845f commit f97d254
Show file tree
Hide file tree
Showing 8 changed files with 162 additions and 35 deletions.
2 changes: 2 additions & 0 deletions Geek.Server.App/Program.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System.Diagnostics;
using System.Text;
using Geek.Server.App.Common;
using Geek.Server.Core.Actors.Impl;
using Geek.Server.Core.Comps;
using Geek.Server.Core.Hotfix;
using Geek.Server.Core.Storage;
Expand Down Expand Up @@ -62,6 +63,7 @@ private static async Task EnterGameLoop()
try
{
Log.Info($"launch embedded db...");
ActorLimit.Init(ActorLimit.RuleType.None);
GameDB.Init();
GameDB.Open();
Log.Info($"regist comps...");
Expand Down
4 changes: 2 additions & 2 deletions Geek.Server.CodeGenerator/Template/Agent.liquid
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ namespace {{space}}
_ = base.{{mth.name}}{{mth.typeparams}}({{mth.paramstr}});
{{~ else ~}}
long callChainId = Geek.Server.Core.Actors.Impl.WorkerActor.NextChainId();
_ = base.Actor.WorkerActor.Enqueue(()=>{return base.{{mth.name}}{{mth.typeparams}}({{mth.paramstr}});}, callChainId, {{mth.timeout}});
_ = base.Actor.WorkerActor.Enqueue(()=>{return base.{{mth.name}}{{mth.typeparams}}({{mth.paramstr}});}, callChainId, {{mth.discard}}, {{mth.timeout}});
{{~ end ~}}
return {{mth.returntype}}.CompletedTask;

Expand All @@ -27,7 +27,7 @@ namespace {{space}}
{
return base.{{mth.name}}{{mth.typeparams}}({{mth.paramstr}});
}
return base.Actor.WorkerActor.Enqueue(()=>{return base.{{mth.name}}{{mth.typeparams}}({{mth.paramstr}});}, chainId, {{mth.timeout}});
return base.Actor.WorkerActor.Enqueue(()=>{return base.{{mth.name}}{{mth.typeparams}}({{mth.paramstr}});}, chainId, {{mth.discard}}, {{mth.timeout}});
{{~ end ~}}{{~#************************************************~}}
}

Expand Down
7 changes: 6 additions & 1 deletion Geek.Server.Core/Actors/Actor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public async Task<ICompAgent> GetCompAgent(Type agentType)
var comp = compDic.GetOrAdd(compType, k => CompRegister.NewComp(this, k));
// 这里对交叉死锁检测的影响?
if (!comp.IsActive)
await SendAsync(comp.Active);
await SendAsyncWithoutCheck(comp.Active);
return comp.GetAgent(agentType);
}

Expand Down Expand Up @@ -129,6 +129,11 @@ public Task SendAsync(Func<Task> work, int timeout = TIME_OUT)
return WorkerActor.SendAsync(work, timeout);
}

public Task SendAsyncWithoutCheck(Func<Task> work, int timeout = TIME_OUT)
{
return WorkerActor.SendAsync(work, timeout, false);
}

public Task<T> SendAsync<T>(Func<Task<T>> work, int timeout = TIME_OUT)
{
return WorkerActor.SendAsync(work, timeout);
Expand Down
4 changes: 2 additions & 2 deletions Geek.Server.Core/Actors/ActorMgr.cs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public static async Task SaveAll()
var taskList = new List<Task>();
foreach (var actor in actorDic.Values)
{
taskList.Add(actor.SendAsync(() => actor.SaveAllState()));
taskList.Add(actor.SendAsync(async () => await actor.SaveAllState()));
}
await Task.WhenAll(taskList);
Log.Info($"save all state, use: {(DateTime.Now - begin).TotalMilliseconds}ms");
Expand Down Expand Up @@ -179,7 +179,7 @@ public static async Task TimerSave()
return;
if (count < ONCE_SAVE_COUNT)
{
taskList.Add(actor.SendAsync(() => actor.SaveAllState()));
taskList.Add(actor.SendAsync(async () => await actor.SaveAllState()));
count++;
}
else
Expand Down
16 changes: 13 additions & 3 deletions Geek.Server.Core/Actors/ActorType.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,8 @@ public enum ActorType
{
//ID全服唯一类型
None,
Role = 1, // 角色
Guild = 2, //公会
Logger = 4,
Role, // 角色
Guild, //公会

Separator = 128, /*分割线(勿调整,勿用于业务逻辑)*/

Expand All @@ -21,4 +20,15 @@ public enum ActorType

Max = 999,
}

/// <summary>
/// 供ActorLimit检测调用关系
/// </summary>
public enum ActorTypeLevel
{
Role = 1,
Guild,
Server,
}

}
130 changes: 110 additions & 20 deletions Geek.Server.Core/Actors/Impl/ActorLimit.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,41 +4,131 @@

namespace Geek.Server.Core.Actors.Impl
{

/// <summary>
/// 判断Actor交叉死锁
/// </summary>
internal static class ActorLimit
public static class ActorLimit
{
private static readonly Logger Log = LogManager.GetCurrentClassLogger();
/// <summary>
/// 可以按需扩展检查规则
/// </summary>
public enum RuleType
{
None,
/// <summary>
/// 分等级(高等级不能【等待】调用低等级)
/// </summary>
ByLevel,
/// <summary>
/// 禁止双向调用
/// </summary>
NoBidirectionCall
}

internal static readonly ConcurrentDictionary<long, ConcurrentDictionary<long, bool>> CrossDic = new();
interface IRule
{
bool AllowCall(long target);
}

private static bool AllowCall(long self, long target)

private static readonly Logger Log = LogManager.GetCurrentClassLogger();

private static IRule rule;
private static readonly Dictionary<ActorType, int> levelDic = new();
public static void Init(RuleType type)
{
// 自己入自己的队允许,会直接执行
if (self == target)
return true;
if (CrossDic.TryGetValue(target, out var set) && set.ContainsKey(self))
switch (type)
{
Log.Error($"发生交叉死锁,ActorId1:{self} ActorType1:{IdGenerator.GetActorType(self)} ActorId2:{target} ActorType2:{IdGenerator.GetActorType(target)}");
return false;
case RuleType.ByLevel:
rule = new ByLevelRule();
try
{
foreach (ActorTypeLevel foo in Enum.GetValues(typeof(ActorTypeLevel)))
{
ActorType actorType = (ActorType)Enum.Parse(typeof(ActorType), foo.ToString());
levelDic.Add(actorType, (int)foo);
}
}
catch (Exception)
{
throw;
}
break;
case RuleType.NoBidirectionCall:
rule = new NoBidirectionCallRule();
break;
case RuleType.None:
break;
default:
Log.Error($"不支持的rule类型:{type}");
break;
}
}

var selfSet = CrossDic.GetOrAdd(self, k => new());
selfSet.TryAdd(target, false);

public static bool AllowCall(long target)
{
if(rule != null)
rule.AllowCall(target);
return true;
}

internal static bool AllowCall(long target)

class ByLevelRule : IRule
{
var actorId = RuntimeContext.CurActor;
// 从IO线程抛到actor,不涉及入队行为
if (actorId == 0)
public bool AllowCall(long target)
{
var actorId = RuntimeContext.CurActor;
// 从其他线程抛到actor,不涉及入队行为
if (actorId == 0)
return true;
ActorType curType = IdGenerator.GetActorType(actorId);
ActorType targetType = IdGenerator.GetActorType(target);
if (levelDic.ContainsKey(targetType) && levelDic.ContainsKey(curType))
{
//等级高的不能【等待】调用等级低的
if (levelDic[curType] > levelDic[targetType])
{
Log.Error($"不合法的调用路径:{curType}==>{targetType}");
return false;
}
}
return true;
// Actor会在入队成功之后进行设置,这种属于Actor入队
return AllowCall(actorId, target);
}
}


#region NoBidirectionCallRule
class NoBidirectionCallRule : IRule
{
internal readonly ConcurrentDictionary<long, ConcurrentDictionary<long, bool>> CrossDic = new();
private bool AllowCall(long self, long target)
{
// 自己入自己的队允许,会直接执行
if (self == target)
return true;
if (CrossDic.TryGetValue(target, out var set) && set.ContainsKey(self))
{
Log.Error($"发生交叉死锁,ActorId1:{self} ActorType1:{IdGenerator.GetActorType(self)} ActorId2:{target} ActorType2:{IdGenerator.GetActorType(target)}");
return false;
}

var selfSet = CrossDic.GetOrAdd(self, k => new());
selfSet.TryAdd(target, false);

return true;
}

public bool AllowCall(long target)
{
var actorId = RuntimeContext.CurActor;
// 从IO线程抛到actor,不涉及入队行为
if (actorId == 0)
return true;
// Actor会在入队成功之后进行设置,这种属于Actor入队
return AllowCall(actorId, target);
}
}
#endregion

}
}
32 changes: 26 additions & 6 deletions Geek.Server.Core/Actors/Impl/WorkActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ private static async Task InnerRun(WorkWrapper wrapper)
return (needEnqueue, chainId);
}

#region 供代码生成器调用
public Task Enqueue(Action work, long callChainId, int timeOut = TIME_OUT)
#region 勿调用(仅供代码生成器调用)
public Task Enqueue(Action work, long callChainId, bool discard=false, int timeOut = TIME_OUT)
{
if (!discard && Settings.IsDebug && !ActorLimit.AllowCall(Id))
return default;
var at = new ActionWrapper(work)
{
Owner = this,
Expand All @@ -62,8 +64,10 @@ public Task Enqueue(Action work, long callChainId, int timeOut = TIME_OUT)
ActionBlock.SendAsync(at);
return at.Tcs.Task;
}
public Task<T> Enqueue<T>(Func<T> work, long callChainId, int timeOut = TIME_OUT)
public Task<T> Enqueue<T>(Func<T> work, long callChainId, bool discard = false, int timeOut = TIME_OUT)
{
if (!discard && Settings.IsDebug && !ActorLimit.AllowCall(Id))
return default;
var at = new FuncWrapper<T>(work)
{
Owner = this,
Expand All @@ -74,8 +78,10 @@ public Task<T> Enqueue<T>(Func<T> work, long callChainId, int timeOut = TIME_OUT
return at.Tcs.Task;
}

public Task Enqueue(Func<Task> work, long callChainId, int timeOut = TIME_OUT)
public Task Enqueue(Func<Task> work, long callChainId, bool discard = false, int timeOut = TIME_OUT)
{
if (!discard && Settings.IsDebug && !ActorLimit.AllowCall(Id))
return default;
var at = new ActionAsyncWrapper(work)
{
Owner = this,
Expand All @@ -86,8 +92,10 @@ public Task Enqueue(Func<Task> work, long callChainId, int timeOut = TIME_OUT)
return at.Tcs.Task;
}

public Task<T> Enqueue<T>(Func<Task<T>> work, long callChainId, int timeOut = TIME_OUT)
public Task<T> Enqueue<T>(Func<Task<T>> work, long callChainId, bool discard = false, int timeOut = TIME_OUT)
{
if (!discard && Settings.IsDebug && !ActorLimit.AllowCall(Id))
return default;
var at = new FuncAsyncWrapper<T>(work)
{
Owner = this,
Expand Down Expand Up @@ -130,6 +138,9 @@ public Task SendAsync(Action work, int timeout = Actor.TIME_OUT)
(bool needEnqueue, long chainId) = IsNeedEnqueue();
if (needEnqueue)
{
if (Settings.IsDebug && !ActorLimit.AllowCall(Id))
return default;

var at = new ActionWrapper(work)
{
Owner = this,
Expand All @@ -151,6 +162,9 @@ public Task<T> SendAsync<T>(Func<T> work, int timeout = Actor.TIME_OUT)
(bool needEnqueue, long chainId) = IsNeedEnqueue();
if (needEnqueue)
{
if (Settings.IsDebug && !ActorLimit.AllowCall(Id))
return default;

var at = new FuncWrapper<T>(work)
{
Owner = this,
Expand All @@ -166,11 +180,14 @@ public Task<T> SendAsync<T>(Func<T> work, int timeout = Actor.TIME_OUT)
}
}

public Task SendAsync(Func<Task> work, int timeout = Actor.TIME_OUT)
public Task SendAsync(Func<Task> work, int timeout = Actor.TIME_OUT, bool checkLock = true)
{
(bool needEnqueue, long chainId) = IsNeedEnqueue();
if (needEnqueue)
{
if (checkLock && Settings.IsDebug && !ActorLimit.AllowCall(Id))
return default;

var wrapper = new ActionAsyncWrapper(work)
{
Owner = this,
Expand All @@ -191,6 +208,9 @@ public Task<T> SendAsync<T>(Func<Task<T>> work, int timeout = Actor.TIME_OUT)
(bool needEnqueue, long chainId) = IsNeedEnqueue();
if (needEnqueue)
{
if (Settings.IsDebug && !ActorLimit.AllowCall(Id))
return default;

var wrapper = new FuncAsyncWrapper<T>(work)
{
Owner = this,
Expand Down
2 changes: 1 addition & 1 deletion Geek.Server.Hotfix/Logic/Server/ServerCompAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ public virtual Task<int> GetWorldLevel()
}

[Service]
[TimeOut(12000)]
public virtual Task<bool> IsOnline(long roleId)
{
foreach (var id in Comp.OnlineSet)
Expand Down Expand Up @@ -115,6 +114,7 @@ protected void DoSomething2()
}

[Discard]
[TimeOut(12000)]
protected virtual Task DoSomething3()
{
return Task.CompletedTask;
Expand Down

0 comments on commit f97d254

Please sign in to comment.