首页 >数据库 >mysql教程 >大数据量的处理

大数据量的处理

WBOY
WBOY原创
2016-06-07 15:29:051501浏览

最近做的项目中涉及到大数据量的问题,具体问题是:监测数字电视的信号,对传输的码流进行指标监测,每秒监测到20000个流,每个流对应着20多个指标,每秒存储一次将这20000流存储起来,需要保存24小时的数据。 这个问题研究了好几天: 一、文件写入存储:但

最近做的项目中涉及到大数据量的问题,具体问题是:监测数字电视的信号,对传输的码流进行指标监测,每秒监测到20000个流,每个流对应着20多个指标,每秒存储一次将这20000流存储起来,需要保存24小时的数据。

这个问题研究了好几天:

一、文件写入存储:但是如果将一天的17亿条记录都写入到一个文件里,没试过,相信会很慢,而且查询的时候会更慢。如果写入到多个文件,按照流ID可以将数据拆成20000个分类,同时对20000个文件执行写入操作也不现实。

二、数据库存储:文件存储的方式pass掉了之后开始考虑数据库存储

1、首先我用的Oracle进行性能测试:

将表按照流ID进列表分区,分为20000个区,然后每个分区内存储86400条数据(也就是该流从一天的第1秒到86400秒对应的指标数据),需要有索引,主键是全局索引,其余的列我又建了4个分区索引。

第一步创建6个表空间,保证每个表空间都能拓展到32GB大小(Oracle的表空间最大能拓展到32GB)

第二步要创建这个分区表:

-- Create table
create table AAA
(
  ID             number(8),
  StreamID       number(8),
  StreamType     number(1),
  FAvailability  number(5),
  Bandwidth      number(4),
  ValidBandwidth number(4),
  MDI_DF         number(5),
  MDI_MLR        number(5),
  Delay_Time     number(5),
  IPInterval     number(5),
  IPJitter       number(5),
  Time           date,
  MLT15          number(5),
  MLT24          number(5),
  MLS            number(5),
  SliceNum       number(5),
  CachedTime     number(5),
  StuckTime      number(5),
  GetSliceErr    number(5),
  RetransmitRate number(5),
  RepeatRate     number(5),
  SecondsFlag    number(5)
)
partition by list(SecondsFlag)  
(  
   partition p1 values(1) tablespace tbs_haicheng 
  
);  
第三步再为t_stream表创建19999个分区:
DECLARE
parName varchar2(100);
sql_str varchar2(500);
BEGIN
  FOR  I  IN 2..20000 LOOP
    parName:='p'||I;
    sql_str:='ALTER TABLE aaa ADD partition'||' p'||I|| ' VALUES('||I||')';
    execute immediate sql_str;
    END LOOP;
  END; 

第四步为t_stream创建4个分区索引:
-- Create/Recreate indexes 
create index LOCAL_INDEX_REPEATRATE on AAA (REPEATRATE);
create index LOCAL_INDEX_SECONDSFLAG on AAA (SECONDSFLAG);
create index LOCAL_INDEX_STREAM on AAA (STREAMID);
create index LOCAL_INDEX_TIME on AAA (TIME);

第五步创建一个表结构与t_stream相似的表:

create table a
(
  ID             number(8),
  StreamID       number(8),
  StreamType     number(1),
  FAvailability  number(5),
  Bandwidth      number(4),
  ValidBandwidth number(4),
  MDI_DF         number(5),
  MDI_MLR        number(5),
  Delay_Time     number(5),
  IPInterval     number(5),
  IPJitter       number(5),
  Time           date,
  MLT15          number(5),
  MLT24          number(5),
  MLS            number(5),
  SliceNum       number(5),
  CachedTime     number(5),
  StuckTime      number(5),
  GetSliceErr    number(5),
  RetransmitRate number(5),
  RepeatRate     number(5),
  SecondsFlag    number(5)
)
partition by list (SECONDSFLAG)
(
  partition P1 values (1)
    tablespace IPVIEW1
    pctfree 10
    initrans 1
    maxtrans 255
    storage
    (
      initial 64K
      minextents 1
      maxextents unlimited
    )
);
alter table AAA
  add constraint ID primary key (ID)
  using index 
  tablespace TBS_HAICHENG
  pctfree 10
  initrans 2
  maxtrans 255
  storage
  (
    initial 64K
    minextents 1
    maxextents unlimited
  );

第六步向表A中插入86400条数据:
declare
begin
  for i in 1..86400 loop
  insert into a
  (id, streamid, streamtype, favailability, bandwidth, validbandwidth, mdi_df, mdi_mlr, delay_time, ipinterval, ipjitter, time, mlt15, mlt24, mls, slicenum, cachedtime, stucktime, getsliceerr, retransmitrate, repeatrate)
values
  (seq_aaa.nextval, 111, 1, 1111, 1111, 1111, 1111, 1111, 1111, 1111, 1111, SYSDATE, 1111, 1111, 1111, 1111, 1111, 1111, 1111, 1111, 1111); 
  end loop;
  end ;

第七步:向t_stream表中copy数据
declare
begin
  FOR I IN 1..20000 LOOP
  insert into aaa
         select seq_aaa.nextval, streamid, streamtype, favailability, bandwidth, validbandwidth, mdi_df, mdi_mlr, delay_time, ipinterval, ipjitter, time, mlt15, mlt24, mls, slicenum, cachedtime, stucktime, getsliceerr, retransmitrate, repeatrate,I from a;
    commit;
    END LOOP;
  end;

注意:实际上,这一部分我是将1-20000分成20份 ,开了20个线程同时执行,每个线程负责向1000个分区中copy数据(向每个分区录入86400条),这时候明白我为什么要创建表A了吧!

然后,就不管他了,玩游戏看电影去了,两天假结束,想起来去看了一眼插入到什么程度了,发现磁盘有的线程还在执行,有的线程由于表空间写满到32Gb无法再拓展而终止了。

看了一下序列已经被调用到6亿多,说明插入进去了6亿多条是数据。

首先是数据占用的空间问题,与估算的相差太多,我开始插入了上百万的数据,通过查看这上百万数据占用的空间估算出17亿数据占用的空间在180G左右,,而我准备出将近200G的磁盘空间以为足够了呢,结果差了这么多,分析下原因,最主要的一点是索引占用的空间:

我原来在预估的时候忘记了为表创建索引,以为没什么大影响,有10G空间足够索引占用了,可是事实大错特错了,通过下面的语句查看了下空间的占用情况:

1、表占用空间(0.008G   这是A表里的86400条数据占用的空间)
select segment_name, sum(bytes)/1024/1024/1024 GB from user_segments where segment_type='TABLE'  group by segment_name;
2、索引占用空间(17.24GB)
select segment_name ,sum(bytes)/1024/1024/1024 GB from user_segments where segment_type IN('INDEX PARTITION','INDEX') group by segment_name;
3、分区表TABLE PARTITION占用空间(63.5GB)
select segment_name,sum(bytes)/1024/1024/1024 GB from user_segments where segment_type='TABLE PARTITION' group by segment_name;
结果分别如下:

 

\
 

\

\

注:第三个图中的SEGMENT_NAME的值为T_STREAM 是上文创建的那个分区表。

我们看到结果发现,实际上表数据占用的空间是64GB,跟原来估算的几乎一致,多出来的部分是被索引占了,总共占用了将近100GB的空间,吓死哥了尴尬

缘何索引占用了这么多的空间?可能是我创建索引的方式不对?后续研究补充!

我们的程序采用的策略是首先将17亿条记录手动录入到数据库中,然后当监测到流指标时候对响应的数据进行update操作,也就是一般每秒执行20000个update语句,测试下性能:

 

declare
j  number ;
begin
  for i in 2000000..2020000 loop
update t_stream
   set 
       streamid = 2,
       streamtype = 2,
       favailability = 2,
       bandwidth = 2,
       validbandwidth = 2,
       mdi_df = 2,
       mdi_mlr = 2,
       delay_time = 2,
       ipinterval = 2,
       ipjitter = 2,
       time = sysdate,
       mlt15 = 2,
       mlt24 = 2,
       mls = 2,
       slicenum = 2,
       cachedtime = 2,
       stucktime = 2,
       getsliceerr = 2,
       retransmitrate = 2,
       repeatrate = 2
        where  id = i ;
  end loop;
  end ;

这种单纯以主键进行修改的时候他要进行全表扫描(所有的分区需要扫描到),效率很低,大约70s执行完,这才只是6亿数据。

所以我们要让他在执行update语句的时候尽量扫描单个分区,也就是说把那个分区字段当参数传递过来,如下语句所示:

declare
j  number ;
begin
  j:=1;
  for i in 2000000..2020000 loop
update aaa
   set 
       streamid = 2,
       streamtype = 2,
       favailability = 2,
       bandwidth = 2,
       validbandwidth = 2,
       mdi_df = 2,
       mdi_mlr = 2,
       delay_time = 2,
       ipinterval = 2,
       ipjitter = 2,
       time = sysdate,
       mlt15 = 2,
       mlt24 = 2,
       mls = 2,
       slicenum = 2,
       cachedtime = 2,
       stucktime = 2,
       getsliceerr = 2,
       retransmitrate = 2,
       repeatrate = 2
        where  id = i ;
        j:=j+1;
  end loop;
  end ;

测试这个代码块执行时间为3s,而且虽然现在是6亿数据,但是就是17亿数据执行时间也差不多是3s的,因为它扫描的永远只是20000个分区。而且我的电脑才四核处理器,服务器上24核呢。执行的肯定会比我电脑快多了吧,所以实现预定需求不成问题。

2、后来由于Oracle是收费的,不让用了,汗一个,接下来研究Mysql。

Mysql在建表以及分区的时候遇到两个问题:

问题一:建分区的时候总提示语法错误,无论怎么改都不让我创建分区,Mysql这么火的数据库不可能不支持分区啊。后来一查才知道Mysq5.0版本不支持分区,是从5.1才开始支持表的分区的尴尬,于是把我的数据库版本更换成5.5的,分区成功创建。

问题二:在Mysql上建20000个分区的过程中发现每次执行到中途就报错停止了,查询了解到Mysql的表分区数量是有限制的,每个表最多能有1024个分区。

这对我们影响不太大,大不了我就建1000个分区,每个分区存放86400*20条数据,相信每个分区百万条数据不算什么。

3、首先sqlite数据库不支持分区只好建立20000个表,由于sqlite不支持存储过程,我也没找到sqlite怎样写循环语句。但是建立20000个表 和 录入那么多的数据我们不可能一条一条的去执行写语句执行,所以需要另想办法,我的解决过程:

首先我想到可以用调用批处理文件的方式插入数据和建表:

建一个 批量建表.bat文件,文件内容如下:

@ECHO OFF 
For /L %%i in (1,1,20000) do (sqlite3.exe hc.db<createTable.bat bbb_%%i) 
pause 

createTable.bat 内容如下:

create table 1%(ID integer primary key autoincrement,
  STREAMID       NUMBER(10),
  STREAMTYPE     NUMBER(1),
  FAVAILABILITY  NUMBER(5),
  BANDWIDTH      NUMBER(4),
  VALIDBANDWIDTH NUMBER(4),
  MDIDF          NUMBER(5),
  MDIMLR         NUMBER(5),
  DELAY_TIME     NUMBER(5),
  IPINTERVAL     NUMBER(5),
  IPJITTER       NUMBER(5),
  TIME           DATE,
  MLT15          NUMBER(5),
  MLT24          NUMBER(5),
  MLS            NUMBER(5),
  SLICENUM       NUMBER(5),
  CACHEDTIME     NUMBER(5),
  STUCKTIME      NUMBER(5),
  GETSLICEERR    NUMBER(5),
  RETRANSMITRATE NUMBER(5),
  REPEATRATE     NUMBER(5),
  SECONDSFLAG    NUMBER(5),
  PART    NUMBER(5)
);

问题出现了,在执行批量建表.bat的时候提示sqlite语法错误。至今也没找到原因:

问题肯定是出现在传递的动态参数上,createTable.bat成功的接到了参数,语句在sqlite中执行不报错,放在bat里就报错。 所以第一次批量建表没成功。

那就用咱们的老本行,写JAVA程序:

需要一个驱动包:sqlitejdbc-v033-nested.jar。

代码如下:

import java.sql.*;
import org.sqlite.JDBC;
/**
 * sqlite创建数据库以及批量建表
 * @time 2014-01-07
 * @author HaiCheng
 *
 */
public class createTable {
	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		try{
			//1,保证SQLite数据库文件的路径首字符为小写,否则报错
			String thisPath = "e:/haicheng.db";
			String sql = "jdbc:sqlite://"+thisPath;//windows && linux都适用
			 //2,连接SQLite的JDBC
			 Class.forName("org.sqlite.JDBC"); 
			 //建立一个数据库名haicheng.db的连接,如果不存在就在当前目录下自动创建
			 Connection conn = DriverManager.getConnection(sql);
			 //3,创建表
			 Statement stat = conn.createStatement();
			 for(int i=1 ;i<=20000;i++){
			 String sql1="  create table bbb"+i+"   " +
			 				  " 	(" +
							  " ID             INTEGER primary key autoincrement," +
							  "  STREAMID       NUMBER(10)," +
							  "  STREAMTYPE     NUMBER(1)," +
							  "  FAVAILABILITY  NUMBER(5)," +
							  "  BANDWIDTH      NUMBER(4)," +
							  "  VALIDBANDWIDTH NUMBER(4)," +
							  "  MDI_DF         NUMBER(5)," +
							  "  MDI_MLR        NUMBER(5)," +
							  "  DELAY_TIME     NUMBER(5)," +
							  "  IPINTERVAL     NUMBER(5)," +
							  "  IPJITTER       NUMBER(5)," +
							  "  TIME           DATE," +
							  "  MLT15          NUMBER(5)," +
							  "  MLT24          NUMBER(5)," +
							  "  MLS            NUMBER(5)," +
							  "  SLICENUM       NUMBER(5)," +
							  "  CACHEDTIME     NUMBER(5)," +
							  "  STUCKTIME      NUMBER(5)," +
							  "  GETSLICEERR    NUMBER(5)," +
							  "  RETRANSMITRATE NUMBER(5)," +
							  "  REPEATRATE     NUMBER(5)," +
							  "  SECONDSFLAG    NUMBER(5)," +
							  "  PART    NUMBER(5)" +
							  " 	);";
			 System.out.println(sql1);
			 String sql2="CREATE INDEX index_flag"+i+" ON bbb"+i+"(SECONDSFLAG);";
			 String sql3="CREATE INDEX index_part"+i+" ON bbb"+i+"(PART);";
			 stat.executeUpdate( sql1 );
			 stat.executeUpdate( sql2 );
			 stat.executeUpdate( sql3 );
			 }
			 stat.close();
			 conn.close(); //结束数据库的连接 
		 }
		 catch( Exception e )
		 {
			 e.printStackTrace ( );
		 }
	}

}
import java.sql.*;
import org.sqlite.JDBC;
/**
 * 向第一个表中循环录入数据
 * @author HaiCheng
 *
 */
public class insertData {
	public static void main(String[] args) throws Exception {
		try{
			//1,保证SQLite数据库文件的路径首字符为小写,并且路径为unix路径
			String thisPath = "e:/haicheng.db";
			String sql = "jdbc:sqlite://"+thisPath;//windows && linux都适用
			//2,连接SQLite的JDBC
			Class.forName("org.sqlite.JDBC"); 
			//建立一个数据库名haicheng.db的连接,如果不存在就在当前目录下自动创建
			Connection conn = DriverManager.getConnection(sql);

			//4,插入一条数据
			for(int i=1;i<=86400;i++){
				 	PreparedStatement prep = conn.prepareStatement("insert into bbb1(STREAMID) values (?);");
				    prep.setInt(1, 0);
				    prep.addBatch();
				    conn.setAutoCommit(false);
				    prep.executeBatch();
			 }
			 conn.setAutoCommit(true);
			 stat.close();
			 conn.close(); //结束数据库的连接 
			 System.out.println("数据插入成功");
		 }
		 catch( Exception e )
		 {
			 System.out.println("数据插入异常");
			 e.printStackTrace ( );
		 }
	}

}
import java.sql.*;
import org.sqlite.JDBC;
/**
 * 向其余19999个表中批量拷贝数据
 * @author HaiCheng
 *
 */
public class copyData {
	public static void main(String[] args) throws Exception {
		try{
			 //1,保证SQLite数据库文件的路径首字符为小写,并且路径为unix路径
			 String thisPath = "e:/haicheng.db";
			 String sql = "jdbc:sqlite://"+thisPath;//windows && linux都适用
			 //2,连接SQLite的JDBC
			 Class.forName("org.sqlite.JDBC"); 
			 //建立一个数据库名haicheng.db的连接,如果不存在就在当前目录下自动创建
			 Connection conn = DriverManager.getConnection(sql);
			 //3,创建表
			 Statement stat = conn.createStatement();
			 for(int i=2;i<=20000;i++){
			 String sql1="insert into bbb"+i+"  select * from  bbb1";
			 System.out.println(sql1);
			 stat.execute(sql1);
			 }
			 stat.close();
			 conn.close(); //结束数据库的连接 
			 System.out.println("数据插入成功");
		 }
		 catch( Exception e )
		 {
			 System.out.println("数据插入异常");
			 e.printStackTrace ( );
		 }
	}

}
依次执行这三个类,当执行第三个类的时候也就是批量向数据库中录入数据的时候,当数据文件大小达到2G的临界点的时候(不同方式测试多遍都是这种情况),再继续写入数据,那么数据文件就会损坏(文件大小都变了,从2GB变成1MB了)。

分析各种原因:

(1)、正在写入数据的时候断电(排除,没有断电)

(2)、磁盘有坏道(排除,在磁盘中放些其他的文件,换一段空间存储这个数据同样到2GB崩溃)

(3)、数据文件所在磁盘空间不足(排除,硬盘空间足够、sqlite也不像Oracle那样有着表空间的概念)

最终我也没找到什么原因,发帖求助。

-------------------------------------------------------------------------------------------------------------------------

尴尬上面那些还是年前写的东西,也没有写完。最终是sqlite的问题没有解决。目前还是用着Mysql

声明:
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn