ttern.findall(log_line)
if not res:
print('日志未匹配到请求URL,已忽略:\n%s' % log_line)
continue
method = res[0][0]
url = res[0][1].split('?')[0] # 去掉了 ?及后面的url参数
# 提取耗时
res = url_time_taken_extractor.findall(log_line)
if res:
time_taken = float(res[0])
else:
print('未从日志提取到请求耗时,已忽略日志:\n%s' % log_line)
continue
# 存储解析后的日志信息
self.log_line_parsed_queue.append({'method': method,
'url': url,
'time_taken': time_taken,
})
def collect_statistics(self):
'''收集统计数据'''
def _collect_statistics():
while self.log_line_parsed_queue or not self.log_parsing_finished:
if not self.log_line_parsed_queue:
continue
log_info = self.log_line_parsed_queue.popleft()
# do something with log_info
with parallel_backend("multiprocessing", n_jobs=1):
Parallel()(delayed(_collect_statistics)() for i in range(1))
def run(self, file_path_list):
# 多线程读取日志文件
for file_path in file_path_list:
thread = threading.Thread(target=self.read_log_file,
name="read_log_file",
args=(file_path,))
thread.start()
self.files_read_list.append(file_path)
# 启动日志解析进程
thread = threading.Thread(target=self.start_processes_for_log_parsing, name="start_processes_for_log_parsing")
thread.start()
# 启动日志统计数据收集进程
thread = threading.Thread(target=self.collect_statistics, name="collect_statistics")
thread.start()
start = datetime.now()
while threading.active_count() > 1:
print('程序正在努力解析日志...')
time.sleep(0.5)
end = datetime.now()
print('解析完成', 'start', start, 'end', end, '耗时', end - start)
if __name__ == "__main__":
log_parser = LogParser()
log_parser.run(['access.log', 'access2.log'])
注意:
需要合理的配置单次读取文件数据块的大小,不能过大,或者过小,否则都可能会导致数据读取速度变慢。笔者实践环境下,发现10M~15M每次是一个比较高效的配置。
|