使用Spark分析泄露的2000W开房数据 2015-08-19 21:00

说明

CSDN泄露的用户数据的格式如下:

Name,CardNo,Descriot,CtfTp,CtfId,Gender,Birthday,Address,Zip,Dirty,District1,District2,District3,District4,District5,District6,FirstNm,LastNm,Duty,Mobile,Tel,Fax,EMail,Nation,Taste,Education,Company,CTel,CAddress,CZip,Family,Version,id

部分字段的值含有逗号,这部分值用引号包起来。

分析开房次数TOP50用户

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
var hotel1RDD = sc.textFile("/data/hotel/1-200W.csv")
var hotel2RDD = sc.textFile("/data/hotel/200W-400W.csv")
var hotel3RDD = sc.textFile("/data/hotel/400W-600W.csv")
var hotel4RDD = sc.textFile("/data/hotel/600W-800W.csv")
var hotel5RDD = sc.textFile("/data/hotel/800W-1000W.csv")
var hotel6RDD = sc.textFile("/data/hotel/1000W-1200W.csv")
var hotel7RDD = sc.textFile("/data/hotel/1200W-1400W.csv")
var hotel8RDD = sc.textFile("/data/hotel/1400W-1600W.csv")
var hotel9RDD = sc.textFile("/data/hotel/1600w-1800w.csv")
var hotel10RDD = sc.textFile("/data/hotel/1800w-2000w.csv")

var hotelRDD = hotel1RDD ++ hotel2RDD ++ hotel3RDD ++ hotel4RDD 
    ++ hotel5RDD ++ hotel6RDD ++ hotel7RDD ++ hotel8RDD ++ hotel9RDD ++ hotel10RDD
// 部分字段的值含有逗号,这部分值用引号包起来,对于这些数据暂时过滤掉不处理
var tmpRDD1 = hotelRDD.map(line => line.toString().split(",")).filter(_.length == 33)
var tmpRDD2 = tmpRDD1.map(x => (x(0),x(3),x(4)))
var tmpRDD3 = tmpRDD2.map(arr => (arr,1))
var tmpRDD4 = tmpRDD3.reduceByKey((x,y) => x + y)
var tmpRDD5 = tmpRDD4.sortBy(x => x._2, false)
tmpRDD5.take(50).foreach(println)

分析离店时段分布

统计一天中,各个小时离店的人数

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// 部分字段的值含有逗号,这部分值用引号包起来,对于这些数据暂时过滤掉不处理
    hotelRDD.map(line => line.toString().split(",")).filter(_.length == 33).
    // 部分记录没有时间列,需要过滤掉
    filter(arr => arr(31).trim.length != 0).
    // 提出出时间字段
    map(x => (x(31))).
    // 分隔时间字符串
    map(_.split("[- :]")).
    // 过滤掉非法的时间
    filter(_.length == 6).
    // 提取出时间中的小时部分
    map(arr => arr(3).toInt).
    map((_,1)).
    // 统计小时的个数
    reduceByKey(_ + _).
    // 按小时值升序排序
    sortBy(c => c, true).
    foreach(println)

性别分布统计

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
// 部分字段的值含有逗号,这部分值用引号包起来,对于这些数据暂时过滤掉不处理
    hotelRDD.map(line => line.toString().split(",")).filter(_.length == 33).
    // 部分记录没有时间列,需要过滤掉
    filter(arr => arr(31).trim.length != 0).
    // 提出出性别字段
    map(x => (x(5))).
    // 过滤掉长度不是1的性别
    filter(_.trim.length == 1).
    filter(v => (v == "F" || v == "M")).
    map((_,1)).
    // 统计性别的人数
    reduceByKey(_ + _).
    foreach(println)

分析年龄分布

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
var hotelRDD = sc.textFile("file:///opt/bigdata/hotel/test.csv")
    // 部分字段的值含有逗号,这部分值用引号包起来,对于这些数据暂时过滤掉不处理
    hotelRDD.map(line => line.toString().split(",")).filter(_.length == 33).
    // 证件类型是身份证
    filter(_(3) == "ID").
    // 只挑出18位的身份证号
    filter(_(4).length == 18).
    // 出生年份
    map(_(4).substring(6,10)).
    // 合法的出生年份
    filter("^[0-9]{4}$".r.pattern.matcher(_).matches).
    // 年纪
    map(2015 - _.toInt).
    // 有效的年纪
    filter(v => (v > 0 && v < 110)).
    // 计数
    map((_,1)).
    // 统计各年纪的人数
    reduceByKey(_ + _).
    sortByKey(true).
    foreach(println)

关联开房记录与CSDN用户数据

通过email进行关联。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
var hotelRDD = sc.textFile("file:///opt/bigdata/hotel/test.csv")
// 部分字段的值含有逗号,这部分值用引号包起来,对于这些数据暂时过滤掉不处理
var rdd1 = hotelRDD.map(line => line.toString().split(",")).filter(_.length == 33).
    filter(arr => "^[a-z0-9._%\\-+]+@(?:[a-z0-9\\-]+\\.)+[a-z]{2,4}$".r.pattern.matcher(arr(22)).matches).
    map(arr => (arr(22), (arr(0), arr(3), arr(4)))).
    distinct

var csdnRDD = sc.textFile("file:///opt/bigdata/csdn/csdn.1")
var rdd2 = csdnRDD.map(line => line.toString().split(",")).map( arr => (arr(2), (arr(0), arr(1))))

//(dzz104@21cn.com,(艾国超,ID,430103197809181026,yanyi512,OFT15104))
//(dzz104@21cn.com,(戴子涵,ID,430681198808190028,yanyi512,OFT15104))
// 开房记录中,部分用户使用相同的email.
rdd1.join(rdd2).mapValues(x => (x._1._1, x._1._2, x._1._3, x._2._1, x._2._2)).take(50).foreach(println)
Tags: #Spark    Post on Spark