publicclassWikiMapper5extendsMapper<IntWritable ,VectorAndPrefsWritable,VarLongWritable,VectorWritable>{ publicvoidmap(IntWritable key,VectorAndPrefsWritable vectorAndPref,Context context)throws IOException, InterruptedException{ Vector coo=vectorAndPref.getVector(); List<Long> userIds=vectorAndPref.getUserIDs(); List<Float> prefValues=vectorAndPref.getValues(); //System.out.println("alluserids:"+userIds); for(int i=0;i<userIds.size();i++){ long userID=userIds.get(i); float prefValue=prefValues.get(i); Vector par=coo.times(prefValue); context.write(new VarLongWritable(userID), new VectorWritable(par)); //System.out.println(",userid:"+userID+",vector:"+par); // if the user id = 3 is the same as my paper then is right } } }
publicclassWiKiCombiner5extendsReducer<VarLongWritable,VectorWritable,VarLongWritable,VectorWritable> { publicvoidreduce(VarLongWritable key, Iterable<VectorWritable> values,Context context)throws IOException, InterruptedException{ Vector partial=null; for(VectorWritable v:values){ partial=partial==null?v.get():partial.plus(v.get()); } context.write(key, new VectorWritable(partial)); System.out.println("userid:"+key.toString()+",vecotr:"+partial);// here also should be the same as my paper's result } }
publicclassWiKiReducer5extendsReducer<VarLongWritable,VectorWritable,VarLongWritable,RecommendedItemsWritable> { privateint recommendationsPerUser=RECOMMENDATIONSPERUSER; private String path=JOB1OUTPATH; privatestatic FastMap<Integer,String> map=new FastMap<Integer,String>(); publicvoidsetup(Context context)throws IOException{ Configuration conf=new Configuration(); FileSystem fs=FileSystem.get(URI.create(path), conf); Path tempPath=new Path(path); SequenceFile.Reader reader=null; try { reader=new SequenceFile.Reader(fs, tempPath, conf); Writable key=(Writable)ReflectionUtils.newInstance(reader.getKeyClass(),conf); Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); // long position = reader.getPosition(); while (reader.next(key, value)) { map.put(Integer.parseInt(key.toString()), value.toString()); // System.out.println(key.toString()+","+value.toString()); // position = reader.getPosition(); // beginning of next record } } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } }
publicvoidreduce(VarLongWritable key, Iterable<VectorWritable> values,Context context)throws IOException, InterruptedException{ int userID=(int)key.get(); Vector rev=null; for(VectorWritable vec:values){ rev=rev==null? vec.get():rev.plus(vec.get()); } Queue<RecommendedItem>topItems=new PriorityQueue<RecommendedItem>( recommendationsPerUser+1, Collections.reverseOrder(ByValueRecommendedItemComparator.getInstance()) ); Iterator<Vector.Element>recommendationVectorIterator= rev.iterateNonZero(); while(recommendationVectorIterator.hasNext()){ Vector.Element e=recommendationVectorIterator.next(); int index=e.index(); System.out.println("Vecotr.element.indxe:"+index); // test here find the index is item id or not ** test result : index is item if(!hasItem(userID,String.valueOf(index))){ float value=(float) e.get(); if(topItems.size()<recommendationsPerUser){ // here only set index topItems.add(new GenericRecommendedItem(index,value)); }elseif(value>topItems.peek().getValue()){ topItems.add(new GenericRecommendedItem(index,value)); topItems.poll(); } } } List<RecommendedItem>recom=new ArrayList<RecommendedItem>(topItems.size()); recom.addAll(topItems); Collections.sort(recom,ByValueRecommendedItemComparator.getInstance()); context.write(key, new RecommendedItemsWritable(recom)); } publicstaticbooleanhasItem(int user,String item){ // to check whether the user has rate the item boolean flag=false; String items=map.get(user); if(items.contains(item)){ flag=true; } return flag; } }