DataTotalMapReduce (流量统计)

阅读: 评论:0

DataTotalMapReduce (流量统计)

DataTotalMapReduce (流量统计)

######流量统计
hdfs;
import java.io.IOException;
public class DataTotalMapReduce extends Configured implements Tool {
public static class DataTotalMapper extends Mapper<LongWritable, Text, Text, DataTotalWritable> {
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//split by 't' String[] splits = String().split("t") ; //以手机号码作为output key String phoneNum = splits[1]; Text mapOutputKey = new Text(); mapOutputKey.set(phoneNum); // set map output value long upPackNum = Long.parseLong(splits[6]) ; long downPackNum = Long.parseLong(splits[7]) ; long upPayLoad = Long.parseLong(splits[8]) ; long downPayLoad = Long.parseLong(splits[9]) ; DataTotalWritable mapOutputValue = new DataTotalWritable() ; mapOutputValue.set(upPackNum, downPackNum, upPayLoad, downPayLoad); //map output context.write(mapOutputKey, mapOutputValue); } }
public static class DataTotalReducer extends Reducer<Text, DataTotalWritable, Text, DataTotalWritable> {
@Override protected void reduce(Text key, Iterable<DataTotalWritable> values, Context context) throws IOException, InterruptedException { long upPackNumSum = 0; long downPackNumSum = 0; long upPayLoadSum = 0; long downPayLoadSum = 0; //iterator for(DataTotalWritable value : values){ upPackNumSum += UpPackNum() ; downPackNumSum += DownPackNum() ; upPayLoadSum += UpPayLoad() ; downPayLoadSum += DownPayLoad() ; } // set output value DataTotalWritable outputValue = new DataTotalWritable() ; outputValue.set(upPackNumSum, downPackNumSum, upPayLoadSum, downPayLoadSum); // output context.write(key, outputValue); } }
public int run(String[] args) throws Exception {
//Job Configuration conf = Conf(); Job job = Instance(conf); job.setJarByClass(getClass());
//Mapper job.setMapperClass(DataTotalMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(DataTotalWritable.class);
//Reducer job.setReducerClass(DataTotalReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DataTotalWritable.class);
//输入路径 Path inPath = new Path(args[0]); FileInputFormat.addInputPath(job, inPath); //输出路径 Path outPath = new Path(args[1]); FileSystem dfs = (conf); if (ists(outPath)) { dfs.delete(outPath, true); } FileOutputFormat.setOutputPath(job, outPath);
//Submit Job boolean isSuccess = job.waitForCompletion(true); return isSuccess ? 0 : 1; }
public static void main(String[] args) throws Exception {
args = new String[] {"hdfs://domain:8020/input", "hdfs://domain:8020/output2"};
// run job Configuration conf = new Configuration(); int status = ToolRunner.run(conf,new DataTotalMapReduce(),args);
it(status); } }
hdfs;
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable;
public class DataTotalWritable implements Writable { // 上行数据包总数 private long upPackNum ; // 下行数据包总数 private long downPackNum ; // 上行总流量 private long upPayLoad ; // 下行总流量 private long downPayLoad ; public DataTotalWritable() { }
public DataTotalWritable(long upPackNum, long downPackNum, long upPayLoad,long downPayLoad) { this.set(upPackNum, downPackNum, upPayLoad, downPayLoad); } public void set (long upPackNum, long downPackNum, long upPayLoad,long downPayLoad) { this.upPackNum = upPackNum; this.downPackNum = downPackNum; this.upPayLoad = upPayLoad; this.downPayLoad = downPayLoad; } public long getUpPackNum() { return upPackNum; }
public void setUpPackNum(long upPackNum) { this.upPackNum = upPackNum; }
public long getDownPackNum() { return downPackNum; }
public void setDownPackNum(long downPackNum) { this.downPackNum = downPackNum; }
public long getUpPayLoad() { return upPayLoad; }
public void setUpPayLoad(long upPayLoad) { this.upPayLoad = upPayLoad; }
public long getDownPayLoad() { return downPayLoad; }
public void setDownPayLoad(long downPayLoad) { this.downPayLoad = downPayLoad; } //^为异或运算, << 带符号左移, >>带符号右移, >>> 无符号右移 @Override public int hashCode() { final int prime = 31; int result = 1; result = prime * result + (int) (downPackNum ^ (downPackNum >>> 32)); result = prime * result + (int) (downPayLoad ^ (downPayLoad >>> 32)); result = prime * result + (int) (upPackNum ^ (upPackNum >>> 32)); result = prime * result + (int) (upPayLoad ^ (upPayLoad >>> 32)); return result; }
@Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != Class()) return false; DataTotalWritable other = (DataTotalWritable) obj; if (downPackNum != other.downPackNum) return false; if (downPayLoad != other.downPayLoad) return false; if (upPackNum != other.upPackNum) return false; if (upPayLoad != other.upPayLoad) return false; return true; } @Override public String toString() { return upPackNum + "t" + downPackNum + "t" + upPayLoad + "t" + downPayLoad ; }
public void write(DataOutput out) throws IOException { out.writeLong(upPackNum); out.writeLong(downPackNum); out.writeLong(upPayLoad); out.writeLong(downPayLoad); }
public void readFields(DataInput in) throws IOException { this.upPackNum = in.readLong() ; this.downPackNum = in.readLong() ; this.upPayLoad = in.readLong() ; this.downPayLoad = in.readLong() ; }
}

本文发布于:2024-02-05 05:44:20,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170725469763542.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23