数据生成
在插入数据之前,首先需要生成大量数据。Python提供了多种库和方法来生成数据,以下是几种常见的方法。
1.使用 mimesis 生成数据
Mimesis可以参考我这篇文章: Python构造数据神器-mimesis
2.使用 faker 库生成真实数据
Faker
官方文档:faker
3.自定义数据生成
示例:
python
def generate_random_data(num, data_type):
"""
生成随机数据
:param num: 数据数量
:param data_type: 数据类型(0: IP, 1: URL)
:return:
"""
# 预分配内存
data_list = [None] * num
ip = None
for i in range(num):
if data_type == 0:
ip = custom_random.random_ip_v4()
name = ip
fingerprints = {
"basic_info": {
"company": custom_random.random_company(),
"hostname": custom_random.random_hostname(),
"mac": custom_random.random_mac(),
"middleware": custom_random.random_middleware(),
"name": custom_random.random_asset_name(),
"os": custom_random.random_os(),
},
"port": generate_ports()
}
else:
url = custom_random.random_url()
name = url
fingerprints = {
"basic_info": {
"company": custom_random.random_company(),
"hostname": custom_random.random_hostname(),
"mac": custom_random.random_mac(),
"title": custom_random.random_title(),
"middleware": [custom_random.random_middleware()
for _ in range(np.random.randint(1, 4))],
"language": [custom_random.random_language()
for _ in range(np.random.randint(1, 4))],
"database": [custom_random.random_database()
for _ in range(np.random.randint(1, 4))],
"application": [custom_random.random_application()
for _ in range(np.random.randint(1, 4))],
"framework": [custom_random.random_framework()
for _ in range(np.random.randint(1, 4))],
"name": custom_random.random_asset_name(),
},
"port": generate_ports()
}
asset = {
"name": name,
"type": data_type,
"create_time": datetime.now(),
"is_alive": random.choice([0, 1]),
"ip": ip if data_type == 0 else None,
"fingerprints": json.dumps(fingerprints),
}
data_list[i] = asset
return data_list4.
示例:
python
def generate_data_parallel(num, data_type):
"""
生成数据
:param num: 数据数量
:param data_type: 数据类型(0: IP, 1: URL)
:return:
"""
num_processes = multiprocessing.cpu_count()
records_per_process = num // num_processes
with multiprocessing.Pool(num_processes) as pool:
results = pool.starmap(generate_random_data,
[(records_per_process, data_type) for _ in
range(num_processes)])
return [item for sublist in results for item in sublist]数据入库
插入数据也有多种方式,LOAD DATA INFILE
````python
def write_csv(csv_file, data):
"""
生成数据并写入CSV文件
:param csv_file: CSV文件路径
:param data: 数据生成器
:return:
"""
# 使用 `pandas` 批量写入 CSV 文件
df = pd.DataFrame(data)
df.to_csv(csv_file, index=False, encoding='utf-8')LOAD DATA INFILE
写入csv文件后,就可以使用直接入库了
示例:
python
def load_data_infile(csv_file, table_name):
"""
使用`LOAD DATA INFILE`插入数据
:param csv_file: CSV文件路径
:param table_name: 目标表名
:return:
"""
try:
db.cursor.execute(f"""
LOAD DATA INFILE '{csv_file}' IGNORE INTO TABLE {table_name}
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n'
IGNORE 1 ROWS
(name,type,create_time,is_alive,ip,fingerprints);
""")
db.conn.commit()
print(f"成功从文件 {csv_file} 加载数据到表 {table_name}")
except pymysql.MySQLError as e:
print(f"加载数据时发生错误: {e}")
db.conn.rollback()总结
- ````
- ``
- ``
python
import os
import multiprocessing
import time
import json
from datetime import datetime
import random
import pymysql
import pandas as pd
import numpy as np
from scripts.build_data.utils import random as custom_random
from scripts.build_data.db import db
from scripts.build_data.db import reset_and_truncate_table
def get_device_type():
"""
获取设备类型
:return:
"""
sql = """
SELECT
p.NAME AS parent_name,
c.NAME AS child_name
FROM
AssetCategory p
JOIN AssetCategory c ON p.classify_id = c.parent_id
"""
db.cursor.execute(sql)
# 获取所有结果
return db.cursor.fetchall()
device_type = get_device_type()
def generate_ports():
"""
生成随机端口信息
:return:
"""
num_ports = np.random.randint(1, 3) # 随机生成 1 到 2 个端口信息
protocols = [custom_random.random_protocol() for _ in range(num_ports)]
ports = [custom_random.random_port() for _ in range(num_ports)]
services = [custom_random.random_service() for _ in range(num_ports)]
softwares = [custom_random.random_software() for _ in range(num_ports)]
# 使用 zip 将各个部分组合成一个列表
port_info = [
[protocols[i], ports[i], services[i], softwares[i], "", "", []]
for i in range(num_ports)
]
return port_info
def generate_random_data(num, data_type):
"""
生成随机数据
:param num: 数据数量
:param data_type: 数据类型(0: IP, 1: URL)
:return:
"""
# 预分配内存
data_list = [None] * num
ip = None
for i in range(num):
if data_type == 0:
ip = custom_random.random_ip_v4()
name = ip
fingerprints = {
"basic_info": {
"company": custom_random.random_company(),
"hostname": custom_random.random_hostname(),
"mac": custom_random.random_mac(),
"middleware": custom_random.random_middleware(),
"name": custom_random.random_asset_name(),
"os": custom_random.random_os(),
},
"port": generate_ports()
}
else:
url = custom_random.random_url()
name = url
fingerprints = {
"basic_info": {
"company": custom_random.random_company(),
"hostname": custom_random.random_hostname(),
"mac": custom_random.random_mac(),
"title": custom_random.random_title(),
"middleware": [custom_random.random_middleware()
for _ in range(np.random.randint(1, 4))],
"language": [custom_random.random_language()
for _ in range(np.random.randint(1, 4))],
"database": [custom_random.random_database()
for _ in range(np.random.randint(1, 4))],
"application": [custom_random.random_application()
for _ in range(np.random.randint(1, 4))],
"framework": [custom_random.random_framework()
for _ in range(np.random.randint(1, 4))],
"name": custom_random.random_asset_name(),
},
"port": generate_ports()
}
asset = {
"name": name,
"type": data_type,
"create_time": datetime.now(),
"is_alive": random.choice([0, 1]),
"ip": ip if data_type == 0 else None,
"fingerprints": json.dumps(fingerprints),
}
data_list[i] = asset
return data_list
def generate_data_parallel(num, data_type):
"""
生成数据
:param num: 数据数量
:param data_type: 数据类型(0: IP, 1: URL)
:return:
"""
num_processes = multiprocessing.cpu_count()
records_per_process = num // num_processes
with multiprocessing.Pool(num_processes) as pool:
results = pool.starmap(generate_random_data,
[(records_per_process, data_type) for _ in
range(num_processes)])
return [item for sublist in results for item in sublist]
def write_csv(csv_file, data):
"""
生成数据并写入CSV文件
:param csv_file: CSV文件路径
:param data: 数据生成器
:return:
"""
# 使用 `pandas` 批量写入 CSV 文件
df = pd.DataFrame(data)
df.to_csv(csv_file, index=False, encoding='utf-8')
def load_data_infile(csv_file, table_name):
"""
使用`LOAD DATA INFILE`插入数据
:param csv_file: CSV文件路径
:param table_name: 目标表名
:return:
"""
try:
db.cursor.execute(f"""
LOAD DATA INFILE '{csv_file}' IGNORE INTO TABLE {table_name}
FIELDS TERMINATED BY ','
ENCLOSED BY '"'
LINES TERMINATED BY '\n'
IGNORE 1 ROWS
(name,type,create_time,is_alive,ip,fingerprints);
""")
db.conn.commit()
print(f"成功从文件 {csv_file} 加载数据到表 {table_name}")
except pymysql.MySQLError as e:
print(f"加载数据时发生错误: {e}")
db.conn.rollback()
def main():
"""
:return:
"""
# 生成150万条IP数据和50万条URL数据
start_time = time.time()
ip_data = generate_data_parallel(1500000, 0)
url_data = generate_data_parallel(500000, 1)
print(f"生成数据耗时: {time.time() - start_time}秒")
# 写入CSV文件
start_time = time.time()
write_csv('asset.csv', ip_data)
write_csv('asset_url.csv', url_data)
print(f"写入CSV文件耗时: {time.time() - start_time}秒")
# 清空表并重置ID
reset_and_truncate_table("VulnAsset")
# 插入数据
start_time = time.time()
_script_path = os.path.dirname(os.path.abspath(__file__))
load_data_infile(os.path.join(_script_path, "asset.csv"), "Asset")
load_data_infile(os.path.join(_script_path, "asset_url.csv"), "Asset")
print(f"写入数据库耗时: {time.time() - start_time}秒")
if __name__ == '__main__':
main()
评论与讨论
如果这篇文章对你有帮助,或你对实现细节有不同判断,可以直接在这里继续讨论。