业务:需要从一个数据库查询百万级数据,在java程序中插入到另一个oracle数据库中
代码:
private final int persize = 1000;
/**
* 推送数据-流程
* @param tableCode 表名
* @param startTime 开始时间
* @param endTime 结束时间
* @return
*/
public boolean pushFrData(String username,String tableCode,String tableName,String startTime,String endTime){
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
System.out.println("导入数据到名录库!");
boolean boo = false;
//表名集合
String [] str = tableCode.split(",");
String [] names = tableName.split(",");
startTime = startTime==""?"2000-01-01":startTime;
endTime = endTime==""?getCurrentDate():endTime;
//System.out.println("service 时间 "+startTime+" > "+endTime);
String start_Time = "to_date('"+startTime+"','%Y-%m-%d')";
String end_Time = "to_date('"+endTime+"','%Y-%m-%d')";
System.out.println("选择推送 "+str.length+" 张表");
//遍历表名集合
for(int i = 0;i<str.length;i++){
System.out.println("所选数据表: "+str[i]);
//字段集合
String [] arr = fillService.getIdenCode(str[i]);
String iden_code = "";
//遍历字段
for(int j = 0;j<arr.length;j++){
iden_code += ""+arr[j]+",";
}
//表字段
iden_code = iden_code.substring(0,iden_code.length()-1);
//System.out.println(str[i]+"总共 "+arr.length+" 个字段!");
//得到推送数据集合
String sql = "select count(*) from "+str[i] +" where s_ext_timestamp >= "+start_Time+" and s_ext_timestamp < "+end_Time;
System.out.println(sql);
int table_size = Integer.valueOf(frDao.query(sql).get(0).toString());
//System.out.println(table_size/persize);
int times = table_size%persize==0?table_size/persize:table_size/persize+1;
for(int t = 1; t <= times;t++){
int start = (t-1) * persize;
List<Object[]> lists = getData(str[i], iden_code,startTime,endTime,start);
//推送数据
System.out.println("准备导出第 "+t+" 批数据");
push(lists,str[i],iden_code);
System.out.println("已导出第 "+t+" 批数据");
}
try{
if(table_size > 0){
addLog(username,str[i].toString(),names[i].toString(),table_size,"1",sdf.parse(startTime),sdf.parse(endTime));
}
}catch (ParseException e){
System.out.println("日期格式转换异常");
}
}
return boo;
}
/**
* 推送数据
* @param lists 数据集
* @param table 表名
* @param iden_code 字段集
*/
private int push(List<Object []> lists,String table,String iden_code){
boolean boo = false;
int count = 0;
//遍历数据结果集
if(lists.size()>0){
for(int k = 0;k < lists.size();k++){
Object [] obj = lists.get(k);
String val = ""; //将数据转换成String类型
for(Object s:obj){
//val += "'"+s.toString()+"',";
if(s != null){
val += "'"+s.toString()+"'|";
}else{
val += "''|";
}
}
val = val.substring(0,val.length()-1);
String etpsid = "";
String [] iden = iden_code.split(",");
String [] value = val.split("\\|");
String val2 = "";//格式化数据(日期)
if(iden.length == value.length){
//格式化sql语句的时间
for(int i = 0;i<iden.length;i++){
//判断字段是否是date类型
if(getDateType(table, iden[i])){
//格式化字符串 防止出现datetime类型 1900-01-01 00:00:00.0的情况
if(value[i].length() > 4){
String vv = value[i].substring(value[i].length()-3,value[i].length());
if(vv.contains(".")){
val2 += "to_date("+value[i].substring(0,value[i].length()-3)+"','YYYY-MM-DD HH24:MI:SS'),";
}else{
val2 += "to_date("+value[i]+",'YYYY-MM-DD HH24:MI:SS'),";
}
}else{
val2 += "'',";
}
}else{
val2 += value[i]+",";
}
if("ETPSID".equals(iden[i])){
etpsid = value[i];
}
}
val2 = val2.substring(0,val2.length()-1);
}else{
/*System.out.println(iden.length+" : "+value.length);
for(int j = 0 ;j< value.length;j++){
System.out.println(value[j]);
}*/
System.out.println("推送数据和字段不一致");
}
String mlk_table = getMlkTable(table);
String mlk_code = getMlkCode(iden_code,table);
//插入数据sql
//String sql = "insert into "+table+"("+iden_code+") values ("+val2+")";
//生成流水号
String uuid = UUID.randomUUID().toString();
uuid = uuid.replace("-","");
val2 += ",'"+uuid+"'";
String sql = "insert into "+mlk_table+"("+mlk_code+") values ("+val2+")";
if(etpsid != ""){
//System.out.println(etpsid);
//工商增量数据按日依"企业唯一标识"为关键字,更新、追加至名录库表中
String s_sql = "select * from "+mlk_table+" where 企业唯一标识 = "+etpsid;
//System.out.println(s_sql);
int s = mlDao.query(s_sql).size();
if (s > 0){
String update_sql = "";
String [] update_code = mlk_code.split(",");
for(int j = 0;j < iden_code.split(",").length;j++){
//判断字段是否是date类型
if(getDateType(table, iden[j])){
//格式化字符串 防止出现datetime类型 1900-01-01 00:00:00.0的情况
if(value[j].length() > 4){
String vv = value[j].substring(value[j].length() - 3, value[j].length());
if(vv.contains(".")){
update_sql += update_code[j]+" = "+ "to_date("+value[j].substring(0, value[j].length() - 3)+"','YYYY-MM-DD HH24:MI:SS'),";
}else{
update_sql += update_code[j]+" = "+ "to_date("+value[j]+",'YYYY-MM-DD HH24:MI:SS'),";
}
}else{
update_sql += update_code[j]+" = "+ "'',";
}
}else{
update_sql += update_code[j]+" = "+ value[j]+",";
}
//update_sql += update_code[j]+" = "+update_val[j]+",";
}
update_sql += "流水号 = '"+uuid+"'";
update_sql = "update "+mlk_table+" set "+update_sql+" where 企业唯一标识 = "+etpsid;
//System.out.println("/////////// /\n"+update_sql+"\n");
sql = update_sql;
System.out.println("更新 "+mlk_table+" 标识 "+etpsid);
}else{
System.out.println("插入 "+mlk_table+" 标识 "+etpsid);
}
}
//System.out.println(sql);
try{
//循环执行sql
mlDao.execute(sql);
count++;
System.out.println(table+" 推送第 "+count+" 条");
}catch (Exception e){
System.out.println("sql执行异常!");
}
}
System.out.println(table+" 表共插入 "+count+" 条数据!");
}else{
System.out.println(table+" 表共插入 "+count+" 条数据!");
System.out.println("导入数据为空!");
}
return count;
}
PHP中文网2017-04-18 10:56:09
If you want to read, you can read it through multiple threads. If you want to insert, please see the following link:
https://segmentfault.com/sear...
https://segmentfault.com/sear...
PHP中文网2017-04-18 10:56:09
1. You can use batch processing to save data to the database, such as processing 100 inserted records at a time.
2. Don’t process all the data at once. You can divide 1W pieces of data into two threads for processing. This can make full use of the CPU and colleagues will not cause problems. Big blockage
高洛峰2017-04-18 10:56:09
It is recommended to use the batch processing mode of JDBC and search for the following keywords:
addBatch(String query)
executeBatch()
It is recommended to submit transactions around 1,000 as a batch.
巴扎黑2017-04-18 10:56:09
Lazy man’s method:
Establish a distributed link in the target oracle database and directly insert into the target table select sql statement