开发一个mr程序WeblogPreProcess。
package com.learn.pre;import java.io.IOException;
import java.util.HashSet;
import java.util.Set;import com.learn.bean.WebLogBean;
import com.learn.bean.WebLogParser;
import org.f.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** 处理原始日志,过滤出真实pv请求* 转换时间格式* 对缺失字段填充默认值* 对记录标记valid和invalid*/public class WeblogPreProcess {static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {/** 用来存储网站url分类数据 */Set<String> pages = new HashSet<>();Text k = new Text();NullWritable v = ();/*** 从外部加载网站url分类数据*/@Overrideprotected void setup(Context context) {pages.add("/about");pages.add("/black-ip-list/");pages.add("/cassandra-clustor/");pages.add("/finance-rhive-repurchase/");pages.add("/hadoop-family-roadmap/");pages.add("/hadoop-hive-intro/");pages.add("/hadoop-zookeeper-intro/");pages.add("/hadoop-mahout-roadmap/");}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = String();WebLogBean webLogBean = WebLogParser.parser(line);// 过滤js/图片/css等静态资源WebLogParser.filtStaticResource(webLogBean, pages);/* if (!webLogBean.isValid()) return; */k.String());context.write(k, v);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Instance(conf);job.setJarByClass(WeblogPreProcess.class);job.setMapperClass(WeblogPreProcessMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.setNumReduceTasks(0);job.waitForCompletion(true);}}
package com.learn.bean;ParseException;
SimpleDateFormat;
import java.util.Locale;
import java.util.Set;public class WebLogParser {public static SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);public static WebLogBean parser(String line) {WebLogBean webLogBean = new WebLogBean();String[] arr = line.split(" ");if (arr.length > 11) {webLogBean.setRemote_addr(arr[0]);webLogBean.setRemote_user(arr[1]);String time_local = formatDate(arr[3].substring(1));if(null==time_local) time_local="-invalid_time-";webLogBean.setTime_local(time_local);webLogBean.setRequest(arr[6]);webLogBean.setStatus(arr[8]);webLogBean.setBody_bytes_sent(arr[9]);webLogBean.setHttp_referer(arr[10]);//如果useragent元素较多,拼接useragentif (arr.length > 12) {StringBuilder sb = new StringBuilder();for(int i=11;i<arr.length;i++){sb.append(arr[i]);}webLogBean.setHttp_user_String());} else {webLogBean.setHttp_user_agent(arr[11]);}if (Integer.Status()) >= 400) {// 大于400,HTTP错误webLogBean.setValid(false);}if("-invalid_time-".Time_local())){webLogBean.setValid(false);}} else {webLogBean.setValid(false);}return webLogBean;}public static void filtStaticResource(WebLogBean bean, Set<String> pages) {if (!Request())) {bean.setValid(false);}}public static String formatDate(String time_local) {try {return df2.format(df1.parse(time_local));} catch (ParseException e) {return null;}}
}
package com.learn.bean;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;/*** 对接外部数据的层,表结构定义最好跟外部数据源保持一致* 术语: 贴源表**/
public class WebLogBean implements Writable {private boolean valid = true;// 判断数据是否合法private String remote_addr;// 记录客户端的ip地址private String remote_user;// 记录客户端用户名称,忽略属性"-"private String time_local;// 记录访问时间与时区private String request;// 记录请求的url与http协议private String status;// 记录请求状态;成功是200private String body_bytes_sent;// 记录发送给客户端文件主体内容大小private String http_referer;// 用来记录从那个页面链接访问过来的private String http_user_agent;// 记录客户浏览器的相关信息public void set(boolean valid,String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) {this.valid = _addr = remote__user = remote_user;this.time_local = time_quest = request;this.status = status;this.body_bytes_sent = body_bytes_sent;this.http_referer = http_referer;this.http_user_agent = http_user_agent;}@Overridepublic String toString() {StringBuilder sb = new StringBuilder();sb.append(this.valid);sb.append("