设为首页 加入收藏

TOP

Nginx 模块开发之日志模块---实时记录http请求信息写入flume
2018-11-28 18:07:39 】 浏览:350
Tags:Nginx 模块 开发 日志 ---实时 记录 http 请求 信息 写入 flume

一、整体架构部署图, 如下:


本图只是一个大概的描述,真实的情况会有所差异,后台部署采用二级负载均衡:一级lvs,二级nginx。日志框架采用flume(两种source:syslogtcp和avro),日志分析采用hadoop。 nginx和web server上的服务程序都需要写日志,其中nginx部分并不支持远程写日志功能,需要开发独立的模块。这也就是本文的重点,其他部分不做具体阐述。

二、需求

除了记录一些常规的日志信息外,根据项目需求还要记录每个http请求从发起到处理整个过程的时间信息。这就要求给每一个http请求加上一个唯一标示,经过考虑决定在http的header部分加入“id:uuid“,同时为了保证http请求到达时间的准确性,将日志模块放在nginx http 11个处理流程中的第一个阶段(NGX_HTTP_POST_READ_HEAD_PHASE)。达这一步的实现也是在nginx中。此外,还有一些其他信息:IP,url,reachtime等。

三、注意事项

nginx写flume采用socket+json的方式,主要考虑以下几个问题:

1、效率。采用有效长连接,也就是在nginx启动的时候就创建好socket链接,并且设置非阻塞模式。

2、异常处理。主要异常都来自于socket链路。连接失败,发送失败等等。

3、内存管理。nginx有自己的一套管理系统,自己写的模块要能跟上nginx的流程自我创建和销毁。

4、url的获取。完整的url需要自己拼凑。

5、如何插入id。这个要分析源码了。

6、配置文件。日志模块的启动以及flume syslogtcp的ip port都需要在配置文件中设置。

需求和技术方案都已经确定,接下来就是写代码了。


四、代码

1、变量。

考虑到使用的变量比较多,封装成结构体形式

typedef struct {
    ngx_int_t                       on;                   /**<是否开启日志记录 on/off*/

    ngx_fd_t                        file_fd;              /**<文件fd,用户debug状态下同步写入文件*/
    ngx_fd_t                        flume_fd;             /**<flume server fd,用于发送数据*/

    ngx_uint_t                      local_port;           /**<本地主机port*/
    char                           *local_ip;             /**<本地主机ip*/

    ngx_uint_t                      header_key_hash;      /**<header_key的hash值*/
    char                           *header_key;           /**<值为:"id"*/
    size_t                          header_key_len;       /**<"id"的长度,2*/
    char                           *header_value;         /**<header value,实际上是36位uuid*/
    size_t                          header_value_len;     /**<长度,37*/

    ngx_url_t                       flume_url;	          /**<flume socket ip:port*/

    ngx_int_t                       server_addr_index;    /**<*/
    ngx_int_t                       server_port_index;    /**<*/
    ngx_int_t                       schema_index;         /**<*/
    ngx_int_t                       request_uri_index;    /**<*/

    ngx_http_variable_value_t      *schema_value;         /**<http/https 用于拼凑url,下同*/
    ngx_http_variable_value_t      *server_addr_value;    /**<>*/
    ngx_http_variable_value_t      *request_uri_value;    /**<>*/
    ngx_http_variable_value_t      *server_port_value;    /**<>*/

}ngx_http_record_log_main_conf_t;
配置文件中,日志功能的打开和flume socket 设置分别在http的全局范围内,使用命令:record_request_log和flume_server。

static ngx_command_t  ngx_http_record_log_commands[] = {

    { ngx_string("record_request_log"),
      NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
      ngx_http_request_log,
      NGX_HTTP_MAIN_CONF_OFFSET,
      0,
      NULL },

    { ngx_string("flume_server"),
      NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
      ngx_http_flume_server,
      NGX_HTTP_MAIN_CONF_OFFSET,
      0,
      NULL },

      ngx_null_command
};



static ngx_http_module_t  ngx_http_record_log_module_ctx = {
    NULL,                                  /* preconfiguration */
    ngx_http_record_log_init,              /* postconfiguration */

    ngx_http_record_log_create_main_conf,  /* create main configuration */
    ngx_http_record_log_init_main_conf,    /* init main configuration */

    NULL,                                  /* create server configuration */
    NULL,                                  /* merge server configuration */

    NULL,       /* create location configuration */
    NULL         /* merge location configuration */
};


ngx_module_t  ngx_http_record_log_module = {
    NGX_MODULE_V1,
    &ngx_http_record_log_module_ctx,           /* module context */
    ngx_http_record_log_commands,              /* module directives */
    NGX_HTTP_MODULE,                       /* module type */
    NULL,                                  /* init master */
    NULL,                                  /* init module */
    NULL,                                  /* init process */
    NULL,                                  /* init thread */
    NULL,                                  /* exit thread */
    ngx_http_record_log_exit_process,      /* exit process */
    NULL,                                  /* exit master */
    NGX_MODULE_V1_PADDING
};

回调函数ngx_http_record_log_create_main_conf和ngx_http_record_log_init_main_conf用于完成结构体

ngx_http_record_log_main_conf_t的创建和初始化。其中,使用的变量都是在nginx的内存池中创建的,

这样内存就交给nginx统一处理,不用担心出现内存泄露或者内存碎片。

static void *
ngx_http_record_log_create_main_conf(ngx_conf_t *cf)
{
    ngx_http_record_log_main_conf_t  *rlmcf;

    rlmcf = ngx_pcalloc(cf->pool, sizeof(ngx_http_record_log_main_conf_t));
    if (rlmcf == NULL) {
        return NULL;
    }

    rlmcf->header_key = ngx_pcalloc(cf->pool,3);
    if(rlmcf->header_key ==  NULL)
    {
        return NULL;
    }

    rlmcf->local_ip = ngx_pcalloc(cf->pool,16);
    if(rlmcf->local_ip == NULL)
    {
        return NULL;
    }

    rlmcf->header_value = ngx_pcalloc(cf->pool,37);
    if(rlmcf->header_value == NULL)
    {
        return NULL;
    }

    return rlmcf;
}

static char *
ngx_http_record_log_init_main_conf(ngx_conf_t *cf, void *conf)
{

    ngx_http_record_log_main_conf_t  *rlmcf = conf;

    rlmcf->header_key_len = 2;

    strcpy(rlmcf->header_key,"id");

    rlmcf->header_key[rlmcf->header_key_len] = '\0';

    rlmcf->header_key_hash = ngx_hash(ngx_hash(0,'i'),'d');

    rlmcf->header_value_len = 36;

    return NGX_CONF_OK;
}

2、socket。

在nginx启动读取配置文件的时候,创建socket连接,但是这时有可能flume尚未启动,所以不能保证链接建立成功。如果不成功,在发送日志信息的时候会再一次建立socket连接。


ngx_int_t
ngx_init_flume_log_fd(ngx_http_record_log_main_conf_t *rlmcf,ngx_log_t * log)
{
    //ngx_connection_t * c;
    //struct ifreq temp;
    ///定义sockfd
    rlmcf->flume_fd = ngx_socket(AF_INET,SOCK_STREAM, 0);

    //ngx_log_debug1(NGX_LOG_DEBUG_EVENT, log, 0, "socket %d", flume_fd);

    if (rlmcf->flume_fd == -1) {

        ngx_log_error(NGX_LOG_ERR, log, ngx_socket_errno,
                      ngx_socket_n "[RECORD_LOG]-(ngx_init_flume_log_fd): 创建flume socket失败!!!\n");
        return NGX_ERROR;
    }

    ///连接服务器,成功返回0,错误返回-1
    if (connect(rlmcf->flume_fd, rlmcf->flume_url.addrs->sockaddr, rlmcf->flume_url.addrs->socklen) < 0)
    {

        ngx_log_error(NGX_LOG_ERR, log, ngx_socket_errno,
                           "[RECORD_LOG]-(ngx_init_flume_log_fd):conncet flume server failed\n");

        if (ngx_close_socket(rlmcf->flume_fd) == -1) {
            ngx_log_error(NGX_LOG_ERR, log, ngx_socket_errno,
                          ngx_close_socket_n "[RECORD_LOG]-(ngx_init_flume_log_fd):failed to close flume fd\n");
        }
        rlmcf->flume_fd = -1;  ///如果不是设置-1,flume_fd仍然是个有效的fd,在写日志的时候会提示错误88(非socket操作),导致不能重新连接(因为设定是错如9或者32才重新建立连接的)
        ///这里只是借用返回值 NGX_AGAIN,其并非NGX_AGAIN的本意.
        ///只是在ngx_write_flumelog()中表明连接失败,防止陷入死循环
        return NGX_AGAIN;
    }
    else
    {
        ///set nonblock
        ngx_nonblocking(rlmcf->flume_fd);
    }

    return NGX_OK;
}

当nginx结束退出时,需要close socket连接。


static void
ngx_http_record_log_exit_process(ngx_cycle_t *cycle)
{
    ngx_http_record_log_main_conf_t *rlmc;

    rlmc = ngx_http_cycle_get_module_main_conf(ngx_cycle,

                                                ngx_http_record_log_module);
    if (ngx_close_socket(rlmc->flume_fd) == -1) {
            ngx_log_error(NGX_LOG_ERR, cycle->log, ngx_socket_errno,
                          ngx_close_socket_n "[RECORD_LOG]-(ngx_http_record_log_exit_process): 关闭flume fd失败 %V failed\n",strerror(errno));
    }
    else
        ngx_log_error(NGX_LOG_EMERG,cycle->log,0,"[RECORD_LOG]-(ngx_http_record_log_exit_process):关闭flume fd\n");
    #ifdef NGX_DEBUG


    if (ngx_close_socket(rlmc->file_fd) == -1) {
            ngx_log_error(NGX_LOG_ERR, cycle->log, ngx_socket_errno,
                          ngx_close_socket_n "[RECORD_LOG]-(ngx_http_record_log_exit_process): 关闭file fd失败 %V failed",strerror(errno));
    }
    else
        ngx_log_error(NGX_LOG_EMERG,cycle->log,0,"[RECORD_LOG]-(ngx_http_record_log_exit_process):关闭 file fd\n");
    #endif
}


3、插入id。


ngx_int_t
ngx_insert_id_into_headers(ngx_http_record_log_main_conf_t *rlmcf,ngx_http_request_t *r)
{
    ngx_table_elt_t             *h;

    h = ngx_list_push(&r->headers_in.headers);
    if (h == NULL) {
        ngx_log_error(NGX_LOG_ERR,r->connection->log,ngx_errno,
            "[RECORD_LOG_TIME]-(ngx_insert_id_into_headers):失败!!!在request headers中分配空间失败,无法插入id\n");
        return NGX_ERROR;
    }


    h->hash = rlmcf->header_key_hash;

    r->header_hash = h->hash;

    h->key.len = rlmcf->header_key_len;
    h->key.data = (u_char*)rlmcf->header_key;
    h->key.data[h->key.len] = '\0';

    h->value.len = rlmcf->header_value_len;
    h->value.data = (u_char*)rlmcf->header_value;
    h->value.data[h->value.len] = '\0';

    h->lowcase_key = (u_char*)rlmcf->header_key;

    return NGX_OK;
}


4、设置http post_read 阶段的处理handler


static ngx_int_t
ngx_http_record_log_init(ngx_conf_t *cf)
{
    ngx_http_handler_pt        *h;
    ngx_http_core_main_conf_t  *cmcf;

    cmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_core_module);

    h = ngx_array_push(&cmcf->phases[NGX_HTTP_POST_READ_PHASE].handlers);
    if (h == NULL) {
        return NGX_ERROR;
    }

    *h = ngx_http_record_log_handler;

    return NGX_OK;
}

5、拼凑url

url=$schema+$server_addr+$request_uri
目前是采用这种方式,并且没有加port,因为都是80端口的,也没有加参数,目前用不到,等以后用到的时候可以再加。

...

tmp = ngx_snprintf(tmp,rlmcf->schema_value->len+9,",\"url\":\"%s:",rlmcf->schema_value->data);

    tmp = ngx_snprintf(tmp,rlmcf->server_addr_value->len+2,"//%s",rlmcf->server_addr_value->data);

    tmp = ngx_snprintf(tmp,rlmcf->request_uri_value->len, "%s",rlmcf->request_uri_value->data);


...

6、发送日志信息


...

 do{
        len = send(rlmcf->flume_fd,msg,msg_len,0);

        if(len <= 0)
        {
            ngx_log_error(NGX_LOG_ERR,r->connection->log,ngx_errno,
                "[RECORD_LOG_TIME]-ngx_write_flumelog:失败!!!发送的到flume server的数据长度小于0,错误:%s\n",
                    strerror(errno));
            ///对方关闭了连接或者是无效的连接
            if(errno == 32 || errno == 9)
            {
                close(rlmcf->flume_fd);

                t_errno = errno;

                if(NGX_OK != ngx_init_flume_log_fd(rlmcf,r->connection->log))
                {
                    ngx_log_error(NGX_LOG_ERR,r->connection->log,ngx_errno,
                        "[RECORD_LOG_TIME]-ngx_write_flumelog:重新建立连接失败!!,错误:%s\n",strerror(errno));

                    return NGX_ERROR;
                }

                ngx_log_error(NGX_LOG_EMERG,r->connection->log,ngx_errno,
                "[RECORD_LOG_TIME]-ngx_write_flumelog:重新建立连接!!\n");
            }
            else
                return NGX_ERROR;
        }
        else
        {
            break;
        }

    }while(t_errno == 9||t_errno == 32);

...


总结:

日志模块的功能比较简单,但是若想写出清晰规范的代码,需要对nginx的处理流程和内存结构有一定的了解。笔者也是看了很久的源代码才磕磕碰碰写出来的。若有写的不到位的地方还望指出,不胜感激。其实,考虑到效率问题,作为负载均衡来说还是能不加拦截模块就不加。


】【打印繁体】【投稿】【收藏】 【推荐】【举报】【评论】 【关闭】 【返回顶部
上一篇Flume常用命令 下一篇关于flume 中spooldir传输数据报..

最新文章

热门文章

Hot 文章

Python

C 语言

C++基础

大数据基础

linux编程基础

C/C++面试题目