Maison  >  Article  >  développement back-end  >  Explication détaillée de l'exemple de code C# pour la mise en œuvre de l'insertion par lots de Big Data dans plusieurs bases de données

Explication détaillée de l'exemple de code C# pour la mise en œuvre de l'insertion par lots de Big Data dans plusieurs bases de données

黄舟
黄舟original
2017-03-25 11:43:282222parcourir

Cet article présente principalement C# pour implémenter l'insertion par lots de Big Data dans plusieurs bases de données, dont SqlServer, Oracle, SQLite et MySQL Ceux qui sont intéressés peuvent en savoir plus.

Auparavant, je savais seulement que SqlServer prenait en charge l'insertion de données par lots, mais je ne savais pas qu'Oracle, SQLite et MySQL le prenaient également en charge. Cependant, Oracle doit utiliser le pilote Orace.DataAccess. plusieurs solutions d'insertion par lots pour les bases de données.

Tout d'abord, il existe une interface de service de plug-in IBatcherProvider dans IProvider pour l'insertion par lots. Cette interface a été mentionnée dans l'article précédent.

/// <summary> 
  /// 提供数据批量处理的方法。 
  /// </summary> 
  public interface IBatcherProvider : IProviderService 
  { 
    /// <summary> 
    /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">每批次写入的数据量。</param> 
    void Insert(DataTable dataTable, int batchSize = 10000); 
  }

1. Insertion par lots de données SqlServer

L'insertion par lots SqlServer est très simple, utilisez simplement SqlBulkCopy. :

/// <summary> 
  /// 为 System.Data.SqlClient 提供的用于批量操作的方法。 
  /// </summary> 
  public sealed class MsSqlBatcher : IBatcherProvider 
  { 
    /// <summary> 
    /// 获取或设置提供者服务的上下文。 
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">每批次写入的数据量。</param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = (SqlConnection)ServiceContext.Database.CreateConnection()) 
      { 
        try 
        { 
          connection.TryOpen(); 
          //给表名加上前后导符 
          var tableName = DbUtility.FormatByQuote(ServiceContext.Database.Provider.GetService<ISyntaxProvider>(), dataTable.TableName); 
          using (var bulk = new SqlBulkCopy(connection, SqlBulkCopyOptions.KeepIdentity, null) 
            { 
              DestinationTableName = tableName,  
              BatchSize = batchSize 
            }) 
          { 
            //循环所有列,为bulk添加映射 
            dataTable.EachColumn(c => bulk.ColumnMappings.Add(c.ColumnName, c.ColumnName), c => !c.AutoIncrement); 
            bulk.WriteToServer(dataTable); 
            bulk.Close(); 
          } 
        } 
        catch (Exception exp) 
        { 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
  }

Ce qui précède n'utilise pas de transactions. L'utilisation de transactions aura un certain impact sur les performances. Si vous souhaitez utiliser des transactions, vous pouvez définir SqlBulkCopyOptions.UseInternalTransaction.

2. Insertion par lots de données Oracle

System.Data.OracleClient ne prend pas en charge l'insertion par lots, vous ne pouvez donc utiliser qu'Oracle.DataAccess. composant en tant que fournisseur.

/// <summary> 
  /// Oracle.Data.Access 组件提供的用于批量操作的方法。 
  /// </summary> 
  public sealed class OracleAccessBatcher : IBatcherProvider 
  { 
    /// <summary> 
    /// 获取或设置提供者服务的上下文。 
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">每批次写入的数据量。</param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = ServiceContext.Database.CreateConnection()) 
      { 
        try 
        { 
          connection.TryOpen(); 
          using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) 
          { 
            if (command == null) 
            { 
              throw new BatcherException(new ArgumentException("command")); 
            } 
            command.Connection = connection; 
            command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable); 
            command.ExecuteNonQuery(); 
          } 
        } 
        catch (Exception exp) 
        { 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
 
    /// <summary> 
    /// 生成插入数据的sql语句。 
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="command"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table) 
    { 
      var names = new StringBuilder(); 
      var values = new StringBuilder(); 
      //将一个DataTable的数据转换为数组的数组 
      var data = table.ToArray(); 
 
      //设置ArrayBindCount属性 
      command.GetType().GetProperty("ArrayBindCount").SetValue(command, table.Rows.Count, null); 
 
      var syntax = database.Provider.GetService<ISyntaxProvider>(); 
      for (var i = 0; i < table.Columns.Count; i++) 
      { 
        var column = table.Columns[i]; 
 
        var parameter = database.Provider.DbProviderFactory.CreateParameter(); 
        if (parameter == null) 
        { 
          continue; 
        } 
        parameter.ParameterName = column.ColumnName; 
        parameter.Direction = ParameterDirection.Input; 
        parameter.DbType = column.DataType.GetDbType(); 
        parameter.Value = data[i]; 
 
        if (names.Length > 0) 
        { 
          names.Append(","); 
          values.Append(","); 
        } 
        names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, column.ColumnName)); 
        values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName); 
 
        command.Parameters.Add(parameter); 
      } 
      return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values); 
    } 
  }

L'étape la plus importante ci-dessus consiste à convertir le DataTable en une représentation matricielle d'un tableau, c'est-à-dire un objet[][]. nombre de colonnes. Le tableau est le nombre de lignes, donc la boucle Columns utilise ce dernier tableau comme valeur du paramètre, c'est-à-dire que la valeur du paramètre est un tableau. L'instruction insert n'est pas différente d'une instruction insert normale.

3. Insertion par lots de données SQLite

L'insertion par lots SQLite n'a besoin que de démarrer une transaction. Le principe spécifique de ceci est inconnu.

public sealed class SQLiteBatcher : IBatcherProvider 
  { 
    /// <summary> 
    /// 获取或设置提供者服务的上下文。 
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">每批次写入的数据量。</param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = ServiceContext.Database.CreateConnection()) 
      { 
        DbTransaction transcation = null; 
        try 
        { 
          connection.TryOpen(); 
          transcation = connection.BeginTransaction(); 
          using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) 
          { 
            if (command == null) 
            { 
              throw new BatcherException(new ArgumentException("command")); 
            } 
            command.Connection = connection; 
 
            command.CommandText = GenerateInserSql(ServiceContext.Database, dataTable); 
            if (command.CommandText == string.Empty) 
            { 
              return; 
            } 
 
            var flag = new AssertFlag(); 
            dataTable.EachRow(row => 
              { 
                var first = flag.AssertTrue(); 
                ProcessCommandParameters(dataTable, command, row, first); 
                command.ExecuteNonQuery(); 
              }); 
          } 
          transcation.Commit(); 
        } 
        catch (Exception exp) 
        { 
          if (transcation != null) 
          { 
            transcation.Rollback(); 
          } 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
 
    private void ProcessCommandParameters(DataTable dataTable, DbCommand command, DataRow row, bool first) 
    { 
      for (var c = 0; c < dataTable.Columns.Count; c++) 
      { 
        DbParameter parameter; 
        //首次创建参数,是为了使用缓存 
        if (first) 
        { 
          parameter = ServiceContext.Database.Provider.DbProviderFactory.CreateParameter(); 
          parameter.ParameterName = dataTable.Columns[c].ColumnName; 
          command.Parameters.Add(parameter); 
        } 
        else 
        { 
          parameter = command.Parameters[c]; 
        } 
        parameter.Value = row[c]; 
      } 
    } 
 
    /// <summary> 
    /// 生成插入数据的sql语句。 
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string GenerateInserSql(IDatabase database, DataTable table) 
    { 
      var syntax = database.Provider.GetService<ISyntaxProvider>(); 
      var names = new StringBuilder(); 
      var values = new StringBuilder(); 
      var flag = new AssertFlag(); 
      table.EachColumn(column => 
        { 
          if (!flag.AssertTrue()) 
          { 
            names.Append(","); 
            values.Append(","); 
          } 
          names.Append(DbUtility.FormatByQuote(syntax, column.ColumnName)); 
          values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName); 
        }); 
      return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values); 
    } 
  }

4. Insertion par lots de données MySql

/// <summary> 
  /// 为 MySql.Data 组件提供的用于批量操作的方法。 
  /// </summary> 
  public sealed class MySqlBatcher : IBatcherProvider 
  { 
    /// <summary> 
    /// 获取或设置提供者服务的上下文。 
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">每批次写入的数据量。</param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = ServiceContext.Database.CreateConnection()) 
      { 
        try 
        { 
          connection.TryOpen(); 
          using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) 
          { 
            if (command == null) 
            { 
              throw new BatcherException(new ArgumentException("command")); 
            } 
            command.Connection = connection; 
 
            command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable); 
            if (command.CommandText == string.Empty) 
            { 
              return; 
            } 
            command.ExecuteNonQuery(); 
          } 
        } 
        catch (Exception exp) 
        { 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
 
    /// <summary> 
    /// 生成插入数据的sql语句。 
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="command"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table) 
    { 
      var names = new StringBuilder(); 
      var values = new StringBuilder(); 
      var types = new List<DbType>(); 
      var count = table.Columns.Count; 
      var syntax = database.Provider.GetService<ISyntaxProvider>(); 
      table.EachColumn(c => 
        { 
          if (names.Length > 0) 
          { 
            names.Append(","); 
          } 
          names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, c.ColumnName)); 
          types.Add(c.DataType.GetDbType()); 
        }); 
 
      var i = 0; 
      foreach (DataRow row in table.Rows) 
      { 
        if (i > 0) 
        { 
          values.Append(","); 
        } 
        values.Append("("); 
        for (var j = 0; j < count; j++) 
        { 
          if (j > 0) 
          { 
            values.Append(", "); 
          } 
          var isStrType = IsStringType(types[j]); 
          var parameter = CreateParameter(database.Provider, isStrType, types[j], row[j], syntax.ParameterPrefix, i, j); 
          if (parameter != null) 
          { 
            values.Append(parameter.ParameterName); 
            command.Parameters.Add(parameter); 
          } 
          else if (isStrType) 
          { 
            values.AppendFormat("&#39;{0}&#39;", row[j]); 
          } 
          else 
          { 
            values.Append(row[j]); 
          } 
        } 
        values.Append(")"); 
        i++; 
      } 
      return string.Format("INSERT INTO {0}({1}) VALUES {2}", DbUtility.FormatByQuote(syntax, table.TableName), names, values); 
    } 
 
    /// <summary> 
    /// 判断是否为字符串类别。 
    /// </summary> 
    /// <param name="dbType"></param> 
    /// <returns></returns> 
    private bool IsStringType(DbType dbType) 
    { 
return dbType == DbType.AnsiString || dbType == DbType.AnsiStringFixedLength || dbType == DbType.String || dbType == DbType.StringFixedLength; 
    } 
 
    /// <summary> 
    /// 创建参数。 
    /// </summary> 
    /// <param name="provider"></param> 
    /// <param name="isStrType"></param> 
    /// <param name="dbType"></param> 
    /// <param name="value"></param> 
    /// <param name="parPrefix"></param> 
    /// <param name="row"></param> 
    /// <param name="col"></param> 
    /// <returns></returns> 
    private DbParameter CreateParameter(IProvider provider, bool isStrType, DbType dbType, object value, char parPrefix, int row, int col) 
    { 
      //如果生成全部的参数,则速度会很慢,因此,只有数据类型为字符串(包含&#39;号)和日期型时才添加参数 
      if ((isStrType && value.ToString().IndexOf(&#39;\&#39;&#39;) != -1) || dbType == DbType.DateTime) 
      { 
        var name = string.Format("{0}p_{1}_{2}", parPrefix, row, col); 
        var parameter = provider.DbProviderFactory.CreateParameter(); 
        parameter.ParameterName = name; 
        parameter.Direction = ParameterDirection.Input; 
        parameter.DbType = dbType; 
        parameter.Value = value; 
        return parameter; 
      } 
      return null; 
    } 
  }

L'insertion par lots MySql consiste à écrire toutes les valeurs​​dans les valeurs de l'instruction Ici, par exemple, insérez les valeurs batcher(id, name) (1, '1', 2, '2', 3, '3', ......... 10, '10').

5. Tests

Ensuite, écrivez un cas de test pour voir l'effet de l'utilisation de l'insertion par lots.

    public void TestBatchInsert() 
    { 
      Console.WriteLine(TimeWatcher.Watch(() => 
        InvokeTest(database => 
          { 
            var table = new DataTable("Batcher"); 
            table.Columns.Add("Id", typeof(int)); 
            table.Columns.Add("Name1", typeof(string)); 
            table.Columns.Add("Name2", typeof(string)); 
            table.Columns.Add("Name3", typeof(string)); 
            table.Columns.Add("Name4", typeof(string)); 
 
            //构造100000条数据 
            for (var i = 0; i < 100000; i++) 
            { 
              table.Rows.Add(i, i.ToString(), i.ToString(), i.ToString(), i.ToString()); 
            } 
 
            //获取 IBatcherProvider 
            var batcher = database.Provider.GetService<IBatcherProvider>(); 
            if (batcher == null) 
            { 
              Console.WriteLine("不支持批量插入。"); 
            } 
            else 
            { 
              batcher.Insert(table); 
            } 
 
            //输出batcher表的数据量 
            var sql = new SqlCommand("SELECT COUNT(1) FROM Batcher"); 
            Console.WriteLine("当前共有 {0} 条数据", database.ExecuteScalar(sql)); 
 
          }))); 
    }

Le tableau suivant répertorie le temps nécessaire à chacune des quatre bases de données pour générer 100 000 éléments de données

tr>

Base de données

数据库

耗用时间

MsSql 00:00:02.9376300
Oracle 00:00:01.5155959
SQLite 00:00:01.6275634
MySql 00:00:05.4166891
Prend du temps
MsSql 00:00:02.9376300
Oracle 00:00:01.5155959
SQLite 00:00:01.6275634
MySql 00:00:05.4166891

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn