Web日志流处理的MapReduce程序

阅读: 评论:0

Web日志流处理的MapReduce程序

Web日志流处理的MapReduce程序

我的这两个项目代码地址:
Collections排序:

MapReduce排序:

这两个项目里面会有一些车市的代码,可以忽略。

使用Collections.sort排序

WeblogBean

package com.thp.Bean;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;/*** 对接外部数据的层,表结构定义最好跟外部数据源保持一致* @author 汤小萌**/
public class WeblogBean implements Writable {private boolean valid = true;		// 判断数据是否合法private String remote_addr;			// 记录客户端的ip地址private String remote_user;			// 记录客户端用户名称  忽略属性"-"private String time_local;			// 记录访问时间与时区private String request;				// 记录请求的url与http协议private String status;				// 记录请求状态;成功是200private String body_bytes_sent;		// 记录发给客户单主体文件的大小private String http_referer;		// 记录用户是从哪个链接过来的private String http_user_agent;		// 记录客户端浏览器的相关信息public void set(boolean valid, String remote_addr, String remote_user, String time_local, String request,String status, String body_bytes_sent, String http_referer, String http_user_agent) {this.valid = _addr = remote__user = remote_user;this.time_local = time_quest = request;this.status = status;this.body_bytes_sent = body_bytes_sent;this.http_referer = http_referer;this.http_user_agent = http_user_agent;}public boolean isValid() {return valid;}public void setValid(boolean valid) {this.valid = valid;}public String getRemote_addr() {return remote_addr;}public void setRemote_addr(String remote_addr) {_addr = remote_addr;}public String getRemote_user() {return remote_user;}public void setRemote_user(String remote_user) {_user = remote_user;}public String getTime_local() {return time_local;}public void setTime_local(String time_local) {this.time_local = time_local;}public String getRequest() {return request;}public void setRequest(String request) {quest = request;}public String getStatus() {return status;}public void setStatus(String status) {this.status = status;}public String getBody_bytes_sent() {return body_bytes_sent;}public void setBody_bytes_sent(String body_bytes_sent) {this.body_bytes_sent = body_bytes_sent;}public String getHttp_referer() {return http_referer;}public void setHttp_referer(String http_referer) {this.http_referer = http_referer;}public String getHttp_user_agent() {return http_user_agent;}public void setHttp_user_agent(String http_user_agent) {this.http_user_agent = http_user_agent;}@Overridepublic void write(DataOutput out) throws IOException {out.writeBoolean(this.valid);out.writeUTF(null==remote_addr?"":remote_addr);out.writeUTF(null==remote_user?"":remote_user);out.writeUTF(null==time_local?"":time_local);out.writeUTF(null==request?"":request);out.writeUTF(null==status?"":status);out.writeUTF(null==body_bytes_sent?"":body_bytes_sent);out.writeUTF(null==http_referer?"":http_referer);out.writeUTF(null==http_user_agent?"":http_user_agent);}@Overridepublic void readFields(DataInput in) throws IOException {this.valid = in.readBoolean();_addr = in.readUTF();_user = in.readUTF();this.time_local = in.readUTF();quest = in.readUTF();this.status = in.readUTF();this.body_bytes_sent = in.readUTF();this.http_referer = in.readUTF();this.http_user_agent = in.readUTF();}@Overridepublic String toString() {StringBuilder sb = new StringBuilder();sb.append(this.valid);sb.append("01").Remote_addr());sb.append("01").Remote_user());sb.append("01").Time_local());sb.append("01").Request());sb.append("01").Status());sb.append("01").Body_bytes_sent());sb.append("01").Http_referer());sb.append("01").Http_user_agent());String();}}

PageViewsBean

package com.thp.Bean;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;/*** * @author 汤小萌**/
public class PageViewsBean implements Writable {private String session;			// sessionIdprivate String remote_addr;		// 客户端ip地址private String timeStr;			// 访问的时间private String request;			// 请求的urlprivate int step;				// 访问的第几步private String staylong;		// 停留的时间private String referal;			// 是从哪个页面过来的private String useragent;		// 记录跟浏览器相关信息private String bytes_send;		// 发送的数据字节大小private String status;			// 本次请求的状态public void set(String session, String remote_addr, String useragent, String timeStr, String request, int step, String staylong, String referal, String bytes_send, String status) {this.session = _addr = remote_addr;this.useragent = useragent;this.timeStr = quest = request;this.step = step;this.staylong = feral = referal;this.bytes_send = bytes_send;this.status = status;}public String getSession() {return session;}public void setSession(String session) {this.session = session;}public String getRemote_addr() {return remote_addr;}public void setRemote_addr(String remote_addr) {_addr = remote_addr;}public String getTimeStr() {return timeStr;}public void setTimeStr(String timeStr) {this.timeStr = timeStr;}public String getRequest() {return request;}public void setRequest(String request) {quest = request;}public int getStep() {return step;}public void setStep(int step) {this.step = step;}public String getStaylong() {return staylong;}public void setStaylong(String staylong) {this.staylong = staylong;}public String getReferal() {return referal;}public void setReferal(String referal) {feral = referal;}public String getUseragent() {return useragent;}public void setUseragent(String useragent) {this.useragent = useragent;}public String getBytes_send() {return bytes_send;}public void setBytes_send(String bytes_send) {this.bytes_send = bytes_send;}public String getStatus() {return status;}public void setStatus(String status) {this.status = status;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(session);out.writeUTF(remote_addr);out.writeUTF(timeStr);out.writeUTF(request);out.writeInt(step);out.writeUTF(staylong);out.writeUTF(referal);out.writeUTF(useragent);out.writeUTF(bytes_send);out.writeUTF(status);}@Overridepublic void readFields(DataInput in) throws IOException {this.session = in.readUTF();_addr = in.readUTF();this.timeStr = in.readUTF();quest = in.readUTF();this.step = in.readInt();this.staylong = in.readUTF();feral = in.readUTF();this.useragent = in.readUTF();this.bytes_send = in.readUTF();this.status = in.readUTF();}}

VisitBean

package com.thp.Bean;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;/*** * @author 汤小萌**/
public class VisitBean implements Writable {private String session;private String remote_addr;private String inTime;private String outTime;private String inPage;private String outPage;private String referal;private int pageVisits;public void set(String session, String remote_addr, String inTime, String outTime, String inPage, String outPage, String referal, int pageVisits) {this.session = _addr = remote_addr;this.inTime = inTime;this.outTime = outTime;this.inPage = inPage;this.outPage = feral = referal;this.pageVisits = pageVisits;}public String getSession() {return session;}public void setSession(String session) {this.session = session;}public String getRemote_addr() {return remote_addr;}public void setRemote_addr(String remote_addr) {_addr = remote_addr;}public String getInTime() {return inTime;}public void setInTime(String inTime) {this.inTime = inTime;}public String getOutTime() {return outTime;}public void setOutTime(String outTime) {this.outTime = outTime;}public String getInPage() {return inPage;}public void setInPage(String inPage) {this.inPage = inPage;}public String getOutPage() {return outPage;}public void setOutPage(String outPage) {this.outPage = outPage;}public String getReferal() {return referal;}public void setReferal(String referal) {feral = referal;}public int getPageVisits() {return pageVisits;}public void setPageVisits(int pageVisits) {this.pageVisits = pageVisits;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(session);out.writeUTF(remote_addr);out.writeUTF(inTime);out.writeUTF(outTime);out.writeUTF(inPage);out.writeUTF(outPage);out.writeUTF(referal);out.writeInt(pageVisits);}@Overridepublic void readFields(DataInput in) throws IOException {this.session = in.readUTF();_addr = in.readUTF();this.inTime = in.readUTF();this.outTime = in.readUTF();this.inPage = in.readUTF();this.outPage = in.readUTF();feral = in.readUTF();this.pageVisits = in.readInt();}@Overridepublic String toString() {return session + "01" + remote_addr + "01" + inTime + "01" +outTime + "01" + inPage + "01" + outPage + "01" + referal + "01" + pageVisits;}}

预处理解析类

package com.thp.Bean;import java.io.IOException;
import java.io.InputStream;
ParseException;
SimpleDateFormat;
import java.util.Locale;
import java.util.Properties;
import java.util.Set;import org.junit.Test;/*** 对加载进来的数据进行 * @author 汤小萌**/
public class WeblogParser {/***  0 ) 194.237.142.211 ) -2 ) -3 ) [18/Sep/2013:06:49:184 ) +0000]5 ) "GET6 ) /wp-content/uploads/2013/07/rstudio-git3.png7 ) HTTP/1.1"8 ) 3049 ) 010 ) "-"11 ) "Mozilla/4.012 ) (compatible;)"* @param line* @return*/public static WeblogBean parser(String line) {WeblogBean weblogBean = new WeblogBean();String[] arr = line.split(" ");if(arr.length >11) {weblogBean.setRemote_addr(arr[0]);weblogBean.setRemote_user(arr[1]);String time_local = formatDate(arr[3].substring(1));if(null == time_local) time_local = "-invalid_time-";weblogBean.setTime_local(time_local);weblogBean.setRequest(arr[6]);weblogBean.setStatus(arr[8]);weblogBean.setBody_bytes_sent(arr[9]);weblogBean.setHttp_referer(arr[10]);// 如果useragent元素较多,则拼接useragentif(arr.length > 12) {StringBuffer sb = new StringBuffer();for(int i = 11; i < arr.length; i++) {sb.append(arr[i]);}weblogBean.setHttp_user_String());} else {weblogBean.setHttp_user_agent(arr[11]);}if(Integer.Status()) >= 400) {  // 状态码 >=400 说明请求错误weblogBean.setValid(false);}if("-invalid_time-".Time_local())) {weblogBean.setValid(false);}} else {weblogBean.setValid(false);}return weblogBean;}/*** 过来静态资源*/public static void filterStaticResource(WeblogBean bean, Set<String> pages) {if(!Request())) {bean.setValid(false);   // 在这些定义的url资源以外的资源都是作为静态资源处理}}public static SimpleDateFormat sdf1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.US);public static SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.US);/*** 时间转换* @param time_local* @return*/public static String formatDate(String time_local) {try {return sdf2.format(sdf1.parse(time_local));} catch (ParseException e) {e.printStackTrace();}return null;}@Testpublic void testSpilt() {String str = "194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"";String[] arr = str.split(" ");int i = 1;for(String s : arr) {System.out.println(i + " ) " + s);i++;}}@Testpublic void testProp() throws IOException {}public static void main(String[] args) throws IOException {Properties pop = new Properties();InputStream is = ClassLoader().getResourceAsStream("com/thp/bigdata/webClick/mrBean/url_1.propeties");pop.load(is);String str = (String) ("url");System.out.println(str);}}

MapReduce 程序

1 . 日志的预处理:

package com.thp.pre;import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;import org.f.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import com.thp.Bean.WeblogBean;
import com.thp.Bean.WeblogParser;/*** 处理原始的日志,过滤出真实的PV情况* 1)转换时间格式* 2)对缺失的字段填充默认值* 3)对记录标记valid和invalid* @author 汤小萌**/
public class WeblogPreProcess {static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {Set<String> pages = new HashSet<String>();Text k = new Text();NullWritable v = ();/*** 从外部加载网站url分类*/@Overrideprotected void setup(Context context)throws IOException, InterruptedException {/*pages.add("/about/");pages.add("/black-ip-list/");pages.add("/cassandra-clustor/");pages.add("/finance-rhive-repurchase/");pages.add("/hadoop-family-roadmap/");pages.add("/hadoop-hive-intro/");pages.add("/hadoop-zookeeper-intro/");pages.add("/hadoop-mahout-roadmap/");*/Properties pop = new Properties();InputStream in = ClassLoader().getResourceAsStream("url.propeties");pop.load(in);String urlStr = Property("url");String[] urls = urlStr.split(",");for(String url : urls) {pages.add(url);}}@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = String();WeblogBean weblogBean = WeblogParser.parser(line);// 可插拔的方法  : 过滤  js/图片/css等静态资源WeblogParser.filterStaticResource(weblogBean, pages);if(weblogBean.isValid()) {  // 无效的数据都被过滤出去了k.String());context.write(k, v);}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Instance(conf);job.setJarByClass(WeblogPreProcess.class);job.setMapperClass(WeblogPreProcessMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//		 FileInputFormat.setInputPaths(job, new Path(args[0]));
//		 FileOutputFormat.setOutputPath(job, new Path(args[1]));FileInputFormat.setInputPaths(job, new Path("f:/weblog/input"));FileOutputFormat.setOutputPath(job, new Path("f:/weblog/output"));job.setNumReduceTasks(0);job.waitForCompletion(true);}}

2.分析出点击流:

package com.thp.;import java.io.IOException;
import flect.InvocationTargetException;
ParseException;
SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.Locale;
import java.util.UUID;import org.apachemons.beanutils.BeanUtils;
import org.f.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import com.thp.Bean.WeblogBean;/*** 将清洗之后的日志梳理出点击流pageViews模型数据* 输入的数据是经过清洗之后的数据* * 区分每一次会话,给每一次visit(session)增加了session-id(随机uuid)* 梳理出每一次会话中所访问的每个页面(请求时间,url,停留时长,以及该页面在这次session中的序号)* 保留referral_url,body_bytes_send,useragent* @author 汤小萌**/
public class ClickStream {static class ClickStreamMapper extends Mapper<LongWritable, Text, Text, WeblogBean> {Text k = new Text();WeblogBean v = new WeblogBean();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = String();
System.out.println(line);String[] fields = line.split("01");if(fields.length < 9) return;v.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]);// 只有有效的记录才会进入后续处理if(v.isValid()) {k.Remote_addr());context.write(k, v);}}}static class ClickStreamReducer extends Reducer<Text, WeblogBean, NullWritable, Text> {Text v = new Text();@Overrideprotected void reduce(Text key, Iterable<WeblogBean> values, Context context) throws IOException, InterruptedException {ArrayList<WeblogBean> beans = new ArrayList<WeblogBean>();// 先将每一个用户都拿出来按照时间进行排序for(WeblogBean bean : values) {WeblogBean weblogBean = new WeblogBean();try {pyProperties(weblogBean, bean);} catch (IllegalAccessException | InvocationTargetException e) {e.printStackTrace();}beans.add(weblogBean);}// 将Bean按照时间进行排序Collections.sort(beans, new Comparator<WeblogBean>() {@Overridepublic int compare(WeblogBean o1, WeblogBean o2) {try {Date d1 = Time_local());Date d2 = Time_local());if(d1 == null || d2 == null) return 0;return d1pareTo(d2);} catch (ParseException e) {e.printStackTrace();}return 0;}});int step = 1;String session = UUID.randomUUID().toString();for(int i = 0; i < beans.size(); i++) {WeblogBean bean = (i);// 如果仅有一条数据,则输出if(1 == beans.size()) {// 设置默认停留时间为60sv.set(session+"01"&#String()+"01"&#Remote_user() + "01" &#Time_local() + "01" + Request() + "01" + step + "01" + (60) + "01" + Http_referer() + "01" + Http_user_agent() + "01" + Body_bytes_sent() + "01"+ Status());context.(), v);// 重新生成sessionsession = UUID.randomUUID().toString();break;} if(i == 0) {  // 不止一条数据,那么第一条要直接跳过,因为 (i-1)continue;}try {long timeDiff = Time_local(), (i - 1).getTime_local());if(timeDiff < 30*60*1000) {// 如果  本次  -  上次   时间差   <  30  min  ,则输出前一次的页面访问信息v.set(session+"01"&#String()+"01"&#(i - 1).getRemote_user() + "01" + (i - 1).getTime_local() + "01" + (i - 1).getRequest() + "01" + step + "01" + (timeDiff / 1000) + "01" + (i - 1).getHttp_referer() + "01"+ (i - 1).getHttp_user_agent() + "01" + (i - 1).getBody_bytes_sent() + "01"+ (i - 1).getStatus());context.(), v);step++;} else {// 如果  本次 - 上次 时间差  > 30 min, 则输出前一次的页面访问信息,将step重置为1,以分隔为为新的visitv.set(session+"01"&#String()+"01"&#(i - 1).getRemote_user() + "01" &#(i - 1).getTime_local() + "01" + (i - 1).getRequest() + "01" + (step) + "01" + (60) + "01" + (i - 1).getHttp_referer() + "01"+ (i - 1).getHttp_user_agent() + "01" + (i - 1).getBody_bytes_sent() + "01" + (i - 1).getStatus());context.(), v);// 输出完上一条之后,重置step编号step = 1;// session 也要重新生成session = UUID.randomUUID().toString();}} catch (ParseException e) {e.printStackTrace();}// 如果此次遍历时最后一条数据,则将本条数据输出  session  在上面的逻辑都控制了if(i == beans.size() - 1) {// 设置默认停留时间为60sv.set(session+"01"&#String()+"01"&#Remote_user() + "01" + Time_local() + "01" + Request() + "01" + step + "01" + (60) + "01" + Http_referer() + "01" + Http_user_agent() + "01" + Body_bytes_sent() + "01" + Status());context.(), v);}}}private String toStr(Date date) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.US);return sdf.format(date);}private Date toDate(String timeStr) throws ParseException {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.UK);return sdf.parse(timeStr);}// 算时间差private long timeDiff(String time1, String time2) throws ParseException {Date d1 = toDate(time1);Date d2 = toDate(time2);Time() - d2.getTime();}}public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Instance(conf);job.setJarByClass(ClickStream.class);job.setMapperClass(ClickStreamMapper.class);job.setReducerClass(ClickStreamReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(WeblogBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//		 FileInputFormat.setInputPaths(job, new Path(args[0]));
//		 FileOutputFormat.setOutputPath(job, new Path(args[1]));FileInputFormat.setInputPaths(job, new Path("f:/weblog_1/output"));FileOutputFormat.setOutputPath(job, new Path("f:/weblog_1/pageviews"));FileSystem fs = (conf);ists(new Path("f:/weblog_1/pageviews"))) {fs.delete(new Path("f:/weblog_1/pageviews"), true);}job.waitForCompletion(true);}}

3.进一步梳理出visit模型:

package com.thp.;import java.io.IOException;
import flect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;import org.apachemons.beanutils.BeanUtils;
import org.f.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import com.thp.Bean.PageViewsBean;
import com.thp.Bean.VisitBean;/*** 从PageViews模型数据结果中进一步梳理出visit模型* * 经过这里之后出去的数据格式:* sessionid   satrt-time   out-time   satrt-page   out-page   pagecounts  ...* * @author 汤小萌**/
public class ClickStreamVisit {static class ClickStreamVisitMapper extends Mapper<LongWritable, Text, Text, PageViewsBean> {PageViewsBean pvBean = new PageViewsBean();Text k = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = String();String[] fields = line.split("01");int step = Integer.parseInt(fields[5]);   // 访问的步数pvBean.set(fields[0], fields[1], fields[2], fields[3],fields[4], step, fields[6], fields[7], fields[8], fields[9]);k.Session());context.write(k, pvBean);}}static class ClickStreamVisitReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> {@Overrideprotected void reduce(Text session, Iterable<PageViewsBean> pvBeans, Context context)throws IOException, InterruptedException {// 将pvBean按照step排序ArrayList<PageViewsBean> pvBeanList = new ArrayList<PageViewsBean>();for(PageViewsBean pvBean : pvBeans) {PageViewsBean bean = new PageViewsBean();try {pyProperties(bean, pvBean);pvBeanList.add(bean);} catch (IllegalAccessException | InvocationTargetException e) {e.printStackTrace();}}Collections.sort(pvBeanList, new Comparator<PageViewsBean>() {@Overridepublic int compare(PageViewsBean o1, PageViewsBean o2) {Step() > o2.getStep() ? 1 : -1;}});// 取这次visit的首尾pageViews记录,放入VisitBean中VisitBean visitBean = new VisitBean();// 取visit 的首记录visitBean.(0).getRequest());visitBean.(0).getTimeStr());// 取visit 的尾记录visitBean.(pvBeanList.size() - 1).getRequest());visitBean.(pvBeanList.size() - 1).getTimeStr());// visit访问的页面数visitBean.setPageVisits(pvBeanList.size());// 来访者的ipvisitBean.setRemote_(0).getRemote_addr());// 本次visit的referalvisitBean.(0).getReferal());visitBean.String());context.(), visitBean);}}public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Instance(conf);job.setJarByClass(ClickStreamVisit.class);job.setMapperClass(ClickStreamVisitMapper.class);job.setReducerClass(ClickStreamVisitReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(PageViewsBean.class);job.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(VisitBean.class);//		FileInputFormat.setInputPaths(job, new Path(args[0]));
//		FileOutputFormat.setOutputPath(job, new Path(args[1]));FileInputFormat.setInputPaths(job, new Path("f:/weblog_1/pageviews"));FileOutputFormat.setOutputPath(job, new Path("f:/weblog_1/visitout"));FileSystem fs = (conf);ists(new Path("f:/weblog_1/visitout"))) {fs.delete(new Path("f:/weblog_1/visitout"), true);}boolean res = job.waitForCompletion(true);it(res?0:1);}/*** 	* 2018年11月29日 上午9:00:57* @param a*/public void a1(int a) {new StringBuffer();}}

使用MapReduce自身的排序

WeblogBean

package mr.flow.weblog.bean;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
ParseException;
SimpleDateFormat;
import java.util.Date;
import java.util.Locale;import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;/*** 对接外部数据的层,表结构定义最好跟外部数据源保持一致* @author 汤小萌**/
public class WeblogBean implements WritableComparable<WeblogBean> {private boolean valid = true;		// 判断数据是否合法private String remote_addr;			// 记录客户端的ip地址private String remote_user;			// 记录客户端用户名称  忽略属性"-"private String time_local;			// 记录访问时间与时区private String request;				// 记录请求的url与http协议private String status;				// 记录请求状态;成功是200private String body_bytes_sent;		// 记录发给客户单主体文件的大小private String http_referer;		// 记录用户是从哪个链接过来的private String http_user_agent;		// 记录客户端浏览器的相关信息public void set(boolean valid, String remote_addr, String remote_user, String time_local, String request,String status, String body_bytes_sent, String http_referer, String http_user_agent) {this.valid = _addr = remote__user = remote_user;this.time_local = time_quest = request;this.status = status;this.body_bytes_sent = body_bytes_sent;this.http_referer = http_referer;this.http_user_agent = http_user_agent;}public boolean isValid() {return valid;}public void setValid(boolean valid) {this.valid = valid;}public String getRemote_addr() {return remote_addr;}public void setRemote_addr(String remote_addr) {_addr = remote_addr;}public String getRemote_user() {return remote_user;}public void setRemote_user(String remote_user) {_user = remote_user;}public String getTime_local() {return time_local;}public void setTime_local(String time_local) {this.time_local = time_local;}public String getRequest() {return request;}public void setRequest(String request) {quest = request;}public String getStatus() {return status;}public void setStatus(String status) {this.status = status;}public String getBody_bytes_sent() {return body_bytes_sent;}public void setBody_bytes_sent(String body_bytes_sent) {this.body_bytes_sent = body_bytes_sent;}public String getHttp_referer() {return http_referer;}public void setHttp_referer(String http_referer) {this.http_referer = http_referer;}public String getHttp_user_agent() {return http_user_agent;}public void setHttp_user_agent(String http_user_agent) {this.http_user_agent = http_user_agent;}@Overridepublic void write(DataOutput out) throws IOException {out.writeBoolean(this.valid);out.writeUTF(null==remote_addr?"":remote_addr);out.writeUTF(null==remote_user?"":remote_user);out.writeUTF(null==time_local?"":time_local);out.writeUTF(null==request?"":request);out.writeUTF(null==status?"":status);out.writeUTF(null==body_bytes_sent?"":body_bytes_sent);out.writeUTF(null==http_referer?"":http_referer);out.writeUTF(null==http_user_agent?"":http_user_agent);}@Overridepublic void readFields(DataInput in) throws IOException {this.valid = in.readBoolean();_addr = in.readUTF();_user = in.readUTF();this.time_local = in.readUTF();quest = in.readUTF();this.status = in.readUTF();this.body_bytes_sent = in.readUTF();this.http_referer = in.readUTF();this.http_user_agent = in.readUTF();}@Overridepublic String toString() {
// System.out.println("=========================");StringBuilder sb = new StringBuilder();sb.append(this.valid);sb.append("01").Remote_addr());sb.append("01").Remote_user());sb.append("01").Time_local());sb.append("01").Request());sb.append("01").Status());sb.append("01").Body_bytes_sent());sb.append("01").Http_referer());sb.append("01").Http_user_agent());String();}/*** 跟时间先后顺序排序*/@Overridepublic int compareTo(WeblogBean o) {/*System.out.println("++++++++++++++++++++++++++++++++++++");SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.UK);try {Date d1 = sdf.Time_local());Date d2 = sdf.Time_local());if(d1 == null || d2 == null) return 0;System.out.println(d1pareTo(d2));return d1pareTo(d2);} catch (ParseException e) {e.printStackTrace();}*/// 先比较ip地址  --  【注意:】  这个ip必须要先继续一次比较  两个相同之后,才可以进行日期的比较   如果没有比较ip就只比较日期那么是会出问题的int ipCompareResult = Remote_addr()Remote_addr());if(ipCompareResult == 0) { // ip地址相同,则继续比较同一个ip下的访问的时间SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.UK);try {Date d1 = sdf.Time_local());Date d2 = sdf.Time_local());if(d1 == null || d2 == null) return 0;// System.out.println(d1pareTo(d2));return d1pareTo(d2);} catch (ParseException e) {e.printStackTrace();}} else {return ipCompareResult;}return 0;}}

PageViewsBean

package mr.flow.weblog.bean;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;import org.apache.hadoop.io.WritableComparable;
import org.junit.Test;/*** sessionId * @author 汤小萌**/
public class PageViewsBean implements WritableComparable<PageViewsBean> {private String session;			// sessionIdprivate String remote_addr;		// 客户端ip地址private String timeStr;			// 访问的时间private String request;			// 请求的urlprivate int step;				// 访问的第几步private String staylong;		// 停留的时间private String referal;			// 是从哪个页面过来的private String useragent;		// 记录跟浏览器相关信息private String bytes_send;		// 发送的数据字节大小private String status;			// 本次请求的状态public void set(String session, String remote_addr, String useragent, String timeStr, String request, int step, String staylong, String referal, String bytes_send, String status) {this.session = _addr = remote_addr;this.useragent = useragent;this.timeStr = quest = request;this.step = step;this.staylong = feral = referal;this.bytes_send = bytes_send;this.status = status;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(session);out.writeUTF(remote_addr);out.writeUTF(timeStr);out.writeUTF(request);out.writeInt(step);out.writeUTF(staylong);out.writeUTF(referal);out.writeUTF(useragent);out.writeUTF(bytes_send);out.writeUTF(status);}@Overridepublic void readFields(DataInput in) throws IOException {this.session = in.readUTF();_addr = in.readUTF();this.timeStr = in.readUTF();quest = in.readUTF();this.step = in.readInt();this.staylong = in.readUTF();feral = in.readUTF();this.useragent = in.readUTF();this.bytes_send = in.readUTF();this.status = in.readUTF();}@Overridepublic int compareTo(PageViewsBean o) {// 【注意:】这个session也要先进行比较,只有先进行了session的比较后面的step的比较菜有意义int sessionCompareResult = this.Session());if(sessionCompareResult == 0) {  // 相同的session的话就继续比较  stepreturn this.step - o.getStep() > 0 ? 1 : -1;   // 这种方式 是正序  从小岛大排序} return sessionCompareResult;// return 0;}public String getSession() {return session;}public void setSession(String session) {this.session = session;}public String getRemote_addr() {return remote_addr;}public void setRemote_addr(String remote_addr) {_addr = remote_addr;}public String getTimeStr() {return timeStr;}public void setTimeStr(String timeStr) {this.timeStr = timeStr;}public String getRequest() {return request;}public void setRequest(String request) {quest = request;}public int getStep() {return step;}public void setStep(int step) {this.step = step;}public String getStaylong() {return staylong;}public void setStaylong(String staylong) {this.staylong = staylong;}public String getReferal() {return referal;}public void setReferal(String referal) {feral = referal;}public String getUseragent() {return useragent;}public void setUseragent(String useragent) {this.useragent = useragent;}public String getBytes_send() {return bytes_send;}public void setBytes_send(String bytes_send) {this.bytes_send = bytes_send;}public String getStatus() {return status;}public void setStatus(String status) {this.status = status;}@Overridepublic String toString() {return this.session + " " + this.step + "";}@Testpublic void testCompareTo() {PageViewsBean pvb1 = new PageViewsBean();pvb1.set(null, null, null, null, null, 2, null, null, null, null);PageViewsBean pvb2 = new PageViewsBean();pvb2.set(null, null, null, null, null, 1, null, null, null, null);PageViewsBean pvb3 = new PageViewsBean();pvb3.set(null, null, null, null, null, 4, null, null, null, null);PageViewsBean pvb4 = new PageViewsBean();pvb4.set(null, null, null, null, null, 3, null, null, null, null);ArrayList<PageViewsBean> list = new ArrayList<PageViewsBean>();list.add(pvb1);list.add(pvb2);list.add(pvb3);list.add(pvb4);System.out.println(list);Collections.sort(list);System.out.println(list);}}

VisitBean

package mr.flow.weblog.bean;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;/*** 记录的是一个访问会话的  ip 地址   进入时间   出来时间   进来页面   出来页面  从哪个页面过来的   总共浏览过多少个页面* @author 汤小萌* @date 2018年11月28日 下午9:01:17*/
public class VisitBean implements Writable {private String session;private String remote_addr;private String inTime;private String outTime;private String inPage;private String outPage;private String referal;private int pageVisits;public void set(String session, String remote_addr, String inTime, String outTime, String inPage, String outPage, String referal, int pageVisits) {this.session = _addr = remote_addr;this.inTime = inTime;this.outTime = outTime;this.inPage = inPage;this.outPage = feral = referal;this.pageVisits = pageVisits;}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(session);out.writeUTF(remote_addr);out.writeUTF(inTime);out.writeUTF(outTime);out.writeUTF(inPage);out.writeUTF(outPage);out.writeUTF(referal);out.writeInt(pageVisits);}@Overridepublic void readFields(DataInput in) throws IOException {this.session = in.readUTF();_addr = in.readUTF();this.inTime = in.readUTF();this.outTime = in.readUTF();this.inPage = in.readUTF();this.outPage = in.readUTF();feral = in.readUTF();this.pageVisits = in.readInt();}@Overridepublic String toString() {return session + "01" + remote_addr + "01" + inTime + "01" +outTime + "01" + inPage + "01" + outPage + "01" + referal + "01" + pageVisits;}public String getSession() {return session;}public void setSession(String session) {this.session = session;}public String getRemote_addr() {return remote_addr;}public void setRemote_addr(String remote_addr) {_addr = remote_addr;}public String getInTime() {return inTime;}public void setInTime(String inTime) {this.inTime = inTime;}public String getOutTime() {return outTime;}public void setOutTime(String outTime) {this.outTime = outTime;}public String getInPage() {return inPage;}public void setInPage(String inPage) {this.inPage = inPage;}public String getOutPage() {return outPage;}public void setOutPage(String outPage) {this.outPage = outPage;}public String getReferal() {return referal;}public void setReferal(String referal) {feral = referal;}public int getPageVisits() {return pageVisits;}public void setPageVisits(int pageVisits) {this.pageVisits = pageVisits;}}

WeblogParser

package mr.flow.weblog.bean;import java.io.IOException;
import java.io.InputStream;
ParseException;
SimpleDateFormat;
import java.util.Locale;
import java.util.Properties;
import java.util.Set;import org.junit.Test;/*** 对加载进来的数据进行 * @author 汤小萌**/
public class WeblogParser {/***  0 ) 194.237.142.211 ) -2 ) -3 ) [18/Sep/2013:06:49:184 ) +0000]5 ) "GET6 ) /wp-content/uploads/2013/07/rstudio-git3.png7 ) HTTP/1.1"8 ) 3049 ) 010 ) "-"11 ) "Mozilla/4.012 ) (compatible;)"* @param line* @return*/public static WeblogBean parser(String line) {WeblogBean weblogBean = new WeblogBean();String[] arr = line.split(" ");if(arr.length >11) {weblogBean.setRemote_addr(arr[0]);weblogBean.setRemote_user(arr[1]);String time_local = formatDate(arr[3].substring(1));if(null == time_local) time_local = "-invalid_time-";weblogBean.setTime_local(time_local);weblogBean.setRequest(arr[6]);weblogBean.setStatus(arr[8]);weblogBean.setBody_bytes_sent(arr[9]);weblogBean.setHttp_referer(arr[10]);// 如果useragent元素较多,则拼接useragentif(arr.length > 12) {StringBuffer sb = new StringBuffer();for(int i = 11; i < arr.length; i++) {sb.append(arr[i]);}weblogBean.setHttp_user_String());} else {weblogBean.setHttp_user_agent(arr[11]);}if(Integer.Status()) >= 400) {  // 状态码 >=400 说明请求错误weblogBean.setValid(false);}if("-invalid_time-".Time_local())) {weblogBean.setValid(false);}} else {weblogBean.setValid(false);}return weblogBean;}/*** 过来静态资源*/public static void filterStaticResource(WeblogBean bean, Set<String> pages) {if(!Request())) {bean.setValid(false);   // 在这些定义的url资源以外的资源都是作为静态资源处理}}public static SimpleDateFormat sdf1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss",Locale.US);public static SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.US);/*** 时间转换* @param time_local* @return*/public static String formatDate(String time_local) {try {return sdf2.format(sdf1.parse(time_local));} catch (ParseException e) {e.printStackTrace();}return null;}@Testpublic void testSpilt() {String str = "194.237.142.21 - - [18/Sep/2013:06:49:18 +0000] "GET /wp-content/uploads/2013/07/rstudio-git3.png HTTP/1.1" 304 0 "-" "Mozilla/4.0 (compatible;)"";String[] arr = str.split(" ");int i = 1;for(String s : arr) {System.out.println(i + " ) " + s);i++;}}@Testpublic void testProp() throws IOException {}public static void main(String[] args) throws IOException {Properties pop = new Properties();InputStream is = ClassLoader().getResourceAsStream("com/thp/bigdata/webClick/mrBean/url_1.propeties");pop.load(is);String str = (String) ("url");System.out.println(str);}}

比较器
IpGroupingComparator

package mr.flow.weblog.bean;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;/*** 自定义的聚合规则* 当key的ip相同的时候,就放入同一个reduce进行处理* @author 汤小萌**/
public class IpGroupingComparator extends WritableComparator {public IpGroupingComparator() {super(WeblogBean.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {WeblogBean aBean = (WeblogBean) a;WeblogBean bBean = (WeblogBean) b;Remote_addr()Remote_addr());}}

SessionIdGroupingComparator

package mr.flow.weblog.bean;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;/*** 自定义的聚合规则* 相同的sessionId要进入同一个reduce进行处理* @author 汤小萌* @date 2018年11月28日 下午8:55:13*/
public class SessionIdGroupingComparator extends WritableComparator {public SessionIdGroupingComparator() {super(PageViewsBean.class, true);}@Overridepublic int compare(WritableComparable a, WritableComparable b) {PageViewsBean aBean = (PageViewsBean) a;PageViewsBean bBean = (PageViewsBean) b;// System.out.Session()  + " -- " + Session());// System.out.Session()Session()));Session()Session());}
}

1. WeblogPreProcess 日志预处理

package mr.flow.weblog.pre;import java.io.IOException;
import java.io.InputStream;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;import org.f.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import mr.flow.weblog.bean.WeblogBean;
import mr.flow.weblog.bean.WeblogParser;/*** 处理原始的日志,过滤出真实的PV情况* 1)转换时间格式* 2)对缺失的字段填充默认值* 3)对记录标记valid和invalid* @author 汤小萌**/
public class WeblogPreProcess {static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {Set<String> pages = new HashSet<String>();Text k = new Text();NullWritable v = ();/*** 从外部加载网站url分类*/@Overrideprotected void setup(Context context)throws IOException, InterruptedException {/*pages.add("/about/");pages.add("/black-ip-list/");pages.add("/cassandra-clustor/");pages.add("/finance-rhive-repurchase/");pages.add("/hadoop-family-roadmap/");pages.add("/hadoop-hive-intro/");pages.add("/hadoop-zookeeper-intro/");pages.add("/hadoop-mahout-roadmap/");*/Properties pop = new Properties();InputStream in = ClassLoader().getResourceAsStream("url.propeties");pop.load(in);String urlStr = Property("url");String[] urls = urlStr.split(",");for(String url : urls) {pages.add(url);}}@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = String();WeblogBean weblogBean = WeblogParser.parser(line);// 可插拔的方法  : 过滤  js/图片/css等静态资源WeblogParser.filterStaticResource(weblogBean, pages);if(weblogBean.isValid()) {  // 无效的数据都被过滤出去了k.String());context.write(k, v);}}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Instance(conf);job.setJarByClass(WeblogPreProcess.class);job.setMapperClass(WeblogPreProcessMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);//		 FileInputFormat.setInputPaths(job, new Path(args[0]));
//		 FileOutputFormat.setOutputPath(job, new Path(args[1]));FileInputFormat.setInputPaths(job, new Path("f:/weblog_2/input/access.log.fensi"));FileOutputFormat.setOutputPath(job, new Path("f:/weblog_2/output"));FileSystem fs = (conf);ists(new Path("f:/weblog_2/output"))) {fs.delete(new Path("f:/weblog_2/output"), true);}job.setNumReduceTasks(0);job.waitForCompletion(true);}}

2. ClickStream

package mr.;import java.io.IOException;
ParseException;
SimpleDateFormat;
import java.util.Date;
import java.util.Locale;
import java.util.UUID;import org.f.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import mr.flow.weblog.bean.IpGroupingComparator;
import mr.flow.weblog.bean.WeblogBean;/*** 将清洗过后的数据梳理出点击流pageViews模型数据* 输入的数据是经过预处理之后的数据* * 区分每一次会话,给每一次会话打上sessionId* 梳理出每一次会话所访问的每个页面  (请求时间,url,停留时长,以及该页面在这次session中的序号)* 保留http_referer  body_bytes_sent   http_user_agent* * @author 汤小萌**/
public class ClickStream {static class ClickStreamMapper extends Mapper<LongWritable, Text, WeblogBean, Text> {WeblogBean k = new WeblogBean();Text v = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String[] fields = String().split("01");if(fields.length < 9) return;k.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]);if(k.isValid()) {  // 只有有效果的数据才会进入后续的处理context.write(k, v);}}}/*** 需要生成的数据:* *  sessionId  ip  time_local  request  step  http_referer  Http_user_agent  http_user_agent body_bytes_sent  status* * @author 汤小萌**/static class ClickStreamReducer extends Reducer<WeblogBean, Text, Text, NullWritable> {Text k = new Text();NullWritable v = ();@Overrideprotected void reduce(WeblogBean beanKey, Iterable<Text> values, Context context) throws IOException, InterruptedException {// System.out.println(beanKey);System.out.println("---------------");int step = 1;	// 这个页面在这个session是第几次访问的String sessionId = UUID.randomUUID().toString();	// 生成sessionIdString lastTimeStr = null;String lastSaveStr = null;   // 需要保留上一条记录的后面字符串String lastIpAndUser = null; // 需要保留的上一条记录的ip地址和用户属性String lastUrl = null;		 // 需要保留的上一条记录的访问的urlLong stayTime = 0L;		 // 前后两次停留的时间for(Text value : values) {// System.out.println(beanKey);/*k.set(sessionId+"01"&#String()+"01"&#Remote_user() + "01" &#Time_local() + "01" + Request() + "01" + step + "01" + (60) + "01" + Http_referer() + "01" + Http_user_agent() + "01" + Body_bytes_sent() + "01"+ Status());context.write(k, v);*/if(lastTimeStr != null) {try {// beanKey又是下一次的记录了  lastTimeStr 保留的是上一条记录的访问时间// stayTime = Time_local()).getTime() - toDate(lastTimeStr).getTime();stayTime = Time_local(), lastTimeStr);} catch (ParseException e) {e.printStackTrace();}if(stayTime < 30*60*1000) {  // 同一个IP访问的时间差  <  30 min 认为是同一个 sessionk.set(sessionId + "01" + lastIpAndUser + "01" + lastUrl + "01" + lastTimeStr + "01" + step + "01" + (stayTime/1000) + "01" + lastSaveStr);// 往外写数据了context.write(k, v);step++;} else {   // 同一个IP访问的时间差  > 30min  认为是不同的session   上一条记录的访问时间 是 60k.set(sessionId + "01" + lastIpAndUser + "01" + lastUrl + "01" + lastTimeStr + "01" + step + "01" + (60) + "01" + lastSaveStr);context.write(k, v);   // 这一系的ip的最后一条数据  在这里是不输出的,  还要继续往下走// 输出完上一条之后,重置step编号step = 1;// session 也要重新生成sessionId = UUID.randomUUID().toString();}}// 初识的设置lastTimeStr = Time_local();lastSaveStr = Http_referer() + "01" + Http_user_agent() + "01" + Body_bytes_sent() + "01" + Status();lastUrl = Request();lastIpAndUser = Remote_addr() + "01" + Remote_user() ;}// 下面的这条数据是最后一条数据k.set(sessionId + "01" + lastIpAndUser + "01" + lastUrl + "01" + lastTimeStr + "01" + step + "01" + (60) + "01" + lastSaveStr);context.write(k, v);System.out.println("---------------");}//  **********************工具方法************************private String toStr(Date date) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.US);return sdf.format(date);}private Date toDate(String timeStr) throws ParseException {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss",Locale.UK);return sdf.parse(timeStr);}// 算时间差private long timeDiff(String time1, String time2) throws ParseException {Date d1 = toDate(time1);Date d2 = toDate(time2);Time() - d2.getTime();}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Instance(conf);job.setJarByClass(ClickStream.class);job.setMapperClass(ClickStreamMapper.class);job.setReducerClass(ClickStreamReducer.class);job.setMapOutputKeyClass(WeblogBean.class);job.setMapOutputValueClass(Text.class);// outjob.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);job.setGroupingComparatorClass(IpGroupingComparator.class);FileInputFormat.setInputPaths(job, new Path("f:/weblog_2/output"));FileOutputFormat.setOutputPath(job, new Path("f:/weblog_2/pageviews"));FileSystem fs = (conf);ists(new Path("f:/weblog_2/pageviews"))) {fs.delete(new Path("f:/weblog_2/pageviews"), true);}job.waitForCompletion(true);}}

3. ClickStreamVisit

package mr.;import java.io.IOException;
import flect.InvocationTargetException;
import java.util.ArrayList;import org.apachemons.beanutils.BeanUtils;
import org.f.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import mr.flow.weblog.bean.PageViewsBean;
import mr.flow.weblog.bean.SessionIdGroupingComparator;
import mr.flow.weblog.bean.VisitBean;/*** 从PageViews模型中根据sessionId来继续梳理出同一次会话中的信息* *  梳理之后向外输出的数据的格式 :*  sessionId   开始访问的时间     访问结束的时间      开始的页面       访问结束的页面      总共访问的页数	* * @author 汤小萌* @date 2018年11月28日 下午8:42:26*/
public class ClickStreamVisit {static class ClickStreamVisitMapper extends Mapper<LongWritable, Text, PageViewsBean, Text> {/*** 这个Mapper的输出可以为NullWritable  由于当我我在测试的时候就写成了Text就一直没改*/PageViewsBean beanKey = new PageViewsBean();// NullWritable v = ();Text v = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String[] fields = String().split("01");int step = Integer.parseInt(fields[5]);  // 需要将每个PageViewBean的在这个session中是处在第几步这个step变成int类型beanKey.set(fields[0], fields[1], fields[2], fields[3],fields[4], step, fields[6], fields[7], fields[8], fields[9]);v.Session() + " " + step);context.write(beanKey, v);}}static class ClickStreamVisitReducer extends Reducer<PageViewsBean, Text, NullWritable, VisitBean> {NullWritable k = ();// 取这次visit的首尾pageViews记录,放入VisitBean中VisitBean visitBean = new VisitBean();@Overrideprotected void reduce(PageViewsBean beanKey, Iterable<Text> values, Context context)throws IOException, InterruptedException {
// System.out.println("----------------");ArrayList<PageViewsBean> pvBeanList = new ArrayList<PageViewsBean>();for(Text str : values) {// System.out.println(beanKey + " || " + str);// 不能直接这样天剑   是为是引用类型// pvBeanList.add(beanKey);PageViewsBean pvBean = new PageViewsBean();try {pyProperties(pvBean, beanKey);pvBeanList.add(pvBean);} catch (IllegalAccessException | InvocationTargetException e) {e.printStackTrace();}}
// System.out.println(pvBeanList);
// System.out.println("----------------");// 取visit 的首记录// visitBean.(0).getRequest());visitBean.(0).getTimeStr());// 取visit 的尾记录visitBean.(pvBeanList.size() - 1).getRequest());visitBean.(pvBeanList.size() - 1).getTimeStr());// visit访问的页面数visitBean.setPageVisits(pvBeanList.size());// 来访者的ipvisitBean.setRemote_(0).getRemote_addr());// 本次visit的referalvisitBean.(0).getReferal());visitBean.(0).getSession());context.write(k, visitBean);}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration();Job job = Instance(conf);job.setJarByClass(ClickStreamVisit.class);job.setMapperClass(ClickStreamVisitMapper.class);job.setReducerClass(ClickStreamVisitReducer.class);job.setMapOutputKeyClass(PageViewsBean.class);// job.setMapOutputValueClass(NullWritable.class);job.setMapOutputValueClass(Text.class);job.setOutputKeyClass(NullWritable.class);job.setOutputKeyClass(VisitBean.class);job.setGroupingComparatorClass(SessionIdGroupingComparator.class);FileInputFormat.setInputPaths(job, new Path("f:/weblog_2/"));FileOutputFormat.setOutputPath(job, new Path("f:/weblog_2/visitout"));FileSystem fs = (conf);ists(new Path("f:/weblog_2/visitout"))) {fs.delete(new Path("f:/weblog_2/visitout"), true);}boolean res = job.waitForCompletion(true);it(res?0:1);}}

本文发布于:2024-02-03 00:57:28,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170689309747614.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:程序   日志   Web   MapReduce
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23