【网站点击流数据分析】04

阅读: 评论:0

【网站点击流数据分析】04

【网站点击流数据分析】04

1、主要目的

  • 过滤“不合规”数据
  • 格式转换和规整
  • 根据后续的统计需求,过滤分离出各种不同主题(不同栏目path)的基础数据

2、实现方式

开发一个mr程序WeblogPreProcess。

package com.learn.pre;import java.io.IOException;
import java.util.HashSet;
import java.util.Set;import com.learn.bean.WebLogBean;
import com.learn.bean.WebLogParser;
import org.f.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** 处理原始日志,过滤出真实pv请求* 转换时间格式* 对缺失字段填充默认值* 对记录标记valid和invalid*/public class WeblogPreProcess {static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {/** 用来存储网站url分类数据 */Set<String> pages = new HashSet<>();Text k = new Text();NullWritable v = ();/*** 从外部加载网站url分类数据*/@Overrideprotected void setup(Context context) {pages.add("/about");pages.add("/black-ip-list/");pages.add("/cassandra-clustor/");pages.add("/finance-rhive-repurchase/");pages.add("/hadoop-family-roadmap/");pages.add("/hadoop-hive-intro/");pages.add("/hadoop-zookeeper-intro/");pages.add("/hadoop-mahout-roadmap/");}@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = String();WebLogBean webLogBean = WebLogParser.parser(line);// 过滤js/图片/css等静态资源WebLogParser.filtStaticResource(webLogBean, pages);/* if (!webLogBean.isValid()) return; */k.String());context.write(k, v);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Instance(conf);job.setJarByClass(WeblogPreProcess.class);job.setMapperClass(WeblogPreProcessMapper.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(NullWritable.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.setNumReduceTasks(0);job.waitForCompletion(true);}}
package com.learn.bean;ParseException;
SimpleDateFormat;
import java.util.Locale;
import java.util.Set;public class WebLogParser {public static SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);public static WebLogBean parser(String line) {WebLogBean webLogBean = new WebLogBean();String[] arr = line.split(" ");if (arr.length > 11) {webLogBean.setRemote_addr(arr[0]);webLogBean.setRemote_user(arr[1]);String time_local = formatDate(arr[3].substring(1));if(null==time_local) time_local="-invalid_time-";webLogBean.setTime_local(time_local);webLogBean.setRequest(arr[6]);webLogBean.setStatus(arr[8]);webLogBean.setBody_bytes_sent(arr[9]);webLogBean.setHttp_referer(arr[10]);//如果useragent元素较多,拼接useragentif (arr.length > 12) {StringBuilder sb = new StringBuilder();for(int i=11;i<arr.length;i++){sb.append(arr[i]);}webLogBean.setHttp_user_String());} else {webLogBean.setHttp_user_agent(arr[11]);}if (Integer.Status()) >= 400) {// 大于400,HTTP错误webLogBean.setValid(false);}if("-invalid_time-".Time_local())){webLogBean.setValid(false);}} else {webLogBean.setValid(false);}return webLogBean;}public static void filtStaticResource(WebLogBean bean, Set<String> pages) {if (!Request())) {bean.setValid(false);}}public static String formatDate(String time_local) {try {return df2.format(df1.parse(time_local));} catch (ParseException e) {return null;}}
}
package com.learn.bean;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.Writable;/*** 对接外部数据的层,表结构定义最好跟外部数据源保持一致* 术语: 贴源表**/
public class WebLogBean implements Writable {private boolean valid = true;// 判断数据是否合法private String remote_addr;// 记录客户端的ip地址private String remote_user;// 记录客户端用户名称,忽略属性"-"private String time_local;// 记录访问时间与时区private String request;// 记录请求的url与http协议private String status;// 记录请求状态;成功是200private String body_bytes_sent;// 记录发送给客户端文件主体内容大小private String http_referer;// 用来记录从那个页面链接访问过来的private String http_user_agent;// 记录客户浏览器的相关信息public void set(boolean valid,String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) {this.valid = _addr = remote__user = remote_user;this.time_local = time_quest = request;this.status = status;this.body_bytes_sent = body_bytes_sent;this.http_referer = http_referer;this.http_user_agent = http_user_agent;}@Overridepublic String toString() {StringBuilder sb = new StringBuilder();sb.append(this.valid);sb.append("01").Remote_addr());sb.append("01").Remote_user());sb.append("01").Time_local());sb.append("01").Request());sb.append("01").Status());sb.append("01").Body_bytes_sent());sb.append("01").Http_referer());sb.append("01").Http_user_agent());String();}@Overridepublic void readFields(DataInput in) throws IOException {this.valid = in.readBoolean();_addr = in.readUTF();_user = in.readUTF();this.time_local = in.readUTF();quest = in.readUTF();this.status = in.readUTF();this.body_bytes_sent = in.readUTF();this.http_referer = in.readUTF();this.http_user_agent = in.readUTF();}@Overridepublic void write(DataOutput out) throws IOException {out.writeBoolean(this.valid);out.writeUTF(null==remote_addr?"":remote_addr);out.writeUTF(null==remote_user?"":remote_user);out.writeUTF(null==time_local?"":time_local);out.writeUTF(null==request?"":request);out.writeUTF(null==status?"":status);out.writeUTF(null==body_bytes_sent?"":body_bytes_sent);out.writeUTF(null==http_referer?"":http_referer);out.writeUTF(null==http_user_agent?"":http_user_agent);}}

运行mr对数据进行预处理

hadoop jar weblog.jar  com.learn.pre.WeblogPreProcess /weblog/input /weblog/preout

3、点击流模型数据梳理

由于大量的指标统计从点击流模型中更容易得出,所以在预处理阶段,可以使用mr程序来生成点击流模型的数据。

3.1、点击流模型pageviews表

package com.learn.;import java.io.IOException;
ParseException;
SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.Locale;
import java.util.UUID;import com.learn.bean.WebLogBean;
import org.apachemons.beanutils.BeanUtils;
import org.f.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** * 将清洗之后的日志梳理出点击流pageviews模型数据* * 输入数据是清洗过后的结果数据* * 区分出每一次会话,给每一次visit(session)增加了session-id(随机uuid)* 梳理出每一次会话中所访问的每个页面(请求时间,url,停留时长,以及该页面在这次session中的序号)* 保留referral_url,body_bytes_send,useragent*/
public class ClickStreamThree {static class ClickStreamMapper extends Mapper<LongWritable, Text, Text, WebLogBean> {Text k = new Text();WebLogBean v = new WebLogBean();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = String();String[] fields = line.split("01");if (fields.length < 9) return;//将切分出来的各字段set到weblogbean中v.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]);//只有有效记录才进入后续处理if (v.isValid()) {k.Remote_addr());context.write(k, v);}}}static class ClickStreamReducer extends Reducer<Text, WebLogBean, NullWritable, Text> {Text v = new Text();@Overrideprotected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException {ArrayList<WebLogBean> beans = new ArrayList<WebLogBean>();// 先将一个用户的所有访问记录中的时间拿出来排序try {for (WebLogBean bean : values) {WebLogBean webLogBean = new WebLogBean();try {pyProperties(webLogBean, bean);} catch(Exception e) {e.printStackTrace();}beans.add(webLogBean);}//将bean按时间先后顺序排序Collections.sort(beans, new Comparator<WebLogBean>() {@Overridepublic int compare(WebLogBean o1, WebLogBean o2) {try {Date d1 = Time_local());Date d2 = Time_local());if (d1 == null || d2 == null)return 0;return d1pareTo(d2);} catch (Exception e) {e.printStackTrace();return 0;}}});/*** 以下逻辑为:从有序bean中分辨出各次visit,并对一次visit中所访问的page按顺序标号step*/int step = 1;String session = UUID.randomUUID().toString();for (int i = 0; i < beans.size(); i++) {WebLogBean bean = (i);// 如果仅有1条数据,则直接输出if (1 == beans.size()) {// 设置默认停留市场为60sv.set(session+"01"&#String()+"01"&#Remote_user() + "01" + Time_local() + "01" + Request() + "01" + step + "01" + (60) + "01" + Http_referer() + "01" + Http_user_agent() + "01" + Body_bytes_sent() + "01"+ Status());context.(), v);session = UUID.randomUUID().toString();break;}// 如果不止1条数据,则将第一条跳过不输出,遍历第二条时再输出if (i == 0) {continue;}// 求近两次时间差long timeDiff = timeDiff(Time_local()), (i - 1).getTime_local()));// 如果本次-上次时间差<30分钟,则输出前一次的页面访问信息if (timeDiff < 30 * 60 * 1000) {v.set(session+"01"&#String()+"01"&#(i - 1).getRemote_user() + "01" + (i - 1).getTime_local() + "01" + (i - 1).getRequest() + "01" + step + "01" + (timeDiff / 1000) + "01" + (i - 1).getHttp_referer() + "01"+ (i - 1).getHttp_user_agent() + "01" + (i - 1).getBody_bytes_sent() + "01" + (i - 1).getStatus());context.(), v);step++;} else {// 如果本次-上次时间差>30分钟,则输出前一次的页面访问信息且将step重置,以分隔为新的visitv.set(session+"01"&#String()+"01"&#(i - 1).getRemote_user() + "01" + (i - 1).getTime_local() + "01" + (i - 1).getRequest() + "01" + (step) + "01" + (60) + "01" + (i - 1).getHttp_referer() + "01"+ (i - 1).getHttp_user_agent() + "01" + (i - 1).getBody_bytes_sent() + "01" + (i - 1).getStatus());context.(), v);// 输出完上一条之后,重置step编号step = 1;session = UUID.randomUUID().toString();}// 如果此次遍历的是最后一条,则将本条直接输出if (i == beans.size() - 1) {// 设置默认停留市场为60sv.set(session+"01"&#String()+"01"&#Remote_user() + "01" + Time_local() + "01" + Request() + "01" + step + "01" + (60) + "01" + Http_referer() + "01" + Http_user_agent() + "01" + Body_bytes_sent() + "01" + Status());context.(), v);}}} catch (ParseException e) {e.printStackTrace();}}private String toStr(Date date) {SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);return df.format(date);}private Date toDate(String timeStr) throws ParseException {SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);return df.parse(timeStr);}private long timeDiff(String time1, String time2) throws ParseException {Date d1 = toDate(time1);Date d2 = toDate(time2);Time() - d2.getTime();}private long timeDiff(Date time1, Date time2){Time() - Time();}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Instance(conf);job.setJarByClass(ClickStreamThree.class);job.setMapperClass(ClickStreamMapper.class);job.setReducerClass(ClickStreamReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(WebLogBean.class);job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));job.waitForCompletion(true);}
}

 

package com.learn.bean;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;public class PageViewsBean implements Writable {private String session;private String remote_addr;private String timestr;private String request;private int step;private String staylong;private String referal;private String useragent;private String bytes_send;private String status;public void set(String session, String remote_addr, String useragent, String timestr, String request, int step, String staylong, String referal, String bytes_send, String status) {this.session = _addr = remote_addr;this.useragent = useragent;this.timestr = quest = request;this.step = step;this.staylong = feral = referal;this.bytes_send = bytes_send;this.status = status;}@Overridepublic void readFields(DataInput in) throws IOException {this.session = in.readUTF();_addr = in.readUTF();this.timestr = in.readUTF();quest = in.readUTF();this.step = in.readInt();this.staylong = in.readUTF();feral = in.readUTF();this.useragent = in.readUTF();this.bytes_send = in.readUTF();this.status = in.readUTF();}@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(session);out.writeUTF(remote_addr);out.writeUTF(timestr);out.writeUTF(request);out.writeInt(step);out.writeUTF(staylong);out.writeUTF(referal);out.writeUTF(useragent);out.writeUTF(bytes_send);out.writeUTF(status);}
}

Pageviews表模型数据生成 

hadoop jar weblogpreprocess.jar  
com.learn.ClickStreamThree   
/user/hive/warehouse/dw_click.db/test_ods_weblog_origin/datestr=2013-09-20/ /test-click/pageviews/

表结构:

 3.2、点击流模型visit信息表

注:“一次访问”=“N次连续请求”

直接从原始数据中用hql语法得出每个人的“次”访问信息比较困难,可先用mapreduce程序分析原始数据得出“次”信息数据,然后再用hql进行更多维度统计。

用MR程序从pageviews数据中,梳理出每一次visit的起止时间、页面信息。

package com.learn.;import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;import com.learn.bean.PageViewsBean;
import com.learn.bean.VisitBean;
import org.apachemons.beanutils.BeanUtils;
import org.f.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;/*** 从pageviews模型结果数据中进一步梳理出visit模型* sessionid  start-time   out-time   start-page   out-page   pagecounts  ......*/
public class ClickStreamVisit {// 以session作为key,发送数据到reducerstatic class ClickStreamVisitMapper extends Mapper<LongWritable, Text, Text, PageViewsBean> {PageViewsBean pvBean = new PageViewsBean();Text k = new Text();@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {String line = String();String[] fields = line.split("01");int step = Integer.parseInt(fields[5]);//(String session, String remote_addr, String timestr, String request, int step, String staylong, String referal, String useragent, String bytes_send, String status)//299d6b78-9571-4fa9-bcc2-f2567c46df3472.46.128.140-2013-09-18 07:58:50/hadoop-zookeeper-intro/160"/""Mozilla/5.0"14722200pvBean.set(fields[0], fields[1], fields[2], fields[3],fields[4], step, fields[6], fields[7], fields[8], fields[9]);k.Session());context.write(k, pvBean);}}static class ClickStreamVisitReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> {@Overrideprotected void reduce(Text session, Iterable<PageViewsBean> pvBeans, Context context) throws IOException, InterruptedException {// 将pvBeans按照step排序ArrayList<PageViewsBean> pvBeansList = new ArrayList<PageViewsBean>();for (PageViewsBean pvBean : pvBeans) {PageViewsBean bean = new PageViewsBean();try {pyProperties(bean, pvBean);pvBeansList.add(bean);} catch (Exception e) {e.printStackTrace();}}Collections.sort(pvBeansList, new Comparator<PageViewsBean>() {@Overridepublic int compare(PageViewsBean o1, PageViewsBean o2) {Step() > o2.getStep() ? 1 : -1;}});// 取这次visit的首尾pageview记录,将数据放入VisitBean中VisitBean visitBean = new VisitBean();// 取visit的首记录visitBean.(0).getRequest());visitBean.(0).getTimestr());// 取visit的尾记录visitBean.(pvBeansList.size() - 1).getRequest());visitBean.(pvBeansList.size() - 1).getTimestr());// visit访问的页面数visitBean.setPageVisits(pvBeansList.size());// 来访者的ipvisitBean.setRemote_(0).getRemote_addr());// 本次visit的referalvisitBean.(0).getReferal());visitBean.String());context.(), visitBean);}}public static void main(String[] args) throws Exception {Configuration conf = new Configuration();Job job = Instance(conf);job.setJarByClass(ClickStreamVisit.class);job.setMapperClass(ClickStreamVisitMapper.class);job.setReducerClass(ClickStreamVisitReducer.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(PageViewsBean.class);job.setOutputKeyClass(NullWritable.class);job.setOutputValueClass(VisitBean.class);FileInputFormat.setInputPaths(job, new Path(args[0]));FileOutputFormat.setOutputPath(job, new Path(args[1]));boolean res = job.waitForCompletion(true);it(res?0:1);}
}

 

hadoop jar weblogpreprocess.jar com.learn.ClickStreamVisit /weblog/sessionout /weblog/visitout

然后,在hive仓库中建点击流visit模型表

drop table if exist click_stream_visit;
create table click_stream_visit(
session     string,
remote_addr string,
inTime      string,
outTime     string,
inPage      string,
outPage     string,
referal     string,
pageVisits  int)
partitioned by (datestr string);

然后,将MR运算得到的visit数据导入visit模型表

load data inpath '/weblog/visitout' into table click_stream_visit partition(datestr='2013-09-18');

本文发布于:2024-02-03 00:57:40,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170689310647615.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:数据   网站
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23