← 返回列表
CSV 数据自动化分析
借助 Pandas 强大的一维和二维数据处理能力,快速读取 CSV 格式的销售报表,计算总销售额并筛选出业绩达标的数据行,是数据分析的入门经典案例。
快速预览:
import pandas as pd
df = pd.read_csv('sales_data.csv')
total_sales = df['amount'].sum()
# 筛选销售额大于 1000 的记录
high_performers = df[df['amount'] > 1000]
print(high_performers.head())
应用场景与价值
CSV(Comma-Separated Values)是最常见的数据交换格式,广泛应用于各种业务场景:
- 销售分析: 分析月度/季度销售数据,识别高绩效产品和区域
- 财务报表: 处理收支明细,生成财务汇总和趋势图表
- 用户行为分析: 分析用户活跃度、留存率等关键指标
- 库存管理: 追踪库存变化,预警低库存商品
- 考试成绩统计: 计算班级平均分、排名、及格率等
使用 Python 的 Pandas 库,可以用几行代码完成 Excel 中需要复杂公式和透视表才能实现的分析任务,大幅提升数据处理效率。
基础知识
Pandas 核心概念
Pandas 是 Python 数据分析的核心库,提供了两种主要数据结构:
- Series: 一维数据结构,类似于数组或列表
- DataFrame: 二维表格数据结构,类似于 Excel 表格
DataFrame 常用操作
import pandas as pd
# 读取数据
df = pd.read_csv('data.csv')
df = pd.read_excel('data.xlsx')
# 查看数据
df.head() # 前 5 行
df.tail(10) # 后 10 行
df.info() # 数据概览
df.describe() # 统计摘要
# 选择数据
df['column'] # 选择单列
df[['col1', 'col2']] # 选择多列
df[df['age'] > 30] # 条件筛选
# 统计计算
df['column'].sum() # 求和
df['column'].mean() # 平均值
df['column'].max() # 最大值
df.groupby('category').sum() # 分组统计
CSV 文件格式
CSV 是纯文本格式,每行代表一条记录,字段之间用逗号分隔:
name,age,amount
Alice,25,1200
Bob,30,850
Charlie,35,1500
代码详解
让我们逐步分析代码的工作原理:
import pandas as pd
导入 Pandas 库,通常使用别名 pd。如果未安装,使用 uv add pandas 安装。
df = pd.read_csv('sales_data.csv')
读取 CSV 文件到 DataFrame 对象。Pandas 自动推断数据类型,第一行通常作为列名。
total_sales = df['amount'].sum()
选择 'amount' 列(销售额列),使用 sum() 方法计算总和。这一行代码替代了 Excel 中的 SUM() 函数。
high_performers = df[df['amount'] > 1000]
条件筛选:选择销售额大于 1000 的所有记录。df['amount'] > 1000 返回布尔序列,用作索引即可筛选数据。
print(high_performers.head())
显示筛选结果的前 5 行。head() 方法可接受参数指定显示行数。
完整代码
基础版本:销售数据分析
import pandas as pd
def analyze_sales_data(csv_file):
"""
分析销售数据
参数:
csv_file: CSV 文件路径
"""
try:
# 读取数据
df = pd.read_csv(csv_file)
print("=" * 50)
print("销售数据分析报告")
print("=" * 50)
# 数据概览
print(f"\n总记录数: {len(df)}")
print(f"数据列: {', '.join(df.columns)}")
# 统计分析
total_sales = df['amount'].sum()
avg_sales = df['amount'].mean()
max_sales = df['amount'].max()
min_sales = df['amount'].min()
print(f"\n总销售额: ${total_sales:,.2f}")
print(f"平均销售额: ${avg_sales:,.2f}")
print(f"最高单笔: ${max_sales:,.2f}")
print(f"最低单笔: ${min_sales:,.2f}")
# 筛选高绩效记录
threshold = 1000
high_performers = df[df['amount'] > threshold]
print(f"\n高绩效记录(销售额 > ${threshold}):")
print(f"数量: {len(high_performers)} 条")
print(f"占比: {len(high_performers) / len(df) * 100:.1f}%")
print("\n详细数据:")
print(high_performers.to_string(index=False))
return df
except FileNotFoundError:
print(f"错误: 文件 '{csv_file}' 不存在")
except KeyError as e:
print(f"错误: 数据列 {e} 不存在")
except Exception as e:
print(f"发生错误: {e}")
# 使用示例
if __name__ == "__main__":
analyze_sales_data('sales_data.csv')
进阶版本:多维度分析
import pandas as pd
import numpy as np
def advanced_sales_analysis(csv_file):
"""
高级销售数据分析
"""
df = pd.read_csv(csv_file)
print("高级销售分析报告\n")
# 1. 按产品分组统计
if 'product' in df.columns:
print("=" * 50)
print("按产品分组统计")
print("=" * 50)
product_stats = df.groupby('product').agg({
'amount': ['sum', 'mean', 'count']
}).round(2)
print(product_stats)
# 2. 按区域分组统计
if 'region' in df.columns:
print("\n" + "=" * 50)
print("按区域分组统计")
print("=" * 50)
region_stats = df.groupby('region')['amount'].sum().sort_values(ascending=False)
print(region_stats)
# 3. 销售额分布分析
print("\n" + "=" * 50)
print("销售额分布")
print("=" * 50)
print(df['amount'].describe())
# 4. 识别异常值(使用四分位数法)
Q1 = df['amount'].quantile(0.25)
Q3 = df['amount'].quantile(0.75)
IQR = Q3 - Q1
outliers = df[(df['amount'] < Q1 - 1.5 * IQR) | (df['amount'] > Q3 + 1.5 * IQR)]
print(f"\n异常值检测:")
print(f"发现 {len(outliers)} 条异常记录")
if len(outliers) > 0:
print(outliers[['amount']])
# 5. 销售趋势(如果有日期列)
if 'date' in df.columns:
df['date'] = pd.to_datetime(df['date'])
daily_sales = df.groupby(df['date'].dt.date)['amount'].sum()
print("\n" + "=" * 50)
print("每日销售趋势")
print("=" * 50)
print(daily_sales.tail(10))
return df
# 使用示例
advanced_sales_analysis('sales_data.csv')
高级版本:生成分析报告并导出
import pandas as pd
from datetime import datetime
def generate_sales_report(csv_file, output_file='sales_report.xlsx'):
"""
生成综合销售报告并导出到 Excel
"""
df = pd.read_csv(csv_file)
# 创建 Excel Writer 对象
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
# 原始数据
df.to_excel(writer, sheet_name='原始数据', index=False)
# 汇总统计
summary = pd.DataFrame({
'指标': ['总销售额', '平均销售额', '最大销售额', '最小销售额', '记录总数'],
'数值': [
f"${df['amount'].sum():,.2f}",
f"${df['amount'].mean():,.2f}",
f"${df['amount'].max():,.2f}",
f"${df['amount'].min():,.2f}",
len(df)
]
})
summary.to_excel(writer, sheet_name='汇总统计', index=False)
# 高绩效记录
high_performers = df[df['amount'] > 1000].sort_values('amount', ascending=False)
high_performers.to_excel(writer, sheet_name='高绩效记录', index=False)
# 产品分析(如果有 product 列)
if 'product' in df.columns:
product_analysis = df.groupby('product').agg({
'amount': ['sum', 'mean', 'count']
}).round(2)
product_analysis.to_excel(writer, sheet_name='产品分析')
print(f"分析报告已生成: {output_file}")
# 使用示例
generate_sales_report('sales_data.csv')
运行示例
准备测试数据
import pandas as pd
import numpy as np
# 创建示例 CSV 文件
data = {
'date': pd.date_range('2026-01-01', periods=20, freq='D'),
'product': np.random.choice(['Product A', 'Product B', 'Product C'], 20),
'region': np.random.choice(['North', 'South', 'East', 'West'], 20),
'amount': np.random.randint(500, 2000, 20)
}
df = pd.DataFrame(data)
df.to_csv('sales_data.csv', index=False)
print("测试数据已创建: sales_data.csv")
安装依赖
uv add pandas openpyxl
执行分析
uv run main.py
预期输出
==================================================
销售数据分析报告
==================================================
总记录数: 20
数据列: date, product, region, amount
总销售额: $25,340.00
平均销售额: $1,267.00
最高单笔: $1,950.00
最低单笔: $520.00
高绩效记录(销售额 > $1000):
数量: 12 条
占比: 60.0%
详细数据:
date product region amount
2026-01-01 Product A North 1450
2026-01-03 Product B East 1320
...
扩展思路
1. 数据可视化
使用 Matplotlib 或 Seaborn 创建图表:
import matplotlib.pyplot as plt
# 销售额分布直方图
plt.figure(figsize=(10, 6))
df['amount'].hist(bins=20)
plt.title('销售额分布')
plt.xlabel('销售额')
plt.ylabel('频数')
plt.savefig('sales_distribution.png')
# 按产品分组的柱状图
df.groupby('product')['amount'].sum().plot(kind='bar')
plt.title('各产品总销售额')
plt.savefig('product_sales.png')
2. 时间序列分析
# 移动平均
df['rolling_avg'] = df['amount'].rolling(window=7).mean()
# 同比/环比增长
df['growth_rate'] = df['amount'].pct_change() * 100
3. 缺失值处理
# 检查缺失值
print(df.isnull().sum())
# 填充缺失值
df['amount'].fillna(df['amount'].mean(), inplace=True)
# 删除包含缺失值的行
df.dropna(inplace=True)
4. 数据合并
# 合并多个 CSV 文件
df1 = pd.read_csv('sales_jan.csv')
df2 = pd.read_csv('sales_feb.csv')
combined = pd.concat([df1, df2], ignore_index=True)
# 关联两个数据表
df_sales = pd.read_csv('sales.csv')
df_products = pd.read_csv('products.csv')
merged = pd.merge(df_sales, df_products, on='product_id')
5. 数据透视表
# 创建透视表
pivot = df.pivot_table(
values='amount',
index='product',
columns='region',
aggfunc='sum'
)
print(pivot)
6. 自定义函数应用
# 对列应用自定义函数
def categorize_sales(amount):
if amount > 1500:
return 'High'
elif amount > 1000:
return 'Medium'
else:
return 'Low'
df['category'] = df['amount'].apply(categorize_sales)
7. 导出为多种格式
# Excel
df.to_excel('output.xlsx', index=False)
# JSON
df.to_json('output.json', orient='records', indent=2)
# HTML 表格
df.to_html('output.html', index=False)
# SQL 数据库
import sqlite3
conn = sqlite3.connect('sales.db')
df.to_sql('sales', conn, if_exists='replace', index=False)
8. 性能优化
处理大型 CSV 文件时的优化技巧:
# 分块读取大文件
chunk_size = 10000
for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
process(chunk)
# 指定数据类型减少内存占用
df = pd.read_csv('data.csv', dtype={'column1': 'int32', 'column2': 'float32'})
# 只读取需要的列
df = pd.read_csv('data.csv', usecols=['col1', 'col2'])
注意事项
- 编码问题: CSV 文件可能使用不同编码(UTF-8、GBK 等),读取时指定
encoding参数 - 数据类型: 注意日期、数值等列的数据类型转换
- 内存管理: 处理大文件时注意内存占用,使用分块读取
- 数据验证: 分析前检查数据完整性和有效性
- 备份原始数据: 在修改数据前保留原始文件副本
通过掌握 Pandas 数据分析技能,你可以高效处理各种结构化数据,从复杂的业务数据中提取有价值的洞察,为决策提供数据支持。