在 Oracle 数据库中实现 MapReduce(二)

2014-11-24 17:12:59 · 作者: · 浏览: 1
:= length(document);

-- For every word within a document
while (istart <= len) loop
pos := instr(document, sep, istart);

if (pos = 0) then
word_rec.word := substr(document, istart);
pipe row (word_rec);
istart := len + 1;
else
word_rec.word := substr(document, istart, pos - istart);
pipe row (word_rec);
istart := pos + 1;
end if;

end loop; -- end loop for a single document

end loop; -- end loop for all documents

return;

end mapper;

--
-- The reducer emits words and the number of times they're seen
--
function reducer(in_cur in word_cur_t) return wordcnts_t
pipelined parallel_enable (partition in_cur by hash(word))
cluster in_cur by (word)
is
word_count wordcnt_t;
next varchar2(4000);
begin

word_count.count := 0;

loop

fetch in_cur into next;
exit when in_cur%notfound;

if (word_count.word is null) then

word_count.word := next;
word_count.count := word_count.count + 1;

elsif (next <> word_count.word) then

pipe row (word_count);
word_count.word := next;
word_count.count := 1;

else

word_count.count := word_count.count + 1;

end if;

end loop;

if word_count.count <> 0 then
pipe row (word_count);
end if;

return;

end reducer;

end;
/


-- Select statements

select word, count(*)
from (
select value(map_result).word word
from table(oracle_map_reduce.mapper(cursor(select a from documents), ' ')) map_result)
group by (word);

select *
from table(oracle_map_reduce.reducer(
cursor(select value(map_result).word word
from table(oracle_map_reduce.mapper(
cursor(select a from documents), ' ')) map_result)));


英文原文:In-Database MapReduce (Map-Reduce)