设为首页 加入收藏

TOP

HBase 项目
2019-04-14 13:45:42 】 浏览:19
Tags:HBase 项目
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_35641192/article/details/81150512

1、涉及概念梳理:命名空间
1、 命名空间的结构
这里写图片描述
1) Table:表,所有的表都是命名空间的成员,即表必属于某个命名空间,如果没有指定,则在 default 默认的命名空间中。

2) RegionServer group: 一个命名空间包含了默认的 RegionServer Group。

3) Permission: 权限,命名空间能够让我们来定义访问控制列表 ACL(Access Control List)。
例如,创建表,读取表,删除,更新等等操作。

4) Quota: 限额,可以强制一个命名空间可包含的 region 的数量。(属性:hbase.quota.enabled)

2、命名空间的使用
1) 创建命名空间
hbase(main):002:0> create_namespace 'ns_school'

2) 创建表时指定命名空间
hbase(main):004:0> create 'ns_school:tbl_student','info'

3) 观察 HDFS 中的目录结构的变化

img-blog.csdn.net/20180721233003649watermark/2/text/aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM1NjQxMTky/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70" alt="这里写图片描述" title="">

2、微博系统
1、需求分析
1) 微博内容的浏览,数据库表设计
2) 用户社交体现:关注用户,取关用户
3) 拉取关注的人的微博内容
这里写图片描述

2、 代码实现
代码设计总览:
1) 创建命名空间以及表名的定义
2) 创建微博内容表
3) 创建用户关系表
4) 创建用户微博内容接收邮件表
5) 发布微博内容
6) 添加关注用户
7) 移除(取关)用户
8) 获取关注的人的微博内容
9) 测试

创建命名空间以及表名的定义

    // HBase的配置对象
    private Configuration conf = HBaseConfiguration.create();

    // 创建weibo这个业务的命名空间,3张表
    // private static final byte[] NS_WEIBO = Bytes.toBytes("ns_weibo");
    private static final byte[] TABLE_CONTENT = Bytes.toBytes("ns_weibo:content");
    private static final byte[] TABLE_RELATION = Bytes.toBytes("ns_weibo:relation");
    private static final byte[] TABLE_INBOX = Bytes.toBytes("ns_weibo:inbox");

    // 初始化表与命名空间
    public void init() throws IOException {
        // 创建微博业务命名空间
        initNamespace();
        // 创建微博内容表
        initTableContent();
        // 创建用户关系表
        initTableRelation();
        // 创建收件箱表
        initTableInbox();
    }

    // 创建微博业务命名空间
    private void initNamespace() throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();
        // 创建命名空间描述器
        NamespaceDescriptor ns_weibo = NamespaceDescriptor.create("ns_weibo").addConfiguration("creator", "Joker")
                .addConfiguration("create_time", String.valueOf(System.currentTimeMillis())).build();
        admin.createNamespace(ns_weibo);
        admin.close();
        connection.close();
    }

创建微博内容表
表结构:

方法名 creatTableeContent
Table Name ns_weibo:content
RowKey 用户 ID_时间戳
ColumnFamily info
ColumnLabel 标题,内容,图片
Version 1个版本

代码:

    // /**
    // * 表名:ns_weibo:content
    // * 列族名:info
    // * 列名:content
    // * rowkey:用户id_时间戳
    // * value:微博内容(文字内容,图片URL,视频URL,语音URL)
    // * versions:1
    // *
    // * @throws IOException
    // */
    private void initTableContent() throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();

        // 创建表描述器
        HTableDescriptor contentTableDescriptor = new HTableDescriptor(TableName.valueOf(TABLE_CONTENT));
        // 创建列描述器
        HColumnDescriptor infoColumnDescriptor = new HColumnDescriptor("info");
        // 设置块缓存
        infoColumnDescriptor.setBlockCacheEnabled(true);
        // 设置块缓存大小 2M
        infoColumnDescriptor.setBlocksize(2 * 1024 * 1024);
        // 设置版本确界
        infoColumnDescriptor.setMinVersions(1);
        infoColumnDescriptor.setMaxVersions(1);

        // 将列描述器添加到表描述器中
        contentTableDescriptor.addFamily(infoColumnDescriptor);
        // 创建表
        admin.createTable(contentTableDescriptor);
        admin.close();
        connection.close();
    }

创建用户关系表
表结构:

方法名 createTableRelations
Table Name ns_weibo:relation
RowKey 用户 ID
ColumnFamily attends、 fans
ColumnLabel 关注用户 ID,粉丝用户 ID
ColumnValue 用户 ID
Version 1个版本

代码:

    // /**
    // * 表名:ns_weibo:relation
    // * 列族名:attends,fans
    // * 列名:用户id
    // * value:用户id
    // * rowkey:当前操作人的用户id
    // * versions:1
    // *
    // * @throws IOException
    // */
    private void initTableRelation() throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();
        // 创建用户关系表描述器
        HTableDescriptor relationTableDescriptor = new HTableDescriptor(TableName.valueOf(TABLE_RELATION));

        // 创建attends列描述器
        HColumnDescriptor attendsColumnDescriptor = new HColumnDescriptor("attends");
        // 设置块缓存
        attendsColumnDescriptor.setBlockCacheEnabled(true);
        // 设置块缓存大小 2M
        attendsColumnDescriptor.setBlocksize(2 * 1024 * 1024);
        // 设置版本
        attendsColumnDescriptor.setMinVersions(1);
        attendsColumnDescriptor.setMaxVersions(1);

        // 创建fans列描述器
        HColumnDescriptor fansColumnDescriptor = new HColumnDescriptor("fans");
        // 设置块缓存
        fansColumnDescriptor.setBlockCacheEnabled(true);
        // 设置块缓存大小 2M
        fansColumnDescriptor.setBlocksize(2 * 1024 * 1024);
        // 设置版本
        fansColumnDescriptor.setMinVersions(1);
        fansColumnDescriptor.setMaxVersions(1);

        // 将两个列描述器添加到表描述器中
        relationTableDescriptor.addFamily(attendsColumnDescriptor);
        relationTableDescriptor.addFamily(fansColumnDescriptor);

        // 创建表
        admin.createTable(relationTableDescriptor);
        admin.close();
        connection.close();
    }

创建微博收件箱表
表结构:

方法名 createTableInbox
Table Name ns_weibo:inbox
RowKey 用户 ID
ColumnFamily info
ColumnLabel 用户 ID
ColumnValue 取微博内容的 RowKey
Version 10

代码:

    // /**
    // * 表名:ns_weibo:inbox
    // * 列族:info
    // * 列:当前用户所关注的人的用户id
    // * value:微博rowkey
    // * rowkey:用户id
    // * versions:10
    // *
    // * @throws IOException
    // */
    private void initTableInbox() throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        Admin admin = connection.getAdmin();

        HTableDescriptor inboxTableDescriptor = new HTableDescriptor(TableName.valueOf(TABLE_INBOX));
        HColumnDescriptor infoColumnDescriptor = new HColumnDescriptor("info");
        // 设置块缓存
        infoColumnDescriptor.setBlockCacheEnabled(true);
        // 设置块缓存大小 2M
        infoColumnDescriptor.setBlocksize(2 * 1024 * 1024);
        // 设置版本
        infoColumnDescriptor.setMinVersions(10);
        infoColumnDescriptor.setMaxVersions(10);

        inboxTableDescriptor.addFamily(infoColumnDescriptor);
        admin.createTable(inboxTableDescriptor);
        admin.close();
        connection.close();
    }

发布微博内容
a、 微博内容表中添加 1 条数据
b、 微博收件箱表对所有粉丝用户添加数据

    // /**
    // * 发布微博
    // * a、向微博内容表中添加刚发布的内容,多了一个微博rowkey
    // * b、向发布微博人的粉丝的收件箱表中,添加该微博rowkey
    // * @throws IOException
    // */
    public void publishContent(String uid, String content) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        // 得到微博表对象
        Table contentTable = connection.getTable(TableName.valueOf(TABLE_CONTENT));
        // a
        // 组装rowkey
        long ts = System.currentTimeMillis();
        String rowkey = uid + "_" + ts;
        // 添加微博内容到微博表
        Put contentPut = new Put(Bytes.toBytes(rowkey));
        contentPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes("content"), Bytes.toBytes(content));
        contentTable.put(contentPut);
        // b
        // 查询用户关系表,得到当前用户的fans用户id
        Table relationTable = connection.getTable(TableName.valueOf(TABLE_RELATION));
        // 获取粉丝的用户id
        Get get = new Get(Bytes.toBytes(uid));
        get.addFamily(Bytes.toBytes("fans"));

        // 先取出所有fans的用户id,存放于一个集合之中
        List<byte[]> fans = new ArrayList<>();

        Result result = relationTable.get(get);
        for (Cell cell : result.rawCells()) {
            // 取出当前用户所有的粉丝uid
            fans.add(CellUtil.cloneva lue(cell));
        }

        // 如果没有粉丝,则不需要操作粉丝的收件箱表
        if (fans.size() <= 0)
            return;

        // 开始操作收件箱表
        Table inboxTable = connection.getTable(TableName.valueOf(TABLE_INBOX));

        // 封装用于操作粉丝收件箱表的Put对象集合
        List<Put> puts = new ArrayList<>();
        for (byte[] fansRowKey : fans) {
            Put inboxPut = new Put(fansRowKey);
            inboxPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(uid), ts, Bytes.toBytes(rowkey));
            puts.add(inboxPut);
        }
        // 向收件箱表放置数据
        inboxTable.put(puts);

        // 关闭表与连接器,释放资源
        inboxTable.close();
        relationTable.close();
        contentTable.close();
        connection.close();
    }

添加关注用户
a、 在微博用户关系表中,对当前主动操作的用户添加新关注的好友
b、 在微博用户关系表中,对被关注的用户添加新的粉丝
c、 微博收件箱表中添加所关注的用户发布的微博

    // /**
    // * 添加关注
    // * a、在用户关系表中,对当前主动操作的用户id进行添加关注的操作
    // * b、在用户关系表中,对被关注的人的用户id,添加粉丝操作
    // * c、对当前操作的用户的收件箱表中,添加他所关注的人的最近的微博rowkey
    // * @param args
    // * @throws IOException
    // */
    public void addAttends(String uid, String... attends) throws IOException {
        // 参数过滤:如果没有传递关注的人的uid,则直接返回
        if (attends == null || attends.length <= 0 || uid == null)
            return;
        // a
        Connection connection = ConnectionFactory.createConnection(conf);
        Table relationTable = connection.getTable(TableName.valueOf(TABLE_RELATION));
        List<Put> puts = new ArrayList<>();
        // 在微博用户关系表中,添加新关注的好友
        Put attendPut = new Put(Bytes.toBytes(uid));
        for (String attend : attends) {
            // 为当前用户添加关注人
            attendPut.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(attend), Bytes.toBytes(attend));
            // b
            // 被关注的人,添加粉丝(uid)
            Put fansPut = new Put(Bytes.toBytes(attend));
            fansPut.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid), Bytes.toBytes(uid));
            puts.add(fansPut);
        }
        puts.add(attendPut);
        relationTable.put(puts);
        // c
        // 取得微博内容表
        Table contentTable = connection.getTable(TableName.valueOf(TABLE_CONTENT));
        Scan scan = new Scan();
        // 用于存放扫描出来的我所关注的人的微博rowkey
        List<byte[]> rowkeys = new ArrayList<>();

        for (String attend : attends) {
            // 1002_152321283837374
            // 扫描微博rowkey,使用rowfilter过滤器
            RowFilter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(attend + "_"));
            scan.setFilter(filter);
            // 通过该scan扫描结果
            ResultScanner resultScanner = contentTable.getScanner(scan);
            Iterator<Result> iterator = resultScanner.iterator();
            while (iterator.hasNext()) {
                Result result = iterator.next();
                rowkeys.add(result.getRow());
            }
        }
        // 将取出的微博rowkey放置于当前操作的这个用户的收件箱表中
        // 如果所关注的人,没有一条微博,则直接返回
        if (rowkeys.size() <= 0)
            return;

        // 操作inboxTable
        Table inboxTable = connection.getTable(TableName.valueOf(TABLE_INBOX));
        Put inboxPut = new Put(Bytes.toBytes(uid));
        for (byte[] rowkey : rowkeys) {
            String rowkeyString = Bytes.toString(rowkey);
            String attendUID = rowkeyString.split("_")[0];
            String attendWeiboTS = rowkeyString.split("_")[1];
            inboxPut.addColumn(Bytes.toBytes("info"), Bytes.toBytes(attendUID), Long.valueOf(attendWeiboTS), rowkey);
        }
        inboxTable.put(inboxPut);

        // 关闭,释放资源
        inboxTable.close();
        contentTable.close();
        relationTable.close();
        connection.close();
    }

移除(取关)用户
a、在微博用户关系表中,对当前主动操作的用户移除取关的好友(attends)
b、在微博用户关系表中,对被取关的用户移除粉丝
c、微博收件箱中删除取关的用户发布的微博

    // /**
    // * 取关操作
    // * a、在用户关系表中,删除你要取关的那个人的用户id
    // * b、在用户关系表中,删除被你取关的那个人的粉丝中的当前操作用户id
    // * c、删除微博收件箱表中你取关的人所发布的微博的rowkey
    // * @throws IOException
    // */
    public void removeAttends(String uid, String... attends) throws IOException {
        // 参数过滤:如果没有传递关注的人的uid,则直接返回
        if (attends == null || attends.length <= 0 || uid == null)
            return;

        Connection connection = ConnectionFactory.createConnection(conf);
        // a
        // 得到用户关系表
        Table relationTable = connection.getTable(TableName.valueOf(TABLE_RELATION));
        Delete attendDelete = new Delete(Bytes.toBytes(uid));
        List<Delete> deletes = new ArrayList<>();
        for (String attend : attends) {
            // b 在对面用户关系表中移除粉丝
            attendDelete.addColumn(Bytes.toBytes("attends"), Bytes.toBytes(attend));
            Delete delete = new Delete(Bytes.toBytes(attend));
            delete.addColumn(Bytes.toBytes("fans"), Bytes.toBytes(uid));
            deletes.add(delete);
        }
        deletes.add(attendDelete);
        relationTable.delete(deletes);

        // c
        Table inboxTable = connection.getTable(TableName.valueOf(TABLE_INBOX));

        Delete delete = new Delete(Bytes.toBytes(uid));
        for (String attend : attends) {
            delete.addColumns(Bytes.toBytes("info"), Bytes.toBytes(attend));
        }
        inboxTable.delete(delete);

        // 释放资源
        inboxTable.close();
        relationTable.close();
        connection.close();
    }

获取关注的人的微博内容
a、从微博收件箱中获取所关注的用户的微博 RowKey
b、根据获取的 RowKey,得到微博内容
message类:

public class Message {
    private String uid;
    private long timestamp;
    private String content;

    public String getUid() {
        return uid;
    }

    public void setUid(String uid) {
        this.uid = uid;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    public String getContent() {
        return content;
    }

    public void setContent(String content) {
        this.content = content;
    }

    @Override
    public String toString() {
        Date date = new Date(timestamp);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return "Message [用户ID:" + uid + "\n, 发布时间:" + sdf.format(date) + "\n, 微博内容:" + content + "]\n";
    }

}

代码:

    // /**
    // * 查看微博内容
    // * a、从微博收件箱中获取所有关注的人发布的微博的微博rowkey
    // * b、根据得到的微博rowkey,去微博内容表中得到数据
    // * c、将取出的数据解码然后封装到Message对象中
    // * @throws IOException
    // */
    public List<Message> getAttendsContent(String uid) throws IOException {
        // a
        Connection connection = ConnectionFactory.createConnection(conf);
        Table inboxTable = connection.getTable(TableName.valueOf(TABLE_INBOX));
        // 从收件箱表中获取微博rowkey
        Get inboxGet = new Get(Bytes.toBytes(uid));
        inboxGet.addFamily(Bytes.toBytes("info"));
        // 每个Cell中存储了10个版本,我们只取出最新的5个版本
        inboxGet.setMaxVersions(5);

        Result inboxResult = inboxTable.get(inboxGet);
        // 准备一个存放所有微博rowkey的集合
        List<byte[]> rowkeys = new ArrayList<>();
        Cell[] inboxCells = inboxResult.rawCells();
        // 组装rowkes集合
        for (Cell cell : inboxCells) {
            rowkeys.add(CellUtil.cloneva lue(cell));
        }

        // b
        // 根据微博rowkeys,去内容表中取得微博实际内容的数据
        Table contentTable = connection.getTable(TableName.valueOf(TABLE_CONTENT));
        // 用于批量获取所有微博数据
        List<Get> contentGets = new ArrayList<>();
        for (byte[] rowkey : rowkeys) {
            Get contentGet = new Get(rowkey);
            contentGets.add(contentGet);
        }
        // 所有的结果数据
        List<Message> messages = new ArrayList<>();
        Result[] contentResults = contentTable.get(contentGets);
        for (Result r : contentResults) {
            Cell[] cs = r.rawCells();
            for (Cell c : cs) {
                // 取得contentTable中的rowkey
                String rk = Bytes.toString(r.getRow());
                // 发布微博人的UID
                String publishUID = rk.split("_")[0];
                long publishTS = Long.valueOf(rk.split("_")[1]);

                Message msg = new Message();
                msg.setUid(publishUID);
                msg.setTimestamp(publishTS);
                msg.setContent(Bytes.toString(CellUtil.cloneva lue(c)));

                messages.add(msg);
            }
        }

        contentTable.close();
        inboxTable.close();
        connection.close();

        return messages;
    }

测试

    // 发布微博
    public static void publishWeiBoTest(Weibo weiBo, String uid, String content) throws IOException {
        weiBo.publishContent(uid, content);
    }

    // 关注
    public static void addAttendTest(Weibo weiBo, String uid, String... attends) throws IOException {
        weiBo.addAttends(uid, attends);
    }

    // 取关
    public static void removeAttendTest(Weibo weiBo, String uid, String... attends) throws IOException {
        weiBo.removeAttends(uid, attends);
    }

    // 刷微博
    public static void scanWeiBoContentTest(Weibo weiBo, String uid) throws IOException {
        List<Message> list = weiBo.getAttendsContent(uid);
        System.out.println(list);
    }

    public static void main(String[] args) throws Exception {
        Weibo wb = new Weibo();
        // wb.init();

        // publishWeiBoTest(wb, "1002", "哦,我的上帝,我要踢爆他的屁股");
        // publishWeiBoTest(wb, "1002", "哦,我的上帝,我还要踢爆他的屁股");
        // publishWeiBoTest(wb, "1002", "哦,我的上帝,我非要踢爆他的屁股");
        // publishWeiBoTest(wb, "1003", "哦,我的上帝,我也要踢爆他的屁股");
        //
        // addAttendTest(wb, "1001", "1002", "1003");
        // removeAttendTest(wb, "1001", "1002");
        // scanWeiBoContentTest(wb, "1001");

        addAttendTest(wb, "1003", "1002", "1001");
        scanWeiBoContentTest(wb, "1003");

        publishWeiBoTest(wb, "1001", "嘿嘿嘿11");
        publishWeiBoTest(wb, "1001", "嘿嘿嘿22");
        publishWeiBoTest(wb, "1001", "嘿嘿嘿33");
        publishWeiBoTest(wb, "1001", "嘿嘿嘿44");
        publishWeiBoTest(wb, "1001", "嘿嘿嘿55");
        publishWeiBoTest(wb, "1001", "嘿嘿嘿66");
        scanWeiBoContentTest(wb, "1003");
    }

编程开发网
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇搭建三节点的 Hbase 环境及动态添.. 下一篇HBase RegionServe监控

评论

帐  号: 密码: (新用户注册)
验 证 码:
表  情:
内  容:

array(4) { ["type"]=> int(8) ["message"]=> string(24) "Undefined variable: jobs" ["file"]=> string(32) "/mnt/wp/cppentry/do/bencandy.php" ["line"]=> int(217) }