电话日志分析callLog(一)

阅读: 评论:0

电话日志分析callLog(一)

电话日志分析callLog(一)

一、项目简介
----------------------------------------------1.hadoop+hbase+flume+zookeeper实现电信级海量通话日志数据的存储,随机访问与实时读写。通过hash技术对rowkey进行分析处理,解决hbase的热点问题,协同coprocessor,解决系统的高吞吐量和查询负载问题以及如何避免中间结果导致通知风暴或死递归问题,让同学们体验到大数据技术在企业中实战应用2.整体架构分析hadoop体系架构与ha配置方案。hbase体系架构与ha配置方案flume实时收集架构方案。SSM实现前端web实现以及与后端HBase的交互架构方案。hive+oozie实现的周期任务调度。Spark streaming实现窗口化敏感词实时监控方案。3.hbase中callLogs表的设计与实现。通话信息的内容分析与常用场景分析以及对rowkey的设计与实现。重点讲解盐析的原理与热点问题的解决。rowkey的设计原则与实战中的技巧。4.协处理原理与应用实战讲解。被叫通话记录的设计思想讲解,以及通过协处理器方式实现callog日志主叫记录被主换位与同步写入。在callog是表中数据的存储序列与双向查询方方式的一致性透明结果处理。5.Hadoop以及HBase的HA集群配置与实战。hadoop的使用QJM的高可用架构配置讲解,ResourceManager的高可用架构配置讲解。zookeeper的工作原理以及配置、实操演练,hbase与Hadoop HA集成注意事项以及客户端API编程细节处理。二、创建新工程
------------------------------------------------1.创建新工程 -- CallLogSystem三、创建模拟日志生成程序模块CallLogGenModel
-------------------------------------------------1.创建模块 -- CallLogGenModel,添加Maven支持2.创建类ain.App.class3.编写App类---------------------------------------
ain;import java.io.FileWriter;import java.io.IOException;DecimalFormat;SimpleDateFormat;import java.util.*;public class App {//电话簿public static Map<String, String> callers = new HashMap<String, String>();//电话号码public static List<String> phoneNumbers = new ArrayList<String>();static{callers.put("15811111111", "史让");callers.put("18022222222", "赵嗄");callers.put("15133333333", "张锕 ");callers.put("13269364444", "王以");callers.put("15032295555", "张噢");callers.put("17731086666", "张类");callers.put("15338597777", "李平");callers.put("15733218888", "杜跑");callers.put("15614209999", "任阳");callers.put("15778421111", "梁鹏");callers.put("18641241111", "郭彤");callers.put("15732641111", "刘飞");callers.put("13341101111", "段星");callers.put("13560191111", "唐华");callers.put("18301581111", "杨谋");callers.put("13520401111", "温英");callers.put("18332561111", "朱宽");callers.put("18620191111", "刘宗");phoneNumbers.addAll(callers.keySet());}public static void main(String [] args){if(args == null || args.length == 0){System.out.println("no args");it(-1);}genCallLog(args[0]);}/*** 生成通话日志*/private static void genCallLog(String logFilePath) {try {//文件写入器FileWriter fw = new FileWriter(logFilePath, true);Random random = new Random();while (true) {//主叫String caller = (Int(callers.size()));String callerName = (caller);//被叫 (!= 主叫)String callee = (Int(callers.size()));while (callee.equals(caller)) {callee = (Int(callers.size()));}String calleeName = (callee);//通话时长(<10min)int duration = Int(60 * 10) + 1;DecimalFormat df = new DecimalFormat();df.applyPattern("000");String dur = df.format(duration);//通话时间timeStrint year = 2018;int month = Int(12);int day = Int(29) + 1;int hour = Int(24);int min = Int(60);int sec = Int(60);Calendar calendar = Instance();calendar.set(year,month,day,hour,min,sec);Date date = Time();//如果时间超过今天就重新qushijian取时间.Date now = new Date();if (datepareTo(now) > 0) {continue ;}SimpleDateFormat dfs = new SimpleDateFormat();dfs.applyPattern("yyyy/MM/dd HH:mm:ss");String timeStr = dfs.format(date);//通话日志//String callLog = caller + "," + callerName + "," + callee + "," + calleeName + "," + timeStr + "," + dur;String callLog = caller + ","  + callee  + "," + timeStr + "," + dur;fw.write(callLog+ "rn");fw.flush();Thread.sleep(200);}} catch (Exception e) {e.printStackTrace();}}}
    4.打成jar包,扔到Linux上执行s100/s200b.ubuntu上创建目录$> mkdir /home/ubuntu/callloga.执行命令cmd> java -cp CallLogGenModel-1.0-SNAPSHOT.ain.App d:\calllog\calllog.log$>   java -cp /share/calllog/CallLogGenModel-1.0-SNAPSHOT.ain.App /home/ubuntu/calllog/calllog.logc.编写快捷脚本 ~/calllog/calllog.sh#!/bin/bashjava -cp /share/calllog/CallLogGenModel-1.0-SNAPSHOT.ain.App /home/ubuntu/calllog/calllog.logd.修改calllog.sh权限$calllog> chmod 777 calllog.she.执行calllog.sh脚本$calllog> ./calllog.sh四、启动s100 s200 的flume,开始实时收集日志calllog.log [s100 s200]
---------------------------------------------------------1.编写flume配置文件[flume/f]
a1.sources = r1a1.sinks = k1a1.channels = c1a1.pe=exec# -c +0 如果从头开始收集  -F:持续收集后续数据,否则进程停止。a1.sources.r1mand=tail -F -c +0 /home/ubuntu/calllog/calllog.loga1.pe=memorya1.pe = org.apache.flume.sink.kafka.KafkaSinka1.sinks.pic = callloga1.sinks.k1.kafka.bootstrap.servers = s200:9092 s300:9092 s400:9092a1.sinks.k1.kafka.flumeBatchSize = 20a1.sinks.k1.kafka.producer.acks = 1a1.sources.r1.channels = c1a1.sinks.k1.channel = c1
    2.在s100和s200上启动flume,开始收集日志$s100> flume-ng agent -f /soft/flume/f -n a1 &$s200> flume-ng agent -f /soft/flume/f -n a1 &五、启动kafka集群
--------------------------------------------------1.启动zk集群[s100 s200 s300]$> zkServer.sh start$> xcall.sh jps2.启动kafka集群[s200 s300 s400]$> /soft/kafka/bin/kafka-server-start.sh -daemon /soft/kafka/config/server.properties$> netstat -ano | grep 90923.创建kafka主题$> kafka-topics.sh --create --zookeeper s100:2181 --replication-factor 3 --partitions 4 --topic calllog$> kafka-topics.sh --list --zookeeper s100:21814.在s300上开启kafka控制台消费者,消费flume收集的calllog主题,用于测试.$s300> kafka-console-consumer.sh --zookeeper s100:2181 --topic calllog5.在s100和s200上 开启日志生成app,查看s300控制台输出情况$s100> ~/calllog/calllog.sh$s200> ~/calllog/calllog.sh六、编写真正的kafka消费者HBase -- 从kafka提取消息,存放到hbase中
---------------------------------------------------------------------1.启动hadoop集群[s100 s500 / s200 s300 s400], 完全分布式 + HAa.$s100> start-all.shb.$s100> xcall.sh jps6656 Jps6353 ResourceManager6261 DFSZKFailoverController3317 QuorumPeerMain5818 NameNode----xcall : jps from s200 ----6224 DataNode6721 NodeManager7025 Jps6465 JournalNode3847 QuorumPeerMain4335 Kafka----xcall : jps from s300 ----6088 NodeManager6409 Jps4330 Kafka5595 DataNode5836 JournalNode3612 QuorumPeerMain----xcall : jps from s400 ----4242 Kafka5241 DataNode5738 NodeManager5482 JournalNode6059 Jps----xcall : jps from s500 ----5317 Jps5064 DFSZKFailoverController4826 NameNodec.查看webuis100:500702.启动hbase集群[s100 s500/ s200 s300 s400]a.在s100上启动集群$s100>start-hbase.shb.在s500上启动备份master节点$s500> hbase-daemon.sh start masterc.查看webuis100:160103.创建hbase名字空间 + 表a.进入hbase终端$s100> hbase shellb.创建名字空间和表$s100> create_namespace 'call'$s100> create 'call:calllogs','f1'4.编程实现 -- 创建kafka消费者,订阅calllog主题a.创建模块CalllogCustomerModel,添加maven支持b.添加maven依赖
 <?xml version="1.0" encoding="UTF-8"?><project xmlns=".0.0"xmlns:xsi=""xsi:schemaLocation=".0.0 .0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>groupId</groupId><artifactId>CalllogCustomerModel</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>0.10.0.1</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.4</version></dependency></dependencies></project>
        c.创建包calllog.kafka.hbase.customerd.编写属性文件[resources/kafka.t=s100:2181,s200:2181,s300:2181group.id=calllogzookeeper.session.timeout.ms=500zookeeper.sync.time.ms=250automit.interval.ms=1000#从头消费set=smallest#主题topic=calllog#表名table.name=call:calllogs#分区数partition.number=100#主叫标记caller.flag=0#hash区域的模式hashcode.pattern=00e.拷贝l配置文件到resources目录下
 <?xml version="1.0"?><?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration><!-- 使用完全分布式 --><property><name>hbase.cluster.distributed</name><value>true</value></property><!-- 指定hbase数据在hdfs上的存放路径 --><property><name&dir</name><value>hdfs://mycluster/hbase</value></property><!-- 配置zk地址 --><property><name&keeper.quorum</name><value>192.168.43.131:2181,192.168.43.132:2181,192.168.43.133:2181</value></property><!-- zk的本地目录 --><property><name&keeper.property.dataDir</name><value>/home/ubuntu/zookeeper</value></property></configuration>
        f.拷贝l到resources目录下
<?xml version="1.0" encoding="UTF-8"?><?xml-stylesheet type="text/xsl" href="configuration.xsl"?><configuration><property><name&plication</name><value>3</value></property><property><name>dfs.abled</name><value>true</value></property><property><name>dfs.hosts</name><value>/soft/hadoop/f</value></property><property><name>lude</name><value>/soft/hadoop/f</value></property><!-- set ha  --><property><name>dfs.nameservices</name><value>mycluster</value></property><!-- myucluster下的名称节点两个id --><!-- 注意:目前仅允许2个名称节点 --><property><name>dfs.luster</name><value>nn1,nn2</value></property><property><name>dfs.1</name><value>s100:8020</value></property><property><name>dfs.2</name><value>s500:8020</value></property><property><name>dfs.1</name><value>s100:50070</value></property><property><name>dfs.2</name><value>s500:50070</value></property><property><name>dfs.namenode.shared.edits.dir</name><value>qjournal://s200:8485;s300:8485;s400:8485/mycluster</value></property><property><name>dfs.client.failover.luster</name><value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value></property><property><name>dfs.hods</name><value>sshfenceshell(/bin/true)</value></property><property><name>dfs.ha.fencing.ssh.private-key-files</name><value>/home/ubuntu/.ssh/id_rsa</value></property><property><name>dfs.journalnode.edits.dir</name><value>/home/ubuntu/hadoop/journal</value></property><!-- set ha over  --></configuration>
        g.编写工具类PropertiesUtil -- 外部加载prop---------------------------------------------------
package calllog.kafka.hbase.customer;import java.io.IOException;import java.io.InputStream;import java.util.Properties;public class PropertiesUtil {public static Properties props;static {try {//外部加载属性文件propsInputStream is = SystemResourceAsStream("kafka.properties");props = new Properties();props.load(is);is.close();} catch (IOException e) {e.printStackTrace();}}/*** 获取属性*/public static String getPorp(String key){Property(key);}}
        h.编写类HbaseDao类 -- Hbase的访问数据对象,通过dao访问hbase1)设rowkey:常用的主要指标,全部编写进来,而且要保证定长区域号[00-99] , 1_id[主号码] , time , 标识[0/1  主叫/背叫] , 2_id[从属号码] , 时长区域号[00-99] = (1_id[后四位] + time[yyyyMM]).hash()  %   100[区域数]2)代码--------------------------------------
package calllog.kafka.hbase.customer;import org.f.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.Connection;import org.apache.hadoop.hbase.client.ConnectionFactory;import org.apache.hadoop.hbase.client.Put;import org.apache.hadoop.hbase.client.Table;import org.apache.hadoop.hbase.util.Bytes;import java.io.IOException;DecimalFormat;/*** hbase的数据访问对象*/public class HbaseDao {private Table table = null;private DecimalFormat df = new DecimalFormat();//设计rowkey的分区标识private int partitions;//0 -- 主叫  1 -- 被叫private String flag;public HbaseDao() {try {//获取配置文件Configuration conf = ate();//工厂类创建连接Connection conn = ateConnection(conf);//get tableTableName tbName = TableName.Porp("table.name"));table = Table(tbName);df.Porp("hashcode.pattern"));partitions = Integer.Porp("partition.number"));flag = Porp("caller.flag");} catch (IOException e) {e.printStackTrace();}}/*** 向hbase中put数据*/public void put(String log){if(log == null || log.equals("")){return;}try {//设计rowkeyString rowKey = "";//解析日志String [] strs = log.split(",");if(strs != null && strs.length == 4){String caller = strs[0];String callee = strs[1];String time = strs[2];String duration = strs[3];//计算区域号String hash = getRegionNumber(caller, time);rowKey = getRowkey(hash,caller,flag,time,callee,duration);//开始putPut p = new Bytes(rowKey));p.Bytes("f1"), Bytes("caller"),Bytes(caller));p.Bytes("f1"), Bytes("callee"),Bytes(callee));p.Bytes("f1"), Bytes("callTime"),Bytes(time));p.Bytes("f1"), Bytes("callDuration"),Bytes(duration));table.put(p);}} catch (IOException e) {e.printStackTrace();}}/*** 获取区域号码 -- Rowkey设计用* @return*/public String getRegionNumber(String caller, String calltime){//取得电话号码的后四位String last4Code = caller.substring(caller.length() - 4);//取得通话时间的年月String month = calltime.substring(0, 6);int hash = (Integer.parseInt(last4Code) ^ Integer.parseInt(month)) % partitions;return df.format(hash);}/*** 获取rowkey*/public String getRowkey(String hash, String caller,String time,String flag, String callee,String dur){return hash + "," + caller + "," + time + "," + flag + "," + callee + "," + dur;}}
        i.编写主类 -- HbaseCustomer---------------------------------------------
package calllog.kafka.hbase.customer;sumer.Consumer;sumer.ConsumerConfig;ssage.MessageAndMetadata;import java.io.IOException;import java.io.InputStream;import java.util.HashMap;import java.util.List;import java.util.Map;import java.util.Properties;sumer.ConsumerIterator;sumer.KafkaStream;import sumer.ConsumerConnector;import java.util.Properties;/*** hbase消费者,从kafka获取日志信息,存储到hbase中*/public class HbaseCustomer {public static void main(String [] args){//hbasedaoHbaseDao dao = new HbaseDao();//创建消费者配置文件ConsumerConfig config = new ConsumerConfig(PropertiesUtil.props);//创建消费者ConsumerConnector consumer = ateJavaConsumerConnector(new ConsumerConfig(PropertiesUtil.props));//绑定主题String topic = Porp("topic");Map<String, Integer> map = new HashMap<String, Integer>();map.put(topic, new Integer(1));//开始消费Map<String, List<KafkaStream<byte[], byte[]>>> kafkaMsg = ateMessageStreams(map);List<KafkaStream<byte[], byte[]>> msgList = (topic);String kafka_hbaseMsg = "";for(KafkaStream<byte[],byte[]> msg : msgList){ConsumerIterator<byte[],byte[]> mm = msg.iterator();while (mm.hasNext()) {MessageAndMetadata<byte[], byte[]> next = mm.next();byte [] m = ssage();//获取消息kafka_hbaseMsg = new String(m);//写入hbasedao.put(kafka_hbaseMsg);}}}}
        j.使用idea进行关联jar打包File --> project strructure --> artifacts --> ...l.在window上执行,测试cmd> java -cp CalllogCustomerModel.jar calllog.kafka.hbase.customer.HbaseCustomerm.将jar包放到共享文件夹下,在ubuntu上执行程序,查看程序是否正确执行$> java -cp CalllogCustomerModel.jar calllog.kafka.hbase.customer.HbaseCustomer七、编写web程序,从hbase中提取数据,进行可视化展示
--------------------------------------------------------1.导入上次课程的SSM工程File --> project struct --> Models --> + --> ssm.imi --> meven '+' 添加l2.进行一系列web设置:a. setting --> Application Server --> 添加tomcat服务器b. File --> project structure --> Artifacs --> + 添加ssmweb模块 --> +添加 右侧支持依赖的jar包和依赖的外部配置文件c. Run --> edit configuarations --> 添加tomcat local app --> deployment + --> 添加自己的web模块(ssm) --> 部署热更新3.运行,输入网址 localhost:8080/user/findall?pn=1,测试程序4.在domain包中添加calllog类----------------------------------------------
 package com.it18zhang.ssm.domain;/*** calllog的domain类 -- 标准javabean*/public class Calllog {private String caller;private String callee;private String callTime;private String callDuration;public String getCaller() {return caller;}public void setCaller(String caller) {this.caller = caller;}public String getCallee() {return callee;}public void setCallee(String callee) {this.callee = callee;}public String getCallTime() {return callTime;}public void setCallTime(String callTime) {this.callTime = callTime;}public String getCallDuration() {return callDuration;}public void setCallDuration(String callDuration) {this.callDuration = callDuration;}}
    5.添加calllog service接口CalllogService.interface------------------------------------------------------------
 package com.it18zhang.ssm.service;import Calllog;import java.util.List;/*** Calllog的服务类 -- 用于定制与服务器交互的规则*/public interface CalllogService {//查询所有的calllogpublic List<Calllog> findAll();}
    6.添加CalllogService的实现类CalllogServiceImpl,用于与hbase进行交互-------------------------------------------------------------------------a.准备必要的配置文件,拷贝[l / l]到resouces目录下b.添加maven依赖<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.11</artifactId><version>0.10.0.1</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>1.2.4</version></dependency>c.编写类CalllogServiceImpl------------------------------------------
 package com.it18zhang.ssm.service.impl;import com.it18zhang.ssm.domain.Calllog;import com.it18zhang.ssm.service.CalllogService;import org.f.Configuration;import org.apache.hadoop.hbase.HBaseConfiguration;import org.apache.hadoop.hbase.TableName;import org.apache.hadoop.hbase.client.*;import org.apache.hadoop.hbase.util.Bytes;import org.springframework.stereotype.Service;import java.io.IOException;import java.util.*;/*** CalllogService的实现类*/@Service("calllogService")public class CalllogServiceImpl implements CalllogService {private Table table;public CalllogServiceImpl(){try {//获取配置文件Configuration conf = ate();//工厂类创建连接Connection conn = ateConnection(conf);//get tableTableName tbName = TableName.valueOf("call:calllogs");table = Table(tbName);} catch (IOException e) {e.printStackTrace();}}/*** 查询所有的calllog* 全表扫描* @return*/public List<Calllog> findAll() {List<Calllog> list = new ArrayList<Calllog>();try {//扫描Scan scan = new Scan();ResultScanner rs = Scanner(scan);Iterator<Result> it = rs.iterator();byte[] famliy = Bytes("f1");byte[] callerf = Bytes("caller");byte[] calleef = Bytes("callee");byte[] callTimef = Bytes("callTime");byte[] callDurationf = Bytes("callDuration");Calllog calllog = null;while (it.hasNext()) {Result next = it.next();String caller = Value(famliy, callerf));String callee = Value(famliy, calleef));String callTime = Value(famliy, callTimef));String callDuration = Value(famliy, callDurationf));calllog = new Calllog();calllog.setCaller(caller);calllog.setCallee(callee);calllog.setCallTime(callTime);calllog.setCallDuration(callDuration);list.add(calllog);}} catch (Exception e) {e.printStackTrace();}return list;}}
        7.添加CalllogContorller -- 用于web界面显示-------------------------------------------------------
package com.it18zhang.ller;import com.it18zhang.ssm.domain.Calllog;import com.it18zhang.ssm.service.CalllogService;import com.it18zhang.ssm.service.impl.CalllogServiceImpl;import org.springframework.stereotype.Controller;import org.springframework.ui.Model;import org.springframework.web.bind.annotation.RequestMapping;import javax.annotation.Resource;import java.util.List;@Controllerpublic class CalllogController {@Resource(name="calllogService")private CalllogService cs;@RequestMapping("calllog/findAll")public String findAll(Model model){List<Calllog> list = cs.findAll();model.addAttribute("calllogs", list);return "calllog/calllogList";}}
        8.添加jsp页面calllog/calllogList.jsp     
  <%@ page contentType="text/html;charset=UTF-8" language="java" %><%@ taglib uri="" prefix="c" %><html><head><title>通话记录</title><link rel="stylesheet" type="text/css" href="../css/my.css"></head><body><table id="t1" border="1px" class="t-1"><tr><td>主叫</td><td>被叫</td><td>通话时间</td><td>通话时长</td></tr><c:forEach items="${calllogs}" var="u"><tr><td><c:out value="${u.caller}"/></td><td><c:out value="${u.callee}"/></td><td><c:out value="${u.callTime}"/></td><td><c:out value="${u.callDuration}"/></td></tr></c:forEach><tr><td colspan="5" style="text-align: right"></td></tr></table></body></html>

 

本文发布于:2024-02-04 16:24:06,感谢您对本站的认可!

本文链接:https://www.4u4v.net/it/170711566357204.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:电话   日志   callLog
留言与评论(共有 0 条评论)
   
验证码:

Copyright ©2019-2022 Comsenz Inc.Powered by ©

网站地图1 网站地图2 网站地图3 网站地图4 网站地图5 网站地图6 网站地图7 网站地图8 网站地图9 网站地图10 网站地图11 网站地图12 网站地图13 网站地图14 网站地图15 网站地图16 网站地图17 网站地图18 网站地图19 网站地图20 网站地图21 网站地图22/a> 网站地图23