Net6Configuration & Options 源碼分析 Part3 IOptionsMonitor 是如何接收到配置文件變更並同步數據源的

配置源的同步 IOptionsMonitor 使用

//以下demo演示使用IOptionsMonitor重新載入配置併當重新載入配置是執行回調函數

var configuration = new ConfigurationBuilder().AddJsonFile(path: "profile.json",
                                                           optional: false,
                                                           reloadOnChange: true).Build();
new ServiceCollection().AddOptions().Configure<Profile>(configuration).BuildServiceProvider().GetRequiredService<IOptionsMonitor<Profile>>().OnChange(profile => Console.WriteLine($"data reload: {profile.Age}"));
Console.Read();

public class Profile
{
    public int Age { get; set; }
}

配置源的同步 IOptionsMonitor 源碼分析

當文件變更時如何向外發送通知的以及 Reload data。

以JsonConfiguration為例:
FileConfigurationProvider通過FileProvider.Watch當文件發生改變的時候會調用Load,load方法做了兩件事情,1.調用子類同名虛方完成具體數據的reload data(由具體實現類:JsonConfigurationProvider)2。提供調用OnReload(由父類ConfigurationProvider實現)。完成對外發送data change的通知。OnReload內調用了_reloadToken.OnReload發送回調通知併產生一個新的ConfigurationReloadToken重新賦值給_reloadToken,通知註冊到FileConfigurationProvider._reloadToken的回調,那麼想接收到文件改變的消息只需要通過GetReloadToken()得到_reloadToken屬性並將回調函數註冊進去即可。
如下是此三個類的繼承關係JsonConfiguration->FileConfigurationProvider->ConfigurationProvider
知道了這些在看下ConfigurationRoot。

public abstract class FileConfigurationProvider : ConfigurationProvider, IDisposable
{
    public FileConfigurationProvider(FileConfigurationSource source!!)
    {
        Source = source;
        if (Source.ReloadOnChange && Source.FileProvider != null)
        {
            _changeTokenRegistration = ChangeToken.OnChange(
                () => Source.FileProvider.Watch(Source.Path!),
                () =>
                {
                    // 重新從JsonFile Load 數據並
                    Load(reload: true);
                });
        }
    }

    private void Load(bool reload)
    {
        IFileInfo? file = Source.FileProvider?.GetFileInfo(Source.Path ?? string.Empty);
        using Stream stream = OpenRead(file);
        try
        {
            // 此處調用具體實現類的Load 方法例如JsonConfigurationProvider
            Load(stream);
        }
       
        // 發送OnReload 並重新生成ConfigurationReloadToken共下次使用。
        OnReload();
    }
}

public class JsonConfigurationProvider : FileConfigurationProvider
{
    public JsonConfigurationProvider(JsonConfigurationSource source) : base(source) { }

    public override void Load(Stream stream)
    {
        Data = JsonConfigurationFileParser.Parse(stream);
    }
}

public abstract class ConfigurationProvider : IConfigurationProvider
{
    protected void OnReload()
    {
        ConfigurationReloadToken previousToken = Interlocked.Exchange(ref _reloadToken, new ConfigurationReloadToken());
        previousToken.OnReload();
    }

    public IChangeToken GetReloadToken()
    {
        return _reloadToken;
    }
}

ConfigurationRoot會循環調用把所有的providers 並通過IConfigurationProvider.GetReloadToken()得到FileConfigurationProvider._reloadToken,然後註冊上RaiseChanged作為回調函數。以文件系統為例,當文件發生改動時會調用此回調函數,此回調函數又會調用ConfigurationRoot的_changeToken.OnReload()向外發送通知。
ConfigurationChangeTokenSource:註冊的時機為ConfigurationChangeTokenSource.Configure.

我們作為使用者註冊的回調事件就是註冊在OptionsMonitor._onChange中。當用戶使用OptionsMonitor時,其在構造方法通過DI拿到使用ConfigurationChangeTokenSource作為包裝類,其包裝的是ConfigurationRoot._changeToken,並把自身的事件OptionsMonitor._onChange作為回調函數註冊在包裝類ConfigurationChangeTokenSource.包裝的ConfigurationRoot._changeToken中。自此完成了整個回調鏈條。

// ConfigurationRoot向IConfigurationProvider註冊回調函數拼接回調鏈條。
public class ConfigurationRoot : IConfigurationRoot, IDisposable
{
    _providers = providers;
    _changeTokenRegistrations = new List<IDisposable>(providers.Count);
    foreach (IConfigurationProvider p in providers)
    {
        p.Load();
        // 回調鏈條拼接
        _changeTokenRegistrations.Add(ChangeToken.OnChange(() => p.GetReloadToken(), () => RaiseChanged()));
    }

    private void RaiseChanged()
    {
        ConfigurationReloadToken previousToken = Interlocked.Exchange(ref _changeToken, new ConfigurationReloadToken());
        previousToken.OnReload();
    }
}

// ConfigurationChangeTokenSource 包裝類與註冊 OptionsConfigurationServiceCollectionExtensions
public class ConfigurationChangeTokenSource<TOptions> : IOptionsChangeTokenSource<TOptions>
{
    private IConfiguration _config;

    public ConfigurationChangeTokenSource(IConfiguration config) : this(Options.DefaultName, config){}

    public IChangeToken GetChangeToken()
    {
        return _config.GetReloadToken();
    }
}

public static class OptionsConfigurationServiceCollectionExtensions
{
     public static IServiceCollection Configure<[DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.All)] TOptions>(this IServiceCollection services!!, string? name, IConfiguration config!!, Action<BinderOptions>? configureBinder) where TOptions : class
    {
        services.AddOptions();
        services.AddSingleton<IOptionsChangeTokenSource<TOptions>>(new ConfigurationChangeTokenSource<TOptions>(name, config));
        return services.AddSingleton<IConfigureOptions<TOptions>>(new NamedConfigureFromConfigurationOptions<TOptions>(name, config, configureBinder));
    }
}

public class OptionsMonitor<[DynamicallyAccessedMembers(Options.DynamicallyAccessedMembers)] TOptions> : IOptionsMonitor<TOptions>, IDisposable
    where TOptions : class
{
    internal event Action<TOptions, string>? _onChange;
    public OptionsMonitor(IOptionsFactory<TOptions> factory, IEnumerable<IOptionsChangeTokenSource<TOptions>> sources, IOptionsMonitorCache<TOptions> cache)
    {
        ChangeToken.OnChange(
                          () => source.GetChangeToken(),
                          (name) => InvokeChanged(name),
                          source.Name);
        private void InvokeChanged(string? name)
        {
            name = name ?? Options.DefaultName;
            _cache.TryRemove(name);
            TOptions options = Get(name);
            if (_onChange != null)
            {
                _onChange.Invoke(options, name);
            }
        }
 
    }
     public IDisposable OnChange(Action<TOptions, string> listener)
    {
        var disposable = new ChangeTrackerDisposable(this, listener);
        _onChange += disposable.OnChange;
        return disposable;
    }
}


總結

整個過程回調使用了兩個ConfigurationReloadToken分別是。1. FileConfigurationProvider提供了一個ConfigurationReloadToken 2.提供了一個ConfigurationRoot._changeToken 。回調鏈條的拼接是。1.FileConfigurationProvider構造函數中文件的Watch與FileConfigurationProvider._reloadToken同時在這裡也完成了數據的reload data 2 ConfigurationRoot的構造函數中與IConfigurationProvider._reloadToken進行的回調鏈條拼接 。第三次拼接是把用戶註冊的回調函註冊在OptionsMonito的event上,OptionsMonito在構造函數中通過DI容器獲取到ConfigurationRoot._changeToken中包裝類。並把event作為回調函數進行註冊.

通過以上程式碼分析,當我們向創建一個具有相同通知機制的回調鏈條並且有多次通知 需要利用CancellationToken與 ChangeToken.OnChange 進行鏈接,同時要注意每次鏈接後向下發送消息時,要重新生成changeToken,因為changeToken的特性是只能發送一次消息。向多次必須重新生成ChangeToken例如

ConfigurationReloadToken previousToken = Interlocked.Exchange(ref _changeToken, new ConfigurationReloadToken());
previousToken.OnReload();