検索
ホームページデータベースmysql チュートリアルHadoop 中利用 地图reduce 读写 mysql 数据

Hadoop 中利用 地图reduce 读写 mysql 数据

Jun 07, 2016 pm 04:27 PM
hadoopmysqlreduce利用地図読み書き

Hadoop 中利用 mapreduce 读写 mysql 数据 问题导读 1.hadoop mapreduce的通过哪两个类可以读取数据源? 2.如果没有mysql驱动包,一般会是什么问题? 3.如何添加包? 有时候我们在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv、uv 数据,然后

Hadoop 中利用 mapreduce 读写 mysql 数据

问题导读
1.hadoop mapreduce的通过哪两个类可以读取数据源?
2.如果没有mysql驱动包,一般会是什么问题?
3.如何添加包?




有时候我们在项目中会遇到输入结果集很大,但是输出结果很小,比如一些 pv、uv 数据,然后为了实时查询的需求,或者一些 OLAP 的需求,我们需要 mapreduce 与 mysql 进行数据的交互,而这些特性正是 hbase 或者 hive 目前亟待改进的地方。

好了言归正传,简单的说说背景、原理以及需要注意的地方:

1、为了方便 MapReduce 直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。

2、由于0.20版本对DBInputFormat和DBOutputFormat支持不是很好,该例用了0.19版本来说明这两个类的用法。

至少在我的 0.20.203 中的 org.apache.hadoop.mapreduce.lib 下是没见到 db 包,所以本文也是以老版的 API 来为例说明的。

3、运行MapReduce时候报错:java.io.IOException: com.mysql.jdbc.Driver,一般是由于程序找不到mysql驱动包。解决方法是让每个tasktracker运行MapReduce程序时都可以找到该驱动包。

添加包有两种方式:

(1)在每个节点下的${HADOOP_HOME}/lib下添加该包。重启集群,一般是比较原始的方法。

(2)a)把包传到集群上: hadoop fs -put mysql-connector-java-5.1.0- bin.jar /hdfsPath/

? ?? ? b)在mr程序提交job前,添加语句:DistributedCache.addFileToClassPath(new Path(“/hdfsPath/mysql- connector-java- 5.1.0-bin.jar”), conf);

(3)虽然API用的是0.19的,但是使用0.20的API一样可用,只是会提示方法已过时而已。、

4、测试数据:

  1. CREATE TABLE `t` (
  2. `id` int DEFAULT NULL,
  3. `name` varchar(10) DEFAULT NULL
  4. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  5. CREATE TABLE `t2` (
  6. `id` int DEFAULT NULL,
  7. `name` varchar(10) DEFAULT NULL
  8. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
  9. insert into t values (1,"june"),(2,"decli"),(3,"hello"),
  10. ? ? ? ? (4,"june"),(5,"decli"),(6,"hello"),(7,"june"),
  11. ? ? ? ? (8,"decli"),(9,"hello"),(10,"june"),
  12. ? ? ? ? (11,"june"),(12,"decli"),(13,"hello");
复制代码


5、代码:

  1. import java.io.DataInput;
  2. import java.io.DataOutput;
  3. import java.io.IOException;
  4. import java.sql.PreparedStatement;
  5. import java.sql.ResultSet;
  6. import java.sql.SQLException;
  7. import java.util.Iterator;
  8. import org.apache.hadoop.filecache.DistributedCache;
  9. import org.apache.hadoop.fs.Path;
  10. import org.apache.hadoop.io.LongWritable;
  11. import org.apache.hadoop.io.Text;
  12. import org.apache.hadoop.io.Writable;
  13. import org.apache.hadoop.mapred.JobClient;
  14. import org.apache.hadoop.mapred.JobConf;
  15. import org.apache.hadoop.mapred.MapReduceBase;
  16. import org.apache.hadoop.mapred.Mapper;
  17. import org.apache.hadoop.mapred.OutputCollector;
  18. import org.apache.hadoop.mapred.Reducer;
  19. import org.apache.hadoop.mapred.Reporter;
  20. import org.apache.hadoop.mapred.lib.IdentityReducer;
  21. import org.apache.hadoop.mapred.lib.db.DBConfiguration;
  22. import org.apache.hadoop.mapred.lib.db.DBInputFormat;
  23. import org.apache.hadoop.mapred.lib.db.DBOutputFormat;
  24. import org.apache.hadoop.mapred.lib.db.DBWritable;
  25. /**
  26. * Function: 测试 mr 与 mysql 的数据交互,此测试用例将一个表中的数据复制到另一张表中
  27. * ? ? ? ? ? ? ? ? ? ? ? ???实际当中,可能只需要从 mysql 读,或者写到 mysql 中。
  28. * date: 2013-7-29 上午2:34:04
  29. * @author june
  30. */
  31. public class Mysql2Mr {
  32. ? ? ? ? // DROP TABLE IF EXISTS `hadoop`.`studentinfo`;
  33. ? ? ? ? // CREATE TABLE studentinfo (
  34. ? ? ? ? // id INTEGER NOT NULL PRIMARY KEY,
  35. ? ? ? ? // name VARCHAR(32) NOT NULL);
  36. ? ? ? ? public static class StudentinfoRecord implements Writable, DBWritable {
  37. ? ? ? ? ? ? ? ? int id;
  38. ? ? ? ? ? ? ? ? String name;
  39. ? ? ? ? ? ? ? ? public StudentinfoRecord() {
  40. ? ? ? ? ? ? ? ? }
  41. ? ? ? ? ? ? ? ? public void readFields(DataInput in) throws IOException {
  42. ? ? ? ? ? ? ? ? ? ? ? ? this.id = in.readInt();
  43. ? ? ? ? ? ? ? ? ? ? ? ? this.name = Text.readString(in);
  44. ? ? ? ? ? ? ? ? }
  45. ? ? ? ? ? ? ? ? public String toString() {
  46. ? ? ? ? ? ? ? ? ? ? ? ? return new String(this.id + " " + this.name);
  47. ? ? ? ? ? ? ? ? }
  48. ? ? ? ? ? ? ? ? @Override
  49. ? ? ? ? ? ? ? ? public void write(PreparedStatement stmt) throws SQLException {
  50. ? ? ? ? ? ? ? ? ? ? ? ? stmt.setInt(1, this.id);
  51. ? ? ? ? ? ? ? ? ? ? ? ? stmt.setString(2, this.name);
  52. ? ? ? ? ? ? ? ? }
  53. ? ? ? ? ? ? ? ? @Override
  54. ? ? ? ? ? ? ? ? public void readFields(ResultSet result) throws SQLException {
  55. ? ? ? ? ? ? ? ? ? ? ? ? this.id = result.getInt(1);
  56. ? ? ? ? ? ? ? ? ? ? ? ? this.name = result.getString(2);
  57. ? ? ? ? ? ? ? ? }
  58. ? ? ? ? ? ? ? ? @Override
  59. ? ? ? ? ? ? ? ? public void write(DataOutput out) throws IOException {
  60. ? ? ? ? ? ? ? ? ? ? ? ? out.writeInt(this.id);
  61. ? ? ? ? ? ? ? ? ? ? ? ? Text.writeString(out, this.name);
  62. ? ? ? ? ? ? ? ? }
  63. ? ? ? ? }
  64. ? ? ? ? // 记住此处是静态内部类,要不然你自己实现无参构造器,或者等着抛异常:
  65. ? ? ? ? // Caused by: java.lang.NoSuchMethodException: DBInputMapper.()
  66. ? ? ? ? // http://stackoverflow.com/questions/7154125/custom-mapreduce-input-format-cant-find-constructor
  67. ? ? ? ? // 网上脑残式的转帖,没见到一个写对的。。。
  68. ? ? ? ? public static class DBInputMapper extends MapReduceBase implements
  69. ? ? ? ? ? ? ? ? ? ? ? ? Mapper {
  70. ? ? ? ? ? ? ? ? public void map(LongWritable key, StudentinfoRecord value,
  71. ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? OutputCollector collector, Reporter reporter) throws IOException {
  72. ? ? ? ? ? ? ? ? ? ? ? ? collector.collect(new LongWritable(value.id), new Text(value.toString()));
  73. ? ? ? ? ? ? ? ? }
  74. ? ? ? ? }
  75. ? ? ? ? public static class MyReducer extends MapReduceBase implements
  76. ? ? ? ? ? ? ? ? ? ? ? ? Reducer {
  77. ? ? ? ? ? ? ? ? @Override
  78. ? ? ? ? ? ? ? ? public void reduce(LongWritable key, Iterator values,
  79. ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? OutputCollector output, Reporter reporter) throws IOException {
  80. ? ? ? ? ? ? ? ? ? ? ? ? String[] splits = values.next().toString().split(" ");
  81. ? ? ? ? ? ? ? ? ? ? ? ? StudentinfoRecord r = new StudentinfoRecord();
  82. ? ? ? ? ? ? ? ? ? ? ? ? r.id = Integer.parseInt(splits[0]);
  83. ? ? ? ? ? ? ? ? ? ? ? ? r.name = splits[1];
  84. ? ? ? ? ? ? ? ? ? ? ? ? output.collect(r, new Text(r.name));
  85. ? ? ? ? ? ? ? ? }
  86. ? ? ? ? }
  87. ? ? ? ? public static void main(String[] args) throws IOException {
  88. ? ? ? ? ? ? ? ? JobConf conf = new JobConf(Mysql2Mr.class);
  89. ? ? ? ? ? ? ? ? DistributedCache.addFileToClassPath(new Path("/tmp/mysql-connector-java-5.0.8-bin.jar"), conf);
  90. ? ? ? ? ? ? ? ? conf.setMapOutputKeyClass(LongWritable.class);
  91. ? ? ? ? ? ? ? ? conf.setMapOutputValueClass(Text.class);
  92. ? ? ? ? ? ? ? ? conf.setOutputKeyClass(LongWritable.class);
  93. ? ? ? ? ? ? ? ? conf.setOutputValueClass(Text.class);
  94. ? ? ? ? ? ? ? ? conf.setOutputFormat(DBOutputFormat.class);
  95. ? ? ? ? ? ? ? ? conf.setInputFormat(DBInputFormat.class);
  96. ? ? ? ? ? ? ? ? // // mysql to hdfs
  97. ? ? ? ? ? ? ? ? // conf.setReducerClass(IdentityReducer.class);
  98. ? ? ? ? ? ? ? ? // Path outPath = new Path("/tmp/1");
  99. ? ? ? ? ? ? ? ? // FileSystem.get(conf).delete(outPath, true);
  100. ? ? ? ? ? ? ? ? // FileOutputFormat.setOutputPath(conf, outPath);
  101. ? ? ? ? ? ? ? ? DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://192.168.1.101:3306/test",
  102. ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? "root", "root");
  103. ? ? ? ? ? ? ? ? String[] fields = { "id", "name" };
  104. ? ? ? ? ? ? ? ? // 从 t 表读数据
  105. ? ? ? ? ? ? ? ? DBInputFormat.setInput(conf, StudentinfoRecord.class, "t", null, "id", fields);
  106. ? ? ? ? ? ? ? ? // mapreduce 将数据输出到 t2 表
  107. ? ? ? ? ? ? ? ? DBOutputFormat.setOutput(conf, "t2", "id", "name");
  108. ? ? ? ? ? ? ? ? // conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);
  109. ? ? ? ? ? ? ? ? conf.setMapperClass(DBInputMapper.class);
  110. ? ? ? ? ? ? ? ? conf.setReducerClass(MyReducer.class);
  111. ? ? ? ? ? ? ? ? JobClient.runJob(conf);
  112. ? ? ? ? }
  113. }
复制代码



6、结果:

执行两次后,你可以看到mysql结果:

  1. mysql> select * from t2;
  2. +------+-------+
  3. | id? ?| name??|
  4. +------+-------+
  5. |? ? 1 | june??|
  6. |? ? 2 | decli |
  7. |? ? 3 | hello |
  8. |? ? 4 | june??|
  9. |? ? 5 | decli |
  10. |? ? 6 | hello |
  11. |? ? 7 | june??|
  12. |? ? 8 | decli |
  13. |? ? 9 | hello |
  14. |? ?10 | june??|
  15. |? ?11 | june??|
  16. |? ?12 | decli |
  17. |? ?13 | hello |
  18. |? ? 1 | june??|
  19. |? ? 2 | decli |
  20. |? ? 3 | hello |
  21. |? ? 4 | june??|
  22. |? ? 5 | decli |
  23. |? ? 6 | hello |
  24. |? ? 7 | june??|
  25. |? ? 8 | decli |
  26. |? ? 9 | hello |
  27. |? ?10 | june??|
  28. |? ?11 | june??|
  29. |? ?12 | decli |
  30. |? ?13 | hello |
  31. +------+-------+
  32. 26 rows in set (0.00 sec)
  33. mysql>
复制代码


7、日志:

  1. 13/07/29 02:33:03 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
  2. 13/07/29 02:33:03 INFO filecache.TrackerDistributedCacheManager: Creating mysql-connector-java-5.0.8-bin.jar in /tmp/hadoop-june/mapred/local/archive/-8943686319031389138_-1232673160_640840668/192.168.1.101/tmp-work--8372797484204470322 with rwxr-xr-x
  3. 13/07/29 02:33:03 INFO filecache.TrackerDistributedCacheManager: Cached hdfs://192.168.1.101:9000/tmp/mysql-connector-java-5.0.8-bin.jar as /tmp/hadoop-june/mapred/local/archive/-8943686319031389138_-1232673160_640840668/192.168.1.101/tmp/mysql-connector-java-5.0.8-bin.jar
  4. 13/07/29 02:33:03 INFO filecache.TrackerDistributedCacheManager: Cached hdfs://192.168.1.101:9000/tmp/mysql-connector-java-5.0.8-bin.jar as /tmp/hadoop-june/mapred/local/archive/-8943686319031389138_-1232673160_640840668/192.168.1.101/tmp/mysql-connector-java-5.0.8-bin.jar
  5. 13/07/29 02:33:03 INFO mapred.JobClient: Running job: job_local_0001
  6. 13/07/29 02:33:03 INFO mapred.MapTask: numReduceTasks: 1
  7. 13/07/29 02:33:03 INFO mapred.MapTask: io.sort.mb = 100
  8. 13/07/29 02:33:03 INFO mapred.MapTask: data buffer = 79691776/99614720
  9. 13/07/29 02:33:03 INFO mapred.MapTask: record buffer = 262144/327680
  10. 13/07/29 02:33:03 INFO mapred.MapTask: Starting flush of map output
  11. 13/07/29 02:33:03 INFO mapred.MapTask: Finished spill 0
  12. 13/07/29 02:33:03 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
  13. 13/07/29 02:33:04 INFO mapred.JobClient:??map 0% reduce 0%
  14. 13/07/29 02:33:06 INFO mapred.LocalJobRunner:?
  15. 13/07/29 02:33:06 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
  16. 13/07/29 02:33:06 INFO mapred.LocalJobRunner:?
  17. 13/07/29 02:33:06 INFO mapred.Merger: Merging 1 sorted segments
  18. 13/07/29 02:33:06 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 235 bytes
  19. 13/07/29 02:33:06 INFO mapred.LocalJobRunner:?
  20. 13/07/29 02:33:06 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
  21. 13/07/29 02:33:07 INFO mapred.JobClient:??map 100% reduce 0%
  22. 13/07/29 02:33:09 INFO mapred.LocalJobRunner: reduce > reduce
  23. 13/07/29 02:33:09 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
  24. 13/07/29 02:33:09 WARN mapred.FileOutputCommitter: Output path is null in cleanup
  25. 13/07/29 02:33:10 INFO mapred.JobClient:??map 100% reduce 100%
  26. 13/07/29 02:33:10 INFO mapred.JobClient: Job complete: job_local_0001
  27. 13/07/29 02:33:10 INFO mapred.JobClient: Counters: 18
  28. 13/07/29 02:33:10 INFO mapred.JobClient:? ?File Input Format Counters?
  29. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Bytes Read=0
  30. 13/07/29 02:33:10 INFO mapred.JobClient:? ?File Output Format Counters?
  31. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Bytes Written=0
  32. 13/07/29 02:33:10 INFO mapred.JobClient:? ?FileSystemCounters
  33. 13/07/29 02:33:10 INFO mapred.JobClient:? ???FILE_BYTES_READ=1211691
  34. 13/07/29 02:33:10 INFO mapred.JobClient:? ???HDFS_BYTES_READ=1081704
  35. 13/07/29 02:33:10 INFO mapred.JobClient:? ???FILE_BYTES_WRITTEN=2392844
  36. 13/07/29 02:33:10 INFO mapred.JobClient:? ?Map-Reduce Framework
  37. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Map output materialized bytes=239
  38. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Map input records=13
  39. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Reduce shuffle bytes=0
  40. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Spilled Records=26
  41. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Map output bytes=207
  42. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Map input bytes=13
  43. 13/07/29 02:33:10 INFO mapred.JobClient:? ???SPLIT_RAW_BYTES=75
  44. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Combine input records=0
  45. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Reduce input records=13
  46. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Reduce input groups=13
  47. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Combine output records=0
  48. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Reduce output records=13
  49. 13/07/29 02:33:10 INFO mapred.JobClient:? ???Map output records=13
复制代码







MapReduce直接连接Mysql获取数据






Mysql中数据:

  1. mysql> select * from lxw_tbls;
  2. +---------------------+----------------+
  3. | TBL_NAME? ?? ?? ?? ?| TBL_TYPE? ?? ? |
  4. +---------------------+----------------+
  5. | lxw_test_table? ?? ?| EXTERNAL_TABLE |
  6. | lxw_t? ?? ?? ?? ?? ?| MANAGED_TABLE??|
  7. | lxw_t1? ?? ?? ?? ???| MANAGED_TABLE??|
  8. | tt? ?? ?? ?? ?? ?? ?| MANAGED_TABLE??|
  9. | tab_partition? ?? ? | MANAGED_TABLE??|
  10. | lxw_hbase_table_1? ?| MANAGED_TABLE??|
  11. | lxw_hbase_user_info | MANAGED_TABLE??|
  12. | t? ?? ?? ?? ?? ?? ? | EXTERNAL_TABLE |
  13. | lxw_jobid? ?? ?? ???| MANAGED_TABLE??|
  14. +---------------------+----------------+
  15. 9 rows in set (0.01 sec)
  16. mysql> select * from lxw_tbls where TBL_NAME like 'lxw%' order by TBL_NAME;
  17. +---------------------+----------------+
  18. | TBL_NAME? ?? ?? ?? ?| TBL_TYPE? ?? ? |
  19. +---------------------+----------------+
  20. | lxw_hbase_table_1? ?| MANAGED_TABLE??|
  21. | lxw_hbase_user_info | MANAGED_TABLE??|
  22. | lxw_jobid? ?? ?? ???| MANAGED_TABLE??|
  23. | lxw_t? ?? ?? ?? ?? ?| MANAGED_TABLE??|
  24. | lxw_t1? ?? ?? ?? ???| MANAGED_TABLE??|
  25. | lxw_test_table? ?? ?| EXTERNAL_TABLE |
  26. +---------------------+----------------+
  27. 6 rows in set (0.00 sec)
复制代码


MapReduce程序代码,ConnMysql.java:

  1. package com.lxw.study;
  2. import java.io.DataInput;
  3. import java.io.DataOutput;
  4. import java.io.IOException;
  5. import java.net.URI;
  6. import java.sql.PreparedStatement;
  7. import java.sql.ResultSet;
  8. import java.sql.SQLException;
  9. import java.util.Iterator;
  10. import org.apache.hadoop.conf.Configuration;
  11. import org.apache.hadoop.filecache.DistributedCache;
  12. import org.apache.hadoop.fs.FileSystem;
  13. import org.apache.hadoop.fs.Path;
  14. import org.apache.hadoop.io.LongWritable;
  15. import org.apache.hadoop.io.Text;
  16. import org.apache.hadoop.io.Writable;
  17. import org.apache.hadoop.mapreduce.Job;
  18. import org.apache.hadoop.mapreduce.Mapper;
  19. import org.apache.hadoop.mapreduce.Reducer;
  20. import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
  21. import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
  22. import org.apache.hadoop.mapreduce.lib.db.DBWritable;
  23. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  24. public class ConnMysql {
  25. ? ?? ???
  26. ? ?? ???private static Configuration conf = new Configuration();
  27. ? ?? ???
  28. ? ?? ???static {
  29. ? ?? ?? ?? ?? ? conf.addResource(new Path("F:/lxw-hadoop/hdfs-site.xml"));
  30. ? ?? ?? ?? ?? ? conf.addResource(new Path("F:/lxw-hadoop/mapred-site.xml"));
  31. ? ?? ?? ?? ?? ? conf.addResource(new Path("F:/lxw-hadoop/core-site.xml"));
  32. ? ?? ?? ?? ?? ? conf.set("mapred.job.tracker", "10.133.103.21:50021");
  33. ? ?? ???}
  34. ? ?? ???
  35. ? ?? ???public static class TblsRecord implements Writable, DBWritable {
  36. ? ?? ?? ?? ?? ? String tbl_name;
  37. ? ?? ?? ?? ?? ? String tbl_type;
  38. ? ?? ?? ?? ?? ? public TblsRecord() {
  39. ? ?? ?? ?? ?? ? }
  40. ? ?? ?? ?? ?? ? @Override
  41. ? ?? ?? ?? ?? ? public void write(PreparedStatement statement) throws SQLException {
  42. ? ?? ?? ?? ?? ?? ?? ?? ?// TODO Auto-generated method stub
  43. ? ?? ?? ?? ?? ?? ?? ?? ?statement.setString(1, this.tbl_name);
  44. ? ?? ?? ?? ?? ?? ?? ?? ?statement.setString(2, this.tbl_type);
  45. ? ?? ?? ?? ?? ? }
  46. ? ?? ?? ?? ?? ? @Override
  47. ? ?? ?? ?? ?? ? public void readFields(ResultSet resultSet) throws SQLException {
  48. ? ?? ?? ?? ?? ?? ?? ?? ?// TODO Auto-generated method stub
  49. ? ?? ?? ?? ?? ?? ?? ?? ?this.tbl_name = resultSet.getString(1);
  50. ? ?? ?? ?? ?? ?? ?? ?? ?this.tbl_type = resultSet.getString(2);
  51. ? ?? ?? ?? ?? ? }
  52. ? ?? ?? ?? ?? ? @Override
  53. ? ?? ?? ?? ?? ? public void write(DataOutput out) throws IOException {
  54. ? ?? ?? ?? ?? ?? ?? ?? ?// TODO Auto-generated method stub
  55. ? ?? ?? ?? ?? ?? ?? ?? ?Text.writeString(out, this.tbl_name);
  56. ? ?? ?? ?? ?? ?? ?? ?? ?Text.writeString(out, this.tbl_type);
  57. ? ?? ?? ?? ?? ? }
  58. ? ?? ?? ?? ?? ? @Override
  59. ? ?? ?? ?? ?? ? public void readFields(DataInput in) throws IOException {
  60. ? ?? ?? ?? ?? ?? ?? ?? ?// TODO Auto-generated method stub
  61. ? ?? ?? ?? ?? ?? ?? ?? ?this.tbl_name = Text.readString(in);
  62. ? ?? ?? ?? ?? ?? ?? ?? ?this.tbl_type = Text.readString(in);
  63. ? ?? ?? ?? ?? ? }
  64. ? ?? ?? ?? ?? ? public String toString() {
  65. ? ?? ?? ?? ?? ?? ?? ?? ?return new String(this.tbl_name + " " + this.tbl_type);
  66. ? ?? ?? ?? ?? ? }
  67. ? ?? ???}
  68. ? ?? ???public static class ConnMysqlMapper extends Mapper {
  69. ? ?? ?? ?? ?? ? public void map(LongWritable key,TblsRecord values,Context context)?
  70. ? ?? ?? ?? ?? ?? ?? ?? ?? ?? ???throws IOException,InterruptedException {
  71. ? ?? ?? ?? ?? ?? ?? ?? ?context.write(new Text(values.tbl_name), new Text(values.tbl_type));
  72. ? ?? ?? ?? ?? ? }
  73. ? ?? ???}
  74. ? ?? ???
  75. ? ?? ???public static class ConnMysqlReducer extends Reducer {
  76. ? ?? ?? ?? ?? ? public void reduce(Text key,Iterable values,Context context)?
  77. ? ?? ?? ?? ?? ?? ?? ?? ?? ?? ???throws IOException,InterruptedException {
  78. ? ?? ?? ?? ?? ?? ?? ?? ?for(Iterator itr = values.iterator();itr.hasNext();) {
  79. ? ?? ?? ?? ?? ?? ?? ?? ?? ?? ???context.write(key, itr.next());
  80. ? ?? ?? ?? ?? ?? ?? ?? ?}
  81. ? ?? ?? ?? ?? ? }
  82. ? ?? ???}
  83. ? ?? ???
  84. ? ?? ???public static void main(String[] args) throws Exception {
  85. ? ?? ?? ?? ?? ? Path output = new Path("/user/lxw/output/");
  86. ? ?? ?? ?? ?? ??
  87. ? ?? ?? ?? ?? ? FileSystem fs = FileSystem.get(URI.create(output.toString()), conf);
  88. ? ?? ?? ?? ?? ? if (fs.exists(output)) {
  89. ? ?? ?? ?? ?? ?? ?? ?? ?fs.delete(output);
  90. ? ?? ?? ?? ?? ? }
  91. ? ?? ?? ?? ?? ??
  92. ? ?? ?? ?? ?? ? //mysql的jdbc驱动
  93. ? ?? ?? ?? ?? ? DistributedCache.addFileToClassPath(new Path(??
  94. ? ?? ?? ?? ?? ?? ?? ?? ???"hdfs://hd022-test.nh.sdo.com/user/liuxiaowen/mysql-connector-java-5.1.13-bin.jar"), conf);??
  95. ? ?? ?? ?? ?? ??
  96. ? ?? ?? ?? ?? ? DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",??
  97. ? ?? ?? ?? ?? ?? ?? ?? ???"jdbc:mysql://10.133.103.22:3306/hive", "hive", "hive");??
  98. ? ?? ?? ?? ?? ??
  99. ? ?? ?? ?? ?? ? Job job = new Job(conf,"test mysql connection");
  100. ? ?? ?? ?? ?? ? job.setJarByClass(ConnMysql.class);
  101. ? ?? ?? ?? ?? ??
  102. ? ?? ?? ?? ?? ? job.setMapperClass(ConnMysqlMapper.class);
  103. ? ?? ?? ?? ?? ? job.setReducerClass(ConnMysqlReducer.class);
  104. ? ?? ?? ?? ?? ??
  105. ? ?? ?? ?? ?? ? job.setOutputKeyClass(Text.class);
  106. ? ?? ?? ?? ?? ? job.setOutputValueClass(Text.class);
  107. ? ?? ?? ?? ?? ??
  108. ? ?? ?? ?? ?? ? job.setInputFormatClass(DBInputFormat.class);
  109. ? ?? ?? ?? ?? ? FileOutputFormat.setOutputPath(job, output);
  110. ? ?? ?? ?? ?? ??
  111. ? ?? ?? ?? ?? ? //列名
  112. ? ?? ?? ?? ?? ? String[] fields = { "TBL_NAME", "TBL_TYPE" };?
  113. ? ?? ?? ?? ?? ? //六个参数分别为:
  114. ? ?? ?? ?? ?? ? //1.Job;2.Class extends DBWritable>
  115. ? ?? ?? ?? ?? ? //3.表名;4.where条件
  116. ? ?? ?? ?? ?? ? //5.order by语句;6.列名
  117. ? ?? ?? ?? ?? ? DBInputFormat.setInput(job, TblsRecord.class,
  118. ? ?? ?? ?? ?? ?? ?? ?"lxw_tbls", "TBL_NAME like 'lxw%'", "TBL_NAME", fields);??
  119. ? ?? ?? ?? ?? ??
  120. ? ?? ?? ?? ?? ? System.exit(job.waitForCompletion(true) ? 0 : 1);
  121. ? ?? ???}
  122. ? ?? ???
  123. }
复制代码


运行结果:

  1. [lxw@hd025-test ~]$ hadoop fs -cat /user/lxw/output/part-r-00000
  2. lxw_hbase_table_1? ?? ? MANAGED_TABLE
  3. lxw_hbase_user_info? ???MANAGED_TABLE
  4. lxw_jobid? ?? ? MANAGED_TABLE
  5. lxw_t? ?MANAGED_TABLE
  6. lxw_t1??MANAGED_TABLE
  7. lxw_test_table??EXTERNAL_TABLE
复制代码

http://www.aboutyun.com/forum.php?highlight=MapReduce+MySQL&mod=viewthread&tid=7405

声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
MySQLの役割:WebアプリケーションのデータベースMySQLの役割:WebアプリケーションのデータベースApr 17, 2025 am 12:23 AM

WebアプリケーションにおけるMySQLの主な役割は、データを保存および管理することです。 1.MYSQLは、ユーザー情報、製品カタログ、トランザクションレコード、その他のデータを効率的に処理します。 2。SQLクエリを介して、開発者はデータベースから情報を抽出して動的なコンテンツを生成できます。 3.MYSQLは、クライアントサーバーモデルに基づいて機能し、許容可能なクエリ速度を確保します。

MySQL:最初のデータベースを構築しますMySQL:最初のデータベースを構築しますApr 17, 2025 am 12:22 AM

MySQLデータベースを構築する手順には次のものがあります。1。データベースとテーブルの作成、2。データの挿入、および3。クエリを実行します。まず、createdAtabaseおよびcreateTableステートメントを使用してデータベースとテーブルを作成し、InsertINTOステートメントを使用してデータを挿入し、最後にSelectステートメントを使用してデータを照会します。

MySQL:データストレージに対する初心者向けのアプローチMySQL:データストレージに対する初心者向けのアプローチApr 17, 2025 am 12:21 AM

MySQLは、使いやすく強力であるため、初心者に適しています。 1.MYSQLはリレーショナルデータベースであり、CRUD操作にSQLを使用します。 2。インストールは簡単で、ルートユーザーのパスワードを構成する必要があります。 3.挿入、更新、削除、および選択してデータ操作を実行します。 4. Orderby、Where and Joinは複雑なクエリに使用できます。 5.デバッグでは、構文をチェックし、説明を使用してクエリを分析する必要があります。 6.最適化の提案には、インデックスの使用、適切なデータ型の選択、優れたプログラミング習慣が含まれます。

MySQLは初心者に優しいですか?学習曲線の評価MySQLは初心者に優しいですか?学習曲線の評価Apr 17, 2025 am 12:19 AM

MySQLは初心者に適しています。1)インストールと構成、2)リッチラーニングリソース、3)直感的なSQL構文、4)強力なツールサポート。それにもかかわらず、初心者はデータベースの設計、クエリの最適化、セキュリティ管理、データのバックアップなどの課題を克服する必要があります。

SQLはプログラミング言語ですか?用語を明確にするSQLはプログラミング言語ですか?用語を明確にするApr 17, 2025 am 12:17 AM

はい、sqlisaprogramginglanguagespecializedfordatamanamanagement.1)それはdeclarative、focusingonwhattoachieveratherthanhow.2)

酸性の特性(原子性、一貫性、分離、耐久性)を説明します。酸性の特性(原子性、一貫性、分離、耐久性)を説明します。Apr 16, 2025 am 12:20 AM

酸性属性には、原子性、一貫性、分離、耐久性が含まれ、データベース設計の基礎です。 1.原子性は、トランザクションが完全に成功するか、完全に失敗することを保証します。 2.一貫性により、データベースがトランザクションの前後に一貫性を保証します。 3.分離により、トランザクションが互いに干渉しないようにします。 4.永続性により、トランザクションの提出後にデータが永久に保存されることが保証されます。

MySQL:データベース管理システムとプログラミング言語MySQL:データベース管理システムとプログラミング言語Apr 16, 2025 am 12:19 AM

MySQLは、データベース管理システム(DBMS)であるだけでなく、プログラミング言語にも密接に関連しています。 1)DBMSとして、MySQLはデータを保存、整理、取得するために使用され、インデックスを最適化するとクエリのパフォーマンスが向上する可能性があります。 2)SQLとPythonに埋め込まれたプログラミング言語とSQLalchemyなどのORMツールを使用すると、操作を簡素化できます。 3)パフォーマンスの最適化には、インデックス、クエリ、キャッシュ、ライブラリ、テーブル分割、およびトランザクション管理が含まれます。

MySQL:SQLコマンドでデータの管理MySQL:SQLコマンドでデータの管理Apr 16, 2025 am 12:19 AM

MySQLはSQLコマンドを使用してデータを管理します。 1.基本コマンドには、select、挿入、更新、削除が含まれます。 2。高度な使用には、参加、サブクエリ、および集計関数が含まれます。 3.一般的なエラーには、構文、ロジック、パフォーマンスの問題が含まれます。 4。最適化のヒントには、インデックスの使用、Select*の回避、制限の使用が含まれます。

See all articles

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

SecLists

SecLists

SecLists は、セキュリティ テスターの究極の相棒です。これは、セキュリティ評価中に頻繁に使用されるさまざまな種類のリストを 1 か所にまとめたものです。 SecLists は、セキュリティ テスターが必要とする可能性のあるすべてのリストを便利に提供することで、セキュリティ テストをより効率的かつ生産的にするのに役立ちます。リストの種類には、ユーザー名、パスワード、URL、ファジング ペイロード、機密データ パターン、Web シェルなどが含まれます。テスターはこのリポジトリを新しいテスト マシンにプルするだけで、必要なあらゆる種類のリストにアクセスできるようになります。

WebStorm Mac版

WebStorm Mac版

便利なJavaScript開発ツール

mPDF

mPDF

mPDF は、UTF-8 でエンコードされた HTML から PDF ファイルを生成できる PHP ライブラリです。オリジナルの作者である Ian Back は、Web サイトから「オンザフライ」で PDF ファイルを出力し、さまざまな言語を処理するために mPDF を作成しました。 HTML2FPDF などのオリジナルのスクリプトよりも遅く、Unicode フォントを使用すると生成されるファイルが大きくなりますが、CSS スタイルなどをサポートし、多くの機能強化が施されています。 RTL (アラビア語とヘブライ語) や CJK (中国語、日本語、韓国語) を含むほぼすべての言語をサポートします。ネストされたブロックレベル要素 (P、DIV など) をサポートします。

VSCode Windows 64 ビットのダウンロード

VSCode Windows 64 ビットのダウンロード

Microsoft によって発売された無料で強力な IDE エディター

DVWA

DVWA

Damn Vulnerable Web App (DVWA) は、非常に脆弱な PHP/MySQL Web アプリケーションです。その主な目的は、セキュリティ専門家が法的環境でスキルとツールをテストするのに役立ち、Web 開発者が Web アプリケーションを保護するプロセスをより深く理解できるようにし、教師/生徒が教室環境で Web アプリケーションを教え/学習できるようにすることです。安全。 DVWA の目標は、シンプルでわかりやすいインターフェイスを通じて、さまざまな難易度で最も一般的な Web 脆弱性のいくつかを実践することです。このソフトウェアは、