찾다
데이터 베이스MySQL 튜토리얼Hadoop 中利用 地图reduce 读写 mysql 数据

Hadoop 中利用 地图reduce 读写 mysql 数据

Jun 07, 2016 pm 04:27 PM
hadoopmysqlreduce사용지도읽고 쓰기

Hadoop 中利用 mapreduce 读写 mysql 数据 问题导读 1.hadoop mapreduce的通过哪两个类可以读取数据源? 2.如果没有mysql驱动包,一般会是什么问题? 3.如何添加包? 有时候我们在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv、uv 数据,然后

Hadoop 中利用 mapreduce 读写 mysql 数据

问题导读
1.hadoop mapreduce的通过哪两个类可以读取数据源?
2.如果没有mysql驱动包,一般会是什么问题?
3.如何添加包?




有时候我们在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv、uv 数据,然后为了实时查询的需求,或者一些 OLAP 的需求,我们需要 mapreduce 与 mysql 进行数据的交互,而这些特性正是 hbase 或者 hive 目前亟待改进的地方。

好了言归正传,简单的说说背景、原理以及需要注意的地方:

1、为了方便 MapReduce 直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。

2、由于0.20版本对DBInputFormat和DBOutputFormat支持不是很好,该例用了0.19版本来说明这两个类的用法。

至少在我的 0.20.203 中的 org.apache.hadoop.mapreduce.lib 下是没见到 db 包,所以本文也是以老版的 API 来为例说明的。

3、运行MapReduce时候报错:java.io.IOException: com.mysql.jdbc.Driver,一般是由于程序找不到mysql驱动包。解决方法是让每个tasktracker运行MapReduce程序时都可以找到该驱动包。

添加包有两种方式:

(1)在每个节点下的${HADOOP_HOME}/lib下添加该包。重启集群,一般是比较原始的方法。

(2)a)把包传到集群上: hadoop fs -put mysql-connector-java-5.1.0- bin.jar /hdfsPath/

? ?? ? b)在mr程序提交job前,添加语句:DistributedCache.addFileToClassPath(new Path(“/hdfsPath/mysql- connector-java- 5.1.0-bin.jar”), conf);

(3)虽然API用的是0.19的,但是使用0.20的API一样可用,只是会提示方法已过时而已。、

4、测试数据:

  1. CREATE TABLE `t` (
  2. `id` int DEFAULT NULL,
  3. `name` varchar(10) DEFAULT NULL
  4. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  5. CREATE TABLE `t2` (
  6. `id` int DEFAULT NULL,
  7. `name` varchar(10) DEFAULT NULL
  8. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  9. insert into t values (1,"june"),(2,"decli"),(3,"hello"),
  10. ? ? ? ? (4,"june"),(5,"decli"),(6,"hello"),(7,"june"),
  11. ? ? ? ? (8,"decli"),(9,"hello"),(10,"june"),
  12. ? ? ? ? (11,"june"),(12,"decli"),(13,"hello");
复制代码


5、代码:

  1. import java.io.DataInput;
  2. import java.io.DataOutput;
  3. import java.io.IOException;
  4. import java.sql.PreparedStatement;
  5. import java.sql.ResultSet;
  6. import java.sql.SQLException;
  7. import java.util.Iterator;
  8. import org.apache.hadoop.filecache.DistributedCache;
  9. import org.apache.hadoop.fs.Path;
  10. import org.apache.hadoop.io.LongWritable;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.io.Writable;
  13. import org.apache.hadoop.mapred.JobClient;
  14. import org.apache.hadoop.mapred.JobConf;
  15. import org.apache.hadoop.mapred.MapReduceBase;
  16. import org.apache.hadoop.mapred.Mapper;
  17. import org.apache.hadoop.mapred.OutputCollector;
  18. import org.apache.hadoop.mapred.Reducer;
  19. import org.apache.hadoop.mapred.Reporter;
  20. import org.apache.hadoop.mapred.lib.IdentityReducer;
  21. import org.apache.hadoop.mapred.lib.db.DBConfiguration;
  22. import org.apache.hadoop.mapred.lib.db.DBInputFormat;
  23. import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
  24. import org.apache.hadoop.mapred.lib.db.DBWritable;
  25. /**
  26. * Function: 测试 mr 与 mysql 的数据交互,此测试用例将一个表中的数据复制到另一张表中
  27. * ? ? ? ? ? ? ? ? ? ? ? ???实际当中,可能只需要从 mysql 读,或者写到 mysql 中。
  28. * date: 2013-7-29 上午2:34:04
  29. * @author june
  30. */
  31. public class Mysql2Mr {
  32. ? ? ? ? // DROP TABLE IF EXISTS `hadoop`.`studentinfo`;
  33. ? ? ? ? // CREATE TABLE studentinfo (
  34. ? ? ? ? // id INTEGER NOT NULL PRIMARY KEY,
  35. ? ? ? ? // name VARCHAR(32) NOT NULL);
  36. ? ? ? ? public static class StudentinfoRecord implements Writable, DBWritable {
  37. ? ? ? ? ? ? ? ? int id;
  38. ? ? ? ? ? ? ? ? String name;
  39. ? ? ? ? ? ? ? ? public StudentinfoRecord() {
  40. ? ? ? ? ? ? ? ? }
  41. ? ? ? ? ? ? ? ? public void readFields(DataInput in) throws IOException {
  42. ? ? ? ? ? ? ? ? ? ? ? ? this.id = in.readInt();
  43. ? ? ? ? ? ? ? ? ? ? ? ? this.name = Text.readString(in);
  44. ? ? ? ? ? ? ? ? }
  45. ? ? ? ? ? ? ? ? public String toString() {
  46. ? ? ? ? ? ? ? ? ? ? ? ? return new String(this.id + " " + this.name);
  47. ? ? ? ? ? ? ? ? }
  48. ? ? ? ? ? ? ? ? @Override
  49. ? ? ? ? ? ? ? ? public void write(PreparedStatement stmt) throws SQLException {
  50. ? ? ? ? ? ? ? ? ? ? ? ? stmt.setInt(1, this.id);
  51. ? ? ? ? ? ? ? ? ? ? ? ? stmt.setString(2, this.name);
  52. ? ? ? ? ? ? ? ? }
  53. ? ? ? ? ? ? ? ? @Override
  54. ? ? ? ? ? ? ? ? public void readFields(ResultSet result) throws SQLException {
  55. ? ? ? ? ? ? ? ? ? ? ? ? this.id = result.getInt(1);
  56. ? ? ? ? ? ? ? ? ? ? ? ? this.name = result.getString(2);
  57. ? ? ? ? ? ? ? ? }
  58. ? ? ? ? ? ? ? ? @Override
  59. ? ? ? ? ? ? ? ? public void write(DataOutput out) throws IOException {
  60. ? ? ? ? ? ? ? ? ? ? ? ? out.writeInt(this.id);
  61. ? ? ? ? ? ? ? ? ? ? ? ? Text.writeString(out, this.name);
  62. ? ? ? ? ? ? ? ? }
  63. ? ? ? ? }
  64. ? ? ? ? // 记住此处是静态内部类,要不然你自己实现无参构造器,或者等着抛异常:
  65. ? ? ? ? // Caused by: java.lang.NoSuchMethodException: DBInputMapper.()
  66. ? ? ? ? // http://stackoverflow.com/questions/7154125/custom-mapreduce-input-format-cant-find-constructor
  67. ? ? ? ? // 网上脑残式的转帖,没见到一个写对的。。。
  68. ? ? ? ? public static class DBInputMapper extends MapReduceBase implements
  69. ? ? ? ? ? ? ? ? ? ? ? ? Mapper {
  70. ? ? ? ? ? ? ? ? public void map(LongWritable key, StudentinfoRecord value,
  71. ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? OutputCollector collector, Reporter reporter) throws IOException {
  72. ? ? ? ? ? ? ? ? ? ? ? ? collector.collect(new LongWritable(value.id), new Text(value.toString()));
  73. ? ? ? ? ? ? ? ? }
  74. ? ? ? ? }
  75. ? ? ? ? public static class MyReducer extends MapReduceBase implements
  76. ? ? ? ? ? ? ? ? ? ? ? ? Reducer {
  77. ? ? ? ? ? ? ? ? @Override
  78. ? ? ? ? ? ? ? ? public void reduce(LongWritable key, Iterator values,
  79. ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? OutputCollector output, Reporter reporter) throws IOException {
  80. ? ? ? ? ? ? ? ? ? ? ? ? String[] splits = values.next().toString().split(" ");
  81. ? ? ? ? ? ? ? ? ? ? ? ? StudentinfoRecord r = new StudentinfoRecord();
  82. ? ? ? ? ? ? ? ? ? ? ? ? r.id = Integer.parseInt(splits[0]);
  83. ? ? ? ? ? ? ? ? ? ? ? ? r.name = splits[1];
  84. ? ? ? ? ? ? ? ? ? ? ? ? output.collect(r, new Text(r.name));
  85. ? ? ? ? ? ? ? ? }
  86. ? ? ? ? }
  87. ? ? ? ? public static void main(String[] args) throws IOException {
  88. ? ? ? ? ? ? ? ? JobConf conf = new JobConf(Mysql2Mr.class);
  89. ? ? ? ? ? ? ? ? DistributedCache.addFileToClassPath(new Path("/tmp/mysql-connector-java-5.0.8-bin.jar"), conf);
  90. ? ? ? ? ? ? ? ? conf.setMapOutputKeyClass(LongWritable.class);
  91. ? ? ? ? ? ? ? ? conf.setMapOutputValueClass(Text.class);
  92. ? ? ? ? ? ? ? ? conf.setOutputKeyClass(LongWritable.class);
  93. ? ? ? ? ? ? ? ? conf.setOutputValueClass(Text.class);
  94. ? ? ? ? ? ? ? ? conf.setOutputFormat(DBOutputFormat.class);
  95. ? ? ? ? ? ? ? ? conf.setInputFormat(DBInputFormat.class);
  96. ? ? ? ? ? ? ? ? // // mysql to hdfs
  97. ? ? ? ? ? ? ? ? // conf.setReducerClass(IdentityReducer.class);
  98. ? ? ? ? ? ? ? ? // Path outPath = new Path("/tmp/1");
  99. ? ? ? ? ? ? ? ? // FileSystem.get(conf).delete(outPath, true);
  100. ? ? ? ? ? ? ? ? // FileOutputFormat.setOutputPath(conf, outPath);
  101. ? ? ? ? ? ? ? ? DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.1.101:3306/test",
  102. ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? "root", "root");
  103. ? ? ? ? ? ? ? ? String[] fields = { "id", "name" };
  104. ? ? ? ? ? ? ? ? // 从 t 表读数据
  105. ? ? ? ? ? ? ? ? DBInputFormat.setInput(conf, StudentinfoRecord.class, "t", null, "id", fields);
  106. ? ? ? ? ? ? ? ? // mapreduce 将数据输出到 t2 表
  107. ? ? ? ? ? ? ? ? DBOutputFormat.setOutput(conf, "t2", "id", "name");
  108. ? ? ? ? ? ? ? ? // conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);
  109. ? ? ? ? ? ? ? ? conf.setMapperClass(DBInputMapper.class);
  110. ? ? ? ? ? ? ? ? conf.setReducerClass(MyReducer.class);
  111. ? ? ? ? ? ? ? ? JobClient.runJob(conf);
  112. ? ? ? ? }
  113. }
复制代码



6、结果:

执行两次后,你可以看到mysql结果:

  1. mysql> select * from t2;
  2. +------+-------+
  3. | id? ?| name??|
  4. +------+-------+
  5. |? ? 1 | june??|
  6. |? ? 2 | decli |
  7. |? ? 3 | hello |
  8. |? ? 4 | june??|
  9. |? ? 5 | decli |
  10. |? ? 6 | hello |
  11. |? ? 7 | june??|
  12. |? ? 8 | decli |
  13. |? ? 9 | hello |
  14. |? ?10 | june??|
  15. |? ?11 | june??|
  16. |? ?12 | decli |
  17. |? ?13 | hello |
  18. |? ? 1 | june??|
  19. |? ? 2 | decli |
  20. |? ? 3 | hello |
  21. |? ? 4 | june??|
  22. |? ? 5 | decli |
  23. |? ? 6 | hello |
  24. |? ? 7 | june??|
  25. |? ? 8 | decli |
  26. |? ? 9 | hello |
  27. |? ?10 | june??|
  28. |? ?11 | june??|
  29. |? ?12 | decli |
  30. |? ?13 | hello |
  31. +------+-------+
  32. 26 rows in set (0.00 sec)
  33. mysql>
复制代码


7、日志:

  1. 13/07/29 02:33:03 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
  2. 13/07/29 02:33:03 INFO filecache.TrackerDistributedCacheManager: Creating mysql-connector-java-5.0.8-bin.jar in /tmp/hadoop-june/mapred/local/archive/-8943686319031389138_-1232673160_640840668/192.168.1.101/tmp-work--8372797484204470322 with rwxr-xr-x
  3. 13/07/29 02:33:03 INFO filecache.TrackerDistributedCacheManager: Cached hdfs://192.168.1.101:9000/tmp/mysql-connector-java-5.0.8-bin.jar as /tmp/hadoop-june/mapred/local/archive/-8943686319031389138_-1232673160_640840668/192.168.1.101/tmp/mysql-connector-java-5.0.8-bin.jar
  4. 13/07/29 02:33:03 INFO filecache.TrackerDistributedCacheManager: Cached hdfs://192.168.1.101:9000/tmp/mysql-connector-java-5.0.8-bin.jar as /tmp/hadoop-june/mapred/local/archive/-8943686319031389138_-1232673160_640840668/192.168.1.101/tmp/mysql-connector-java-5.0.8-bin.jar
  5. 13/07/29 02:33:03 INFO mapred.JobClient: Running job: job_local_0001
  6. 13/07/29 02:33:03 INFO mapred.MapTask: numReduceTasks: 1
  7. 13/07/29 02:33:03 INFO mapred.MapTask: io.sort.mb = 100
  8. 13/07/29 02:33:03 INFO mapred.MapTask: data buffer = 79691776/99614720
  9. 13/07/29 02:33:03 INFO mapred.MapTask: record buffer = 262144/327680
  10. 13/07/29 02:33:03 INFO mapred.MapTask: Starting flush of map output
  11. 13/07/29 02:33:03 INFO mapred.MapTask: Finished spill 0
  12. 13/07/29 02:33:03 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
  13. 13/07/29 02:33:04 INFO mapred.JobClient:??map 0% reduce 0%
  14. 13/07/29 02:33:06 INFO mapred.LocalJobRunner:?
  15. 13/07/29 02:33:06 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
  16. 13/07/29 02:33:06 INFO mapred.LocalJobRunner:?
  17. 13/07/29 02:33:06 INFO mapred.Merger: Merging 1 sorted segments
  18. 13/07/29 02:33:06 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 235 bytes
  19. 13/07/29 02:33:06 INFO mapred.LocalJobRunner:?
  20. 13/07/29 02:33:06 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
  21. 13/07/29 02:33:07 INFO mapred.JobClient:??map 100% reduce 0%
  22. 13/07/29 02:33:09 INFO mapred.LocalJobRunner: reduce > reduce
  23. 13/07/29 02:33:09 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
  24. 13/07/29 02:33:09 WARN mapred.FileOutputCommitter: Output path is null in cleanup
  25. 13/07/29 02:33:10 INFO mapred.JobClient:??map 100% reduce 100%
  26. 13/07/29 02:33:10 INFO mapred.JobClient: Job complete: job_local_0001
  27. 13/07/29 02:33:10 INFO mapred.JobClient: Counters: 18
  28. 13/07/29 02:33:10 INFO mapred.JobClient:? ?File Input Format Counters?
  29. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Bytes Read=0
  30. 13/07/29 02:33:10 INFO mapred.JobClient:? ?File Output Format Counters?
  31. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Bytes Written=0
  32. 13/07/29 02:33:10 INFO mapred.JobClient:? ?FileSystemCounters
  33. 13/07/29 02:33:10 INFO mapred.JobClient:? ???FILE_BYTES_READ=1211691
  34. 13/07/29 02:33:10 INFO mapred.JobClient:? ???HDFS_BYTES_READ=1081704
  35. 13/07/29 02:33:10 INFO mapred.JobClient:? ???FILE_BYTES_WRITTEN=2392844
  36. 13/07/29 02:33:10 INFO mapred.JobClient:? ?Map-Reduce Framework
  37. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Map output materialized bytes=239
  38. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Map input records=13
  39. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Reduce shuffle bytes=0
  40. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Spilled Records=26
  41. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Map output bytes=207
  42. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Map input bytes=13
  43. 13/07/29 02:33:10 INFO mapred.JobClient:? ???SPLIT_RAW_BYTES=75
  44. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Combine input records=0
  45. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Reduce input records=13
  46. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Reduce input groups=13
  47. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Combine output records=0
  48. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Reduce output records=13
  49. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Map output records=13
复制代码







MapReduce直接连接Mysql获取数据






Mysql中数据:

  1. mysql> select * from lxw_tbls;
  2. +---------------------+----------------+
  3. | TBL_NAME? ?? ?? ?? ?| TBL_TYPE? ?? ? |
  4. +---------------------+----------------+
  5. | lxw_test_table? ?? ?| EXTERNAL_TABLE |
  6. | lxw_t? ?? ?? ?? ?? ?| MANAGED_TABLE??|
  7. | lxw_t1? ?? ?? ?? ???| MANAGED_TABLE??|
  8. | tt? ?? ?? ?? ?? ?? ?| MANAGED_TABLE??|
  9. | tab_partition? ?? ? | MANAGED_TABLE??|
  10. | lxw_hbase_table_1? ?| MANAGED_TABLE??|
  11. | lxw_hbase_user_info | MANAGED_TABLE??|
  12. | t? ?? ?? ?? ?? ?? ? | EXTERNAL_TABLE |
  13. | lxw_jobid? ?? ?? ???| MANAGED_TABLE??|
  14. +---------------------+----------------+
  15. 9 rows in set (0.01 sec)
  16. mysql> select * from lxw_tbls where TBL_NAME like 'lxw%' order by TBL_NAME;
  17. +---------------------+----------------+
  18. | TBL_NAME? ?? ?? ?? ?| TBL_TYPE? ?? ? |
  19. +---------------------+----------------+
  20. | lxw_hbase_table_1? ?| MANAGED_TABLE??|
  21. | lxw_hbase_user_info | MANAGED_TABLE??|
  22. | lxw_jobid? ?? ?? ???| MANAGED_TABLE??|
  23. | lxw_t? ?? ?? ?? ?? ?| MANAGED_TABLE??|
  24. | lxw_t1? ?? ?? ?? ???| MANAGED_TABLE??|
  25. | lxw_test_table? ?? ?| EXTERNAL_TABLE |
  26. +---------------------+----------------+
  27. 6 rows in set (0.00 sec)
复制代码


MapReduce程序代码,ConnMysql.java:

  1. package com.lxw.study;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import java.net.URI;
  6. import java.sql.PreparedStatement;
  7. import java.sql.ResultSet;
  8. import java.sql.SQLException;
  9. import java.util.Iterator;
  10. import org.apache.hadoop.conf.Configuration;
  11. import org.apache.hadoop.filecache.DistributedCache;
  12. import org.apache.hadoop.fs.FileSystem;
  13. import org.apache.hadoop.fs.Path;
  14. import org.apache.hadoop.io.LongWritable;
  15. import org.apache.hadoop.io.Text;
  16. import org.apache.hadoop.io.Writable;
  17. import org.apache.hadoop.mapreduce.Job;
  18. import org.apache.hadoop.mapreduce.Mapper;
  19. import org.apache.hadoop.mapreduce.Reducer;
  20. import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
  21. import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
  22. import org.apache.hadoop.mapreduce.lib.db.DBWritable;
  23. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  24. public class ConnMysql {
  25. ? ?? ???
  26. ? ?? ???private static Configuration conf = new Configuration();
  27. ? ?? ???
  28. ? ?? ???static {
  29. ? ?? ?? ?? ?? ? conf.addResource(new Path("F:/lxw-hadoop/hdfs-site.xml"));
  30. ? ?? ?? ?? ?? ? conf.addResource(new Path("F:/lxw-hadoop/mapred-site.xml"));
  31. ? ?? ?? ?? ?? ? conf.addResource(new Path("F:/lxw-hadoop/core-site.xml"));
  32. ? ?? ?? ?? ?? ? conf.set("mapred.job.tracker", "10.133.103.21:50021");
  33. ? ?? ???}
  34. ? ?? ???
  35. ? ?? ???public static class TblsRecord implements Writable, DBWritable {
  36. ? ?? ?? ?? ?? ? String tbl_name;
  37. ? ?? ?? ?? ?? ? String tbl_type;
  38. ? ?? ?? ?? ?? ? public TblsRecord() {
  39. ? ?? ?? ?? ?? ? }
  40. ? ?? ?? ?? ?? ? @Override
  41. ? ?? ?? ?? ?? ? public void write(PreparedStatement statement) throws SQLException {
  42. ? ?? ?? ?? ?? ?? ?? ?? ?// TODO Auto-generated method stub
  43. ? ?? ?? ?? ?? ?? ?? ?? ?statement.setString(1, this.tbl_name);
  44. ? ?? ?? ?? ?? ?? ?? ?? ?statement.setString(2, this.tbl_type);
  45. ? ?? ?? ?? ?? ? }
  46. ? ?? ?? ?? ?? ? @Override
  47. ? ?? ?? ?? ?? ? public void readFields(ResultSet resultSet) throws SQLException {
  48. ? ?? ?? ?? ?? ?? ?? ?? ?// TODO Auto-generated method stub
  49. ? ?? ?? ?? ?? ?? ?? ?? ?this.tbl_name = resultSet.getString(1);
  50. ? ?? ?? ?? ?? ?? ?? ?? ?this.tbl_type = resultSet.getString(2);
  51. ? ?? ?? ?? ?? ? }
  52. ? ?? ?? ?? ?? ? @Override
  53. ? ?? ?? ?? ?? ? public void write(DataOutput out) throws IOException {
  54. ? ?? ?? ?? ?? ?? ?? ?? ?// TODO Auto-generated method stub
  55. ? ?? ?? ?? ?? ?? ?? ?? ?Text.writeString(out, this.tbl_name);
  56. ? ?? ?? ?? ?? ?? ?? ?? ?Text.writeString(out, this.tbl_type);
  57. ? ?? ?? ?? ?? ? }
  58. ? ?? ?? ?? ?? ? @Override
  59. ? ?? ?? ?? ?? ? public void readFields(DataInput in) throws IOException {
  60. ? ?? ?? ?? ?? ?? ?? ?? ?// TODO Auto-generated method stub
  61. ? ?? ?? ?? ?? ?? ?? ?? ?this.tbl_name = Text.readString(in);
  62. ? ?? ?? ?? ?? ?? ?? ?? ?this.tbl_type = Text.readString(in);
  63. ? ?? ?? ?? ?? ? }
  64. ? ?? ?? ?? ?? ? public String toString() {
  65. ? ?? ?? ?? ?? ?? ?? ?? ?return new String(this.tbl_name + " " + this.tbl_type);
  66. ? ?? ?? ?? ?? ? }
  67. ? ?? ???}
  68. ? ?? ???public static class ConnMysqlMapper extends Mapper {
  69. ? ?? ?? ?? ?? ? public void map(LongWritable key,TblsRecord values,Context context)?
  70. ? ?? ?? ?? ?? ?? ?? ?? ?? ?? ???throws IOException,InterruptedException {
  71. ? ?? ?? ?? ?? ?? ?? ?? ?context.write(new Text(values.tbl_name), new Text(values.tbl_type));
  72. ? ?? ?? ?? ?? ? }
  73. ? ?? ???}
  74. ? ?? ???
  75. ? ?? ???public static class ConnMysqlReducer extends Reducer {
  76. ? ?? ?? ?? ?? ? public void reduce(Text key,Iterable values,Context context)?
  77. ? ?? ?? ?? ?? ?? ?? ?? ?? ?? ???throws IOException,InterruptedException {
  78. ? ?? ?? ?? ?? ?? ?? ?? ?for(Iterator itr = values.iterator();itr.hasNext();) {
  79. ? ?? ?? ?? ?? ?? ?? ?? ?? ?? ???context.write(key, itr.next());
  80. ? ?? ?? ?? ?? ?? ?? ?? ?}
  81. ? ?? ?? ?? ?? ? }
  82. ? ?? ???}
  83. ? ?? ???
  84. ? ?? ???public static void main(String[] args) throws Exception {
  85. ? ?? ?? ?? ?? ? Path output = new Path("/user/lxw/output/");
  86. ? ?? ?? ?? ?? ??
  87. ? ?? ?? ?? ?? ? FileSystem fs = FileSystem.get(URI.create(output.toString()), conf);
  88. ? ?? ?? ?? ?? ? if (fs.exists(output)) {
  89. ? ?? ?? ?? ?? ?? ?? ?? ?fs.delete(output);
  90. ? ?? ?? ?? ?? ? }
  91. ? ?? ?? ?? ?? ??
  92. ? ?? ?? ?? ?? ? //mysql的jdbc驱动
  93. ? ?? ?? ?? ?? ? DistributedCache.addFileToClassPath(new Path(??
  94. ? ?? ?? ?? ?? ?? ?? ?? ???"hdfs://hd022-test.nh.sdo.com/user/liuxiaowen/mysql-connector-java-5.1.13-bin.jar"), conf);??
  95. ? ?? ?? ?? ?? ??
  96. ? ?? ?? ?? ?? ? DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",??
  97. ? ?? ?? ?? ?? ?? ?? ?? ???"jdbc:mysql://10.133.103.22:3306/hive", "hive", "hive");??
  98. ? ?? ?? ?? ?? ??
  99. ? ?? ?? ?? ?? ? Job job = new Job(conf,"test mysql connection");
  100. ? ?? ?? ?? ?? ? job.setJarByClass(ConnMysql.class);
  101. ? ?? ?? ?? ?? ??
  102. ? ?? ?? ?? ?? ? job.setMapperClass(ConnMysqlMapper.class);
  103. ? ?? ?? ?? ?? ? job.setReducerClass(ConnMysqlReducer.class);
  104. ? ?? ?? ?? ?? ??
  105. ? ?? ?? ?? ?? ? job.setOutputKeyClass(Text.class);
  106. ? ?? ?? ?? ?? ? job.setOutputValueClass(Text.class);
  107. ? ?? ?? ?? ?? ??
  108. ? ?? ?? ?? ?? ? job.setInputFormatClass(DBInputFormat.class);
  109. ? ?? ?? ?? ?? ? FileOutputFormat.setOutputPath(job, output);
  110. ? ?? ?? ?? ?? ??
  111. ? ?? ?? ?? ?? ? //列名
  112. ? ?? ?? ?? ?? ? String[] fields = { "TBL_NAME", "TBL_TYPE" };?
  113. ? ?? ?? ?? ?? ? //六个参数分别为:
  114. ? ?? ?? ?? ?? ? //1.Job;2.Class extends DBWritable>
  115. ? ?? ?? ?? ?? ? //3.表名;4.where条件
  116. ? ?? ?? ?? ?? ? //5.order by语句;6.列名
  117. ? ?? ?? ?? ?? ? DBInputFormat.setInput(job, TblsRecord.class,
  118. ? ?? ?? ?? ?? ?? ?? ?"lxw_tbls", "TBL_NAME like 'lxw%'", "TBL_NAME", fields);??
  119. ? ?? ?? ?? ?? ??
  120. ? ?? ?? ?? ?? ? System.exit(job.waitForCompletion(true) ? 0 : 1);
  121. ? ?? ???}
  122. ? ?? ???
  123. }
复制代码


运行结果:

  1. [lxw@hd025-test ~]$ hadoop fs -cat /user/lxw/output/part-r-00000
  2. lxw_hbase_table_1? ?? ? MANAGED_TABLE
  3. lxw_hbase_user_info? ???MANAGED_TABLE
  4. lxw_jobid? ?? ? MANAGED_TABLE
  5. lxw_t? ?MANAGED_TABLE
  6. lxw_t1??MANAGED_TABLE
  7. lxw_test_table??EXTERNAL_TABLE
复制代码

http://www.aboutyun.com/forum.php?highlight=MapReduce+MySQL&mod=viewthread&tid=7405

성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
MySQL의 역할 : 웹 응용 프로그램의 데이터베이스MySQL의 역할 : 웹 응용 프로그램의 데이터베이스Apr 17, 2025 am 12:23 AM

웹 응용 프로그램에서 MySQL의 주요 역할은 데이터를 저장하고 관리하는 것입니다. 1. MySQL은 사용자 정보, 제품 카탈로그, 트랜잭션 레코드 및 기타 데이터를 효율적으로 처리합니다. 2. SQL 쿼리를 통해 개발자는 데이터베이스에서 정보를 추출하여 동적 컨텐츠를 생성 할 수 있습니다. 3.mysql은 클라이언트-서버 모델을 기반으로 작동하여 허용 가능한 쿼리 속도를 보장합니다.

MySQL : 첫 번째 데이터베이스 구축MySQL : 첫 번째 데이터베이스 구축Apr 17, 2025 am 12:22 AM

MySQL 데이터베이스를 구축하는 단계에는 다음이 포함됩니다. 1. 데이터베이스 및 테이블 작성, 2. 데이터 삽입 및 3. 쿼리를 수행하십시오. 먼저 CreateAbase 및 CreateTable 문을 사용하여 데이터베이스 및 테이블을 작성한 다음 InsertInto 문을 사용하여 데이터를 삽입 한 다음 최종적으로 SELECT 문을 사용하여 데이터를 쿼리하십시오.

MySQL : 데이터 저장에 대한 초보자 친화적 인 접근 방식MySQL : 데이터 저장에 대한 초보자 친화적 인 접근 방식Apr 17, 2025 am 12:21 AM

MySQL은 사용하기 쉽고 강력하기 때문에 초보자에게 적합합니다. 1.MySQL은 관계형 데이터베이스이며 CRUD 작업에 SQL을 사용합니다. 2. 설치가 간단하고 루트 사용자 비밀번호를 구성해야합니다. 3. 삽입, 업데이트, 삭제 및 선택하여 데이터 작업을 수행하십시오. 4. Orderby, Where and Join은 복잡한 쿼리에 사용될 수 있습니다. 5. 디버깅은 구문을 확인하고 쿼리를 분석하기 위해 설명을 사용해야합니다. 6. 최적화 제안에는 인덱스 사용, 올바른 데이터 유형 선택 및 우수한 프로그래밍 습관이 포함됩니다.

MySQL 초보자가 친숙합니까? 학습 곡선 평가MySQL 초보자가 친숙합니까? 학습 곡선 평가Apr 17, 2025 am 12:19 AM

MySQL은 다음과 같은 초보자에게 적합합니다. 1) 설치 및 구성이 쉽고, 2) 풍부한 학습 리소스, 3) 직관적 인 SQL 구문, 4) 강력한 도구 지원. 그럼에도 불구하고 초보자는 데이터베이스 디자인, 쿼리 최적화, 보안 관리 및 데이터 백업과 같은 과제를 극복해야합니다.

SQL은 프로그래밍 언어입니까? 용어를 명확하게합니다SQL은 프로그래밍 언어입니까? 용어를 명확하게합니다Apr 17, 2025 am 12:17 AM

예, sqlisaprogramminglanguages-pecializedfordatamanagement.1) 그것은 초점을 맞추고, 초점을 맞추고, 초점을 맞추고, sqlisessentialforquerying, 삽입, 업데이트 및 adletingdataindataindationaldatabase.3) weburer infriendly, itrequires-quirestoamtoavase

산성 특성 (원자력, 일관성, 분리, 내구성)을 설명하십시오.산성 특성 (원자력, 일관성, 분리, 내구성)을 설명하십시오.Apr 16, 2025 am 12:20 AM

산성 속성에는 원자력, 일관성, 분리 및 내구성이 포함되며 데이터베이스 설계의 초석입니다. 1. 원자력은 거래가 완전히 성공적이거나 완전히 실패하도록합니다. 2. 일관성은 거래 전후에 데이터베이스가 일관성을 유지하도록합니다. 3. 격리는 거래가 서로를 방해하지 않도록합니다. 4. 지속성은 거래 제출 후 데이터가 영구적으로 저장되도록합니다.

MySQL : 데이터베이스 관리 시스템 대 프로그래밍 언어MySQL : 데이터베이스 관리 시스템 대 프로그래밍 언어Apr 16, 2025 am 12:19 AM

MySQL은 데이터베이스 관리 시스템 (DBMS) 일뿐 만 아니라 프로그래밍 언어와 밀접한 관련이 있습니다. 1) DBMS로서 MySQL은 데이터를 저장, 구성 및 검색하는 데 사용되며 인덱스 최적화는 쿼리 성능을 향상시킬 수 있습니다. 2) SQL과 같은 ORM 도구를 사용하여 Python에 내장 된 SQL과 프로그래밍 언어를 결합하면 작업을 단순화 할 수 있습니다. 3) 성능 최적화에는 인덱싱, 쿼리, 캐싱, 라이브러리 및 테이블 부서 및 거래 관리가 포함됩니다.

MySQL : SQL 명령으로 데이터 관리MySQL : SQL 명령으로 데이터 관리Apr 16, 2025 am 12:19 AM

MySQL은 SQL 명령을 사용하여 데이터를 관리합니다. 1. 기본 명령에는 선택, 삽입, 업데이트 및 삭제가 포함됩니다. 2. 고급 사용에는 조인, 하위 쿼리 및 집계 함수가 포함됩니다. 3. 일반적인 오류에는 구문, 논리 및 성능 문제가 포함됩니다. 4. 최적화 팁에는 인덱스 사용, 선택*을 피하고 한계 사용이 포함됩니다.

See all articles

핫 AI 도구

Undresser.AI Undress

Undresser.AI Undress

사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover

AI Clothes Remover

사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool

Undress AI Tool

무료로 이미지를 벗다

Clothoff.io

Clothoff.io

AI 옷 제거제

AI Hentai Generator

AI Hentai Generator

AI Hentai를 무료로 생성하십시오.

인기 기사

R.E.P.O. 에너지 결정과 그들이하는 일 (노란색 크리스탈)
1 몇 달 전By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 최고의 그래픽 설정
1 몇 달 전By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 아무도들을 수없는 경우 오디오를 수정하는 방법
1 몇 달 전By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 채팅 명령 및 사용 방법
1 몇 달 전By尊渡假赌尊渡假赌尊渡假赌

뜨거운 도구

스튜디오 13.0.1 보내기

스튜디오 13.0.1 보내기

강력한 PHP 통합 개발 환경

SublimeText3 Linux 새 버전

SublimeText3 Linux 새 버전

SublimeText3 Linux 최신 버전

Atom Editor Mac 버전 다운로드

Atom Editor Mac 버전 다운로드

가장 인기 있는 오픈 소스 편집기

SublimeText3 Mac 버전

SublimeText3 Mac 버전

신 수준의 코드 편집 소프트웨어(SublimeText3)

VSCode Windows 64비트 다운로드

VSCode Windows 64비트 다운로드

Microsoft에서 출시한 강력한 무료 IDE 편집기