已经有两个月没有写博客了,也有好几个月没有看go相关的内容了,由于工作原因最近在做java以及大数据相关的内容,导致最近工作较忙,博客停止了更新,正好想捡起之前go的东西,所以找了一个源码学习
这个也是之前用go写日志收集的时候用到的一个包 :github.com/hpcloud/tail, 这次就学习一下人家的源码,为了方便看这个代码,我将这个包进行了简化,也是用于方便理解,代码放到了:https://github.com/pythonsite/tail, 这个代码包可能无法正常用,只是为了方面理解tail这个包,以及学习人家的代码
精简后的代码目录
│ tail.go │ └─watch filechanges.go inotify.go inotify_tracker.go watch.go
tail.go: 这里包含着tail包的核心代码,主要的逻辑处理时在这个里面
watch: 这个包主要用于对文件的监控,用于将文件的变化通知到tail.如:文件修改了,文件删除了,文件内容追加了
tail.go 代码分析
在tail.go中主要有几下几个结构体:
// Line 结构体用于存读每行的时候的对象 type Line struct { Text string //当前行的内容 Time time.Time // 时间 Err error // Error from tail } type SeekInfo struct { Offset int64 Whence int } // 关于配置的结构体 type Config struct { Location *SeekInfo ReOpen bool MustExist bool // 要打开的文件是否必须存在 Poll bool Pipe bool Follow bool // 是否继续读取新的一行,可以理解为tail -f 命令 } // 核心的结构体Tail type Tail struct { Filename string // 要打开的文件名 Lines chan *Line // 用于存每行内容的Line结构体 Config watcher watch.FileWatcher changes *watch.FileChanges tomb.Tomb file *os.File reader *bufio.Reader lk sync.Mutex }
Line 结构体用于存读取文件的每行内容
Tail 是核心的结构体,我们使用tail这个包的时候其实就是会先调用初始化这个struct的方法TailFile,如我在写日志收集的时候的使用:
tail,err := tail.TailFile(conf.LogPath,tail.Config{ ReOpen:true, Follow:true, Location:&tail.SeekInfo{Offset:0,Whence:2}, MustExist:false, Poll:true, })
既然我们使用的时候就会在最开始的时候调用tail.TailFile方法,就直接看这个方法:
// 主要用于Tail结构体的初始化 func TailFile(filename string, config Config) (*Tail, error) { t := &Tail { Filename: filename, Lines: make(chan *Line), Config: config, } t.watcher = watch.NewInotifyFileWatcher(filename) if t.MustExist { var err error t.file, err = OpenFile(t.Filename) if err != nil { return nil, err } } go t.tailFileSync() return t, nil }
从这个代码里我们就可以看到它首先初始化了Tail结构体并且对Tail中的watcher进行的复制,先暂时不看watch相关的内容
然后就是关于文件是否必须存在的判断处理,最后开启了一个一个线程执行tailFileSync()方法,我们接着看tailFileSync方法
func (tail *Tail) tailFileSync(){ defer tail.Done() defer tail.close() if !tail.MustExist { err := tail.reopen() if err != nil { if err != tomb.ErrDying { tail.Kill(err) } return } } tail.openReader() var offset int64 var err error // 一行行读文件内容 for { if !tail.Pipe { offset,err = tail.Tell() if err != nil { tail.Kill(err) return } } line, err := tail.readLine() if err == nil { // 将读取的一行内容放到chan中 tail.sendLine(line) } else if err == io.EOF { // 表示读到文件的最后了 // 如果Follow 设置为false的话就不会继续读文件 if !tail.Follow { if line != "" { tail.sendLine(line) } return } // 如果Follow设置为True则会继续读 if tail.Follow && line != "" { err := tail.seekTo(SeekInfo{Offset: offset, Whence: 0}) if err != nil { tail.Kill(err) return } } // 如果读到文件最后,文件并没有新的内容增加 err := tail.waitForChanges() if err != nil { if err != ErrStop { tail.Kill(err) } return } } else { // 既不是文件结尾,也没有error tail.Killf("error reading %s :%s", tail.Filename, err) return } select { case <- tail.Dying(): if tail.Err() == errStopAtEOF { continue } return default: } } }
这个方法里主要是先调用了openReader方法,这个方法其实并没有做什么,只是对tail.reqader进行了赋值:tail.reader = bufio.NewReader(tail.file)
接着就是循环一行行的读文件
在循环里最开始判断了tail.Pipe的值,这个值一般开始我