.Net分表分库动态化处理

介绍

本期主角:ShardingCore 一款ef-core下高性能、轻量级针对分表分库读写分离的解决方案,具有零依赖、零学习成本、零业务代码入侵

背景

最近有个小伙伴来问我,分表下他有一批数据,这个数据是白天可能会相对比较频繁数据录入,但是到了晚上可能基本上就没有对应的数据了,因为看到了我的框架,本来想以按小时来实现分表但是这么以来可能会导致一天有24张表,表多的情况下还导致了数据分布不均匀,这是一个很严重的问题因为可能以24小时制会让8-17这几张白天的表数据很多,但是晚上和凌晨的表基本没有数据,没有数据其实意味着这些表其实不需要去查询,基于这个情况想来问我应该如何实现这个自定义的路由。

听了他的需求,其实我这边又进行了一次确认,针对这个场景更多的其实是这个小伙伴需要的是按需分片,实时建表,来保证需要的数据进行合理的插入,那么我们应该如何在ShardingCore下实现这么一个需求呢,废话不多说直接开始吧~~~

创建项目

本次需求我们以mysql作为测试数据库,然后使用efcore6作为数据库驱动orm来实现怎么处理才能达到这个效果的分表分库(本次只涉及分表)。

新建一个项目

添加依赖

//请安装最新版本第一个版本号6代表efcore的版本号
Install-Package ShardingCore -Version 6.4.3.4

Install-Package Microsoft.EntityFrameworkCore.SqlServer -Version 6.0.1

新建一个对象表,配置对应的数据库映射关系并且关联到dbcontext

//创建数据库对象
    public class OrderByHour
    {
        public string Id { get; set; }
        public DateTime CreateTime { get; set; }
        public string Name { get; set; }
    }
//映射对象结构到数据库
    public class OrderByHourMap:IEntityTypeConfiguration<OrderByHour>
    {
        public void Configure(EntityTypeBuilder<OrderByHour> builder)
        {
            builder.HasKey(o => o.Id);
            builder.Property(o => o.Id).IsRequired().HasMaxLength(50);
            builder.Property(o => o.Name).IsRequired().HasMaxLength(128);
            builder.ToTable(nameof(OrderByHour));
        }
    }
//创建dbcontext为efcore所用上下文
    public class DefaultDbContext:AbstractShardingDbContext,IShardingTableDbContext
    {
        public DefaultDbContext(DbContextOptions<DefaultDbContext> options) : base(options)
        {
        }

        protected override void OnModelCreating(ModelBuilder modelBuilder)
        {
            base.OnModelCreating(modelBuilder);
            modelBuilder.ApplyConfiguration(new OrderByHourMap());
        }

        public IRouteTail RouteTail { get; set; }
    }

到这边其实只需要启动时候依赖注入

services.AddDbContext<DefaultDbContext>(o=>o.UseMySql(xxxx));

那么efcore就可以运行了,这么一看其实并没有很复杂而且IEntityTypeConfiguration也不是必须的,efcore允许使用attribute来实现
当然DefaultDbContext:AbstractShardingDbContext,IShardingTableDbContext这一部分在原生efcore中应该是DefaultDbContext:DbContext

创建分片路由

首先我们来看一下ShardingCore针对分片路由的自定义情况的分析,通过文档我们可以了解到,如果想要使用自定义路由那么你只需要自己新建一个路由并且继承实现AbstractShardingOperatorVirtualTableRoute,当然这是分表的,分库是AbstractShardingOperatorVirtualDataSourceRoute.

接下来我们新建一个路由并且实现分表操作。


    public class orderByHourRoute : AbstractShardingOperatorVirtualTableRoute<OrderByHour, DateTime>
    {
        public override string ShardingKeyToTail(object shardingKey)
        {
            throw new NotImplementedException();
        }

        public override List<string> GetAllTails()
        {
            throw new NotImplementedException();
        }

        public override void Configure(EntityMetadataTableBuilder<OrderByHour> builder)
        {
            throw new NotImplementedException();
        }

        public override Expression<Func<string, bool>> GetRouteToFilter(DateTime shardingKey, ShardingOperatorEnum shardingOperator)
        {
            throw new NotImplementedException();
        }
    }

接下来我们依次来实现并且说明各个接口。

  • ShardingKeyToTail:将你的对象转成数据库的后缀尾巴,比如你是按月分片,那么你的分片值大概率是datetime,那么只需要datetime.ToString("yyyyMM")就可以获取到分片后缀
  • GetAllTails:返回集合,集合是数据库现有的当前表的所有后缀,仅程序启动时被调用,这个接口就是需要你返回当前数据库中当前表在系统里面有多少张表,然后返回这些表的后缀
  • Configure:配置当前对象按什么字段分片
  • GetRouteToFilter:因为ShardingCore内存有当前所有表的后缀,假设后缀为list集合,返回的Expression<Func<string, bool>>在经过AndOr后的组合进行Compile(),然后对list.Where(expression.Compile()).ToList()就可以返回对应的本次查询的后缀信息

废话不多说针对这个条件我们直接开始操作完成路由的实现

路由的编写

1.ShardingKeyToTail:因为我们是按小时分表所以格式化值后缀我们采用日期格式化

//因为分片建是DateTime类型所以直接强转
        public override string ShardingKeyToTail(object shardingKey)
        {
            var dateTime = (DateTime)shardingKey;
            return ShardingKeyFormat(dateTime);
        }
        private string ShardingKeyFormat(DateTime dateTime)
        {
            var tail = $"{dateTime:yyyyMMddHH}";

            return tail;
        }

2.Configure:分表配置


        public override void Configure(EntityMetadataTableBuilder<OrderByHour> builder)
        {
            builder.ShardingProperty(o => o.CreateTime);
        }

3.GetRouteToFilter:路由比较,因为是时间字符串的后缀具有和按年,按月等相似的属性所以我们直接参考默认路由来实现


        public override Expression<Func<string, bool>> GetRouteToFilter(DateTime shardingKey, ShardingOperatorEnum shardingOperator)
        {
            var t = ShardingKeyFormat(shardingKey);
            switch (shardingOperator)
            {
                case ShardingOperatorEnum.GreaterThan:
                case ShardingOperatorEnum.GreaterThanOrEqual:
                    return tail => String.Compare(tail, t, StringComparison.Ordinal) >= 0;
                case ShardingOperatorEnum.LessThan:
                {
                    var currentHourBeginTime = new DateTime(shardingKey.Year,shardingKey.Month,shardingKey.Day,shardingKey.Hour,0,0);
                    //处于临界值 不应该被返回
                    if (currentHourBeginTime == shardingKey)
                        return tail => String.Compare(tail, t, StringComparison.Ordinal) < 0;
                    return tail => String.Compare(tail, t, StringComparison.Ordinal) <= 0;
                }
                case ShardingOperatorEnum.LessThanOrEqual:
                    return tail => String.Compare(tail, t, StringComparison.Ordinal) <= 0;
                case ShardingOperatorEnum.Equal: return tail => tail == t;
                default:
                {
                    return tail => true;
                }
            }
        }

4.GetAllTails:比较特殊我们因为并不是连续生成的所以没办法使用起始时间然后一直推到当前时间来实现后缀的返回,只能依靠ado.net的能力读取数据库然后返回对应的表后缀,当然你也可以使用redis等三方工具来存储

//1.构造函数注入 IVirtualDataSourceManager<DefaultDbContext> virtualDataSourceManager

//2/mysql的ado.net读取数据库表(sqlserver和mysql有差异自行百度或者查看ShardingCore的SqlServerTableEnsureManager类)
        private const string CurrentTableName = nameof(OrderByHour);
        private const string Tables = "Tables";
        private const string TABLE_SCHEMA = "TABLE_SCHEMA";
        private const string TABLE_NAME = "TABLE_NAME";

        private readonly ConcurrentDictionary<string, object?> _tails = new ConcurrentDictionary<string, object?>();
        /// <summary>
        /// 如果你是非mysql数据库请自行实现这个方法返回当前类在数据库已经存在的后缀
        /// 仅启动时调用
        /// </summary>
        /// <returns></returns>
        public override List<string> GetAllTails()
        {
            //启动寻找有哪些表后缀
            using (var connection = new MySqlConnection(_virtualDataSourceManager.GetCurrentVirtualDataSource().DefaultConnectionString))
            {
                connection.Open();
                var database = connection.Database;
                
                using (var dataTable = connection.GetSchema(Tables))
                {
                    for (int i = 0; i < dataTable.Rows.Count; i++)
                    {
                        var schema = dataTable.Rows[i][TABLE_SCHEMA];
                        if (database.Equals($"{schema}", StringComparison.OrdinalIgnoreCase))
                        {
                            var tableName = dataTable.Rows[i][TABLE_NAME]?.ToString()??string.Empty;
                            if (tableName.StartsWith(CurrentTableName, StringComparison.OrdinalIgnoreCase))
                            {
                                //如果没有下划线那么需要CurrentTableName.Length有下划线就要CurrentTableName.Length+1
                                _tails.TryAdd(tableName.Substring(CurrentTableName.Length),null);
                            }
                        }
                    }
                }
            }
            return _tails.Keys.ToList();
        }

动态创建添加表

到目前为止我们已经完成了路由的静态分片的处理,但是还有一点需要处理就是如何在插入值得时候判断当前有没有对应的数据库表是否需要创建等操作

查看AbstractShardingOperatorVirtualTableRoute分表抽象类的父类我们发现当前抽象类有两个地方会调用路由的获取判断方法

  • DoRouteWithPredicate:使用条件路由也就是where后面的表达式
  • RouteWithValue:使用值路由也就是我们的新增和修改整个对象的时候会被调用

所以通过上述流程的梳理我们可以知道只需要在RouteWithValue处进行动手脚即可,又因为我们需要动态建表所以我们可以参考默认路由的自动建表的代码进行参考
AbstractShardingAutoCreateOperatorVirtualTableRoute下的ExecuteAsync


        private readonly IVirtualDataSourceManager<DefaultDbContext> _virtualDataSourceManager;
        private readonly IVirtualTableManager<DefaultDbContext> _virtualTableManager;
        private readonly IShardingTableCreator<DefaultDbContext> _shardingTableCreator;
        private readonly ConcurrentDictionary<string, object?> _tails = new ConcurrentDictionary<string, object?>();
        private readonly object _lock = new object();

        public OrderByHourRoute(IVirtualDataSourceManager<DefaultDbContext> virtualDataSourceManager,IVirtualTableManager<DefaultDbContext> virtualTableManager, IShardingTableCreator<DefaultDbContext> shardingTableCreator)
        {
            _virtualDataSourceManager = virtualDataSourceManager;
            _virtualTableManager = virtualTableManager;
            _shardingTableCreator = shardingTableCreator;
        }

        public override IPhysicTable RouteWithValue(List<IPhysicTable> allPhysicTables, object shardingKey)
        {
            var shardingKeyToTail = ShardingKeyToTail(shardingKey);

            if (!_tails.TryGetValue(shardingKeyToTail,out var _))
            {
                lock (_lock)
                {
                    if (!_tails.TryGetValue(shardingKeyToTail,out var _))
                    {
                        var virtualTable = _virtualTableManager.GetVirtualTable(typeof(OrderByHour));
//必须先执行AddPhysicTable在进行CreateTable
                        _virtualTableManager.AddPhysicTable(virtualTable, new DefaultPhysicTable(virtualTable, shardingKeyToTail));
                        try
                        {
                            _shardingTableCreator.CreateTable<OrderByHour>(_virtualDataSourceManager.GetCurrentVirtualDataSource().DefaultDataSourceName, shardingKeyToTail);
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine("尝试添加表失败" + ex);
                        }

                        _tails.TryAdd(shardingKeyToTail,null);
                    }
                }
            }

            var needRefresh = allPhysicTables.Count != _tails.Count;
            if (needRefresh)
            {
                var virtualTable = _virtualTableManager.GetVirtualTable(typeof(OrderByHour));
                //修复可能导致迭代器遍历时添加的bug
                var keys = _tails.Keys.ToList();
                foreach (var tail in keys)
                {
                    var hashSet = allPhysicTables.Select(o=>o.Tail).ToHashSet();
                    if (!hashSet.Contains(tail))
                    {
                        var tables = virtualTable.GetAllPhysicTables();
                        var physicTable = tables.FirstOrDefault(o=>o.Tail==tail);
                        if (physicTable!= null)
                        {
                            allPhysicTables.Add(physicTable);
                        }
                    }
                }
            }
            var physicTables = allPhysicTables.Where(o => o.Tail== shardingKeyToTail).ToList();
            if (physicTables.IsEmpty())
            {
                throw new ShardingCoreException($"sharding key route not match {EntityMetadata.EntityType} -> [{EntityMetadata.ShardingTableProperty.Name}] ->【{shardingKey}】 all tails ->[{string.Join(",", allPhysicTables.Select(o=>o.FullName))}]");
            }

            if (physicTables.Count > 1)
                throw new ShardingCoreException($"more than one route match table:{string.Join(",", physicTables.Select(o => $"[{o.FullName}]"))}");
            return physicTables[0];
        }

通过和父类的比较我们只是在对应的根据值判断当前系统是否存在xx表如果不存在就在ShardingCore上插入AddPhysicTable然后CreateTable最后_tails.TryAdd(shardingKeyToTail,null);

needRefresh处的代码需要针对如果当前需要和传入的全量表进行匹配因为新加的表后缀不在全量表里面所以需要先进行对其的处理然后再进行执行

启动配置必不可少


ILoggerFactory efLogger = LoggerFactory.Create(builder =>
{
    builder.AddFilter((category, level) => category == DbLoggerCategory.Database.Command.Name && level == LogLevel.Information).AddConsole();
});

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.

builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at //aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddShardingDbContext<DefaultDbContext>()
    .AddEntityConfig(o =>
    {
        o.ThrowIfQueryRouteNotMatch = false;
        o.CreateShardingTableOnStart = true;
        o.EnsureCreatedWithOutShardingTable = true;
        o.AddShardingTableRoute<OrderByHourRoute>();
    })
    .AddConfig(o =>
    {
        o.ConfigId = "c1";
        o.AddDefaultDataSource("ds0", "server=127.0.0.1;port=3306;database=shardingTest;userid=root;password=root;");
        o.UseShardingQuery((conn, b) =>
        {
            b.UseMySql(conn, new MySqlServerVersion(new Version())).UseLoggerFactory(efLogger);
        });
        o.UseShardingTransaction((conn, b) =>
        {
            b.UseMySql(conn, new MySqlServerVersion(new Version())).UseLoggerFactory(efLogger);
        });
        o.ReplaceTableEnsureManager(sp=>new MySqlTableEnsureManager<DefaultDbContext>());
    }).EnsureConfig();
var app = builder.Build();

// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}
app.Services.GetRequiredService<IShardingBootstrapper>().Start();
app.UseAuthorization();

app.MapControllers();

app.Run();

最后我们直接启动运行调试代码

当我们插入一个没有的时间对应的框架会帮我们对应的创建表并且插入数据

最后的最后

demo地址 //github.com/dotnetcore/sharding-core/tree/main/samples/Sample.AutoCreateIfPresent

您都看到这边了确定不点个star或者赞吗,一款.Net不得不学的分库分表解决方案,简单理解为sharding-jdbc在.net中的实现并且支持更多特性和更优秀的数据聚合,拥有原生性能的97%,并且无业务侵入性,支持未分片的所有efcore原生查询