>  기사  >  백엔드 개발  >  여러 데이터베이스에 빅데이터 일괄 삽입을 구현하기 위한 C# 샘플 코드에 대한 자세한 설명

여러 데이터베이스에 빅데이터 일괄 삽입을 구현하기 위한 C# 샘플 코드에 대한 자세한 설명

黄舟
黄舟원래의
2017-03-25 11:43:282169검색

이 기사에서는 SqlServer, Oracle, SQLite 및 MySQL을 포함한 여러 데이터베이스에 빅데이터 일괄 삽입을 구현하는 C#을 주로 소개합니다.

이전에는 SqlServer가 데이터 일괄 삽입을 지원한다는 것만 알았고 Oracle, SQLite, MySQL도 지원한다는 사실은 몰랐습니다. 하지만 Oracle에서는 Orace.DataAccess 드라이버를 사용해야 합니다. 데이터베이스를 위한 여러 일괄 삽입 솔루션.

먼저 IProvider에는 일괄 삽입을 위한 플러그인 서비스 인터페이스 IBatcherProvider가 있습니다. 이 인터페이스는 이전 기사에서 언급한 바 있습니다.

/// <summary> 
  /// 提供数据批量处理的方法。 
  /// </summary> 
  public interface IBatcherProvider : IProviderService 
  { 
    /// <summary> 
    /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">每批次写入的数据量。</param> 
    void Insert(DataTable dataTable, int batchSize = 10000); 
  }

1. SqlServer 데이터 일괄 삽입

SqlServer 일괄 삽입은 매우 간단합니다. SqlBulkCopy를 사용하면 됩니다. 이 클래스:

/// <summary> 
  /// 为 System.Data.SqlClient 提供的用于批量操作的方法。 
  /// </summary> 
  public sealed class MsSqlBatcher : IBatcherProvider 
  { 
    /// <summary> 
    /// 获取或设置提供者服务的上下文。 
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">每批次写入的数据量。</param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = (SqlConnection)ServiceContext.Database.CreateConnection()) 
      { 
        try 
        { 
          connection.TryOpen(); 
          //给表名加上前后导符 
          var tableName = DbUtility.FormatByQuote(ServiceContext.Database.Provider.GetService<ISyntaxProvider>(), dataTable.TableName); 
          using (var bulk = new SqlBulkCopy(connection, SqlBulkCopyOptions.KeepIdentity, null) 
            { 
              DestinationTableName = tableName,  
              BatchSize = batchSize 
            }) 
          { 
            //循环所有列,为bulk添加映射 
            dataTable.EachColumn(c => bulk.ColumnMappings.Add(c.ColumnName, c.ColumnName), c => !c.AutoIncrement); 
            bulk.WriteToServer(dataTable); 
            bulk.Close(); 
          } 
        } 
        catch (Exception exp) 
        { 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
  }

위에서는 트랜잭션을 사용하지 않습니다. 트랜잭션을 사용하면 성능에 일정한 영향을 미칩니다. 트랜잭션을 사용하려면 SqlBulkCopyOptions.UseInternalTransaction을 설정할 수 있습니다.

2. Oracle 데이터 일괄 삽입

System.Data.OracleClient는 일괄 삽입을 지원하지 않으므로 Oracle.DataAccess만 사용할 수 있습니다. 공급자로서의 구성 요소.

/// <summary> 
  /// Oracle.Data.Access 组件提供的用于批量操作的方法。 
  /// </summary> 
  public sealed class OracleAccessBatcher : IBatcherProvider 
  { 
    /// <summary> 
    /// 获取或设置提供者服务的上下文。 
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">每批次写入的数据量。</param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = ServiceContext.Database.CreateConnection()) 
      { 
        try 
        { 
          connection.TryOpen(); 
          using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) 
          { 
            if (command == null) 
            { 
              throw new BatcherException(new ArgumentException("command")); 
            } 
            command.Connection = connection; 
            command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable); 
            command.ExecuteNonQuery(); 
          } 
        } 
        catch (Exception exp) 
        { 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
 
    /// <summary> 
    /// 生成插入数据的sql语句。 
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="command"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table) 
    { 
      var names = new StringBuilder(); 
      var values = new StringBuilder(); 
      //将一个DataTable的数据转换为数组的数组 
      var data = table.ToArray(); 
 
      //设置ArrayBindCount属性 
      command.GetType().GetProperty("ArrayBindCount").SetValue(command, table.Rows.Count, null); 
 
      var syntax = database.Provider.GetService<ISyntaxProvider>(); 
      for (var i = 0; i < table.Columns.Count; i++) 
      { 
        var column = table.Columns[i]; 
 
        var parameter = database.Provider.DbProviderFactory.CreateParameter(); 
        if (parameter == null) 
        { 
          continue; 
        } 
        parameter.ParameterName = column.ColumnName; 
        parameter.Direction = ParameterDirection.Input; 
        parameter.DbType = column.DataType.GetDbType(); 
        parameter.Value = data[i]; 
 
        if (names.Length > 0) 
        { 
          names.Append(","); 
          values.Append(","); 
        } 
        names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, column.ColumnName)); 
        values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName); 
 
        command.Parameters.Add(parameter); 
      } 
      return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values); 
    } 
  }

위의 가장 중요한 단계는 DataTable을 배열의 배열 표현, 즉 객체[][]로 변환하는 것입니다. 열이고 두 번째 배열은 행 수이므로 Columns 루프는 마지막 배열을 매개변수 값으로 사용합니다. 즉 매개변수 값은 배열입니다. insert 문은 일반적인 insert 문과 다르지 않습니다.

3. SQLite 데이터 일괄 삽입

SQLite 일괄 삽입에는 트랜잭션 열기만 필요합니다.

public sealed class SQLiteBatcher : IBatcherProvider 
  { 
    /// <summary> 
    /// 获取或设置提供者服务的上下文。 
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">每批次写入的数据量。</param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = ServiceContext.Database.CreateConnection()) 
      { 
        DbTransaction transcation = null; 
        try 
        { 
          connection.TryOpen(); 
          transcation = connection.BeginTransaction(); 
          using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) 
          { 
            if (command == null) 
            { 
              throw new BatcherException(new ArgumentException("command")); 
            } 
            command.Connection = connection; 
 
            command.CommandText = GenerateInserSql(ServiceContext.Database, dataTable); 
            if (command.CommandText == string.Empty) 
            { 
              return; 
            } 
 
            var flag = new AssertFlag(); 
            dataTable.EachRow(row => 
              { 
                var first = flag.AssertTrue(); 
                ProcessCommandParameters(dataTable, command, row, first); 
                command.ExecuteNonQuery(); 
              }); 
          } 
          transcation.Commit(); 
        } 
        catch (Exception exp) 
        { 
          if (transcation != null) 
          { 
            transcation.Rollback(); 
          } 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
 
    private void ProcessCommandParameters(DataTable dataTable, DbCommand command, DataRow row, bool first) 
    { 
      for (var c = 0; c < dataTable.Columns.Count; c++) 
      { 
        DbParameter parameter; 
        //首次创建参数,是为了使用缓存 
        if (first) 
        { 
          parameter = ServiceContext.Database.Provider.DbProviderFactory.CreateParameter(); 
          parameter.ParameterName = dataTable.Columns[c].ColumnName; 
          command.Parameters.Add(parameter); 
        } 
        else 
        { 
          parameter = command.Parameters[c]; 
        } 
        parameter.Value = row[c]; 
      } 
    } 
 
    /// <summary> 
    /// 生成插入数据的sql语句。 
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string GenerateInserSql(IDatabase database, DataTable table) 
    { 
      var syntax = database.Provider.GetService<ISyntaxProvider>(); 
      var names = new StringBuilder(); 
      var values = new StringBuilder(); 
      var flag = new AssertFlag(); 
      table.EachColumn(column => 
        { 
          if (!flag.AssertTrue()) 
          { 
            names.Append(","); 
            values.Append(","); 
          } 
          names.Append(DbUtility.FormatByQuote(syntax, column.ColumnName)); 
          values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName); 
        }); 
      return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values); 
    } 
  }

4. MySql 데이터 일괄 삽입

/// <summary> 
  /// 为 MySql.Data 组件提供的用于批量操作的方法。 
  /// </summary> 
  public sealed class MySqlBatcher : IBatcherProvider 
  { 
    /// <summary> 
    /// 获取或设置提供者服务的上下文。 
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">每批次写入的数据量。</param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = ServiceContext.Database.CreateConnection()) 
      { 
        try 
        { 
          connection.TryOpen(); 
          using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) 
          { 
            if (command == null) 
            { 
              throw new BatcherException(new ArgumentException("command")); 
            } 
            command.Connection = connection; 
 
            command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable); 
            if (command.CommandText == string.Empty) 
            { 
              return; 
            } 
            command.ExecuteNonQuery(); 
          } 
        } 
        catch (Exception exp) 
        { 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
 
    /// <summary> 
    /// 生成插入数据的sql语句。 
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="command"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table) 
    { 
      var names = new StringBuilder(); 
      var values = new StringBuilder(); 
      var types = new List<DbType>(); 
      var count = table.Columns.Count; 
      var syntax = database.Provider.GetService<ISyntaxProvider>(); 
      table.EachColumn(c => 
        { 
          if (names.Length > 0) 
          { 
            names.Append(","); 
          } 
          names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, c.ColumnName)); 
          types.Add(c.DataType.GetDbType()); 
        }); 
 
      var i = 0; 
      foreach (DataRow row in table.Rows) 
      { 
        if (i > 0) 
        { 
          values.Append(","); 
        } 
        values.Append("("); 
        for (var j = 0; j < count; j++) 
        { 
          if (j > 0) 
          { 
            values.Append(", "); 
          } 
          var isStrType = IsStringType(types[j]); 
          var parameter = CreateParameter(database.Provider, isStrType, types[j], row[j], syntax.ParameterPrefix, i, j); 
          if (parameter != null) 
          { 
            values.Append(parameter.ParameterName); 
            command.Parameters.Add(parameter); 
          } 
          else if (isStrType) 
          { 
            values.AppendFormat("&#39;{0}&#39;", row[j]); 
          } 
          else 
          { 
            values.Append(row[j]); 
          } 
        } 
        values.Append(")"); 
        i++; 
      } 
      return string.Format("INSERT INTO {0}({1}) VALUES {2}", DbUtility.FormatByQuote(syntax, table.TableName), names, values); 
    } 
 
    /// <summary> 
    /// 判断是否为字符串类别。 
    /// </summary> 
    /// <param name="dbType"></param> 
    /// <returns></returns> 
    private bool IsStringType(DbType dbType) 
    { 
return dbType == DbType.AnsiString || dbType == DbType.AnsiStringFixedLength || dbType == DbType.String || dbType == DbType.StringFixedLength; 
    } 
 
    /// <summary> 
    /// 创建参数。 
    /// </summary> 
    /// <param name="provider"></param> 
    /// <param name="isStrType"></param> 
    /// <param name="dbType"></param> 
    /// <param name="value"></param> 
    /// <param name="parPrefix"></param> 
    /// <param name="row"></param> 
    /// <param name="col"></param> 
    /// <returns></returns> 
    private DbParameter CreateParameter(IProvider provider, bool isStrType, DbType dbType, object value, char parPrefix, int row, int col) 
    { 
      //如果生成全部的参数,则速度会很慢,因此,只有数据类型为字符串(包含&#39;号)和日期型时才添加参数 
      if ((isStrType && value.ToString().IndexOf(&#39;\&#39;&#39;) != -1) || dbType == DbType.DateTime) 
      { 
        var name = string.Format("{0}p_{1}_{2}", parPrefix, row, col); 
        var parameter = provider.DbProviderFactory.CreateParameter(); 
        parameter.ParameterName = name; 
        parameter.Direction = ParameterDirection.Input; 
        parameter.DbType = dbType; 
        parameter.Value = value; 
        return parameter; 
      } 
      return null; 
    } 
  }

MySql 일괄 삽입은 값에 모든 값을 쓰는 것입니다 ​​​​예를 들어, batcher(id, name) 값(1, '1', 2, '2', 3, '3', ......... 10, '10')을 삽입합니다.

5. 테스트

다음으로 일괄 삽입을 사용한 효과를 확인하기 위해 테스트 케이스를 작성합니다.

    public void TestBatchInsert() 
    { 
      Console.WriteLine(TimeWatcher.Watch(() => 
        InvokeTest(database => 
          { 
            var table = new DataTable("Batcher"); 
            table.Columns.Add("Id", typeof(int)); 
            table.Columns.Add("Name1", typeof(string)); 
            table.Columns.Add("Name2", typeof(string)); 
            table.Columns.Add("Name3", typeof(string)); 
            table.Columns.Add("Name4", typeof(string)); 
 
            //构造100000条数据 
            for (var i = 0; i < 100000; i++) 
            { 
              table.Rows.Add(i, i.ToString(), i.ToString(), i.ToString(), i.ToString()); 
            } 
 
            //获取 IBatcherProvider 
            var batcher = database.Provider.GetService<IBatcherProvider>(); 
            if (batcher == null) 
            { 
              Console.WriteLine("不支持批量插入。"); 
            } 
            else 
            { 
              batcher.Insert(table); 
            } 
 
            //输出batcher表的数据量 
            var sql = new SqlCommand("SELECT COUNT(1) FROM Batcher"); 
            Console.WriteLine("当前共有 {0} 条数据", database.ExecuteScalar(sql)); 
 
          }))); 
    }

다음 표에는 4개의 데이터베이스가 각각 100,000개의 데이터를 생성하는 데 걸리는 시간이 나와 있습니다.

데이터베이스

数据库

耗用时间

MsSql 00:00:02.9376300
Oracle 00:00:01.5155959
SQLite 00:00:01.6275634
MySql 00:00:05.4166891

시간이 많이 걸립니다

MsSql 00:00:02.9376300
오라클 00:00:01.5155959
SQLite 00:00:01.6275634
MySql 00:00:05.4166891

위 내용은 여러 데이터베이스에 빅데이터 일괄 삽입을 구현하기 위한 C# 샘플 코드에 대한 자세한 설명의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.