Massive-Data-Process-Lab3-K-means-Clustering-of-Netflix-Data

实验目标

使用K-means聚类方法对Netflix电影进行聚类.输入电影集合以及每部电影的用户评价列表,输出400个相关电影集合.

实验步骤

  • 数据预处理,将电影评论信息进行整合,一个电影输出一行 movieid, rating list
  • Canopy Selection
  • Mark Data set by Canopy
  • K-means Iteration
  • Viewer

具体实现

Data Prep

原始数据是一个Movie的信息一个文件,为了处理,需要将其转换为一个Movie 一行的形式

Map阶段:

输入文件是大量小文件,因此直接采用WholeFileInputFormat, 即Map的输入value为整个文件的内容,从中解析出MovieID, 以及对应的用户评论列表即可,注意这里需要对用户评论列表按照userID进行排序,以提高后面几步计算距离时的效率

1
2
3
4
5
6
7
8
9
10
11
String content = new String(value.getBytes(),0, value.getLength());
String [] lines = content.split("\n");
Text movie = new Text(lines[0].replace(":", ""));
List<RatingPair> user_list = new ArrayList<>();
StringBuffer rating_list = new StringBuffer();
for(int i = 1; i < lines.length; ++i) {
String []info = lines[i].split(",");
user_list.add(new RatingPair(info[0], info[1]));
}
// 按用户id排序
Collections.sort(user_list);

不需要reduce函数。

Canopy Selection

在这个步骤中,使用Simple Distance 来选择Canopy Centers, 该距离函数是两个电影的相同用户评论数.由于在第一步中已经对用户评论列表进行了按用户ID排序,因此只需要按顺序比对userID,找到超过阈值之后即可返回对应的结果。

具体实现是MAP阶段,各个分节点先处理本地的数据,从中找出Canopy Centers,然后各个节点的Canopy Centers 传入Reduce,reduce从中用同样的算法计算出Canopy Centers,因此这里需要设置reduce num为1

  • SimpleDistance:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    public static int distance(Canopy o1, Canopy o2, int threshold) {
    int count = 0;

    for(int i = 0, j = 0; i < o1.rating.size() && j < o2.rating.size();) {
    if(o1.rating.get(i).userid < o2.rating.get(j).userid) {
    ++i;
    }

    else if(o2.rating.get(j).userid.equals(o1.rating.get(i).userid)) {
    if(++count > threshold) {
    // LOG.warn( o1.id + " " + o2.id + " " + count);
    return count;
    }
    ++i;
    ++j;
    }
    else {
    ++j;
    }
    }
    // LOG.warn( o1.id + " " + o2.id + " " + count);
    return count;
    }
  • Canopy Select算法:

    从point list中随机选取一个点p ,计算该点与当前所有的Canopy Center的距离,如果p与某个Center相同用户数大于阈值8(距离小于T1),则该点属于该Canopy, 如果p与所有的Canopy Center点的相同用户数都小于8,则该p置为一个新的Canopy Center,加入CanopyList.

  • Canopy Selection具体实现:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    for(Text value: values) {
    count += 1;
    Canopy curr = new Canopy(value.toString());
    boolean newCanopy = true;
    for(Canopy p: this.canopyList) {
    if(Canopy.inCanopy(p, curr, threshold)) {
    newCanopy = false;
    break;
    }
    }
    if(newCanopy) {
    this.canopyList.add(curr);
    context.write(new Text(curr.id), new Text(curr.rating.toString()));
    }
    }

Mark Data Set by Canopy

这一步需要对每条数据标记其属于的Canopy,在MAP阶段,首先需要load 第二步中得到的Canopy Center的信息,然后对所有的电影数据,计算其与各个Canopy center的距离,若相同用户排列数超过2个,则该条电影数据属于该Canopy Center,最终输出(movieid, rating_list + canopy_list) 即可。

不需要reduce函数。

1
2
3
4
5
6
7
8
9
10
11
// 本Canopy
Canopy canopy = new Canopy(value.toString());

for(int i = 0; i < canopies.size(); ++i) {
// 遍历中心点
if(Canopy.distance(canopy, canopies.get(i), 2) > 2) {
// 在该Canopy中
canopyList.add(canopies.get(i).id);
}
}
context.write(new Text(canopy.id), new Text(canopy.rating.toString() + "\t" + canopyList.toString()));

K-means Iteration

Map Step

map 阶段的输入为每个Movie的movieid, rating_list, canopy_list, 需要在setup阶段先将第二步中选出的Canopy Center点的数据load入内存,在map函数中每接收一个Movie数据,就计算其与所属的Canopy的距离,并选择距离最近的作为其新的中心maxid,输出为(maxid, movieinfo),此处的距离是余弦距离,其计算方式为$$similarity = \frac{AB}{|A||B|} = \frac{\sum_{i = 1}^{n}Arate(user(i))Brate(user(i)))}{|A||B|}$$

出于计算效率考虑,这里使用余弦相似度的平方作为其距离,数值越大,point越接近。

Reduce Step

reduce 阶段需要计算新的Canopy Center, 输入为(Canopyid, pointlist), 首先遍历属于CanopyID这个集合的所有电影的评论数据,统计每个用户的评论数以及评论总分,然后根据用户的评论数进行排序,选择评论数前1000个用户,计算其平均评分,这些ratinglist作为新的Canopy id对应的数据。输出(canopyID, rating_list)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36

// 将map.entrySet()转换成list, 按照Value 排序
List<Map.Entry<Integer, Integer>> list = new ArrayList<Map.Entry<Integer, Integer>>(userMovieCounts.entrySet());
// 通过比较器来实现排序
Collections.sort(list, new Comparator<Map.Entry<Integer, Integer>>() {

@Override
public int compare(Map.Entry<Integer, Integer> o1, Map.Entry<Integer, Integer> o2) {
// 降序排序
return -o1.getValue().compareTo(o2.getValue());
}
});

// StringBuffer ratinglist = new StringBuffer();

int counter = 0;


List<DataPrep.RatingPair> ratingList = new ArrayList<>();

for (Map.Entry<Integer, Integer> mapping : list) {
// System.out.println(mapping.getKey() + ":" + mapping.getValue());
if(counter > 1000) break;
// LOG.error(mapping.getKey() + " " + mapping.getValue());

int rateCount = mapping.getValue();
Integer averageRate = ratingSum.get(mapping.getKey()) / rateCount;

if(averageRate > 0) {

ratingList.add(new DataPrep.RatingPair(mapping.getKey().toString(), averageRate.toString()));
// ratinglist.append(mapping.getKey() + "," + averageRate);
}
counter += 1;

}

Viewer

输出最终的聚类结果,导入Title文件,map阶段同K-means Iteration的MAP函数,但是在输出时将Movieid 转换为对应的Title,输出最终的聚类结果。

结果示意:

  • 哈利波特:Harry Potter

    • out5New 5轮 1000: (4,3,1)
    • out10Times: 10轮 1000: (4,1,1,2)
  • Star Wars

    • out5News: 4+2+1+1+1
    • out10Times: (5+2+1+1)

Write Up

  • 聚类数: 318

  • 分类结果示例

    1
    2
    10011   {5046=Iron Maiden: Visions of the Beast, 6316=New England Metal Hardcore Festival 2003, 7538=Iron Maiden: The Early Days, 7653=CKY: Infiltrate, 10011=Lamb of God: Killadelphia}
    1028 {1028=The Educational Archives: Vol. 1: Sex & Drugs, 4376=ABBA: The Definitive Collection, 4549=ABBA: Gold: Greatest Hits, 5987=Gates of Heaven, 10896=Errol Morris' First Person: The Complete Series, 10969=Jail Bait, 12449=Bad Girls Go to Hell/ Another Day Another Man, 12691=Vernon, 14696=ABBA: The Winner Takes It All, 14703=ABBA: Super Troupers}

本次实验实现了Kmeans算法和Canopy算法,通过本次实验,我对Hadoop 的MapReduce编程模式理解更加深入了,也对聚类算法有了更多的认识。

打赏