大数据离线处理数据项目(二)数据清洗 ETL 编写MapReduce程序实现数据清洗

阅读: 评论:0

大数据离线处理数据项目(二)数据清洗 ETL 编写MapReduce程序实现数据清洗

大数据离线处理数据项目(二)数据清洗 ETL 编写MapReduce程序实现数据清洗

简介:

实现的功能:对采集到的日志数据进行清洗,过滤无效数据、静态资源

方法:编写MapReduce进行处理

涉及到的类:

1)实体类Bean

描述日志数据的各个字段:如客户端的ip、请求的url、请求状态等等...

2)工具类

用来处理Bean:设置日志的有效或无效,过滤无效日志

3)Map类

编写Map程序

4)Driver类

先进行日志数据分析:

1、日志数据拆分

举一条日志数据为例进行分析: 

194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"

日志数据都是有一定的规律的。看这条日志数据每个数据彼此之间都是通过空格来隔开的,所以我们可以:

1)每一条日志数据通过空格进行分割;

2)定义一个字符串数组,来接收每一条日志数据;

//创建一个实体Bean对象
WebLogBean webLogBean = new WebLogBean(); 
//把每一行line代表一条日志数据进行拆分并且存储到一个字符串数组里
String[] arr = line.split(" ");

但并不是所有的日志数据都会像上面的一样,可能有的很短(数据不全),有的很长(用户浏览器信息字段较长)

所以我们要对字符串数组进行判断:

1)字符串长度小于11的,即是数据不全,直接不要

2)长度大于11

长度大于11,并且大于12的,说明最后一个字段:用户浏览器信息过长

此时我们把字符串数组前面的数据写进Bean实体,把字符串数组中关于用户浏览器的字符串存储到一个StringBuilder对象中,最后再写进Bean实体的用户浏览器信息里即可

StringBuilder sb = new StringBuilder();for(int i=11;i<arr.length;i++){  //从a[11]开始,直至末尾,把字段都加进sb里面sb.append(arr[i]);
}

长度大于11,但不大于12的,即长度等于12,“标准”的日志数据,直接把字符串数组各个数据写进Bean实体即可

所以日志数据有三种情况:1、“标准” 2、数据不全 3、数据较长

2、日志数据处理

1)时间

如果字符串数组中获取到的请求时间为“null”或者为空,即把实体"-invalid_time-"写进Bean中

if(null==time_local || "".equals(time_local)) {time_local="-invalid_time-";
}
webLogBean.setTime_local(time_local);

或者说时间没获取到,也认为是无效数据

if("-invalid_time-".Time_local())){webLogBean.setValid(false);
}

2)状态码

如果请求状态码大于400,即说明请求出错,我们视之为无效数据并写入Bean

if (Integer.Status()) >= 400) {// 大于400,HTTP错误webLogBean.setValid(false);
}

3)字符串数组长度

如果字符串数组长度小于11——数据不全,无效!把Bean写为null

webLogBean=null;

4)请求的url不在我们的集合内,视为无效!(集合可自定义)

public static void filtStaticResource(WebLogBean bean, Set<String> pages) {
if (!Request())) {//如果请求的url不在我们定义的集合内,则视为静态资源,设为falsebean.setValid(false);}
}

小结

日志数据:

1、小于11:无效 

2、大于11:有效,写入 

3、大于12:用户浏览器较长,处理后写入

Bean无效的三个点:

1、时间为空、为null、或者时间获取不到

2、请求为静态资源

3、请求的url不属于集合(自定义) 

目录结构如下:

老规矩,先上pom文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns=".0.0"xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.chen</groupId><artifactId>bigDataProject_1202ETL</artifactId><version>1.0-SNAPSHOT</version><properties><mavenpiler.source>8</mavenpiler.source><mavenpiler.target>8</mavenpiler.target><!--设置项目的编码为UTF-8--><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--使用java8进行编码--><mavenpiler.source>1.8</mavenpiler.source><!--使用java8来进行源码编译--><mavenpiler.target>1.8</mavenpiler.target><!--设置hadoop的版本--><hadoop.version>3.1.2</hadoop.version></properties><!--jar包的依赖--><dependencies><!--测试的依赖坐标--><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version></dependency><!--日志打印的依赖坐标--><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><!--hadoop的通用模块的依赖坐标--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>${hadoop.version}</version></dependency><!--hadoop的对HDFS分布式文件系统访问的技术支持的依赖坐标--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>${hadoop.version}</version></dependency><!--hadoop的客户端访问的依赖坐标--><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>${hadoop.version}</version></dependency></dependencies>
</project>

实体类WebLogBean: 

package com.bean;import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;public class WebLogBean  implements Writable {private boolean valid = true;// 判断数据是否合法private String remote_addr;// 记录客户端的ip地址private String remote_user;// 记录客户端用户名称,忽略属性"-"private String time_local;// 记录访问时间与时区private String request;// 记录请求的url与http协议private String status;// 记录请求状态;成功是200private String body_bytes_sent;// 记录发送给客户端文件主体a内容大小private String http_referer;// 用来记录从那个页面链接访问过来的private String http_user_agent;// 记录客户浏览器的相关信息public void set(boolean valid,String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) {this.valid = _addr = remote__user = remote_user;this.time_local = time_quest = request;this.status = status;this.body_bytes_sent = body_bytes_sent;this.http_referer = http_referer;this.http_user_agent = http_user_agent;}public String getRemote_addr() {return remote_addr;}public void setRemote_addr(String remote_addr) {_addr = remote_addr;}public String getRemote_user() {return remote_user;}public void setRemote_user(String remote_user) {_user = remote_user;}public String getTime_local() {return this.time_local;}public void setTime_local(String time_local) {this.time_local = time_local;}public String getRequest() {return request;}public void setRequest(String request) {quest = request;}public String getStatus() {return status;}public void setStatus(String status) {this.status = status;}public String getBody_bytes_sent() {return body_bytes_sent;}public void setBody_bytes_sent(String body_bytes_sent) {this.body_bytes_sent = body_bytes_sent;}public String getHttp_referer() {return http_referer;}public void setHttp_referer(String http_referer) {this.http_referer = http_referer;}public String getHttp_user_agent() {return http_user_agent;}public void setHttp_user_agent(String http_user_agent) {this.http_user_agent = http_user_agent;}public boolean isValid() {return valid;}public void setValid(boolean valid) {this.valid = valid;}/*** 01是hive当中默认的分隔符,不会出现用户手打出来的情况* @return*/@Overridepublic String toString() {StringBuilder sb = new StringBuilder();sb.append(this.valid);sb.append("01").Remote_addr());sb.append("01").Remote_user());sb.append("01").Time_local());sb.append("01").Request());sb.append("01").Status());sb.append("01").Body_bytes_sent());sb.append("01").Http_referer());sb.append("01").Http_user_agent());String();}@Overridepublic void readFields(DataInput in) throws IOException {this.valid = in.readBoolean();_addr = in.readUTF();_user = in.readUTF();this.time_local = in.readUTF();quest = in.readUTF();this.status = in.readUTF();this.body_bytes_sent = in.readUTF();this.http_referer = in.readUTF();this.http_user_agent = in.readUTF();}@Overridepublic void write(DataOutput out) throws IOException {out.writeBoolean(this.valid);out.writeUTF(null==remote_addr?"":remote_addr);out.writeUTF(null==remote_user?"":remote_user);out.writeUTF(null==time_local?"":time_local);out.writeUTF(null==request?"":request);out.writeUTF(null==status?"":status);out.writeUTF(null==body_bytes_sent?"":body_bytes_sent);out.writeUTF(null==http_referer?"":http_referer);out.writeUTF(null==http_user_agent?"":http_user_agent);}}

工具类WebLogParser

package com.chen.preETL.utils;import com.bean.WebLogBean;
ParseException;
SimpleDateFormat;
import java.util.Locale;
import java.util.Set;
//这是一个工具类,用来写Bean实体的信息,
public class WebLogParser {public static SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);public static WebLogBean parser(String line) {WebLogBean webLogBean = new WebLogBean();//通过空格来对我们的数据进行切割,然后拼接字符串,将我们同一个字段里面的数据拼接到一起//222.66.59.174  -- [18/Sep/2013:06:53:30 +0000] "GET /images/my.jpg HTTP/1.1" 200 19939 "" "Mozilla/5.0 (Windows NT 6.1; WOW64; rv:23.0) Gecko/20100101 Firefox/23.0"String[] arr = line.split(" ");if (arr.length > 11) {webLogBean.setRemote_addr(arr[0]);webLogBean.setRemote_user(arr[1]);//将我们的字符串转换成中文习惯的字符串//  [18/Sep/2013:06:52:32 +0000]//   18/Sep/2013:06:52:32------》2013-09-18 06:52:32String time_local = formatDate(arr[3].substring(1)); //获取从1开始到结束的字符//将获取到的时间进行判断  为null或者为空 则设置为invalidtime无效时间if(null==time_local || "".equals(time_local)) {time_local="-invalid_time-";}webLogBean.setTime_local(time_local);webLogBean.setRequest(arr[6]);  //写入用户请求的urlwebLogBean.setStatus(arr[8]);  //写入返回的状态码webLogBean.setBody_bytes_sent(arr[9]);   //写入返回的内容的字节大小webLogBean.setHttp_referer(arr[10]);  //写入用户访问开源 即从哪个页面跳转过来//如果useragent元素较多,拼接useragent。//  "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0; .NET CLR 1.1.4322; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.4506.2152; .NET CLR 3.5.30729; MDDR; InfoPath.2; .NET4.0C)"if (arr.length > 12) {  //如果大于12 即说明最后一个字段(用户浏览器信息)太长  进行处理(把它全加到最后一个字段里)//StringBuilder类似StringBuffer对象是一个字符序列可变的字符串,即可以把对象进行修改、重新赋值 区别:StringBuilder无线程安全,性能略高StringBuilder sb = new StringBuilder();for(int i=11;i<arr.length;i++){  //从a[11]开始,直至末尾,把字段都加进sb里面sb.append(arr[i]);}webLogBean.setHttp_user_String());   //最后再把sb转换为字符串写进实体Bean中} else { //大于11 不大于12,即等于12  如果等于12则直接写进实体BeanwebLogBean.setHttp_user_agent(arr[11]);}//如果请求状态码大于400值,就认为是请求出错了,请求出错的数据直接认为是无效数据if (Integer.Status()) >= 400) {// 大于400,HTTP错误webLogBean.setValid(false);}//如果获取时间没拿到,那么也是认为是无效的数据if("-invalid_time-".Time_local())){webLogBean.setValid(false);}} else {    //如果切出来的数组长度小于11个,说明数据不全,,直接丢掉//58.215.204.118 - - [18/Sep/2013:06:52:33 +0000] "-" 400 0 "-" "-"webLogBean=null;}return webLogBean;  //返回实体Bean}//总结:  字段: 1、小于11:无效   2、大于11:有效,写入  3、大于12:是最后一个字段(用户浏览器信息)太长,// 只需定义一个StringBuilder存储它们,然后将这个StringBuilder写入最后一个字段即可
// Bean为无效的三个点:1、时间无效 2、请求为静态资源 3、请求的url非我们自定义的url//根据自定义url来过滤  如果这个Bean请求的url不属于page集合,则它不是我们想要的日志数据,淘汰public static void filtStaticResource(WebLogBean bean, Set<String> pages) {if (!Request())) {bean.setValid(false);}}//格式化时间方法public static String formatDate(String time_local) {try {return df2.format(df1.parse(time_local));} catch (ParseException e) {return null;}}}

WeblogPreProcessMapper

package com.chen.preETL.mapper;import com.bean.WebLogBean;
import com.chen.preETL.utils.WebLogParser;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;
import java.util.HashSet;
import java.util.Set;public class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable>
{// 用来存储网站url分类数据  即自定义的url  可根据此集合来过滤日志数据,用户请求的url如果不在此集合内,即为“无效”数据Set<String> pages = new HashSet<String>();Text k = new Text();NullWritable v = ();/*** map阶段的初始化方法* 从外部配置文件中加载网站的有用url分类数据 存储到maptask的内存中,用来对日志数据进行过滤* 过滤掉我们日志文件当中的一些静态资源,包括js   css  img  等请求日志都需要过滤掉*/@Overrideprotected void setup(Context context) throws IOException, InterruptedException {//定义一个集合pages.add("/about");pages.add("/black-ip-list/");pages.add("/cassandra-clustor/");pages.add("/finance-rhive-repurchase/");pages.add("/hadoop-family-roadmap/");pages.add("/hadoop-hive-intro/");pages.add("/hadoop-zookeeper-intro/");pages.add("/hadoop-mahout-roadmap/");}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//得到我们一行数据String line = String();WebLogBean webLogBean = WebLogParser.parser(line);if (webLogBean != null) {  //只有当字段小于11个时(数据不全),webLogBean才会是null// 过滤js/图片/css等静态资源WebLogParser.filtStaticResource(webLogBean, pages);if (!webLogBean.isValid()) return;k.String());context.write(k, v);}}
}

WeblogEtlPreProcessDriver

package com.chen.preETL.driver;import com.chen.preETL.mapper.WeblogPreProcessMapper;
import org.f.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import java.io.IOException;public class WeblogEtlPreProcessDriver {static {   //在windows上运行就必须加上dll文件try {// 设置 HADOOP_HOME 目录System.setProperty("hadoop.home.dir", "E:\winutils-master\hadoop-3.0.0");// 加载库文件System.load("E:\winutils-master\hadoop-3.0.0\bin\hadoop.dll");} catch (UnsatisfiedLinkError e) {println("Native code library failed to load.n" + e);it(1);}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration configuration = new Configuration();Job job = Instance(configuration);FileInputFormat.addInputPath(job,new Path("D:\data\ETL_Input")); //设置输入文件路径job.setInputFormatClass(TextInputFormat.class); //输入文件类型FileOutputFormat.setOutputPath(job,new Path("///D:\data\weblogPreOut2")); //设置处理完的文件的存放路径job.setOutputFormatClass(TextOutputFormat.class);  //输出文件类型job.setJarByClass(WeblogEtlPreProcessDriver.class);  //设置driver类为本类job.setMapperClass(WeblogPreProcessMapper.class);//指定运行的map类job.setOutputKeyClass(Text.class);//设置输出的key的数据类型job.setOutputValueClass(NullWritable.class); //设置输出的value的数据类型job.setNumReduceTasks(0);boolean res = job.waitForCompletion(true);}
}

其中Driver类:

配置了日志数据文件存放及输出位置:

设置输入文件类型、输出文件类型;指定运行的map类;

设置了map类输出的key、value的数据类型

由于程序是在Windows上进行调试运行,所以需要配置hadoop.dll文件:

static {   //在windows上运行就必须加上dll文件try {// 设置 HADOOP_HOME 目录System.setProperty("hadoop.home.dir", "E:\winutils-master\hadoop-3.0.0");// 加载库文件System.load("E:\winutils-master\hadoop-3.0.0\bin\hadoop.dll");} catch (UnsatisfiedLinkError e) {println("Native code library failed to load.n" + e);it(1);}}

执行程序过滤日志前的文件,一万五千条不到: 

执行程序:

成功运行查看结果:

进过清洗,日志数据从15000到76:

 

 

本文发布于:2024-02-03 00:58:02,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170689311947616.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:数据   离线   程序   项目   MapReduce
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23