基于Hdfs、hive、mysql数据处理案例,闲时自玩项目
数据采集方式有很多种,一般在项目中采用数据上报方式。本地为了方便测试则采用读取csv文件。后续python自动抓取数据。
链接: 提取码: r23c
数据量不多,侧重于功能
清洗数据,统计分析数据,结果存储HDFS ,加载至HIVE, Sqoop至MYSQL
public String transfer(File file, String folderPath, String fileName) throws Exception {if (!opened) {throw new Exception("FileSystem was not opened!");}boolean folderCreated = fs.mkdirs(new Path(folderPath));Path filePath = new Path(folderPath, StrUtils.isEmpty(fileName) ? Name() : fileName);boolean fileCreated = fs.createNewFile(filePath);FSDataOutputStream append = fs.append(filePath);byte[] bytes = new byte[COPY_BUFFERSIZE];int size = 0;FileInputStream fileInputStream = new FileInputStream(file);while ((size = ad(bytes)) > 0) {append.write(bytes, 0, size);}fileInputStream.close();Uri().toString();}
//表String yyyyMMdd = hiveTable + DateUtil.formatDate(new Date(), "yyyyMMdd");//参数Map<String, String> map = new HashMap<>();map.put("title", "STRING");map.put("discountPrice", "STRING");map.put("price", "STRING");map.put("address", "STRING");map.put("count", "STRING");//创建表 按天分表ateHiveTable(yyyyMMdd, map);//将dfs数据加载到hive表hiveDataService.DfsPath(), yyyyMMdd);/*** @param tableName hive表名* @param parametersMap 表字段值/类型*/@Overridepublic void createHiveTable(String tableName, Map<String, String> parametersMap) {StringBuffer sql = new StringBuffer("CREATE TABLE IF NOT EXISTS ");sql.append("" + tableName + "");StringBuffer sb = new StringBuffer();parametersMap.forEach((k, v) -> {sb.append(k + " " + v + ",");});sql.append("(" + sb.deleteCharAt(sb.length() - 1) + ")");sql.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY 'n' "); // 定义分隔符sql.append("STORED AS TEXTFILE"); // 作为文本存储Log.info("Create table [" + tableName + "] ");try {String());} catch (DataAccessException dae) {(dae.fillInStackTrace());}}/*** @param filePath dfs文件路径* @param tableName 表名*/@Overridepublic void loadHiveIntoTable(String filePath, String tableName) {StringBuffer sql = new StringBuffer("load data inpath ");sql.append("'" + filePath + "'into table " + tableName);Log.info("Load data into ");try {String());} catch (DataAccessException dae) {(dae.fillInStackTrace());}}
上述代码中有一步为load data 至hive。在于朋友交流中,他提醒可以直接利用
外部加载数据
,自此代码如下:
外部表
好处---------------------------------java代码-----------------------------------------/*** 利用外部表加载数据 ** @param tableName hive表名* @param parametersMap 表字段值/类型* @param dfsUrl dfs文件地址*/@Overridepublic synchronized void createOuterHiveTable(String tableName, Map<String, String> parametersMap, String dfsUrl) {StringBuffer sql = new StringBuffer("CREATE EXTERNAL TABLE IF NOT EXISTS ");sql.append("" + tableName + "");StringBuffer sb = new StringBuffer();parametersMap.forEach((k, v) -> {sb.append(k + " " + v + ",");});sql.append("(" + sb.deleteCharAt(sb.length() - 1) + ")");sql.append(" PARTITIONED BY (day STRING)");sql.append(" ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' " +" COLLECTION ITEMS TERMINATED BY '\002'" +" MAP KEYS TERMINATED BY '\003'" +" LINES TERMINATED BY 'n' "); // 定义分隔符sql.append("LOCATION '" + dfsUrl + "'"); // 外部表加载hdfs数据目录Log.info("Create EXTERNAL table [" + tableName + "] ");try {String());} catch (DataAccessException dae) {(dae.fillInStackTrace());}}
------------------------------------Sql---------------------------------------------CREATE EXTERNAL TABLE IF NOT EXISTS xx_outer_partitioned(affiliatedbasenum STRING,locationid STRING,pickupdatedispatchingbasenum STRING)PARTITIONED BY (day STRING)ROW FORMAT DELIMITEDFIELDS TERMINATED BY ','COLLECTION ITEMS TERMINATED BY '