mahout算法源码分析之Itembased Collaborative Filtering(一)PreparePreferenceMatrixJob(四)

2014-11-24 09:04:20 · 作者: · 浏览: 9
or[itemid:prefValue,itemid:prefValue,...]
随后代码获得了用户的个数:
[java]
int numberOfUsers = (int) toUserVectors.getCounters().findCounter(ToUserVectorsReducer.Counters.USERS).getValue();
HadoopUtil.writeInt(numberOfUsers, getOutputPath(NUM_USERS), getConf());
(3)//build the rating matrix
[java]
Job toItemVectors = prepareJob(getOutputPath(USER_VECTORS), getOutputPath(RATING_MATRIX),
ToItemVectorsMapper.class, IntWritable.class, VectorWritable.class, ToItemVectorsReducer.class,
IntWritable.class, VectorWritable.class);
输入是第二个job的输出,格式为: : userid-->vector[itemid:prefValue,itemid:prefValue,...]
先看mapper:
[java]
protected void map(VarLongWritable rowIndex, VectorWritable vectorWritable, Context ctx)
throws IOException, InterruptedException {
Vector userRatings = vectorWritable.get();
int numElementsBeforeSampling = userRatings.getNumNondefaultElements();
userRatings = Vectors.maybeSample(userRatings, sampleSize);
int numElementsAfterSampling = userRatings.getNumNondefaultElements();
int column = TasteHadoopUtils.idToIndex(rowIndex.get());
VectorWritable itemVector = new VectorWritable(new RandomAccessSparseVector(Integer.MAX_VALUE, 1));
itemVector.setWritesLaxPrecision(true);
Iterator iterator = userRatings.iterateNonZero();
while (iterator.hasNext()) {
Vector.Element elem = iterator.next();
itemVector.get().setQuick(column, elem.get());
ctx.write(new IntWritable(elem.index()), itemVector);
}
ctx.getCounter(Elements.USER_RATINGS_USED).increment(numElementsAfterSampling);
ctx.getCounter(Elements.USER_RATINGS_NEGLECTED).increment(numElementsBeforeSampling - numElementsAfterSampling);
}
其中的userRatings = Vectors.maybeSample(userRatings, sampleSize);函数,由于sampleSize没有设置,所以取到的数是Integer的最大值,那么maybeSample就会返回原始值,vector中的非默认项的个数肯定是小于Integer的最大值的:
[java]
public static Vector maybeSample(Vector original, int sampleSize) {
if (original.getNumNondefaultElements() <= sampleSize) {
return original;
}
Vector sample = original.like();
Iterator sampledElements =
new FixedSizeSamplingIterator(sampleSize, original.iterateNonZero());
while (sampledElements.hasNext()) {
Vector.Element elem = sampledElements.next();
sample.setQuick(elem.index(), elem.get());
}
return sample;
}
map函数中column就是userid,然后输出是elem.index()就是itemID,而itemVector.get().setQuick(column, elem.get())其实就是设置itemVecotor为[userID:prefValue]的格式,这样的话mapper输出就是 itemID-->vector[userID:prefValue];同时还有两个计数器,因为numElementsBeforeSampling - numElementsAfterSampling=0,所以计数器Elements.USER_RATINGS_NEGLECTED就一直是零。
再看reducer:
[java]
protected void reduce(IntWritable row, Iterable vectors, Context ctx)
throws IOException, InterruptedException {
VectorWritable vectorWritable = VectorWritable.merge(vectors.iterator());
vectorWritable.setWritesLaxPrecision(true);
ctx.write(row, vectorWritable);
}
merge函数就是把mapper的输出变换成下面的形式:itemID-->vector[userID:prefValue,userID:prefVlaue,...];
所以这个job的输出是:R