집 >백엔드 개발 >C#.Net 튜토리얼 >여러 데이터베이스에 빅데이터 일괄 삽입을 구현하기 위한 C# 샘플 코드에 대한 자세한 설명
이 기사에서는 SqlServer, Oracle, SQLite 및 MySQL을 포함한 여러 데이터베이스에 빅데이터 일괄 삽입을 구현하는 C#을 주로 소개합니다.
이전에는 SqlServer가 데이터 일괄 삽입을 지원한다는 것만 알았고 Oracle, SQLite, MySQL도 지원한다는 사실은 몰랐습니다. 하지만 Oracle에서는 Orace.DataAccess 드라이버를 사용해야 합니다. 데이터베이스를 위한 여러 일괄 삽입 솔루션.
먼저 IProvider에는 일괄 삽입을 위한 플러그인 서비스 인터페이스 IBatcherProvider가 있습니다. 이 인터페이스는 이전 기사에서 언급한 바 있습니다.
/// <summary> /// 提供数据批量处理的方法。 /// </summary> public interface IBatcherProvider : IProviderService { /// <summary> /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 /// </summary> /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> /// <param name="batchSize">每批次写入的数据量。</param> void Insert(DataTable dataTable, int batchSize = 10000); }
1. SqlServer 데이터 일괄 삽입
SqlServer 일괄 삽입은 매우 간단합니다. SqlBulkCopy를 사용하면 됩니다. 이 클래스:
/// <summary> /// 为 System.Data.SqlClient 提供的用于批量操作的方法。 /// </summary> public sealed class MsSqlBatcher : IBatcherProvider { /// <summary> /// 获取或设置提供者服务的上下文。 /// </summary> public ServiceContext ServiceContext { get; set; } /// <summary> /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 /// </summary> /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> /// <param name="batchSize">每批次写入的数据量。</param> public void Insert(DataTable dataTable, int batchSize = 10000) { Checker.ArgumentNull(dataTable, "dataTable"); if (dataTable.Rows.Count == 0) { return; } using (var connection = (SqlConnection)ServiceContext.Database.CreateConnection()) { try { connection.TryOpen(); //给表名加上前后导符 var tableName = DbUtility.FormatByQuote(ServiceContext.Database.Provider.GetService<ISyntaxProvider>(), dataTable.TableName); using (var bulk = new SqlBulkCopy(connection, SqlBulkCopyOptions.KeepIdentity, null) { DestinationTableName = tableName, BatchSize = batchSize }) { //循环所有列,为bulk添加映射 dataTable.EachColumn(c => bulk.ColumnMappings.Add(c.ColumnName, c.ColumnName), c => !c.AutoIncrement); bulk.WriteToServer(dataTable); bulk.Close(); } } catch (Exception exp) { throw new BatcherException(exp); } finally { connection.TryClose(); } } } }
위에서는 트랜잭션을 사용하지 않습니다. 트랜잭션을 사용하면 성능에 일정한 영향을 미칩니다. 트랜잭션을 사용하려면 SqlBulkCopyOptions.UseInternalTransaction을 설정할 수 있습니다.
2. Oracle 데이터 일괄 삽입
System.Data.OracleClient는 일괄 삽입을 지원하지 않으므로 Oracle.DataAccess만 사용할 수 있습니다. 공급자로서의 구성 요소.
/// <summary> /// Oracle.Data.Access 组件提供的用于批量操作的方法。 /// </summary> public sealed class OracleAccessBatcher : IBatcherProvider { /// <summary> /// 获取或设置提供者服务的上下文。 /// </summary> public ServiceContext ServiceContext { get; set; } /// <summary> /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 /// </summary> /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> /// <param name="batchSize">每批次写入的数据量。</param> public void Insert(DataTable dataTable, int batchSize = 10000) { Checker.ArgumentNull(dataTable, "dataTable"); if (dataTable.Rows.Count == 0) { return; } using (var connection = ServiceContext.Database.CreateConnection()) { try { connection.TryOpen(); using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) { if (command == null) { throw new BatcherException(new ArgumentException("command")); } command.Connection = connection; command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable); command.ExecuteNonQuery(); } } catch (Exception exp) { throw new BatcherException(exp); } finally { connection.TryClose(); } } } /// <summary> /// 生成插入数据的sql语句。 /// </summary> /// <param name="database"></param> /// <param name="command"></param> /// <param name="table"></param> /// <returns></returns> private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table) { var names = new StringBuilder(); var values = new StringBuilder(); //将一个DataTable的数据转换为数组的数组 var data = table.ToArray(); //设置ArrayBindCount属性 command.GetType().GetProperty("ArrayBindCount").SetValue(command, table.Rows.Count, null); var syntax = database.Provider.GetService<ISyntaxProvider>(); for (var i = 0; i < table.Columns.Count; i++) { var column = table.Columns[i]; var parameter = database.Provider.DbProviderFactory.CreateParameter(); if (parameter == null) { continue; } parameter.ParameterName = column.ColumnName; parameter.Direction = ParameterDirection.Input; parameter.DbType = column.DataType.GetDbType(); parameter.Value = data[i]; if (names.Length > 0) { names.Append(","); values.Append(","); } names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, column.ColumnName)); values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName); command.Parameters.Add(parameter); } return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values); } }
위의 가장 중요한 단계는 DataTable을 배열의 배열 표현, 즉 객체[][]로 변환하는 것입니다. 열이고 두 번째 배열은 행 수이므로 Columns 루프는 마지막 배열을 매개변수 값으로 사용합니다. 즉 매개변수 값은 배열입니다. insert 문은 일반적인 insert 문과 다르지 않습니다.
3. SQLite 데이터 일괄 삽입
SQLite 일괄 삽입에는 트랜잭션 열기만 필요합니다.
public sealed class SQLiteBatcher : IBatcherProvider { /// <summary> /// 获取或设置提供者服务的上下文。 /// </summary> public ServiceContext ServiceContext { get; set; } /// <summary> /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 /// </summary> /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> /// <param name="batchSize">每批次写入的数据量。</param> public void Insert(DataTable dataTable, int batchSize = 10000) { Checker.ArgumentNull(dataTable, "dataTable"); if (dataTable.Rows.Count == 0) { return; } using (var connection = ServiceContext.Database.CreateConnection()) { DbTransaction transcation = null; try { connection.TryOpen(); transcation = connection.BeginTransaction(); using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) { if (command == null) { throw new BatcherException(new ArgumentException("command")); } command.Connection = connection; command.CommandText = GenerateInserSql(ServiceContext.Database, dataTable); if (command.CommandText == string.Empty) { return; } var flag = new AssertFlag(); dataTable.EachRow(row => { var first = flag.AssertTrue(); ProcessCommandParameters(dataTable, command, row, first); command.ExecuteNonQuery(); }); } transcation.Commit(); } catch (Exception exp) { if (transcation != null) { transcation.Rollback(); } throw new BatcherException(exp); } finally { connection.TryClose(); } } } private void ProcessCommandParameters(DataTable dataTable, DbCommand command, DataRow row, bool first) { for (var c = 0; c < dataTable.Columns.Count; c++) { DbParameter parameter; //首次创建参数,是为了使用缓存 if (first) { parameter = ServiceContext.Database.Provider.DbProviderFactory.CreateParameter(); parameter.ParameterName = dataTable.Columns[c].ColumnName; command.Parameters.Add(parameter); } else { parameter = command.Parameters[c]; } parameter.Value = row[c]; } } /// <summary> /// 生成插入数据的sql语句。 /// </summary> /// <param name="database"></param> /// <param name="table"></param> /// <returns></returns> private string GenerateInserSql(IDatabase database, DataTable table) { var syntax = database.Provider.GetService<ISyntaxProvider>(); var names = new StringBuilder(); var values = new StringBuilder(); var flag = new AssertFlag(); table.EachColumn(column => { if (!flag.AssertTrue()) { names.Append(","); values.Append(","); } names.Append(DbUtility.FormatByQuote(syntax, column.ColumnName)); values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName); }); return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values); } }
4. MySql 데이터 일괄 삽입
/// <summary> /// 为 MySql.Data 组件提供的用于批量操作的方法。 /// </summary> public sealed class MySqlBatcher : IBatcherProvider { /// <summary> /// 获取或设置提供者服务的上下文。 /// </summary> public ServiceContext ServiceContext { get; set; } /// <summary> /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 /// </summary> /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> /// <param name="batchSize">每批次写入的数据量。</param> public void Insert(DataTable dataTable, int batchSize = 10000) { Checker.ArgumentNull(dataTable, "dataTable"); if (dataTable.Rows.Count == 0) { return; } using (var connection = ServiceContext.Database.CreateConnection()) { try { connection.TryOpen(); using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) { if (command == null) { throw new BatcherException(new ArgumentException("command")); } command.Connection = connection; command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable); if (command.CommandText == string.Empty) { return; } command.ExecuteNonQuery(); } } catch (Exception exp) { throw new BatcherException(exp); } finally { connection.TryClose(); } } } /// <summary> /// 生成插入数据的sql语句。 /// </summary> /// <param name="database"></param> /// <param name="command"></param> /// <param name="table"></param> /// <returns></returns> private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table) { var names = new StringBuilder(); var values = new StringBuilder(); var types = new List<DbType>(); var count = table.Columns.Count; var syntax = database.Provider.GetService<ISyntaxProvider>(); table.EachColumn(c => { if (names.Length > 0) { names.Append(","); } names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, c.ColumnName)); types.Add(c.DataType.GetDbType()); }); var i = 0; foreach (DataRow row in table.Rows) { if (i > 0) { values.Append(","); } values.Append("("); for (var j = 0; j < count; j++) { if (j > 0) { values.Append(", "); } var isStrType = IsStringType(types[j]); var parameter = CreateParameter(database.Provider, isStrType, types[j], row[j], syntax.ParameterPrefix, i, j); if (parameter != null) { values.Append(parameter.ParameterName); command.Parameters.Add(parameter); } else if (isStrType) { values.AppendFormat("'{0}'", row[j]); } else { values.Append(row[j]); } } values.Append(")"); i++; } return string.Format("INSERT INTO {0}({1}) VALUES {2}", DbUtility.FormatByQuote(syntax, table.TableName), names, values); } /// <summary> /// 判断是否为字符串类别。 /// </summary> /// <param name="dbType"></param> /// <returns></returns> private bool IsStringType(DbType dbType) { return dbType == DbType.AnsiString || dbType == DbType.AnsiStringFixedLength || dbType == DbType.String || dbType == DbType.StringFixedLength; } /// <summary> /// 创建参数。 /// </summary> /// <param name="provider"></param> /// <param name="isStrType"></param> /// <param name="dbType"></param> /// <param name="value"></param> /// <param name="parPrefix"></param> /// <param name="row"></param> /// <param name="col"></param> /// <returns></returns> private DbParameter CreateParameter(IProvider provider, bool isStrType, DbType dbType, object value, char parPrefix, int row, int col) { //如果生成全部的参数,则速度会很慢,因此,只有数据类型为字符串(包含'号)和日期型时才添加参数 if ((isStrType && value.ToString().IndexOf('\'') != -1) || dbType == DbType.DateTime) { var name = string.Format("{0}p_{1}_{2}", parPrefix, row, col); var parameter = provider.DbProviderFactory.CreateParameter(); parameter.ParameterName = name; parameter.Direction = ParameterDirection.Input; parameter.DbType = dbType; parameter.Value = value; return parameter; } return null; } }
MySql 일괄 삽입은 값에 모든 값을 쓰는 것입니다 예를 들어, batcher(id, name) 값(1, '1', 2, '2', 3, '3', ......... 10, '10')을 삽입합니다.
5. 테스트
다음으로 일괄 삽입을 사용한 효과를 확인하기 위해 테스트 케이스를 작성합니다.
public void TestBatchInsert() { Console.WriteLine(TimeWatcher.Watch(() => InvokeTest(database => { var table = new DataTable("Batcher"); table.Columns.Add("Id", typeof(int)); table.Columns.Add("Name1", typeof(string)); table.Columns.Add("Name2", typeof(string)); table.Columns.Add("Name3", typeof(string)); table.Columns.Add("Name4", typeof(string)); //构造100000条数据 for (var i = 0; i < 100000; i++) { table.Rows.Add(i, i.ToString(), i.ToString(), i.ToString(), i.ToString()); } //获取 IBatcherProvider var batcher = database.Provider.GetService<IBatcherProvider>(); if (batcher == null) { Console.WriteLine("不支持批量插入。"); } else { batcher.Insert(table); } //输出batcher表的数据量 var sql = new SqlCommand("SELECT COUNT(1) FROM Batcher"); Console.WriteLine("当前共有 {0} 条数据", database.ExecuteScalar(sql)); }))); }
다음 표에는 4개의 데이터베이스가 각각 100,000개의 데이터를 생성하는 데 걸리는 시간이 나와 있습니다.
데이터베이스
|
시간이 많이 걸립니다 |
||||||||||
MsSql | 00:00:02.9376300 | ||||||||||
오라클 | 00:00:01.5155959 | ||||||||||
SQLite | 00:00:01.6275634 | ||||||||||
MySql | 00:00:05.4166891 |
위 내용은 여러 데이터베이스에 빅데이터 일괄 삽입을 구현하기 위한 C# 샘플 코드에 대한 자세한 설명의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!