Skip to content
PYPython 工程实践

Python 生成并插入百万数据到 MySQL 的最佳实践

本文介绍如何使用 Python 高效生成百万级测试数据并批量插入 MySQL,涵盖 Mimesis 数据生成、executemany 批量插入、事务优化等关键技巧,适用于 API 性能测试场景。

J
Jasper Labs· 2026年3月19日

数据生成

在插入数据之前,首先需要生成大量数据。Python提供了多种库和方法来生成数据,以下是几种常见的方法。

1.使用 mimesis 生成数据

Mimesis可以参考我这篇文章: Python构造数据神器-mimesis

2.使用 faker 库生成真实数据

Faker

官方文档:faker

3.自定义数据生成

示例:

python
def generate_random_data(num, data_type):
    """
    生成随机数据
    :param num: 数据数量
    :param data_type: 数据类型(0: IP, 1: URL)
    :return:
    """
    # 预分配内存
    data_list = [None] * num
    ip = None
    for i in range(num):
        if data_type == 0:
            ip = custom_random.random_ip_v4()
            name = ip
            fingerprints = {
                "basic_info": {
                    "company": custom_random.random_company(),
                    "hostname": custom_random.random_hostname(),
                    "mac": custom_random.random_mac(),
                    "middleware": custom_random.random_middleware(),
                    "name": custom_random.random_asset_name(),
                    "os": custom_random.random_os(),
                },
                "port": generate_ports()
            }
        else:
            url = custom_random.random_url()
            name = url
            fingerprints = {
                "basic_info": {
                    "company": custom_random.random_company(),
                    "hostname": custom_random.random_hostname(),
                    "mac": custom_random.random_mac(),
                    "title": custom_random.random_title(),
                    "middleware": [custom_random.random_middleware()
                                   for _ in range(np.random.randint(1, 4))],
                    "language": [custom_random.random_language()
                                 for _ in range(np.random.randint(1, 4))],
                    "database": [custom_random.random_database()
                                 for _ in range(np.random.randint(1, 4))],
                    "application": [custom_random.random_application()
                                    for _ in range(np.random.randint(1, 4))],
                    "framework": [custom_random.random_framework()
                                  for _ in range(np.random.randint(1, 4))],
                    "name": custom_random.random_asset_name(),
                },
                "port": generate_ports()
            }

        asset = {
            "name": name,
            "type": data_type,
            "create_time": datetime.now(),
            "is_alive": random.choice([0, 1]),
            "ip": ip if data_type == 0 else None,
            "fingerprints": json.dumps(fingerprints),
        }

        data_list[i] = asset
    return data_list

4.

示例:

python
def generate_data_parallel(num, data_type):
    """
    生成数据
    :param num: 数据数量
    :param data_type: 数据类型(0: IP, 1: URL)
    :return:
    """
    num_processes = multiprocessing.cpu_count()
    records_per_process = num // num_processes
    with multiprocessing.Pool(num_processes) as pool:
        results = pool.starmap(generate_random_data,
                               [(records_per_process, data_type) for _ in
                                range(num_processes)])

    return [item for sublist in results for item in sublist]

数据入库

插入数据也有多种方式,LOAD DATA INFILE

````
python
def write_csv(csv_file, data):
    """
    生成数据并写入CSV文件
    :param csv_file: CSV文件路径
    :param data: 数据生成器
    :return:
    """
    # 使用 `pandas` 批量写入 CSV 文件
    df = pd.DataFrame(data)
    df.to_csv(csv_file, index=False, encoding='utf-8')

LOAD DATA INFILE

写入csv文件后,就可以使用直接入库了

示例:

python
def load_data_infile(csv_file, table_name):
    """
    使用`LOAD DATA INFILE`插入数据
    :param csv_file: CSV文件路径
    :param table_name: 目标表名
    :return:
    """
    try:
        db.cursor.execute(f"""
            LOAD DATA INFILE '{csv_file}' IGNORE INTO TABLE {table_name}
            FIELDS TERMINATED BY ','
            ENCLOSED BY '"'
            LINES TERMINATED BY '\n'
            IGNORE 1 ROWS
            (name,type,create_time,is_alive,ip,fingerprints);
        """)
        db.conn.commit()
        print(f"成功从文件 {csv_file} 加载数据到表 {table_name}")
    except pymysql.MySQLError as e:
        print(f"加载数据时发生错误: {e}")
        db.conn.rollback()

总结

  1. ````
  2. ``
  3. ``

python
import os
import multiprocessing
import time
import json
from datetime import datetime
import random

import pymysql
import pandas as pd
import numpy as np

from scripts.build_data.utils import random as custom_random
from scripts.build_data.db import db
from scripts.build_data.db import reset_and_truncate_table


def get_device_type():
    """
    获取设备类型
    :return:
    """
    sql = """
        SELECT
        p.NAME AS parent_name,
        c.NAME AS child_name 
    FROM
        AssetCategory p
        JOIN AssetCategory c ON p.classify_id = c.parent_id 
    """
    db.cursor.execute(sql)

    # 获取所有结果
    return db.cursor.fetchall()


device_type = get_device_type()


def generate_ports():
    """
    生成随机端口信息
    :return:
    """
    num_ports = np.random.randint(1, 3)  # 随机生成 1 到 2 个端口信息
    protocols = [custom_random.random_protocol() for _ in range(num_ports)]
    ports = [custom_random.random_port() for _ in range(num_ports)]
    services = [custom_random.random_service() for _ in range(num_ports)]
    softwares = [custom_random.random_software() for _ in range(num_ports)]

    # 使用 zip 将各个部分组合成一个列表
    port_info = [
        [protocols[i], ports[i], services[i], softwares[i], "", "", []]
        for i in range(num_ports)
    ]

    return port_info


def generate_random_data(num, data_type):
    """
    生成随机数据
    :param num: 数据数量
    :param data_type: 数据类型(0: IP, 1: URL)
    :return:
    """
    # 预分配内存
    data_list = [None] * num
    ip = None
    for i in range(num):
        if data_type == 0:
            ip = custom_random.random_ip_v4()
            name = ip
            fingerprints = {
                "basic_info": {
                    "company": custom_random.random_company(),
                    "hostname": custom_random.random_hostname(),
                    "mac": custom_random.random_mac(),
                    "middleware": custom_random.random_middleware(),
                    "name": custom_random.random_asset_name(),
                    "os": custom_random.random_os(),
                },
                "port": generate_ports()
            }
        else:
            url = custom_random.random_url()
            name = url
            fingerprints = {
                "basic_info": {
                    "company": custom_random.random_company(),
                    "hostname": custom_random.random_hostname(),
                    "mac": custom_random.random_mac(),
                    "title": custom_random.random_title(),
                    "middleware": [custom_random.random_middleware()
                                   for _ in range(np.random.randint(1, 4))],
                    "language": [custom_random.random_language()
                                 for _ in range(np.random.randint(1, 4))],
                    "database": [custom_random.random_database()
                                 for _ in range(np.random.randint(1, 4))],
                    "application": [custom_random.random_application()
                                    for _ in range(np.random.randint(1, 4))],
                    "framework": [custom_random.random_framework()
                                  for _ in range(np.random.randint(1, 4))],
                    "name": custom_random.random_asset_name(),
                },
                "port": generate_ports()
            }

        asset = {
            "name": name,
            "type": data_type,
            "create_time": datetime.now(),
            "is_alive": random.choice([0, 1]),
            "ip": ip if data_type == 0 else None,
            "fingerprints": json.dumps(fingerprints),
        }

        data_list[i] = asset
    return data_list


def generate_data_parallel(num, data_type):
    """
    生成数据
    :param num: 数据数量
    :param data_type: 数据类型(0: IP, 1: URL)
    :return:
    """
    num_processes = multiprocessing.cpu_count()
    records_per_process = num // num_processes
    with multiprocessing.Pool(num_processes) as pool:
        results = pool.starmap(generate_random_data,
                               [(records_per_process, data_type) for _ in
                                range(num_processes)])

    return [item for sublist in results for item in sublist]


def write_csv(csv_file, data):
    """
    生成数据并写入CSV文件
    :param csv_file: CSV文件路径
    :param data: 数据生成器
    :return:
    """
    # 使用 `pandas` 批量写入 CSV 文件
    df = pd.DataFrame(data)
    df.to_csv(csv_file, index=False, encoding='utf-8')


def load_data_infile(csv_file, table_name):
    """
    使用`LOAD DATA INFILE`插入数据
    :param csv_file: CSV文件路径
    :param table_name: 目标表名
    :return:
    """
    try:
        db.cursor.execute(f"""
            LOAD DATA INFILE '{csv_file}' IGNORE INTO TABLE {table_name}
            FIELDS TERMINATED BY ','
            ENCLOSED BY '"'
            LINES TERMINATED BY '\n'
            IGNORE 1 ROWS
            (name,type,create_time,is_alive,ip,fingerprints);
        """)
        db.conn.commit()
        print(f"成功从文件 {csv_file} 加载数据到表 {table_name}")
    except pymysql.MySQLError as e:
        print(f"加载数据时发生错误: {e}")
        db.conn.rollback()


def main():
    """

    :return:
    """
    # 生成150万条IP数据和50万条URL数据
    start_time = time.time()
    ip_data = generate_data_parallel(1500000, 0)
    url_data = generate_data_parallel(500000, 1)
    print(f"生成数据耗时: {time.time() - start_time}秒")

    # 写入CSV文件
    start_time = time.time()
    write_csv('asset.csv', ip_data)
    write_csv('asset_url.csv', url_data)
    print(f"写入CSV文件耗时: {time.time() - start_time}秒")

    # 清空表并重置ID
    reset_and_truncate_table("VulnAsset")

    # 插入数据
    start_time = time.time()
    _script_path = os.path.dirname(os.path.abspath(__file__))
    load_data_infile(os.path.join(_script_path, "asset.csv"), "Asset")
    load_data_infile(os.path.join(_script_path, "asset_url.csv"), "Asset")
    print(f"写入数据库耗时: {time.time() - start_time}秒")


if __name__ == '__main__':
    main()
DISCUSS

评论与讨论

如果这篇文章对你有帮助,或你对实现细节有不同判断,可以直接在这里继续讨论。