← 返回列表

CSV 数据自动化分析

借助 Pandas 强大的一维和二维数据处理能力,快速读取 CSV 格式的销售报表,计算总销售额并筛选出业绩达标的数据行,是数据分析的入门经典案例。

快速预览:

import pandas as pd
df = pd.read_csv('sales_data.csv')
total_sales = df['amount'].sum()
# 筛选销售额大于 1000 的记录
high_performers = df[df['amount'] > 1000]
print(high_performers.head())

应用场景与价值

CSV(Comma-Separated Values)是最常见的数据交换格式,广泛应用于各种业务场景:

  • 销售分析: 分析月度/季度销售数据,识别高绩效产品和区域
  • 财务报表: 处理收支明细,生成财务汇总和趋势图表
  • 用户行为分析: 分析用户活跃度、留存率等关键指标
  • 库存管理: 追踪库存变化,预警低库存商品
  • 考试成绩统计: 计算班级平均分、排名、及格率等

使用 Python 的 Pandas 库,可以用几行代码完成 Excel 中需要复杂公式和透视表才能实现的分析任务,大幅提升数据处理效率。

基础知识

Pandas 核心概念

Pandas 是 Python 数据分析的核心库,提供了两种主要数据结构:

  1. Series: 一维数据结构,类似于数组或列表
  2. DataFrame: 二维表格数据结构,类似于 Excel 表格

DataFrame 常用操作

import pandas as pd

# 读取数据
df = pd.read_csv('data.csv')
df = pd.read_excel('data.xlsx')

# 查看数据
df.head()           # 前 5 行
df.tail(10)         # 后 10 行
df.info()           # 数据概览
df.describe()       # 统计摘要

# 选择数据
df['column']        # 选择单列
df[['col1', 'col2']]  # 选择多列
df[df['age'] > 30]  # 条件筛选

# 统计计算
df['column'].sum()      # 求和
df['column'].mean()     # 平均值
df['column'].max()      # 最大值
df.groupby('category').sum()  # 分组统计

CSV 文件格式

CSV 是纯文本格式,每行代表一条记录,字段之间用逗号分隔:

name,age,amount
Alice,25,1200
Bob,30,850
Charlie,35,1500

代码详解

让我们逐步分析代码的工作原理:

import pandas as pd

导入 Pandas 库,通常使用别名 pd。如果未安装,使用 uv add pandas 安装。

df = pd.read_csv('sales_data.csv')

读取 CSV 文件到 DataFrame 对象。Pandas 自动推断数据类型,第一行通常作为列名。

total_sales = df['amount'].sum()

选择 'amount' 列(销售额列),使用 sum() 方法计算总和。这一行代码替代了 Excel 中的 SUM() 函数。

high_performers = df[df['amount'] > 1000]

条件筛选:选择销售额大于 1000 的所有记录。df['amount'] > 1000 返回布尔序列,用作索引即可筛选数据。

print(high_performers.head())

显示筛选结果的前 5 行。head() 方法可接受参数指定显示行数。

完整代码

基础版本:销售数据分析

import pandas as pd

def analyze_sales_data(csv_file):
    """
    分析销售数据

    参数:
        csv_file: CSV 文件路径
    """
    try:
        # 读取数据
        df = pd.read_csv(csv_file)

        print("=" * 50)
        print("销售数据分析报告")
        print("=" * 50)

        # 数据概览
        print(f"\n总记录数: {len(df)}")
        print(f"数据列: {', '.join(df.columns)}")

        # 统计分析
        total_sales = df['amount'].sum()
        avg_sales = df['amount'].mean()
        max_sales = df['amount'].max()
        min_sales = df['amount'].min()

        print(f"\n总销售额: ${total_sales:,.2f}")
        print(f"平均销售额: ${avg_sales:,.2f}")
        print(f"最高单笔: ${max_sales:,.2f}")
        print(f"最低单笔: ${min_sales:,.2f}")

        # 筛选高绩效记录
        threshold = 1000
        high_performers = df[df['amount'] > threshold]

        print(f"\n高绩效记录(销售额 > ${threshold}):")
        print(f"数量: {len(high_performers)} 条")
        print(f"占比: {len(high_performers) / len(df) * 100:.1f}%")
        print("\n详细数据:")
        print(high_performers.to_string(index=False))

        return df

    except FileNotFoundError:
        print(f"错误: 文件 '{csv_file}' 不存在")
    except KeyError as e:
        print(f"错误: 数据列 {e} 不存在")
    except Exception as e:
        print(f"发生错误: {e}")


# 使用示例
if __name__ == "__main__":
    analyze_sales_data('sales_data.csv')

进阶版本:多维度分析

import pandas as pd
import numpy as np

def advanced_sales_analysis(csv_file):
    """
    高级销售数据分析
    """
    df = pd.read_csv(csv_file)

    print("高级销售分析报告\n")

    # 1. 按产品分组统计
    if 'product' in df.columns:
        print("=" * 50)
        print("按产品分组统计")
        print("=" * 50)
        product_stats = df.groupby('product').agg({
            'amount': ['sum', 'mean', 'count']
        }).round(2)
        print(product_stats)

    # 2. 按区域分组统计
    if 'region' in df.columns:
        print("\n" + "=" * 50)
        print("按区域分组统计")
        print("=" * 50)
        region_stats = df.groupby('region')['amount'].sum().sort_values(ascending=False)
        print(region_stats)

    # 3. 销售额分布分析
    print("\n" + "=" * 50)
    print("销售额分布")
    print("=" * 50)
    print(df['amount'].describe())

    # 4. 识别异常值(使用四分位数法)
    Q1 = df['amount'].quantile(0.25)
    Q3 = df['amount'].quantile(0.75)
    IQR = Q3 - Q1
    outliers = df[(df['amount'] < Q1 - 1.5 * IQR) | (df['amount'] > Q3 + 1.5 * IQR)]

    print(f"\n异常值检测:")
    print(f"发现 {len(outliers)} 条异常记录")
    if len(outliers) > 0:
        print(outliers[['amount']])

    # 5. 销售趋势(如果有日期列)
    if 'date' in df.columns:
        df['date'] = pd.to_datetime(df['date'])
        daily_sales = df.groupby(df['date'].dt.date)['amount'].sum()
        print("\n" + "=" * 50)
        print("每日销售趋势")
        print("=" * 50)
        print(daily_sales.tail(10))

    return df


# 使用示例
advanced_sales_analysis('sales_data.csv')

高级版本:生成分析报告并导出

import pandas as pd
from datetime import datetime

def generate_sales_report(csv_file, output_file='sales_report.xlsx'):
    """
    生成综合销售报告并导出到 Excel
    """
    df = pd.read_csv(csv_file)

    # 创建 Excel Writer 对象
    with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
        # 原始数据
        df.to_excel(writer, sheet_name='原始数据', index=False)

        # 汇总统计
        summary = pd.DataFrame({
            '指标': ['总销售额', '平均销售额', '最大销售额', '最小销售额', '记录总数'],
            '数值': [
                f"${df['amount'].sum():,.2f}",
                f"${df['amount'].mean():,.2f}",
                f"${df['amount'].max():,.2f}",
                f"${df['amount'].min():,.2f}",
                len(df)
            ]
        })
        summary.to_excel(writer, sheet_name='汇总统计', index=False)

        # 高绩效记录
        high_performers = df[df['amount'] > 1000].sort_values('amount', ascending=False)
        high_performers.to_excel(writer, sheet_name='高绩效记录', index=False)

        # 产品分析(如果有 product 列)
        if 'product' in df.columns:
            product_analysis = df.groupby('product').agg({
                'amount': ['sum', 'mean', 'count']
            }).round(2)
            product_analysis.to_excel(writer, sheet_name='产品分析')

    print(f"分析报告已生成: {output_file}")


# 使用示例
generate_sales_report('sales_data.csv')

运行示例

准备测试数据

import pandas as pd
import numpy as np

# 创建示例 CSV 文件
data = {
    'date': pd.date_range('2026-01-01', periods=20, freq='D'),
    'product': np.random.choice(['Product A', 'Product B', 'Product C'], 20),
    'region': np.random.choice(['North', 'South', 'East', 'West'], 20),
    'amount': np.random.randint(500, 2000, 20)
}

df = pd.DataFrame(data)
df.to_csv('sales_data.csv', index=False)
print("测试数据已创建: sales_data.csv")

安装依赖

uv add pandas openpyxl

执行分析

uv run main.py

预期输出

==================================================
销售数据分析报告
==================================================

总记录数: 20
数据列: date, product, region, amount

总销售额: $25,340.00
平均销售额: $1,267.00
最高单笔: $1,950.00
最低单笔: $520.00

高绩效记录(销售额 > $1000):
数量: 12 条
占比: 60.0%

详细数据:
      date    product region  amount
2026-01-01  Product A  North    1450
2026-01-03  Product B   East    1320
...

扩展思路

1. 数据可视化

使用 Matplotlib 或 Seaborn 创建图表:

import matplotlib.pyplot as plt

# 销售额分布直方图
plt.figure(figsize=(10, 6))
df['amount'].hist(bins=20)
plt.title('销售额分布')
plt.xlabel('销售额')
plt.ylabel('频数')
plt.savefig('sales_distribution.png')

# 按产品分组的柱状图
df.groupby('product')['amount'].sum().plot(kind='bar')
plt.title('各产品总销售额')
plt.savefig('product_sales.png')

2. 时间序列分析

# 移动平均
df['rolling_avg'] = df['amount'].rolling(window=7).mean()

# 同比/环比增长
df['growth_rate'] = df['amount'].pct_change() * 100

3. 缺失值处理

# 检查缺失值
print(df.isnull().sum())

# 填充缺失值
df['amount'].fillna(df['amount'].mean(), inplace=True)

# 删除包含缺失值的行
df.dropna(inplace=True)

4. 数据合并

# 合并多个 CSV 文件
df1 = pd.read_csv('sales_jan.csv')
df2 = pd.read_csv('sales_feb.csv')
combined = pd.concat([df1, df2], ignore_index=True)

# 关联两个数据表
df_sales = pd.read_csv('sales.csv')
df_products = pd.read_csv('products.csv')
merged = pd.merge(df_sales, df_products, on='product_id')

5. 数据透视表

# 创建透视表
pivot = df.pivot_table(
    values='amount',
    index='product',
    columns='region',
    aggfunc='sum'
)
print(pivot)

6. 自定义函数应用

# 对列应用自定义函数
def categorize_sales(amount):
    if amount > 1500:
        return 'High'
    elif amount > 1000:
        return 'Medium'
    else:
        return 'Low'

df['category'] = df['amount'].apply(categorize_sales)

7. 导出为多种格式

# Excel
df.to_excel('output.xlsx', index=False)

# JSON
df.to_json('output.json', orient='records', indent=2)

# HTML 表格
df.to_html('output.html', index=False)

# SQL 数据库
import sqlite3
conn = sqlite3.connect('sales.db')
df.to_sql('sales', conn, if_exists='replace', index=False)

8. 性能优化

处理大型 CSV 文件时的优化技巧:

# 分块读取大文件
chunk_size = 10000
for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
    process(chunk)

# 指定数据类型减少内存占用
df = pd.read_csv('data.csv', dtype={'column1': 'int32', 'column2': 'float32'})

# 只读取需要的列
df = pd.read_csv('data.csv', usecols=['col1', 'col2'])

注意事项

  1. 编码问题: CSV 文件可能使用不同编码(UTF-8、GBK 等),读取时指定 encoding 参数
  2. 数据类型: 注意日期、数值等列的数据类型转换
  3. 内存管理: 处理大文件时注意内存占用,使用分块读取
  4. 数据验证: 分析前检查数据完整性和有效性
  5. 备份原始数据: 在修改数据前保留原始文件副本

通过掌握 Pandas 数据分析技能,你可以高效处理各种结构化数据,从复杂的业务数据中提取有价值的洞察,为决策提供数据支持。