一、整体架构部署图, 如下:
本图只是一个大概的描述,真实的情况会有所差异,后台部署采用二级负载均衡:一级lvs,二级nginx。日志框架采用flume(两种source:syslogtcp和avro),日志分析采用hadoop。 nginx和web server上的服务程序都需要写日志,其中nginx部分并不支持远程写日志功能,需要开发独立的模块。这也就是本文的重点,其他部分不做具体阐述。
二、需求
除了记录一些常规的日志信息外,根据项目需求还要记录每个http请求从发起到处理整个过程的时间信息。这就要求给每一个http请求加上一个唯一标示,经过考虑决定在http的header部分加入“id:uuid“,同时为了保证http请求到达时间的准确性,将日志模块放在nginx http 11个处理流程中的第一个阶段(NGX_HTTP_POST_READ_HEAD_PHASE)。达这一步的实现也是在nginx中。此外,还有一些其他信息:IP,url,reachtime等。
三、注意事项
nginx写flume采用socket+json的方式,主要考虑以下几个问题:
1、效率。采用有效长连接,也就是在nginx启动的时候就创建好socket链接,并且设置非阻塞模式。
2、异常处理。主要异常都来自于socket链路。连接失败,发送失败等等。
3、内存管理。nginx有自己的一套管理系统,自己写的模块要能跟上nginx的流程自我创建和销毁。
4、url的获取。完整的url需要自己拼凑。
5、如何插入id。这个要分析源码了。
6、配置文件。日志模块的启动以及flume syslogtcp的ip port都需要在配置文件中设置。
需求和技术方案都已经确定,接下来就是写代码了。
四、代码
1、变量。
考虑到使用的变量比较多,封装成结构体形式
typedef struct {
ngx_int_t on; /**<是否开启日志记录 on/off*/
ngx_fd_t file_fd; /**<文件fd,用户debug状态下同步写入文件*/
ngx_fd_t flume_fd; /**<flume server fd,用于发送数据*/
ngx_uint_t local_port; /**<本地主机port*/
char *local_ip; /**<本地主机ip*/
ngx_uint_t header_key_hash; /**<header_key的hash值*/
char *header_key; /**<值为:"id"*/
size_t header_key_len; /**<"id"的长度,2*/
char *header_value; /**<header value,实际上是36位uuid*/
size_t header_value_len; /**<长度,37*/
ngx_url_t flume_url; /**<flume socket ip:port*/
ngx_int_t server_addr_index; /**<*/
ngx_int_t server_port_index; /**<*/
ngx_int_t schema_index; /**<*/
ngx_int_t request_uri_index; /**<*/
ngx_http_variable_value_t *schema_value; /**<http/https 用于拼凑url,下同*/
ngx_http_variable_value_t *server_addr_value; /**<>*/
ngx_http_variable_value_t *request_uri_value; /**<>*/
ngx_http_variable_value_t *server_port_value; /**<>*/
}ngx_http_record_log_main_conf_t;
配置文件中,日志功能的打开和flume socket 设置分别在http的全局范围内,使用命令:record_request_log和flume_server。
static ngx_command_t ngx_http_record_log_commands[] = {
{ ngx_string("record_request_log"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_http_request_log,
NGX_HTTP_MAIN_CONF_OFFSET,
0,
NULL },
{ ngx_string("flume_server"),
NGX_HTTP_MAIN_CONF|NGX_CONF_TAKE1,
ngx_http_flume_server,
NGX_HTTP_MAIN_CONF_OFFSET,
0,
NULL },
ngx_null_command
};
static ngx_http_module_t ngx_http_record_log_module_ctx = {
NULL, /* preconfiguration */
ngx_http_record_log_init, /* postconfiguration */
ngx_http_record_log_create_main_conf, /* create main configuration */
ngx_http_record_log_init_main_conf, /* init main configuration */
NULL, /* create server configuration */
NULL, /* merge server configuration */
NULL, /* create location configuration */
NULL /* merge location configuration */
};
ngx_module_t ngx_http_record_log_module = {
NGX_MODULE_V1,
&ngx_http_record_log_module_ctx, /* module context */
ngx_http_record_log_commands, /* module directives */
NGX_HTTP_MODULE, /* module type */
NULL, /* init master */
NULL, /* init module */
NULL, /* init process */
NULL, /* init thread */
NULL, /* exit thread */
ngx_http_record_log_exit_process, /* exit process */
NULL, /* exit master */
NGX_MODULE_V1_PADDING
};
回调函数ngx_http_record_log_create_main_conf和ngx_http_record_log_init_main_conf用于完成结构体
ngx_http_record_log_main_conf_t的创建和初始化。其中,使用的变量都是在nginx的内存池中创建的,
这样内存就交给nginx统一处理,不用担心出现内存泄露或者内存碎片。
static void *
ngx_http_record_log_create_main_conf(ngx_conf_t *cf)
{
ngx_http_record_log_main_conf_t *rlmcf;
rlmcf = ngx_pcalloc(cf->pool, sizeof(ngx_http_record_log_main_conf_t));
if (rlmcf == NULL) {
return NULL;
}
rlmcf->header_key = ngx_pcalloc(cf->pool,3);
if(rlmcf->header_key == NULL)
{
return NULL;
}
rlmcf->local_ip = ngx_pcalloc(cf->pool,16);
if(rlmcf->local_ip == NULL)
{
return NULL;
}
rlmcf->header_value = ngx_pcalloc(cf->pool,37);
if(rlmcf->header_value == NULL)
{
return NULL;
}
return rlmcf;
}
static char *
ngx_http_record_log_init_main_conf(ngx_conf_t *cf, void *conf)
{
ngx_http_record_log_main_conf_t *rlmcf = conf;
rlmcf->header_key_len = 2;
strcpy(rlmcf->header_key,"id");
rlmcf->header_key[rlmcf->header_key_len] = '\0';
rlmcf->header_key_hash = ngx_hash(ngx_hash(0,'i'),'d');
rlmcf->header_value_len = 36;
return NGX_CONF_OK;
}
2、socket。
在nginx启动读取配置文件的时候,创建socket连接,但是这时有可能flume尚未启动,所以不能保证链接建立成功。如果不成功,在发送日志信息的时候会再一次建立socket连接。
ngx_int_t
ngx_init_flume_log_fd(ngx_http_record_log_main_conf_t *rlmcf,ngx_log_t * log)
{
//ngx_connection_t * c;
//struct ifreq temp;
///定义sockfd
rlmcf->flume_fd = ngx_socket(AF_INET,SOCK_STREAM, 0);
//ngx_log_debug1(NGX_LOG_DEBUG_EVENT, log, 0, "socket %d", flume_fd);
if (rlmcf->flume_fd == -1) {
ngx_log_error(NGX_LOG_ERR, log, ngx_socket_errno,
ngx_socket_n "[RECORD_LOG]-(ngx_init_flume_log_fd): 创建flume socket失败!!!\n");
return NGX_ERROR;
}
///连接服务器,成功返回0,错误返回-1
if (connect(rlmcf->flume_fd, rlmcf->flume_url.addrs->sockaddr, rlmcf->flume_url.addrs->socklen) < 0)
{
ngx_log_error(NGX_LOG_ERR, log, ngx_socket_errno,
"[RECORD_LOG]-(ngx_init_flume_log_fd):conncet flume server failed\n");
if (ngx_close_socket(rlmcf->flume_fd) == -1) {
ngx_log_error(NGX_LOG_ERR, log, ngx_socket_errno,
ngx_close_socket_n "[RECORD_LOG]-(ngx_init_flume_log_fd):failed to close flume fd\n");
}
rlmcf->flume_fd = -1; ///如果不是设置-1,flume_fd仍然是个有效的fd,在写日志的时候会提示错误88(非socket操作),导致不能重新连接(因为设定是错如9或者32才重新建立连接的)
///这里只是借用返回值 NGX_AGAIN,其并非NGX_AGAIN的本意.
///只是在ngx_write_flumelog()中表明连接失败,防止陷入死循环
return NGX_AGAIN;
}
else
{
///set nonblock
ngx_nonblocking(rlmcf->flume_fd);
}
return NGX_OK;
}
当nginx结束退出时,需要close socket连接。
static void
ngx_http_record_log_exit_process(ngx_cycle_t *cycle)
{
ngx_http_record_log_main_conf_t *rlmc;
rlmc = ngx_http_cycle_get_module_main_conf(ngx_cycle,
ngx_http_record_log_module);
if (ngx_close_socket(rlmc->flume_fd) == -1) {
ngx_log_error(NGX_LOG_ERR, cycle->log, ngx_socket_errno,
ngx_close_socket_n "[RECORD_LOG]-(ngx_http_record_log_exit_process): 关闭flume fd失败 %V failed\n",strerror(errno));
}
else
ngx_log_error(NGX_LOG_EMERG,cycle->log,0,"[RECORD_LOG]-(ngx_http_record_log_exit_process):关闭flume fd\n");
#ifdef NGX_DEBUG
if (ngx_close_socket(rlmc->file_fd) == -1) {
ngx_log_error(NGX_LOG_ERR, cycle->log, ngx_socket_errno,
ngx_close_socket_n "[RECORD_LOG]-(ngx_http_record_log_exit_process): 关闭file fd失败 %V failed",strerror(errno));
}
else
ngx_log_error(NGX_LOG_EMERG,cycle->log,0,"[RECORD_LOG]-(ngx_http_record_log_exit_process):关闭 file fd\n");
#endif
}
3、插入id。
ngx_int_t
ngx_insert_id_into_headers(ngx_http_record_log_main_conf_t *rlmcf,ngx_http_request_t *r)
{
ngx_table_elt_t *h;
h = ngx_list_push(&r->headers_in.headers);
if (h == NULL) {
ngx_log_error(NGX_LOG_ERR,r->connection->log,ngx_errno,
"[RECORD_LOG_TIME]-(ngx_insert_id_into_headers):失败!!!在request headers中分配空间失败,无法插入id\n");
return NGX_ERROR;
}
h->hash = rlmcf->header_key_hash;
r->header_hash = h->hash;
h->key.len = rlmcf->header_key_len;
h->key.data = (u_char*)rlmcf->header_key;
h->key.data[h->key.len] = '\0';
h->value.len = rlmcf->header_value_len;
h->value.data = (u_char*)rlmcf->header_value;
h->value.data[h->value.len] = '\0';
h->lowcase_key = (u_char*)rlmcf->header_key;
return NGX_OK;
}
4、设置http post_read 阶段的处理handler
static ngx_int_t
ngx_http_record_log_init(ngx_conf_t *cf)
{
ngx_http_handler_pt *h;
ngx_http_core_main_conf_t *cmcf;
cmcf = ngx_http_conf_get_module_main_conf(cf, ngx_http_core_module);
h = ngx_array_push(&cmcf->phases[NGX_HTTP_POST_READ_PHASE].handlers);
if (h == NULL) {
return NGX_ERROR;
}
*h = ngx_http_record_log_handler;
return NGX_OK;
}
5、拼凑url
url=$schema+$server_addr+$request_uri
目前是采用这种方式,并且没有加port,因为都是80端口的,也没有加参数,目前用不到,等以后用到的时候可以再加。
...
tmp = ngx_snprintf(tmp,rlmcf->schema_value->len+9,",\"url\":\"%s:",rlmcf->schema_value->data);
tmp = ngx_snprintf(tmp,rlmcf->server_addr_value->len+2,"//%s",rlmcf->server_addr_value->data);
tmp = ngx_snprintf(tmp,rlmcf->request_uri_value->len, "%s",rlmcf->request_uri_value->data);
...
6、发送日志信息
...
do{
len = send(rlmcf->flume_fd,msg,msg_len,0);
if(len <= 0)
{
ngx_log_error(NGX_LOG_ERR,r->connection->log,ngx_errno,
"[RECORD_LOG_TIME]-ngx_write_flumelog:失败!!!发送的到flume server的数据长度小于0,错误:%s\n",
strerror(errno));
///对方关闭了连接或者是无效的连接
if(errno == 32 || errno == 9)
{
close(rlmcf->flume_fd);
t_errno = errno;
if(NGX_OK != ngx_init_flume_log_fd(rlmcf,r->connection->log))
{
ngx_log_error(NGX_LOG_ERR,r->connection->log,ngx_errno,
"[RECORD_LOG_TIME]-ngx_write_flumelog:重新建立连接失败!!,错误:%s\n",strerror(errno));
return NGX_ERROR;
}
ngx_log_error(NGX_LOG_EMERG,r->connection->log,ngx_errno,
"[RECORD_LOG_TIME]-ngx_write_flumelog:重新建立连接!!\n");
}
else
return NGX_ERROR;
}
else
{
break;
}
}while(t_errno == 9||t_errno == 32);
...
总结:
日志模块的功能比较简单,但是若想写出清晰规范的代码,需要对nginx的处理流程和内存结构有一定的了解。笔者也是看了很久的源代码才磕磕碰碰写出来的。若有写的不到位的地方还望指出,不胜感激。其实,考虑到效率问题,作为负载均衡来说还是能不加拦截模块就不加。