Heim  >  Artikel  >  Backend-Entwicklung  >  Ausführliche Erläuterung des C#-Beispielcodes zur Implementierung von Big-Data-Batch-Einfügungen in mehreren Datenbanken

Ausführliche Erläuterung des C#-Beispielcodes zur Implementierung von Big-Data-Batch-Einfügungen in mehreren Datenbanken

黄舟
黄舟Original
2017-03-25 11:43:282221Durchsuche

In diesem Artikel wird hauptsächlich C# zur Implementierung von Big-Data-Batch-Einfügungen in mehreren Datenbanken vorgestellt, darunter SqlServer, Oracle, SQLite und MySQL. Interessierte können mehr erfahren.

Bisher wusste ich nur, dass SqlServer das Batch-Einfügen von Daten unterstützt, aber ich wusste nicht, dass Oracle, SQLite und MySQL dies auch unterstützen. Heute werde ich einen Beitrag leisten mehrere Batch-Insertion-Lösungen für Datenbanken.

Zuallererst gibt es in IProvider eine Plug-in-Dienstschnittstelle IBatcherProvider für die Stapeleinfügung. Diese Schnittstelle wurde im vorherigen Artikel erwähnt.

/// <summary> 
  /// 提供数据批量处理的方法。 
  /// </summary> 
  public interface IBatcherProvider : IProviderService 
  { 
    /// <summary> 
    /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">每批次写入的数据量。</param> 
    void Insert(DataTable dataTable, int batchSize = 10000); 
  }

1. SqlServer-Datenstapeleinfügung

SqlServer-Stapeleinfügung ist sehr einfach, verwenden Sie einfach SqlBulkCopy. Das Folgende ist diese Art von Implementierung :

/// <summary> 
  /// 为 System.Data.SqlClient 提供的用于批量操作的方法。 
  /// </summary> 
  public sealed class MsSqlBatcher : IBatcherProvider 
  { 
    /// <summary> 
    /// 获取或设置提供者服务的上下文。 
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">每批次写入的数据量。</param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = (SqlConnection)ServiceContext.Database.CreateConnection()) 
      { 
        try 
        { 
          connection.TryOpen(); 
          //给表名加上前后导符 
          var tableName = DbUtility.FormatByQuote(ServiceContext.Database.Provider.GetService<ISyntaxProvider>(), dataTable.TableName); 
          using (var bulk = new SqlBulkCopy(connection, SqlBulkCopyOptions.KeepIdentity, null) 
            { 
              DestinationTableName = tableName,  
              BatchSize = batchSize 
            }) 
          { 
            //循环所有列,为bulk添加映射 
            dataTable.EachColumn(c => bulk.ColumnMappings.Add(c.ColumnName, c.ColumnName), c => !c.AutoIncrement); 
            bulk.WriteToServer(dataTable); 
            bulk.Close(); 
          } 
        } 
        catch (Exception exp) 
        { 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
  }

Die Verwendung von Transaktionen hat einen gewissen Einfluss auf die Leistung. Wenn Sie Transaktionen verwenden möchten, können Sie SqlBulkCopyOptions.UseInternalTransaction festlegen.

2. Oracle-Daten-Batch-Einfügung

System.Data.OracleClient unterstützt keine Batch-Einfügung, daher können Sie nur Oracle.DataAccess verwenden Komponente als Anbieter.

/// <summary> 
  /// Oracle.Data.Access 组件提供的用于批量操作的方法。 
  /// </summary> 
  public sealed class OracleAccessBatcher : IBatcherProvider 
  { 
    /// <summary> 
    /// 获取或设置提供者服务的上下文。 
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">每批次写入的数据量。</param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = ServiceContext.Database.CreateConnection()) 
      { 
        try 
        { 
          connection.TryOpen(); 
          using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) 
          { 
            if (command == null) 
            { 
              throw new BatcherException(new ArgumentException("command")); 
            } 
            command.Connection = connection; 
            command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable); 
            command.ExecuteNonQuery(); 
          } 
        } 
        catch (Exception exp) 
        { 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
 
    /// <summary> 
    /// 生成插入数据的sql语句。 
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="command"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table) 
    { 
      var names = new StringBuilder(); 
      var values = new StringBuilder(); 
      //将一个DataTable的数据转换为数组的数组 
      var data = table.ToArray(); 
 
      //设置ArrayBindCount属性 
      command.GetType().GetProperty("ArrayBindCount").SetValue(command, table.Rows.Count, null); 
 
      var syntax = database.Provider.GetService<ISyntaxProvider>(); 
      for (var i = 0; i < table.Columns.Count; i++) 
      { 
        var column = table.Columns[i]; 
 
        var parameter = database.Provider.DbProviderFactory.CreateParameter(); 
        if (parameter == null) 
        { 
          continue; 
        } 
        parameter.ParameterName = column.ColumnName; 
        parameter.Direction = ParameterDirection.Input; 
        parameter.DbType = column.DataType.GetDbType(); 
        parameter.Value = data[i]; 
 
        if (names.Length > 0) 
        { 
          names.Append(","); 
          values.Append(","); 
        } 
        names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, column.ColumnName)); 
        values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName); 
 
        command.Parameters.Add(parameter); 
      } 
      return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values); 
    } 
  }

Der wichtigste Schritt oben besteht darin, die Datentabelle in eine Array-Darstellung eines Arrays umzuwandeln, d. h. Objekt[][]. Die Anzahl der Spalten ist die Anzahl der Zeilen, daher verwendet die Schleife Columns das letztere Array als Parameterwert, d. h. der Wert des Parameters ist ein Array. Die INSERT-Anweisung unterscheidet sich nicht von einer normalen INSERT-Anweisung.

3. Batch-Einfügung von SQLite-Daten

Das spezifische Prinzip hierfür ist unbekannt.

public sealed class SQLiteBatcher : IBatcherProvider 
  { 
    /// <summary> 
    /// 获取或设置提供者服务的上下文。 
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">每批次写入的数据量。</param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = ServiceContext.Database.CreateConnection()) 
      { 
        DbTransaction transcation = null; 
        try 
        { 
          connection.TryOpen(); 
          transcation = connection.BeginTransaction(); 
          using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) 
          { 
            if (command == null) 
            { 
              throw new BatcherException(new ArgumentException("command")); 
            } 
            command.Connection = connection; 
 
            command.CommandText = GenerateInserSql(ServiceContext.Database, dataTable); 
            if (command.CommandText == string.Empty) 
            { 
              return; 
            } 
 
            var flag = new AssertFlag(); 
            dataTable.EachRow(row => 
              { 
                var first = flag.AssertTrue(); 
                ProcessCommandParameters(dataTable, command, row, first); 
                command.ExecuteNonQuery(); 
              }); 
          } 
          transcation.Commit(); 
        } 
        catch (Exception exp) 
        { 
          if (transcation != null) 
          { 
            transcation.Rollback(); 
          } 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
 
    private void ProcessCommandParameters(DataTable dataTable, DbCommand command, DataRow row, bool first) 
    { 
      for (var c = 0; c < dataTable.Columns.Count; c++) 
      { 
        DbParameter parameter; 
        //首次创建参数,是为了使用缓存 
        if (first) 
        { 
          parameter = ServiceContext.Database.Provider.DbProviderFactory.CreateParameter(); 
          parameter.ParameterName = dataTable.Columns[c].ColumnName; 
          command.Parameters.Add(parameter); 
        } 
        else 
        { 
          parameter = command.Parameters[c]; 
        } 
        parameter.Value = row[c]; 
      } 
    } 
 
    /// <summary> 
    /// 生成插入数据的sql语句。 
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string GenerateInserSql(IDatabase database, DataTable table) 
    { 
      var syntax = database.Provider.GetService<ISyntaxProvider>(); 
      var names = new StringBuilder(); 
      var values = new StringBuilder(); 
      var flag = new AssertFlag(); 
      table.EachColumn(column => 
        { 
          if (!flag.AssertTrue()) 
          { 
            names.Append(","); 
            values.Append(","); 
          } 
          names.Append(DbUtility.FormatByQuote(syntax, column.ColumnName)); 
          values.AppendFormat("{0}{1}", syntax.ParameterPrefix, column.ColumnName); 
        }); 
      return string.Format("INSERT INTO {0}({1}) VALUES ({2})", DbUtility.FormatByQuote(syntax, table.TableName), names, values); 
    } 
  }

4. MySql-Daten-Batch-Einfügung

/// <summary> 
  /// 为 MySql.Data 组件提供的用于批量操作的方法。 
  /// </summary> 
  public sealed class MySqlBatcher : IBatcherProvider 
  { 
    /// <summary> 
    /// 获取或设置提供者服务的上下文。 
    /// </summary> 
    public ServiceContext ServiceContext { get; set; } 
 
    /// <summary> 
    /// 将 <see cref="DataTable"/> 的数据批量插入到数据库中。 
    /// </summary> 
    /// <param name="dataTable">要批量插入的 <see cref="DataTable"/>。</param> 
    /// <param name="batchSize">每批次写入的数据量。</param> 
    public void Insert(DataTable dataTable, int batchSize = 10000) 
    { 
      Checker.ArgumentNull(dataTable, "dataTable"); 
      if (dataTable.Rows.Count == 0) 
      { 
        return; 
      } 
      using (var connection = ServiceContext.Database.CreateConnection()) 
      { 
        try 
        { 
          connection.TryOpen(); 
          using (var command = ServiceContext.Database.Provider.DbProviderFactory.CreateCommand()) 
          { 
            if (command == null) 
            { 
              throw new BatcherException(new ArgumentException("command")); 
            } 
            command.Connection = connection; 
 
            command.CommandText = GenerateInserSql(ServiceContext.Database, command, dataTable); 
            if (command.CommandText == string.Empty) 
            { 
              return; 
            } 
            command.ExecuteNonQuery(); 
          } 
        } 
        catch (Exception exp) 
        { 
          throw new BatcherException(exp); 
        } 
        finally 
        { 
          connection.TryClose(); 
        } 
      } 
    } 
 
    /// <summary> 
    /// 生成插入数据的sql语句。 
    /// </summary> 
    /// <param name="database"></param> 
    /// <param name="command"></param> 
    /// <param name="table"></param> 
    /// <returns></returns> 
    private string GenerateInserSql(IDatabase database, DbCommand command, DataTable table) 
    { 
      var names = new StringBuilder(); 
      var values = new StringBuilder(); 
      var types = new List<DbType>(); 
      var count = table.Columns.Count; 
      var syntax = database.Provider.GetService<ISyntaxProvider>(); 
      table.EachColumn(c => 
        { 
          if (names.Length > 0) 
          { 
            names.Append(","); 
          } 
          names.AppendFormat("{0}", DbUtility.FormatByQuote(syntax, c.ColumnName)); 
          types.Add(c.DataType.GetDbType()); 
        }); 
 
      var i = 0; 
      foreach (DataRow row in table.Rows) 
      { 
        if (i > 0) 
        { 
          values.Append(","); 
        } 
        values.Append("("); 
        for (var j = 0; j < count; j++) 
        { 
          if (j > 0) 
          { 
            values.Append(", "); 
          } 
          var isStrType = IsStringType(types[j]); 
          var parameter = CreateParameter(database.Provider, isStrType, types[j], row[j], syntax.ParameterPrefix, i, j); 
          if (parameter != null) 
          { 
            values.Append(parameter.ParameterName); 
            command.Parameters.Add(parameter); 
          } 
          else if (isStrType) 
          { 
            values.AppendFormat("&#39;{0}&#39;", row[j]); 
          } 
          else 
          { 
            values.Append(row[j]); 
          } 
        } 
        values.Append(")"); 
        i++; 
      } 
      return string.Format("INSERT INTO {0}({1}) VALUES {2}", DbUtility.FormatByQuote(syntax, table.TableName), names, values); 
    } 
 
    /// <summary> 
    /// 判断是否为字符串类别。 
    /// </summary> 
    /// <param name="dbType"></param> 
    /// <returns></returns> 
    private bool IsStringType(DbType dbType) 
    { 
return dbType == DbType.AnsiString || dbType == DbType.AnsiStringFixedLength || dbType == DbType.String || dbType == DbType.StringFixedLength; 
    } 
 
    /// <summary> 
    /// 创建参数。 
    /// </summary> 
    /// <param name="provider"></param> 
    /// <param name="isStrType"></param> 
    /// <param name="dbType"></param> 
    /// <param name="value"></param> 
    /// <param name="parPrefix"></param> 
    /// <param name="row"></param> 
    /// <param name="col"></param> 
    /// <returns></returns> 
    private DbParameter CreateParameter(IProvider provider, bool isStrType, DbType dbType, object value, char parPrefix, int row, int col) 
    { 
      //如果生成全部的参数,则速度会很慢,因此,只有数据类型为字符串(包含&#39;号)和日期型时才添加参数 
      if ((isStrType && value.ToString().IndexOf(&#39;\&#39;&#39;) != -1) || dbType == DbType.DateTime) 
      { 
        var name = string.Format("{0}p_{1}_{2}", parPrefix, row, col); 
        var parameter = provider.DbProviderFactory.CreateParameter(); 
        parameter.ParameterName = name; 
        parameter.Direction = ParameterDirection.Input; 
        parameter.DbType = dbType; 
        parameter.Value = value; 
        return parameter; 
      } 
      return null; 
    } 
  }

MySql-Batch-Einfügung ist das Schreiben aller Werte die Werte der Anweisung. Fügen Sie hier beispielsweise batcher(id, name) value(1, '1', 2, '2', 3, '3', .........) ein. 10, '10').

5. Testen

Als nächstes schreiben Sie einen Testfall, um die Auswirkung der Stapeleinfügung zu sehen.

    public void TestBatchInsert() 
    { 
      Console.WriteLine(TimeWatcher.Watch(() => 
        InvokeTest(database => 
          { 
            var table = new DataTable("Batcher"); 
            table.Columns.Add("Id", typeof(int)); 
            table.Columns.Add("Name1", typeof(string)); 
            table.Columns.Add("Name2", typeof(string)); 
            table.Columns.Add("Name3", typeof(string)); 
            table.Columns.Add("Name4", typeof(string)); 
 
            //构造100000条数据 
            for (var i = 0; i < 100000; i++) 
            { 
              table.Rows.Add(i, i.ToString(), i.ToString(), i.ToString(), i.ToString()); 
            } 
 
            //获取 IBatcherProvider 
            var batcher = database.Provider.GetService<IBatcherProvider>(); 
            if (batcher == null) 
            { 
              Console.WriteLine("不支持批量插入。"); 
            } 
            else 
            { 
              batcher.Insert(table); 
            } 
 
            //输出batcher表的数据量 
            var sql = new SqlCommand("SELECT COUNT(1) FROM Batcher"); 
            Console.WriteLine("当前共有 {0} 条数据", database.ExecuteScalar(sql)); 
 
          }))); 
    }

Die folgende Tabelle listet die Zeit auf, die jede der vier Datenbanken benötigt, um 100.000 Daten zu generieren

tr>
Datenbank

数据库

耗用时间

MsSql 00:00:02.9376300
Oracle 00:00:01.5155959
SQLite 00:00:01.6275634
MySql 00:00:05.4166891
Zeitaufwendig
MsSql 00:00:02.9376300
Oracle 00:00:01.5155959
SQLite 00:00:01.6275634
MySql 00:00:05.4166891

Das obige ist der detaillierte Inhalt vonAusführliche Erläuterung des C#-Beispielcodes zur Implementierung von Big-Data-Batch-Einfügungen in mehreren Datenbanken. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn