设为首页 加入收藏

TOP

protobuf-gRPC-kafka-thrift-hbase数据传输
2019-01-21 02:33:32 】 浏览:131
Tags:protobuf-gRPC-kafka-thrift-hbase 数据传输
版权声明:本文为博主原创文章,转载请注明来源。开发合作联系luanpenguestc@sina.com https://blog.csdn.net/luanpeng825485697/article/details/81064999

全栈工程师开发手册 (作者:栾鹏)
架构系列文章

在这个流程中:

  1. 需要先定义proto文件
  2. 然后通过grcp工具,生成proto格式数据访问py包和grpc协议传输py包
  3. 启动grpc服务器,接收proto数据,并作为kafka生产者,想kafka发送数据
  4. 启动kafka才能正常接收上面生产者发送的数据
  5. 创建kafka消费者,接收数据,形成thrift命令,并发送给thrift
  6. 启动thrift才能接收这个命令,并进行hbase操作.
  7. 启动hbase,才能接收到thrift发来执行命令

定义proto文件

首先我们要在客户端定义proto数据格式.创建proto_data.proto文件

syntax = "proto3";
package example;


service FormatData {   //定义服务,用在rpc传输中
  rpc DoFormat(actionrequest) returns (actionresponse){}
}
message actionrequest {
  string name = 1;
  int32 age = 2;
  int32 userid = 3;
}
message actionresponse{
  string text=1;
}

然后通过grpc_tools.protoc生成调用该类型数据的py文件和传输该类数据grpc协议的py文件

在proto_data.proto文件目录下执行

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. ./proto_data.proto

会生成:proto_data_pb2.py 与 proto_data_pb2_grpc.py

创建proto数据发送端

客户端调用生成的proto数据库,并发送到proto服务器

下面代码存储为proto_gRPC.py

# 实现了客户端用于发送数据并打印接收到 server 端处理后的数据

import grpc
from proto_data import proto_data_pb2, proto_data_pb2_grpc
import time
_HOST = 'localhost'
_PORT = '8080'


def run():
    conn = grpc.insecure_channel(_HOST + ':' + _PORT)  # 监听频道
    print(conn)
    client = proto_data_pb2_grpc.FormatDataStub(channel=conn)   # 客户端使用Stub类发送请求,参数为频道,为了绑定链接
    print(client)
    i=0
    while True:
        i += 1
        data = proto_data_pb2.actionrequest(userid=i,name='lp'+str(i),age=i)
        response = client.DoFormat(data)   # 返回的结果就是proto中定义的类
        print("发送: name" + str(data.userid))
        time.sleep(1)


if __name__ == '__main__':
    run()

gRPC服务器接收proto数据,发送给kafka

启动gRPC服务器处理此类数据, 并将数据发送给kafka消息队列.

下面代码定义为gRPC_kafka.py

# 实现了 server 端用于接收客户端发送的数据,并对数据进行大写处理后返回给客户端


# 使用前要先启动zookeeper和kafka服务
# 启动zookeeper要cd /home/lp/soft/kafka_2.11-1.1.0,然后 bin/zookeeper-server-start.sh config/zookeeper.properties
# 启动kafka要cd /home/lp/soft/kafka_2.11-1.1.0,然后bin/kafka-server-start.sh config/server.properties


from kafka import KafkaProducer
import time,json
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])  #此处ip可以是多个['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]



import grpc
import time
from concurrent import futures
from proto_data import proto_data_pb2, proto_data_pb2_grpc

_ONE_DAY_IN_SECONDS = 60 * 60 * 24
_HOST = 'localhost'
_PORT = '8080'


# 实现一个派生类,重写rpc中的接口函数.自动生成的grpc文件中比proto中的服务名称多了一个Servicer
class FormatData(proto_data_pb2_grpc.FormatDataServicer):
    # 重写接口函数.输入和输出都是proto中定义的Data类型
    def DoFormat(self, request, context):
        msg = {'userid': request.userid, 'name': request.name, 'age': request.age}  # 解析接收到的数据
        producer.send('test', json.dumps(msg).encode('utf-8'))  # 将数据发送kafka(指定主题,只能发送bytes)
        print('向kafka发送数据:',msg)
        return proto_data_pb2.actionresponse(text=str(time.time()))  # 返回一个类实例



def serve():
    # 定义服务器并设置最大连接数,corcurrent.futures是一个并发库,类似于线程池的概念
    grpcServer = grpc.server(futures.ThreadPoolExecutor(max_workers=4))   # 创建一个服务器
    proto_data_pb2_grpc.add_FormatDataServicer_to_server(FormatData(), grpcServer)  # 在服务器中添加派生的接口服务(自己实现了处理函数)
    grpcServer.add_insecure_port(_HOST + ':' + _PORT)    # 添加监听端口
    grpcServer.start()    #  启动服务器
    try:
        while True:
            time.sleep(_ONE_DAY_IN_SECONDS)
    except KeyboardInterrupt:
        grpcServer.stop(0) # 关闭服务器


if __name__ == '__main__':
    serve()

kafka消费者将数据发送给thrift,存储到hbase

下面文件定义为kafka_hbase.py


# =======连接hbase======
# 使用前需要启动hbase和thrift服务器
# 启动hbase在cd /usr/local/hbase下bin/start-hbase.sh   默认端口为
# 启动thrift服务器/usr/local/hbase/bin执行./hbase-daemon.sh start thrift   默认端口为9090
from thrift.transport import TSocket,TTransport
from thrift.protocol import TBinaryProtocol
from hbase import Hbase

# thrift默认端口是9090
socket = TSocket.TSocket('127.0.0.1',9090)
socket.setTimeout(5000)

transport = TTransport.TBufferedTransport(socket)
protocol = TBinaryProtocol.TBinaryProtocol(transport)

client = Hbase.Client(protocol)
socket.open()




# 做hbase的准备工作
from hbase.ttypes import ColumnDescriptor

alltable = client.getTableNames()   # 获取所有表名
print('所有表格',alltable)
if('test' in alltable):
    allcf = client.getColumnDescriptors('test')  # 获取表的所有列族
    print('test表的列族',allcf)
    allregions = client.getTableRegions('test') # 获取所有与表关联的regions
    print('test表的所有regions',allregions)
else:
    column1 = ColumnDescriptor(name='cf1')    # 定义列族
    column3 = ColumnDescriptor(name='cf2')  # 定义列族
    client.createTable('test', [column1,column3])   # 创建表
    print('创建表test')


# 验证表是否被启用
if(not client.isTableEnabled('test')):
    client.enableTable('test')  # 启用表
    print('启用表test')




# =======链接kafka======
from kafka import KafkaConsumer
consumer = KafkaConsumer('test',auto_offset_reset='latest',bootstrap_servers=['127.0.0.1:9092'])  #参数为接收主题和kafka服务器地址

from hbase.ttypes import Mutation
import json
from hbase.ttypes import Mutation, BatchMutation
# 这是一个永久堵塞的过程,
for message in consumer:  # consumer是一个消息队列,当后台有消息时,这个消息队列就会自动增加.所以遍历也总是会有数据,当消息队列中没有数据时,就会堵塞等待消息带来
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
    # =======插入/修改数据=======
    obj = json.loads(str(message.value,'utf-8'))
    print(type(obj),obj)

    mutation1 = Mutation(column='cf1:name', value=obj['name'])
    mutation2 = Mutation(column='cf2:age', value=str(obj['age']))
    batchMutation = BatchMutation('row'+str(obj['userid']), [mutation1, mutation2])
    client.mutateRows('test', [batchMutation])  # 在表中执行一系列批次(单个行上的一系列突变)


全套运型

  1. 首先要开启hbase, 如果使用单机版hbase,会自动启动hbase自带的zookeeper
cd /usr/local/hbase
bin/start-hbase.sh
  1. 启动thrift
/usr/local/hbase/bin
./hbase-daemon.sh start thrift
  1. 启动kafka
启动zookeeper
cd /home/lp/soft/kafka_2.11-1.1.0
bin/zookeeper-server-start.sh config/zookeeper.properties

启动kafka
cd /home/lp/soft/kafka_2.11-1.1.0
bin/kafka-server-start.sh config/server.properties

4 ) 运行grpc服务器

运行gRPC_kafka.py文件启动grpc服务器

  1. 启动客户端

运行proto_gRPC.py文件

】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Storm集成Kafka和Redis 下一篇druid中 kafka-indexing-service ..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目