object MusicRecommend {def musicRecommend()={val conf = new SparkConf().setAppName("musicRecommend")val sc = new SparkContext(conf)
// 处理艺术家数据 val rawArtistData = sc.textFile("ds/artist", 4)
// 由于文件存在非法行 不是用t分割的/*val aritstByID = rawArtistData.map{ l => val (id,name) = l.span(_ != "t")(id.im()) }*/val artistByID = rawArtistData.flatMap{line =>val (id,name) = line.span(_ != 't') if(name.isEmpty()) Noneelse try{Some((id.im()))}catch{case e :Exception => None}}
// 处理艺术家别名数据val rawArtistAlias = sc.textFile("ds/alias")val artistAlias = rawArtistAlias.flatMap{ line =>val tokensim().split("\s+")if(tokens.length != 2){None}else{Some((tokens(0).toInt,tokens(1).toInt))}}.collectAsMap()
// 这里使用广播变量,spark执行一个stage阶段时,会为执行函数建立一个闭包,
// 也就是所有任务需要的信息的2进制形式。闭包包含了函数引用的所有数据结构。
// spark把这个闭包发送到集群上每个executor上.
// 当许多任务使用同一个不可变的数据结构时,我们应该使用广播变量。
// 好处是可以在多个作业和阶段stage之间缓存数据val bArtistAlias = sc.broadcast(artistAlias)
// 处理用户收听记录val rawUserArtistData = sc.textFile("ds/user*")
// 生成训练数据val trainData = rawUserArtistData.map{ line =>val Array(userID,artistID,count) = line.split("\s+").map(_.toInt)val finalArtistID=OrElse(artistID, artistID)Rating(userID,finalArtistID,count)}.cache()
// 使用训练数据进行训练
// MatrixFactorizationModel 其他的参数都是超参数 其值直接影响到最终结果val model = ainImplicit(trainData, 10, 5, 0.01, 1.0)
// 检查推荐结果是否合理val userId = 2093760val rcNum = 5
// 1.获取用户2093760的数据val rawArtistsForUser = rawUserArtistData.map(_.split("\s+")).filter{case Array(user,_,_) => Int == userId}
// 2.获取用户2093760感兴趣的艺术家集val existingProducts = rawArtistsForUser.map {case Array(_,artist,_) => Int}.Set
// 3.找到该艺术家对应的名称,并打印artistByID.filter{case (id,name) =&ains(id)}.llect.foreach(println)
// 获取该用户的几个推荐结果val recommendations = endProducts(userId, rcNum)
// 打印推荐结果recommendations.foreach(println)
// 结果构成Rating(userid,artistID,rating)rating表示一个数值,其值越接近1表示推荐结果越好}
}
本文发布于:2024-02-04 20:44:21,感谢您对本站的认可!
本文链接:https://www.4u4v.net/it/170716009159444.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
留言与评论(共有 0 条评论) |