需求:将日志格式化到csv中并作简要统计和分析,且使用了ip查询api查询ip属地信息;
github地址:https://github.com/ShaogHong/my-homework
只需要执行start_main.py
提供日志文件路径即可,所有操作生成的文件都在所提供的日志文件同一目录下;



启动脚本start_main.py:
from inquire_tools import Log_format_csv from inquire_tools.tools2 import BaseAnalyzer, systems, browsers, robots def safe_input(prompt): while True: try: return input(prompt) except Exception as e: print(f"错误: {e}") def print_menu(): print("[1] 格式化日志为csv(以下操作的前提)") print("[2] ip统计排行") print("[3] 大洲统计排行") print("[4] 国家统计排行") print("[5] 城市统计排行") print("[6] ip所有者统计排行") print("[7] 请求行统计排行") print("[8] 用户代理信息机器人统计") print("[9] 用户代理信息浏览器统计") print("[10] 用户代理信息系统统计") print("[11] 状态码统计") print("[a] 2-11全部统计") print("[q]退出") def initialize_analyzer(log_filename): """按需初始化BaseAnalyzer实例""" return BaseAnalyzer(f"{log_filename}已处理.csv") def process_choice(analyzer, choice): """根据用户选择执行相应操作""" actions = { "2": lambda: analyzer._count_and_save('ip', 'IP统计'), "3": lambda: analyzer._count_and_save('大洲', '大洲统计'), "4": lambda: analyzer._count_and_save('国家', '国家统计'), "5": lambda: analyzer._count_and_save('城市', '城市统计'), "6": lambda: analyzer._count_and_save('所有者', '所有者统计'), "7": lambda: analyzer._count_and_save('请求行', '请求行统计'), "8": lambda: analyzer._user_agent_analysis(robots, '机器人统计'), "9": lambda: analyzer._user_agent_analysis(browsers, '浏览器统计'), "10": lambda: analyzer._user_agent_analysis(systems, '系统统计'), "11": lambda: analyzer.plot_status_codes() } # if choice == "1": # Log_format_csv.main(log_filename) if choice == "a": for act in actions.values(): act() elif choice in actions: actions[choice]() else: print("无效选择") print("""响应日志处理工具,感谢您的使用,希望能够帮到您!\n网址:https://zzhanzhang.top 作者:娄南湘先生""") log_filename = safe_input("把日志文件拖过来:") analyzer = None while True: print_menu() choice = safe_input("选项编号:").lower() if choice == "q": break if choice == "1": Log_format_csv.main(log_filename) continue if analyzer is None: analyzer = initialize_analyzer(log_filename) try: process_choice(analyzer, choice) except Exception as e: print(f"错误: {e}") continue
核心代码tools2.py:
import pandas as pd from matplotlib import pyplot as plt class BaseAnalyzer: def __init__(self, csv_file): self.csv_file = csv_file self.df = pd.read_csv(csv_file)
def _count_and_save(self, column_name, output_filename): counts = self.df[column_name].value_counts().reset_index() counts.columns = [column_name, '数量'] total = counts['数量'].sum() counts['百分比'] = counts['数量'].apply(lambda x: round((x / total) * 100, 2)) counts['排名'] = counts.index + 1 counts.sort_values(by='数量', ascending=False, inplace=True) self._save_to_csv(counts[['排名', column_name, '数量', '百分比']], output_filename) def _save_to_csv(self, df, filename): df.to_csv(f"{self.csv_file}_{filename}.csv", index=False) print(f"统计完成,结果已保存至{self.csv_file}_{filename}.csv") def _user_agent_analysis(self, keyword_list, output_filename, column="用户代理信息"): keyword_counts = {keyword: 0 for keyword in keyword_list} for user_agent in self.df[column]: for keyword in keyword_list: if keyword.lower() in str(user_agent).lower(): keyword_counts[keyword] += 1 total = len(self.df) ranked_data = [(i + 1, keyword, count, round(count / total * 100, 4)) for i, (keyword, count) in enumerate(sorted(keyword_counts.items(), key=lambda x: x[1], reverse=True))] df = pd.DataFrame(ranked_data, columns=['排名', '关键词', '数量', '百分比']) self._save_to_csv(df, output_filename) def plot_status_codes(self, column="状态码"): status_counts = self.df[column].value_counts() total_count = status_counts.sum() percentage_threshold = 0.01 filtered_counts = status_counts[status_counts / total_count >= percentage_threshold] other_count = status_counts[status_counts / total_count < percentage_threshold].sum() if other_count > 0: filtered_counts['其他'] = other_count plt.figure(figsize=(8, 6)) plt.pie(filtered_counts, labels=filtered_counts.index, autopct=lambda p: f'{p:.2f}% ({int(p * total_count) / 100})') plt.rcParams['font.sans-serif'] = ['SimHei'] plt.title('响应状态码统计') code_pie = f'{self.csv_file}_响应状态统计.png' plt.savefig(code_pie) plt.close() print(f"图表已保存至: {code_pie}") browsers = [ "Chrome", "Safari", "Firefox", "Internet Explorer", "Edge", "Edg", "Opera", "Brave", "BIDUBrowser", "UCBrowser", "SamsungBrowser", "Maxthon", "Netscape", "Konqueror", "SeaMonkey", "Camino", "MSIE", "PaleMoon", "Waterfox", "Vivaldi", "Avant Browser", "Yandex", "Epic Privacy Browser", "Torch", "SlimBrowser", "Midori", "Dolphin", "Puffin", "Silk", "BlackBerry", "IE Mobile", "Android Browser", "Chrome Mobile", "Opera Mini", "UCWEB", "QQBrowser", "QQ", "Tor Browser", "Comodo Dragon", "Sogou Explorer", "360 Browser", "Baidu Browser", "Samsung Internet", "Opera Coast", "Opera GX", "Brave Mobile", "Firefox Focus", "Firefox Reality", "Microsoft Internet Explorer", "Microsoft Edge Mobile", "Chromium", "Seamonkey", "K-Meleon", "Avast Secure Browser", "Bitwarden Authenticator", "Brave Shields", "Brave Rewards", "CentBrowser", "Coc Coc", "Comodo IceDragon", "Disruptor Browser", "GreenBrowser", "Iridium Browser", "K-Ninja", "Kiwi Browser", "Lunascape", "Maxthon Cloud Browser", "Orbitum", "QupZilla", "Slimjet", "SRWare Iron", "Torch Browser", "UCWeb Browser", "Vivaldi Snapshot", "Xombrero", "Opera GX Gaming Browser", "Opera Touch", "Brave Browser for Android", "Firefox for Android", "Chrome for Android", "Edge for Android", "Samsung Internet for Android", "UC Browser for Android", "QQ Browser for Android", "Brave Browser for iOS", "Firefox for iOS", "Safari for iOS", "Opera for iOS", "Opera Coast for iOS", "Puffin Browser", "Dolphin Browser", "CM Browser", "Flynx Browser", "Ghostery Browser", "Maxthon Browser", "Opera Mini for Windows", "Perfect Browser", "Photon Browser" ] robots = [ "Googlebot", "Bingbot", "Yahoo", "Slurp", "Baiduspider", "YandexBot", "DuckDuckBot", "Sogou web spider", "Exabot", "FacebookBot", "Twitterbot", "LinkedInBot", "Pinterestbot", "Applebot", "MJ12bot", "AhrefsBot", "SemrushBot", "RamblerBot", "DotBot", "BingPreview", "YandexImages", "Screaming Frog SEO Spider", "SeznamBot", "Embedly", "Slackbot", "TelegramBot", "WhatsApp", "Discordbot", "360Spider", "MSNBOT", "NaverBot", "Gigabot", "YandexMobileBot", "FacebookExternalHit", "Pinterest", "Ask Jeeves/Teoma", "Alexa Crawler", "YodaoBot", "ia_archiver", "CoccocBot", "Vsekorakhiver", "voilabot", "mail.ru_bot", "NZZ3", "TurnitinBot", "ScopeusBot", "GrapeshotCrawler", "Curalab", "SiteBot", "SitebeamBot", "SEOstats Crawler", "Butterfly Collector", "Genieo Web filter", "InfoSeek Robot 1.0", "W3 SiteSearch Crawler", "ZoomSpider.net", "Ezooms", "Teoma", "Scooter", "WebAlta Crawler", "Gigabot", "Alexa Media Crawler", "MojeekBot", "BLEXBot", "YandexSomething", "CrawllyBot", "Wotbox", "SiteExplorer.com", "sogou spider", "sogou news spider", "sogou orion spider", "sogou pic spider", "sogou video spider" ] systems = ['AIX', 'Alpine Linux', 'AmigaOS', 'Android', 'Android Auto', 'Android Go', 'Android Pie', 'Android Q', 'Android R', 'Android S', 'Android TV', 'Android Wear', 'Arch Linux', 'Bada', 'BeOS', 'BlackBerry', 'BlackBerry OS', 'Bodhi Linux', 'CentOS', 'Chrome OS', 'Debian', 'Elementary OS', 'Fedora', 'Firefox OS', 'FreeBSD', 'Gentoo', 'HP-UX', 'Haiku', 'IRIX', 'KDE neon', 'KaOS', 'KaiOS', 'KaiOSWindows', 'Kali Linux', 'Kindle', 'Kubuntu', 'Linux', 'Lubuntu', 'MX Linux', 'Mac OS X', 'Mac Os', 'Macintosh', 'Manjaro', 'Mint', 'MorphOS', 'NetBSD', 'Nintendo', 'Nintendo Wii', 'Nokia', 'OS/2', 'OpenBSD', 'Palm OS', 'Parrot OS', 'PlayStation', 'Q4OS', 'QNX', 'RISC OS', 'Raspberry Pi OS', 'Red Hat', 'Roku', 'SUSE', 'Sailfish OS', 'SmartTV', 'Solaris', 'Solus', 'SolusOS', 'SteamOS', 'SunOS', 'Symbian', 'Tails', 'Tizen', 'Trisquel', 'Ubuntu', 'Unix', 'Void Linux', 'WebTV', 'Windows 10 Mobile', 'Windows 11', 'Windows 2000', 'Windows 3.11', 'Windows 7', 'Windows 8', 'Windows 8.1', 'Windows 95', 'Windows 98', 'Windows CE', 'Windows ME', 'Windows Millennium', 'Windows Mobile', 'Windows NT 10.0', 'Windows NT 3.1', 'Windows NT 3.5', 'Windows NT 3.51', 'Windows NT 4.0', 'Windows NT 5.0', 'Windows NT 5.1', 'Windows NT 5.2', 'Windows NT 6.0', 'Windows NT 6.1', 'Windows NT 6.2', 'Windows NT 6.3', 'Windows Phone', 'Windows RT', 'Windows Server', 'Windows Vista', 'Windows XP', 'Xbox', 'Xubuntu', 'Zorin OS', 'iOS', 'iPadOS', 'macOS', 'macOS Big Sur', 'macOS Catalina', 'macOS High Sierra', 'macOS Mojave', 'macOS Monterey', 'macOS Sierra', 'macOS Ventura', 'openSUSE', 'tvOS', 'watchOS', 'webOS']
核心代码Log_format_csv.py:
import csv import requests import re import time as tm import sys DEFAULT_API_URL = "https://qifu.baidu.com/ip/geo/v1/district?ip=" def query_ip_location(ip): url = DEFAULT_API_URL + ip try: response = requests.get(url) if response.status_code == 200: data = response.json() if data.get("code") == "Success": tm.sleep(0.07) return data["data"] except Exception as e: print(f"查询 IP 时出错 {ip}: {e}") return None def parse_log_line(line): pattern = r'(?P<ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}).*?\[(?P<time>.*?)\]\s*"(?P<method>\w+)\s*(?P<path>.*?)\s*(?P<protocol>.*?)"\s*(?P<status>\d+)\s*(?P<size>\d+)\s*"(?P<referrer>.*?)"\s*"(?P<user_agent>.*?)"' match = re.match(pattern, line) if match: groups = match.groupdict() ip = groups['ip'] time = groups['time'] method = groups['method'] path = groups['protocol'].split()[0] protocol = groups['protocol'].split()[-1] status = groups['status'] size = groups['size'] referrer = groups['referrer'] user_agent = groups['user_agent'] return ip, time, method, path, protocol, status, size, referrer, user_agent return None def write_to_csv(log_data, filename): headers = ["ip", "大洲", "国家", "城市", "城市编码", "经度", "纬度", "所有者", "运营商", "请求时间", "请求方法", "请求行", "请求的资源路径", "HTTP协议版本", "状态码", "响应大小", "引用页", "用户代理信息"] with open(filename, mode='w', newline='', encoding='utf-8') as file: writer = csv.writer(file) writer.writerow(headers) writer.writerows(log_data) def row_count(file): line_count = sum(1 for _ in file) choice = input( f"该日志共有{line_count}行,任意键全部处理, 或输入数字从指定行开始处理: ") if choice.isdigit(): start_line = int(choice) file.seek(0) for _ in range(start_line): next(file) else: file.seek(0) def main(log_filename): # log_filename = input("把日志文件拖过来:") ip_locations = {} log_data = [] insert_values = [] query_ip_yesOrno = input("是否启用ip查询?(y/n): ").lower() start_time = tm.time() if query_ip_yesOrno != 'y': with open(log_filename, mode='r', encoding='utf-8') as file: row_count(file) loading_symbols = ['| ', '- ', '/ ', '\\ '] loading_index = 0 for idx, line in enumerate(file): parsed_line = parse_log_line(line) if parsed_line: ip, time, method, path, xie_yi, status, size, referrer, user_agent = parsed_line entry = [ ip, "null", "null", "null", "null", "null", "null", "null", "null", time, method, f"{method} {path} {xie_yi}", path, xie_yi, status, size, referrer, user_agent ] insert_values.append(entry) sys.stdout.write(f"\r格式化中 {loading_symbols[loading_index]}") loading_index = (loading_index + 1) % len(loading_symbols) sys.stdout.flush() if len(insert_values) == 100: log_data.extend(insert_values) insert_values = [] if len(insert_values) > 0: log_data.extend(insert_values) write_to_csv(log_data, f"{log_filename}已处理.csv") else: with open(log_filename, mode='r', encoding='utf-8') as file: row_count(file) loading_symbols = ['| ', '- ', '/ ', '\\ '] loading_index = 0 for idx, line in enumerate(file): parsed_line = parse_log_line(line) if parsed_line: ip, time, method, path, xie_yi, status, size, referrer, user_agent = parsed_line if ip not in ip_locations: ip_location = query_ip_location(ip) if ip_location: ip_locations[ip] = ip_location else: ip_location = ip_locations[ip] if ip_location: entry = [ ip, ip_location.get("continent", "null"), ip_location.get("country", "null"), ip_location.get("city", "null"), ip_location.get("areacode", "null"), ip_location.get("lng", "null"), ip_location.get("lat", "null"), ip_location.get("owner", "null"), ip_location.get("isp", "null"), time, method, f"{method} {path} {xie_yi}", path, xie_yi, status, size, referrer, user_agent ] insert_values.append(entry) sys.stdout.write(f"\r格式化中 {loading_symbols[loading_index]}") loading_index = (loading_index + 1) % len(loading_symbols) sys.stdout.flush() if len(insert_values) == 100: log_data.extend(insert_values) insert_values = [] if len(insert_values) > 0: log_data.extend(insert_values) write_to_csv(log_data, f"{log_filename}已处理.csv") end_time = tm.time() print(f"\nIP属地查询完成,日志格式化完成,TIME: {end_time - start_time:.1f}S,文件保存在{log_filename}已处理.csv")