设为首页 加入收藏

TOP

我读《通过Go来处理每分钟达百万的数据请求》(一)
2017-09-30 13:22:47 】 浏览:9532
Tags:我读 《通过 处理 分钟 百万 数据 请求

我读《通过Go来处理每分钟达百万的数据请求》

原文

原文作者为Malwarebytes公司的首席架构师Marcio Castilho http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/

问题描述

当我们的服务端需要处理大量的耗时任务时,我们一般都会考虑将耗时任务异步处理。

简单粗暴法

golang恰恰给我们的异步处理带来了很大的便利--go func()。然而,绝大多数的时候,我们不能简单粗暴的创建协程来处理异步任务,原因是不可控。虽然协程相对于线程占用的系统资源更少,但这并不代表我们可以无休止的创建协程。积水成江,不停创建协程也有压垮系统的风险。这里引用原作者的demo,一个执行耗时任务的handler。

func payloadHandler(w http.ResponseWriter, r *http.Request) {
    if r.Method != "POST" {
             w.WriteHeader(http.StatusMethodNotAllowed)
             return
    }
    // Read the body into a string for json decoding
     var content = &PayloadCollection{}
     err := json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&content)
    if err != nil {
             w.Header().Set("Content-Type", "application/json; charset=UTF-8")
             w.WriteHeader(http.StatusBadRequest)
             return
     }

    // Go through each payload and queue items individually to be posted to S3
    for _, payload := range content.Payloads {
        go payload.UploadToS3()   // <----- DON'T DO THIS
    }
    w.WriteHeader(http.StatusOK)
}

这就是我们遇到的第一个问题,简单粗暴起协程处理耗时任务导致的系统不可控性。我们自然而然就会想,怎么能让系统更可控呢。

优雅的方法

一个很自然的思路是,建立任务队列。golang提供了线程安全的任务队列实现方式--带缓冲的channal。但是这样只是延后了请求的爆发。作者意识到,要解决这一问题,必须控制协程的数量。如何控制协程的数量?Job/Worker模式!这里我将作者的代码修改了一下,单文件执行,以记录这一模式。

package main

import (
    "fmt"
    "reflect"
    "time"
)

var (
    MaxWorker = 10
)

type Payload struct {
    Num int
}

//待执行的工作
type Job struct {
    Payload Payload
}

//任务channal
var JobQueue chan Job

//执行任务的工作者单元
type Worker struct {
    WorkerPool chan chan Job //工作者池--每个元素是一个工作者的私有任务channal
    JobChannel chan Job      //每个工作者单元包含一个任务管道 用于获取任务
    quit       chan bool     //退出信号
    no         int           //编号
}

//创建一个新工作者单元
func NewWorker(workerPool chan chan Job, no int) Worker {
    fmt.Println("创建一个新工作者单元")
    return Worker{
        WorkerPool: workerPool,
        JobChannel: make(chan Job),
        quit:       make(chan bool),
        no:         no,
    }
}

//循环 监听任务和结束信号
func (w Worker) Start() {
    go func() {
        for {
            // register the current worker into the worker queue.
            w.WorkerPool <- w.JobChannel
            fmt.Println("w.WorkerPool <- w.JobChannel", w)
            select {
            case job := <-w.JobChannel:
                fmt.Println("job := <-w.JobChannel")
                // 收到任务
                fmt.Println(job)
                time.Sleep(100 * time.Second)
            case <-w.quit:
                // 收到退出信号
                return
            }
        }
    }()
}

// 停止信号
func (w Worker) Stop() {
    go func() {
        w.quit <- true
    }()
}

//调度中心
type Dispatcher struct {
    //工作者池
    WorkerPool chan chan Job
    //工作者数量
    MaxWorkers int
}

//创建调度中心
func NewDispatcher(maxWorkers int) *Dispatcher {
    pool := make(chan chan Job, maxWorkers)
    return &Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
}

//工作者池的初始化
func (d *Dispatcher) Run() {
    // starting n number of workers
    for i := 1; i < d.MaxWorkers+1; i++ {
        worker := NewWorker(d.WorkerPool, i)
        worker.Start()
    }
    go d.dispatch()
}

//调度
func (d *Dispatcher) dispatch() {
    for {
        select {
        case job := <-JobQueue:
            fmt.Println("job := <-JobQueue:")
            go func(job Job) {
                //等待空闲worker (任务多的时候会阻塞这里)
                jobChannel := <-d.WorkerPool
                fmt.Println("jobChannel := <-d.WorkerPool", reflect.TypeOf(jobChannel))
                // 将任务放到上述woker的私有任务channal中
                jobChannel <- job
                fmt.Println("jobChannel <- job")
            }(job)
        }
    }
}

func main() {
    JobQueue = make(chan Job, 10)
    dispatcher := NewDispatcher
首页 上一页 1 2 3 下一页 尾页 1/3/3
】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇golang 用tar打包文件或文件夹 下一篇我学习go的五个感悟(译)

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目