寫在前面
除了批量新增之前Z.EntityFramework.Plus都實現(xiàn)了愧沟,只有批量新增是收費的臭杰,所以這里只介紹如何實現(xiàn)批量新增。
思路詳解
一般的數(shù)據(jù)庫驅動都是帶批量新增功能的谚中,但是EntityFramework支持多種數(shù)據(jù)庫渴杆,要根據(jù)業(yè)務去實現(xiàn)不同的數(shù)據(jù)庫的批量插入,就只要找到數(shù)據(jù)庫驅動的支持文檔就大概可以知道用哪個函數(shù)可以實現(xiàn)批量插入的功能了。現(xiàn)在EfCore中的DbContext是可以拿到DbConnection宪塔,能拿到這個對象那就意味著我可以直接使用Ado.net去干這個事情磁奖,而且Ef還會幫我管理銷毀和連接池等一系列的臟活。
PgSQL批量新增
PgSQL的.NET驅動一般是Npgsql,那么找到它的文檔就應該可以找到對應的批量操作函數(shù)某筐,然后按照文檔大概就可以知道怎么做能搞定這個事情比搭。
核心代碼分析
思路就是先根據(jù)EF給到的EntityType構建列的映射,然后再將數(shù)據(jù)一個一個的塞DataTable中南誊。
最后再使用BeginBinaryImport
語法一次性插入
var pgConnection = dbContext.Database.GetDbConnection() as NpgsqlConnection;
// 這里是構建Copy的SQL語句
var commandFormat = string.Format("COPY \"{0}\"({1}) FROM STDIN BINARY", tableName, string.Join(",", fields));
// 主要就是用的這函數(shù) BeginBinaryImport
using (var writer = pgConnection.BeginBinaryImport(commandFormat))
{
foreach (DataRow item in dataTable.Rows)
{
await writer.WriteRowAsync(cancellationToken, item.ItemArray); // 異步寫入數(shù)據(jù)庫
}
await writer.CompleteAsync(cancellationToken); // 寫完之后一次性提交
}
示例代碼(鄙人框架是ABP的,所以這里是用的ABP框架作為示例代碼)
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata;
using Microsoft.Extensions.Logging;
using Npgsql;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Volo.Abp.Data;
using Volo.Abp.Domain.Entities;
using Volo.Abp.Guids;
using Volo.Abp.ObjectExtending;
using Volo.Abp.DependencyInjection;
using Volo.Abp.EntityFrameworkCore;
using Volo.Abp.Uow;
namespace EntityFrameworkCore.BulkOperationProvider
{
[ExposeServices(typeof(IEfCoreBulkOperationProvider))]
public class PgsqlEfCoreBulkOperationProvider : IEfCoreBulkOperationProvider
{
protected ILogger<PgsqlEfCoreBulkOperationProvider> Logger { get; private set; }
protected IGuidGenerator GuidGenerator { get; private set; }
public PgsqlEfCoreBulkOperationProvider(ILogger<PgsqlEfCoreBulkOperationProvider> logger, IGuidGenerator guidGenerator)
{
Logger = logger;
GuidGenerator = guidGenerator;
}
[UnitOfWork(IsDisabled = true)]
public async Task<int> BulkInsertAsync<TDbContext, TEntity>(TDbContext dbContext, IEnumerable<TEntity> entities, CancellationToken cancellationToken = default)
where TEntity : class, IEntity
where TDbContext : IEfCoreDbContext
{
if (entities.Count() < 1) return 0;
var dbSet = dbContext.Set<TEntity>();
var entityType = dbSet.EntityType;
var entityProps = entityType.GetProperties();
var tableName = dbSet.EntityType.GetTableName();
var storeObjectIdentifier = StoreObjectIdentifier.Table(tableName, dbSet.EntityType.GetSchema());
var pgConnection = dbContext.Database.GetDbConnection() as NpgsqlConnection;
if (pgConnection == null)
throw new Exception("DbConnecion is not assignable to [NpgsqlConnection]");
try
{
int curIndex = 0;
int batchSize = 10000;
int totalCount = entities.Count();
var dataTable = new DataTable();
var fields = new List<string>();
var needHandleExtraProps = typeof(TEntity).IsAssignableTo<IHasExtraProperties>();
// 構建字段與列頭
foreach (var item in entityProps)
{
var colName = item.GetColumnName(storeObjectIdentifier);
//var s = item.GetTypeMapping().ClrType;
var propertyType = item.PropertyInfo.PropertyType;
if (needHandleExtraProps && item.Name == nameof(IHasExtraProperties.ExtraProperties))
propertyType = typeof(string);
var typeMapping = Nullable.GetUnderlyingType(propertyType) ?? propertyType;
fields.Add($"\"{colName}\""); // 構建字段
dataTable.Columns.Add(new DataColumn(colName, typeMapping)); // 構建DataTable的列
}
// 構建導入SQL
var commandFormat = string.Format("COPY \"{0}\"({1}) FROM STDIN BINARY", tableName, string.Join(",", fields));
while (curIndex < totalCount)
{
dataTable.Clear(); // 每次搞完一批之后都要清空DataTable,否則會報錯
var batchEntities = entities.Skip(curIndex).Take(batchSize);
foreach (var item in batchEntities)
{
CheckAndSetId(item); // 為Guid賦值
ArrayList tempList = new ArrayList();
foreach (var entityProp in entityProps)
{
object obj = entityProp.PropertyInfo.GetValue(item, null);
if (needHandleExtraProps && entityProp.PropertyInfo.Name == nameof(IHasExtraProperties.ExtraProperties))
obj = SerializeExtraObject((item as IHasExtraProperties).ExtraProperties, typeof(TEntity));
tempList.Add(obj);
}
dataTable.LoadDataRow(tempList.ToArray(), true);
}
using (var writer = pgConnection.BeginBinaryImport(commandFormat))
{
foreach (DataRow item in dataTable.Rows)
{
await writer.WriteRowAsync(cancellationToken, item.ItemArray);
}
await writer.CompleteAsync(cancellationToken);
}
curIndex += batchSize;
}
}
catch (Exception ex)
{
Logger.LogError(ex, $"PG批量插入出錯,Error->{ex.Message}");
throw ex;
}
return entities.Count();
}
protected virtual void CheckAndSetId<TEntity>(TEntity entity)
{
if (entity is IEntity<Guid> entityWithGuidId)
{
TrySetGuidId(entityWithGuidId);
}
}
protected virtual void TrySetGuidId(IEntity<Guid> entity)
{
if (entity.Id != default)
{
return;
}
EntityHelper.TrySetId(
entity,
() => GuidGenerator.Create(),
true
);
}
protected virtual string SerializeExtraObject(ExtraPropertyDictionary extraProperties, Type entityType)
{
var copyDictionary = new Dictionary<string, object>(extraProperties);
if (entityType != null)
{
var objectExtension = ObjectExtensionManager.Instance.GetOrNull(entityType);
if (objectExtension != null)
{
foreach (var property in objectExtension.GetProperties())
{
if (property.IsMappedToFieldForEfCore())
{
copyDictionary.Remove(property.Name);
}
}
}
}
return JsonSerializer.Serialize(copyDictionary);
}
}
}