设为首页 加入收藏

TOP

Go实现海量日志收集系统(四)(一)
2018-10-19 15:52:07 】 浏览:159
Tags:实现 海量 日志 收集 系统

到这一步,我的收集系统就已经完成很大一部分工作,我们重新看一下我们之前画的图:

我们已经完成前面的部分,剩下是要完成后半部分,将kafka中的数据扔到ElasticSearch,并且最终通过kibana展现出来

ElasticSearch

官网地址这里介绍了非常详细的安装方法:
https://www.elastic.co/downloads/elasticsearch

但是其实这里是需要配置一些东西的,要不然直接启动是会悲剧的,在网上找了一个地址,如果出现类似的错误直接处理就行,我自己已经验证了:
https://blog.csdn.net/liangzhao_jay/article/details/56840941

如下图所示就表示已经安装完成:

 

 

 

 

 

 通过go写一个简单的调用ElasticSearch的例子:

package main

import (
    "fmt"
    elastic "gopkg.in/olivere/elastic.v2"
)

type Tweet struct{
    User string
    Message string
}

func main(){
    client,err := elastic.NewClient(elastic.SetSniff(false),elastic.SetURL("http://192.168.0.118:9200/"))
    if err != nil{
        fmt.Println("connect es error",err)
        return
    }
    fmt.Println("conn es succ")
    tweet := Tweet{User:"olivere name",Message:"Take Five"}
    _, err = client.Index().Index("twitter").Type("tweet").Id("1").BodyJson(tweet).Do()
    if err != nil {
        panic(err)
        return
    }
    fmt.Println("insert succ")
}

logtransfer

logtransfer主要负责从 kafka队列中读取日志信息,并且添加到ElasticSearch中

看那一下logtransfer 目录结构如下:

├── conf
│   └── app.conf
├── es.go
├── etcd.go
├── ip.go
├── kafka.go
├── logs
│   └── transfer.log
└── main.go

conf:存放配置文件
es.go:主要是连接ElasticSearch的部分以及用于将消息放到ElasticSearch中
etcd.go:主要用于做动态的配置更改,当我们需要将kafka中的哪些topic日志内容扔到ElasticSearch中
ip.go: 用于获取当前服务器的ip地址
kafka.go: 主要是kafka的处理逻辑,包括连接kafka以及从kafka中读日志内容
main.go:代码的入口函数

整体大代码框架,通过如图展示:

和之前的logagent中的代码有很多启示是可以复用的或者稍作更改,就可以了,其中es之心的,主要是连接ElasticSearch并将日志内容放进去

es.go的代码内容为:

package main

import (
    "gopkg.in/olivere/elastic.v2"
    "github.com/astaxie/beego/logs"
    "sync"
    "encoding/json"
)

var waitGroup sync.WaitGroup

var client *elastic.Client

func initEs(addr string,) (err error){
    client,err = elastic.NewClient(elastic.SetSniff(false),elastic.SetURL(addr))
    if err != nil{
        logs.Error("connect to es error:%v",err)
        return
    }
    logs.Debug("conn to es success")
    return
}

func reloadKafka(topicArray []string) {
    for _, topic := range topicArray{
        kafkaMgr.AddTopic(topic)
    }
}

func reload(){
    //GetLogConf() 从channel中获topic信息,而这部分信息是从etcd放进去的
    for conf := range GetLogConf(){
        var topicArray []string
        err := json.Unmarshal([]byte(conf),&topicArray)
        if err != nil {
            logs.Error("unmarshal failed,err:%v conf:%v",err,conf)
            continue
        }
        reloadKafka(topicArray)
    }
}

func Run(esThreadNum int) (err error) {
    go reload()
    for i:=0;i<esThreadNum;i++{
        waitGroup.Add(1)
        go sendToEs()
    }
    waitGroup.Wait()
    return
}

type EsMessage struct {
    Message string
}

func sendToEs(){
    // 从msgChan中读取日志内容并扔到elasticsearch中
    for msg:= range GetMessage() {
        var esMsg EsMessage
        esMsg.Message = msg.line
        _,err := client.Index().Index(msg.topic).Type(msg.topic).BodyJson(esMsg).Do()
        if err != nil {
            logs.Error("send to es failed,err:%v",err)
            continue
        }
        logs.Debug("send to es success")
    }
    waitGroup.Done()
}

最终我将logagnet以及logtransfer部署到虚拟机上进行测试的效果是:

 

 

这样当我再次查日志的时候就可以不用登陆每台服

首页 上一页 1 2 下一页 尾页 1/2/2
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇[日常] Linux下的docker实践 下一篇Go 处理yaml类型的配置文件

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目