在http://blog.csdn.net/sunflower_cao/article/details/28266939 写过可以通过继承 Writable, DBWritable实现在reduce过程中讲结果写入到mysql数据库里边,但是一直有一个问题就是只能实现insert 没法去更新已经存在的数据,这就导致不同的mapreduce程序获得的数据只能插入到不同的数据库中 在使用的时候需要建立view或者使用复杂的sql语句去查询,今天调查了下,发现可以通过重写DBOutputFormat
上代码:
TblsWritable.java
import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import java.sql.PreparedStatement;import java.sql.ResultSet;import java.sql.SQLException;import org.apache.hadoop.io.Writable;import org.apache.hadoop.mapreduce.lib.db.DBWritable;/** * 重写DBWritable * * @author caozw TblsWritable需要向mysql中写入数据 */public class TblsWritable implements Writable, DBWritable { String initTime; String new_android_user; String new_ios_user; String new_total_user; String iosUserTotal; String androidUserTotal; String userTotal; public TblsWritable() { } public TblsWritable(String initTime, String new_android_user, String new_ios_user, String new_total_user, String iosUserTotal, String androidUserTotal, String userTotal) { this.initTime = initTime; this.new_android_user = new_android_user; this.new_ios_user = new_ios_user; this.new_total_user = new_total_user; this.iosUserTotal = iosUserTotal; this.androidUserTotal = androidUserTotal; this.userTotal = userTotal; } @Override public void write(PreparedStatement statement) throws SQLException { statement.setString(1, this.new_android_user); statement.setString(2, this.new_ios_user); statement.setString(3, this.new_total_user); statement.setString(4, this.androidUserTotal); statement.setString(5, this.iosUserTotal); statement.setString(6, this.userTotal); statement.setString(7, this.initTime); } @Override public void readFields(ResultSet resultSet) throws SQLException { this.new_android_user = resultSet.getString(1); this.new_ios_user = resultSet.getString(2); this.new_total_user = resultSet.getString(3); this.androidUserTotal = resultSet.getString(4); this.iosUserTotal = resultSet.getString(5); this.userTotal = resultSet.getString(6); this.initTime = resultSet.getString(7); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(this.new_android_user); out.writeUTF(this.new_ios_user); out.writeUTF(this.new_total_user); out.writeUTF(this.androidUserTotal); out.writeUTF(this.iosUserTotal); out.writeUTF(this.userTotal); out.writeUTF(this.initTime); } @Override public void readFields(DataInput in) throws IOException { this.new_android_user = in.readUTF(); this.new_ios_user = in.readUTF(); this.new_total_user = in.readUTF(); this.androidUserTotal = in.readUTF(); this.iosUserTotal = in.readUTF(); this.userTotal = in.readUTF(); this.initTime = in.readUTF(); } public String toString() { return new String(this.initTime + " " + this.new_android_user + " " + this.new_ios_user + " " + this.new_total_user + " " + this.androidUserTotal + " " + this.iosUserTotal + " " + this.userTotal); }}
WriteDataToMysql.java
import java.io.IOException;import java.text.ParseException;import java.text.SimpleDateFormat;import java.util.Date;import java.util.Iterator;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;/** * 将mapreduce的结果数据写入mysql中 * * @author caozw */public class WriteDataToMysql { public static String[] fieldNames = { "INITTIME", "NEW_ANDROID_USER", "NEW_IOS_USER", "NEW_USER_TOTAL", "TOTAL_ANDROID_USER", "TOTAL_IOS_USER", "TOTAL_USER" }; public static String table = "USER_INFO_STATIC"; public static class ConnMysqlMapper extends Mapper<longwritable text intwritable> { enum Counter { LINESKIP, } private final static IntWritable one = new IntWritable(1); private final static IntWritable zero = new IntWritable(0); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { try { String line = value.toString(); String[] strings = line.split("/t"); String initTime = strings[1]; String devType = strings[4]; if (initTime.length() == 19) { SimpleDateFormat sdf = new SimpleDateFormat( "yyyy-MM-dd HH:mm:ss"); Date date = sdf.parse(initTime); if ("1".equals(devType)) { context.write(new Text(initTime.substring(0, 10)), one); context.write(new Text(initTime.substring(0, 10)), zero); } } else { // System.err.println(initTime); context.getCounter(Counter.LINESKIP).increment(1); } // } catch (ArrayIndexOutOfBoundsException e) { } catch (ArrayIndexOutOfBoundsException e) { context.getCounter(Counter.LINESKIP).increment(1); return; } catch (ParseException e) { context.getCounter(Counter.LINESKIP).increment(1); return; } } } public static class ConnMysqlReducer extends Reducer<text intwritable tblswritable> { static int iosUserTotal = 0; static int androidUserTotal = 0; public void reduce(Text key, Iterable<intwritable> values, Context context) throws IOException, InterruptedException { int android = 0; int ios = 0; int total = 0; for (Iterator<intwritable> itr = values.iterator(); itr.hasNext();) { total++; if (0 == itr.next().get()) { android++; } else { ios++; } } iosUserTotal += ios; androidUserTotal += android; /* * System.err.println(key.toString() + ":" + String.valueOf(android) * + ":" + String.valueOf(ios) + ":" + String.valueOf(total)); */ context.write( new TblsWritable(key.toString(), String.valueOf(android), String.valueOf(ios), String.valueOf(total), String .valueOf(androidUserTotal), String .valueOf(iosUserTotal), String .valueOf(androidUserTotal + iosUserTotal)), null); } } public static void main(String args[]) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://127.0.0.1:3306/XINGXUNTONG", "hadoop", "123456"); Job job = new Job(conf, "test mysql connection"); job.setJarByClass(WriteDataToMysql.class); job.setMapperClass(ConnMysqlMapper.class); job.setReducerClass(ConnMysqlReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setSpeculativeExecution(false); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(MysqlDBOutputFormat.class); //job.setOutputFormatClass(DBOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); // DBOutputFormat.setOutput(job, "test", "initTime", "new_user_total"); //DBOutputFormat.setOutput(job, table, fieldNames); MysqlDBOutputFormat.setOutput(job, table, fieldNames); System.exit(job.waitForCompletion(true) ? 0 : 1); }}</intwritable></intwritable></text></longwritable>
MysqlDBOutputFormat.java
/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */import java.io.IOException;import java.sql.Connection;import java.sql.PreparedStatement;import java.sql.SQLException;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.apache.hadoop.classification.InterfaceAudience;import org.apache.hadoop.classification.InterfaceStability;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.JobContext;import org.apache.hadoop.mapreduce.OutputCommitter;import org.apache.hadoop.mapreduce.OutputFormat;import org.apache.hadoop.mapreduce.RecordWriter;import org.apache.hadoop.mapreduce.TaskAttemptContext;import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;import org.apache.hadoop.mapreduce.lib.db.DBWritable;import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.StringUtils;/** * A OutputFormat that sends the reduce output to a SQL table. * <p> * {@link MysqlDBOutputFormat} accepts <key> pairs, where * key has a type extending DBWritable. Returned {@link RecordWriter} * writes <b>only the key</b> to the database with a batch SQL query. * */@InterfaceAudience.Public@InterfaceStability.Stablepublic class MysqlDBOutputFormat<k extends dbwritable v> extends OutputFormat<k> { private static final Log LOG = LogFactory.getLog(MysqlDBOutputFormat.class); public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException {} public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { return new FileOutputCommitter(FileOutputFormat.getOutputPath(context), context); } /** * A RecordWriter that writes the reduce output to a SQL table */ @InterfaceStability.Evolving public class DBRecordWriter extends RecordWriter<k v> { private Connection connection; private PreparedStatement statement; public DBRecordWriter() throws SQLException { } public DBRecordWriter(Connection connection , PreparedStatement statement) throws SQLException { this.connection = connection; this.statement = statement; this.connection.setAutoCommit(false); } public Connection getConnection() { return connection; } public PreparedStatement getStatement() { return statement; } /** {@inheritDoc} */ public void close(TaskAttemptContext context) throws IOException { try { statement.executeBatch(); connection.commit(); } catch (SQLException e) { try { connection.rollback(); } catch (SQLException ex) { LOG.warn(StringUtils.stringifyException(ex)); } throw new IOException(e.getMessage()); } finally { try { statement.close(); connection.close(); } catch (SQLException ex) { throw new IOException(ex.getMessage()); } } } /** {@inheritDoc} */ public void write(K key, V value) throws IOException { try { key.write(statement); statement.addBatch(); } catch (SQLException e) { e.printStackTrace(); } } } /** * Constructs the query used as the prepared statement to insert data. * * @param table * the table to insert into * @param fieldNames * the fields to insert into. If field names are unknown, supply an * array of nulls. */ public String constructQuery(String table, String[] fieldNames) { if (fieldNames == null) { throw new IllegalArgumentException( "Field names may not be null"); } StringBuilder query = new StringBuilder(); query.append("UPDATE ").append(table); System.err.println("fieldNames.length:" + fieldNames.length); if (fieldNames.length > 0) { query.append(" SET "); query.append(fieldNames[1] + " = ?"); query.append("," + fieldNames[2] + " = ?"); query.append("," + fieldNames[3] + " = ?"); query.append("," + fieldNames[4] + " = ?"); query.append("," + fieldNames[5] + " = ?"); query.append("," + fieldNames[6] + " = ?"); query.append(" WHERE "); query.append(fieldNames[0] + " = ?"); System.err.println(query.toString()); return query.toString(); } else { return null; } } /** {@inheritDoc} */ public RecordWriter<k v> getRecordWriter(TaskAttemptContext context) throws IOException { DBConfiguration dbConf = new DBConfiguration(context.getConfiguration()); String tableName = dbConf.getOutputTableName(); String[] fieldNames = dbConf.getOutputFieldNames(); if(fieldNames == null) { fieldNames = new String[dbConf.getOutputFieldCount()]; } try { Connection connection = dbConf.getConnection(); PreparedStatement statement = null; statement = connection.prepareStatement( constructQuery(tableName, fieldNames)); return new DBRecordWriter(connection, statement); } catch (Exception ex) { throw new IOException(ex.getMessage()); } } /** * Initializes the reduce-part of the job with * the appropriate output settings * * @param job The job * @param tableName The table to insert data into * @param fieldNames The field names in the table. */ public static void setOutput(Job job, String tableName, String... fieldNames) throws IOException { if(fieldNames.length > 0 && fieldNames[0] != null) { DBConfiguration dbConf = setOutput(job, tableName); dbConf.setOutputFieldNames(fieldNames); } else { if (fieldNames.length > 0) { setOutput(job, tableName, fieldNames.length); } else { throw new IllegalArgumentException( "Field names must be greater than 0"); } } } /** * Initializes the reduce-part of the job * with the appropriate output settings * * @param job The job * @param tableName The table to insert data into * @param fieldCount the number of fields in the table. */ public static void setOutput(Job job, String tableName, int fieldCount) throws IOException { DBConfiguration dbConf = setOutput(job, tableName); dbConf.setOutputFieldCount(fieldCount); } private static DBConfiguration setOutput(Job job, String tableName) throws IOException { job.setOutputFormatClass(MysqlDBOutputFormat.class); job.setReduceSpeculativeExecution(false); DBConfiguration dbConf = new DBConfiguration(job.getConfiguration()); dbConf.setOutputTableName(tableName); return dbConf; }}</k></k></k></k></key></p>
MysqlDBOutputFormat.java是将hadoop源码里边的DBOutputFormat.java拿过来重写了里边的constructQuery的方法,在生成sql语句的时候产生update的sql语句来实现的
DBOutputFormat.java里边的constructQuery
public String constructQuery(String table, String[] fieldNames) { if(fieldNames == null) { throw new IllegalArgumentException("Field names may not be null"); } StringBuilder query = new StringBuilder(); query.append("INSERT INTO ").append(table); if (fieldNames.length > 0 && fieldNames[0] != null) { query.append(" ("); for (int i = 0; i <p><br>重写后:</p><pre class="brush:php;toolbar:false">public String constructQuery(String table, String[] fieldNames) { if (fieldNames == null) { throw new IllegalArgumentException( "Field names may not be null"); } StringBuilder query = new StringBuilder(); query.append("UPDATE ").append(table); System.err.println("fieldNames.length:" + fieldNames.length); if (fieldNames.length > 0) { query.append(" SET "); query.append(fieldNames[1] + " = ?"); query.append("," + fieldNames[2] + " = ?"); query.append("," + fieldNames[3] + " = ?"); query.append("," + fieldNames[4] + " = ?"); query.append("," + fieldNames[5] + " = ?"); query.append("," + fieldNames[6] + " = ?"); query.append(" WHERE "); query.append(fieldNames[0] + " = ?"); System.err.println(query.toString()); return query.toString(); } else { return null; } }
按照java的思想我这里其实可以通过extends DBOutputFomat来实现update mysql的功能 ,但是我试了死活不行,暂且记下 有时间再来仔细揣摩
重载方式代码(测试未通过)有知道为什么的还请不吝赐教
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;import org.apache.hadoop.mapreduce.lib.db.DBWritable;public class MysqlOutputFormat1<k extends dbwritable> extends DBOutputFormat<k v> { public String constructQuery(String table, String[] fieldNames) { if (fieldNames == null) { throw new IllegalArgumentException( "Field names may not be null"); } StringBuilder query = new StringBuilder(); query.append("UPDATE ").append(table); System.err.println("fieldNames.length:"+fieldNames.length); if (fieldNames.length > 0) { query.append(" SET "); query.append(fieldNames[1] + " = ?"); query.append("," + fieldNames[2] + " = ?"); query.append("," + fieldNames[3] + " = ?"); query.append("," + fieldNames[4] + " = ?"); query.append("," + fieldNames[5] + " = ?"); query.append("," + fieldNames[6] + " = ?"); query.append(" WHERE "); query.append(fieldNames[0] + " = ?"); System.err.println(query.toString()); return query.toString(); } else { return null; } }}</k></k>
另外可以通过我感觉可以通过sql语句的merge方式生成merge语句从而实现有则更新 没有则插入 参考网页http://en.wikipedia.org/wiki/Merge_%28SQL%29
--http://en.wikipedia.org/wiki/Merge_%28SQL%29 MERGE INTO tablename USING table_reference ON (condition) WHEN MATCHED THEN UPDATE SET column1 = value1 [, column2 = value2 ...] WHEN NOT MATCHED THEN INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...

InnoDBBufferPool reduces disk I/O by caching data and indexing pages, improving database performance. Its working principle includes: 1. Data reading: Read data from BufferPool; 2. Data writing: After modifying the data, write to BufferPool and refresh it to disk regularly; 3. Cache management: Use the LRU algorithm to manage cache pages; 4. Reading mechanism: Load adjacent data pages in advance. By sizing the BufferPool and using multiple instances, database performance can be optimized.

Compared with other programming languages, MySQL is mainly used to store and manage data, while other languages such as Python, Java, and C are used for logical processing and application development. MySQL is known for its high performance, scalability and cross-platform support, suitable for data management needs, while other languages have advantages in their respective fields such as data analytics, enterprise applications, and system programming.

MySQL is worth learning because it is a powerful open source database management system suitable for data storage, management and analysis. 1) MySQL is a relational database that uses SQL to operate data and is suitable for structured data management. 2) The SQL language is the key to interacting with MySQL and supports CRUD operations. 3) The working principle of MySQL includes client/server architecture, storage engine and query optimizer. 4) Basic usage includes creating databases and tables, and advanced usage involves joining tables using JOIN. 5) Common errors include syntax errors and permission issues, and debugging skills include checking syntax and using EXPLAIN commands. 6) Performance optimization involves the use of indexes, optimization of SQL statements and regular maintenance of databases.

MySQL is suitable for beginners to learn database skills. 1. Install MySQL server and client tools. 2. Understand basic SQL queries, such as SELECT. 3. Master data operations: create tables, insert, update, and delete data. 4. Learn advanced skills: subquery and window functions. 5. Debugging and optimization: Check syntax, use indexes, avoid SELECT*, and use LIMIT.

MySQL efficiently manages structured data through table structure and SQL query, and implements inter-table relationships through foreign keys. 1. Define the data format and type when creating a table. 2. Use foreign keys to establish relationships between tables. 3. Improve performance through indexing and query optimization. 4. Regularly backup and monitor databases to ensure data security and performance optimization.

MySQL is an open source relational database management system that is widely used in Web development. Its key features include: 1. Supports multiple storage engines, such as InnoDB and MyISAM, suitable for different scenarios; 2. Provides master-slave replication functions to facilitate load balancing and data backup; 3. Improve query efficiency through query optimization and index use.

SQL is used to interact with MySQL database to realize data addition, deletion, modification, inspection and database design. 1) SQL performs data operations through SELECT, INSERT, UPDATE, DELETE statements; 2) Use CREATE, ALTER, DROP statements for database design and management; 3) Complex queries and data analysis are implemented through SQL to improve business decision-making efficiency.

The basic operations of MySQL include creating databases, tables, and using SQL to perform CRUD operations on data. 1. Create a database: CREATEDATABASEmy_first_db; 2. Create a table: CREATETABLEbooks(idINTAUTO_INCREMENTPRIMARYKEY, titleVARCHAR(100)NOTNULL, authorVARCHAR(100)NOTNULL, published_yearINT); 3. Insert data: INSERTINTObooks(title, author, published_year)VA


Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

MantisBT
Mantis is an easy-to-deploy web-based defect tracking tool designed to aid in product defect tracking. It requires PHP, MySQL and a web server. Check out our demo and hosting services.

Dreamweaver Mac version
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

PhpStorm Mac version
The latest (2018.2.1) professional PHP integrated development tool

WebStorm Mac version
Useful JavaScript development tools