分类任务中的类别不平衡问题(下):实践

在上一篇《分类任务中的类别不平衡问题(上):理论》中,我们介绍了几种常用的过采样法 (SMOTE、ADASYN 等)与欠采样法(EasyEnsemble、NearMiss 等)。正所谓“纸上得来终觉浅,绝知此事要躬行”,说了这么多,我们也该亲自上手编写代码来实践一下了。

下面我们使用之前介绍的 imbalanced-learn 库来进行实验。

准备阶段

为了方便地进行实验,我们首先通过 sklearn 提供的 make_classification 函数来构建随机数据集

from sklearn.datasets import make_classification

def create_dataset(
    n_samples=1000,
    weights=(0.01, 0.01, 0.98),
    n_classes=3,
    class_sep=0.8,
    n_clusters=1,
):
    return make_classification(
        n_samples=n_samples,
        n_features=2,
        n_informative=2,
        n_redundant=0,
        n_repeated=0,
        n_classes=n_classes,
        n_clusters_per_class=n_clusters,
        weights=list(weights),
        class_sep=class_sep,
        random_state=0,
    )

其中,n_samples 表示样本个数,weights 表示样本的分布比例,n_classes 表示样本类别数,class_sep 表示乘以超立方体大小的因子(较大的值分散了簇/类,并使分类任务更容易),n_clusters 表示每一个类别是由几个簇构成。这里为了方便展示,我们通过 n_features=2 设定每个样本都是二维向量。

然后,为了后续直观地展示不同采样法的效果,我们运用经典的绘图库 matplotlib(以及 Seaborn)编写两个函数分别展示采样后的样本分布分类器的决策函数(分类超平面)

import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

sns.set_context("poster")

# The following function will be used to plot the sample space after resampling
# to illustrate the specificities of an algorithm.
def plot_resampling(X, y, sampler, ax, title=None):
    X_res, y_res = sampler.fit_resample(X, y)
    ax.scatter(X_res[:, 0], X_res[:, 1], c=y_res, alpha=0.8, edgecolor="k")
    if title is None:
        title = f"Resampling with {sampler.__class__.__name__}"
    ax.set_title(title)
    sns.despine(ax=ax, offset=10)
   
# The following function will be used to plot the decision function of a
# classifier given some data.
def plot_decision_function(X, y, clf, ax, title=None):
    plot_step = 0.02
    x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
    y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
    xx, yy = np.meshgrid(
        np.arange(x_min, x_max, plot_step), np.arange(y_min, y_max, plot_step)
    )

    Z = clf.predict(np.c_[xx.ravel(), yy.ravel()])
    Z = Z.reshape(xx.shape)
    ax.contourf(xx, yy, Z, alpha=0.4)
    ax.scatter(X[:, 0], X[:, 1], alpha=0.8, c=y, edgecolor="k")
    if title is not None:
        ax.set_title(title)

其中 X, y 分别表示样本和对应的类别。特别地,plot_decision_function 函数还需要传入一个分类器,为了方便,后面所有的实验均使用 LogisticRegression 分类器:

from sklearn.linear_model import LogisticRegression

clf = LogisticRegression()

过采样法的比较

Random over-sampling

随机过采样 (Random over-sampling) 即随机地重复采样正例,imbalanced-learn 库通过 RandomOverSampler 类来实现。

在 imbalanced-learn 库中,大部分采样方法都可以使用 make_pipeline 将采样方法和分类模型连接起来,但是两种集成方法 EasyEnsemble 和 BalanceCascade 无法使用 make_pipeline,因为它们本质上是集成了好几个分类模型,所以需要自定义方法。

from imblearn.pipeline import make_pipeline
from imblearn.over_sampling import RandomOverSampler

X, y = create_dataset(n_samples=100, weights=(0.05, 0.25, 0.7))

fig, axs = plt.subplots(nrows=1, ncols=2, figsize=(15, 7))

clf.fit(X, y)
plot_decision_function(X, y, clf, axs[0], title="Without resampling")

sampler = RandomOverSampler(random_state=0)
model = make_pipeline(sampler, clf).fit(X, y)
plot_decision_function(X, y, model, axs[1], f"Using {model[0].__class__.__name__}")

fig.suptitle(f"Decision function of {clf.__class__.__name__}")
fig.tight_layout()
plt.show()

可以看到,即使通过这种最简单的方法,决策的边界也发生了明显的变化,已经较少偏向多数类了。默认情况下,随机采样生成的样本均严格地来自于原始正例集合,可以通过 shrinkage 来向采样生成的样本中添加小的扰动,从而生成一个平滑的数据集:

from imblearn.over_sampling import RandomOverSampler

X, y = create_dataset(n_samples=100, weights=(0.05, 0.25, 0.7))

sampler = RandomOverSampler(random_state=0)

fig, axs = plt.subplots(nrows=1, ncols=2, figsize=(15, 7))

sampler.set_params(shrinkage=None)
plot_resampling(X, y, sampler, ax=axs[0], title="Normal bootstrap")

sampler.set_params(shrinkage=0.3)
plot_resampling(X, y, sampler, ax=axs[1], title="Smoothed bootstrap")

fig.suptitle(f"Resampling with {sampler.__class__.__name__}")
fig.tight_layout()
plt.show()

可以看到,使用平滑方式随机采样得到的数据集看上去包含了更多的样本,这实际上是因为生成的样本没有与原始样本重叠。

ADASYN 和 SMOTE

正如我们在上篇中介绍的那样,相比随机过采样会造成严重的过拟合,ADASYN 和 SMOTE 法才是更为常用的过采样法,它们不是简单地重复采样或者向生成的数据中添加扰动,而是采用一些特定的启发式方法。在 imbalanced-learn 库,它们分别通过 ADASYNSMOTE 类来实现。

from imblearn import FunctionSampler  # to use a idendity sampler
from imblearn.over_sampling import RandomOverSampler
from imblearn.over_sampling import SMOTE, ADASYN

X, y = create_dataset(n_samples=150, weights=(0.1, 0.2, 0.7))

fig, axs = plt.subplots(nrows=2, ncols=2, figsize=(20, 15))

samplers = [
    FunctionSampler(),
    RandomOverSampler(random_state=0),
    SMOTE(random_state=0),
    ADASYN(random_state=0),
]

for ax, sampler in zip(axs.ravel(), samplers):
    title = "Original dataset" if isinstance(sampler, FunctionSampler) else None
    plot_resampling(X, y, sampler, ax, title=title)
fig.tight_layout()

plt.show()

同样地,通过随机过采样法、SMOTE 和 ADASYN 得到的数据集的分类器决策函数也会不同。相比 SMOTE 法对所有正例“一视同仁”,对每个正例样本都合成相同数量的样本不同,ADASYN 法更关注那些在边界附近的正例样本(即周围的负例样本更多的正例),它们更难以通过近邻规则分类:

from imblearn.pipeline import make_pipeline
from imblearn.over_sampling import SMOTE, ADASYN

X, y = create_dataset(n_samples=150, weights=(0.05, 0.25, 0.7))

fig, axs = plt.subplots(nrows=1, ncols=3, figsize=(20, 6))

models = {
    "Without sampler": clf,
    "ADASYN sampler": make_pipeline(ADASYN(random_state=0), clf),
    "SMOTE sampler": make_pipeline(SMOTE(random_state=0), clf),
}

for ax, (title, model) in zip(axs, models.items()):
    model.fit(X, y)
    plot_decision_function(X, y, model, ax=ax, title=title)

fig.suptitle(f"Decision function using a {clf.__class__.__name__}")
fig.tight_layout()

plt.show()

尤其是在样本比较多的情况下,这两种采样法的特性会更加明显,用 SMOTE 合成的样本分布比较平均,而 ADASYN 生成的样本大都来自于原来与负例比较靠近的那些正例样本,它们也可能会引起一些问题:

from imblearn.pipeline import make_pipeline
from imblearn.over_sampling import SMOTE, ADASYN

X, y = create_dataset(n_samples=5000, weights=(0.01, 0.05, 0.94), class_sep=0.8)

samplers = [SMOTE(random_state=0), ADASYN(random_state=0)]

fig, axs = plt.subplots(nrows=2, ncols=2, figsize=(15, 15))
for ax, sampler in zip(axs, samplers):
    model = make_pipeline(sampler, clf).fit(X, y)
    plot_decision_function(
        X, y, clf, ax[0], title=f"Decision function with {sampler.__class__.__name__}"
    )
    plot_resampling(X, y, sampler, ax[1])

fig.suptitle("Particularities of over-sampling with SMOTE and ADASYN")
fig.tight_layout()

plt.show()

正如我们在上篇中介绍的那样,SMOTE 算法根据对正例样本的选择还有一些变体,例如 Border-line SMOTE 会选择那些在边界上的正例。除此以外,常用的还有

  • 使用 SVM 的支持向量来生成新样本的 SVMSMOTE;
  • 先进行聚类,然后根据聚类密度在每个聚类中独立生成样本。

在 imbalanced-learn 库,它们分别通过 BorderlineSMOTEKMeansSMOTESVMSMOTE 类来实现。

from imblearn.pipeline import make_pipeline
from imblearn.over_sampling import SMOTE, BorderlineSMOTE, KMeansSMOTE, SVMSMOTE

X, y = create_dataset(n_samples=5000, weights=(0.01, 0.05, 0.94), class_sep=0.8)

fig, axs = plt.subplots(5, 2, figsize=(15, 30))

samplers = [
    SMOTE(random_state=0),
    BorderlineSMOTE(random_state=0, kind="borderline-1"),
    BorderlineSMOTE(random_state=0, kind="borderline-2"),
    KMeansSMOTE(random_state=0),
    SVMSMOTE(random_state=0),
]

for ax, sampler in zip(axs, samplers):
    model = make_pipeline(sampler, clf).fit(X, y)
    plot_decision_function(
        X, y, clf, ax[0], title=f"Decision function for {sampler.__class__.__name__}"
    )
    plot_resampling(X, y, sampler, ax[1])

fig.suptitle("Decision function and resampling using SMOTE variants")
fig.tight_layout()

plt.show()

混合特征与离散特征

在某些情况下,样本同时包含了连续和离散特征(混合特征),这时只能使用 SMOTENC 法来处理。例如,我们构建一个模拟数据集,样本包含三个维度,其中第 1、3 维为离散特征,第 2 维为连续特征:

from collections import Counter
from imblearn.over_sampling import SMOTENC

rng = np.random.RandomState(42)
n_samples = 50
# Create a dataset of a mix of numerical and categorical data
X = np.empty((n_samples, 3), dtype=object)
X[:, 0] = rng.choice(["A", "B", "C"], size=n_samples).astype(object)
X[:, 1] = rng.randn(n_samples)
X[:, 2] = rng.randint(3, size=n_samples)
y = np.array([0] * 20 + [1] * 30)

print("The original imbalanced dataset")
print(sorted(Counter(y).items()))
print()
print("The first and last columns are containing categorical features:")
print(X[:5])
print()

smote_nc = SMOTENC(categorical_features=[0, 2], random_state=0)
X_resampled, y_resampled = smote_nc.fit_resample(X, y)
print("Dataset after resampling:")
print(sorted(Counter(y_resampled).items()))
print()
print("SMOTE-NC will generate categories for the categorical features:")
print(X_resampled[-5:])
print()
The original imbalanced dataset
[(0, 20), (1, 30)]

The first and last columns are containing categorical features:
[['C' -0.14021849735700803 2]
 ['A' -0.033193400066544886 2]
 ['C' -0.7490765234433554 1]
 ['C' -0.7783820070908942 2]
 ['A' 0.948842857719016 2]]

Dataset after resampling:
[(0, 30), (1, 30)]

SMOTE-NC will generate categories for the categorical features:
[['A' 0.5246469549655818 2]
 ['B' -0.3657680728116921 2]
 ['B' 0.9344237230779993 2]
 ['B' 0.3710891618824609 2]
 ['B' 0.3327240726719727 2]]

可以看到,只需要通过 categorical_features 参数指定离散特征的维度,SMOTENC 采样法就可以生成同时包含离散和连续特征的样本,使得数据集类别平衡。但是,如果数据集只包含离散特征,那么应该直接使用 SMOTEN 过采样法:

from collections import Counter
from imblearn.over_sampling import SMOTEN

# Generate only categorical data
X = np.array(["A"] * 10 + ["B"] * 20 + ["C"] * 30, dtype=object).reshape(-1, 1)
y = np.array([0] * 20 + [1] * 40, dtype=np.int32)

print(f"Original class counts: {Counter(y)}")
print()
print(X[:5])
print()

sampler = SMOTEN(random_state=0)
X_res, y_res = sampler.fit_resample(X, y)
print(f"Class counts after resampling {Counter(y_res)}")
print()
print(X_res[-5:])
print()
Original class counts: Counter({1: 40, 0: 20})

[['A']
 ['A']
 ['A']
 ['A']
 ['A']]

Class counts after resampling Counter({0: 40, 1: 40})

[['B']
 ['B']
 ['A']
 ['B']
 ['A']]

欠采样法的比较

欠采样法大致可以分为两类:可控欠采样法 (controlled under-sampling)数据清洗方法 (leaning under-sampling)。对于可控欠采样法来说,采样的负例样本数是可以被预先设定的,例如最简单的随机欠采样。

Random under-sampling

随机欠采样法 (Random under-sampling) 思想非常简单,即随机地从负例集合中采样指定数量的样本。imbalanced-learn 库通过 RandomUnderSampler 类来实现。

from imblearn import FunctionSampler
from imblearn.pipeline import make_pipeline
from imblearn.under_sampling import RandomUnderSampler

X, y = create_dataset(n_samples=400, weights=(0.05, 0.15, 0.8), class_sep=0.8)

samplers = {
    FunctionSampler(),  # identity resampler
    RandomUnderSampler(random_state=0),
}

fig, axs = plt.subplots(nrows=2, ncols=2, figsize=(15, 12))
for ax, sampler in zip(axs, samplers):
    model = make_pipeline(sampler, clf).fit(X, y)
    plot_decision_function(
        X, y, model, ax[0], title=f"Decision function with {sampler.__class__.__name__}"
    )
    plot_resampling(X, y, sampler, ax[1])

fig.tight_layout()

plt.show()

NearMiss

NearMiss 采样法通过一些启发式的规则来选择负例:NearMiss-1 选择到最近的 $K$ 个正例样本平均距离最近的负例样本;NearMiss-2 选择到最远的 $K$ 个正例样本平均距离最近的负例样本;NearMiss-3 则是一个两阶段算法,首先对于每一个正例样本,它们的 $m$ 个最近的邻居会被保留,然后选择那些与 $K$ 个最近邻的平均距离最大的负例样本。

from imblearn.pipeline import make_pipeline
from imblearn.under_sampling import NearMiss

X, y = create_dataset(n_samples=1000, weights=(0.05, 0.15, 0.8), class_sep=1.5)

samplers = [NearMiss(version=1), NearMiss(version=2), NearMiss(version=3)]

fig, axs = plt.subplots(nrows=3, ncols=2, figsize=(15, 20))
for ax, sampler in zip(axs, samplers):
    model = make_pipeline(sampler, clf).fit(X, y)
    plot_decision_function(
        X, y, model, ax[0],
        title=f"Decision function for {sampler.__class__.__name__}-{sampler.version}",
    )
    plot_resampling(
        X, y, sampler, ax[1],
        title=f"Resampling using {sampler.__class__.__name__}-{sampler.version}",
    )
fig.tight_layout()

数据清洗方法

Edited Nearest Neighbours (ENN)

Edited Nearest Neighbours (ENN) 欠采样法删除那些与其最近的邻居之一不同的负例样本,并且这个过程可以不断重复,称为 Repeated Edited Nearest Neighbours。与 Repeated Edited Nearest Neighbours 法在内部最近邻算法采用固定的 $K$ 不同,AllKNN 法采取可变的 $K$ 参数,并且在每次迭代时增加 $K$。

from imblearn.pipeline import make_pipeline
from imblearn.under_sampling import (
    EditedNearestNeighbours,
    RepeatedEditedNearestNeighbours,
    AllKNN,
)

X, y = create_dataset(n_samples=500, weights=(0.2, 0.3, 0.5), class_sep=0.8)

samplers = [
    EditedNearestNeighbours(),
    RepeatedEditedNearestNeighbours(),
    AllKNN(allow_minority=True),
]

fig, axs = plt.subplots(3, 2, figsize=(15, 20))
for ax, sampler in zip(axs, samplers):
    model = make_pipeline(sampler, clf).fit(X, y)
    plot_decision_function(
        X, y, clf, ax[0], title=f"Decision function for \n{sampler.__class__.__name__}"
    )
    plot_resampling(
        X, y, sampler, ax[1], title=f"Resampling using \n{sampler.__class__.__name__}"
    )

fig.tight_layout()

plt.show()

Tomek Link

Tomek Link 表示不同类别之间距离最近的一对样本,即两个样本互为最近邻且分属不同类别,通过移除 Tomek Link 就能“清洗掉”类间重叠样本,使得互为最近邻的样本皆属于同一类别。imbalanced-learn 库通过 OneSidedSelection 类来实现。类似的还有 NeighbourhoodCleaningRule 算法,它通过 Edited Nearest Neighbours 来移除负例样本。

from imblearn.pipeline import make_pipeline
from imblearn.under_sampling import (
    OneSidedSelection,
    NeighbourhoodCleaningRule,
)

X, y = create_dataset(n_samples=500, weights=(0.2, 0.3, 0.5), class_sep=0.8)

fig, axs = plt.subplots(nrows=2, ncols=2, figsize=(20, 15))

samplers = [
    OneSidedSelection(random_state=0),
    NeighbourhoodCleaningRule(),
]

for ax, sampler in zip(axs, samplers):
    model = make_pipeline(sampler, clf).fit(X, y)
    plot_decision_function(
        X, y, clf, ax[0], title=f"Decision function for \n{sampler.__class__.__name__}"
    )
    plot_resampling(
        X, y, sampler, ax[1], title=f"Resampling using \n{sampler.__class__.__name__}"
    )
fig.tight_layout()

plt.show()

集成学习方法

在上篇中我们也介绍过,EasyEnsemble 和 BalanceCascade 采用集成学习机制来处理随机欠采样中的信息丢失问题。在集成分类器中,bagging 方法在不同的随机选择的数据子集上构建多个估计器,scikit-learn 中采用 BaggingClassifier 类来实现。 但是,它不会平衡每个数据子集,因此将偏向于多数类:

from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.metrics import balanced_accuracy_score
from sklearn.ensemble import BaggingClassifier
from sklearn.tree import DecisionTreeClassifier

X, y = make_classification(n_samples=10000, n_features=2, n_informative=2,
                           n_redundant=0, n_repeated=0, n_classes=3,
                           n_clusters_per_class=1,
                           weights=[0.01, 0.05, 0.94],
                           class_sep=0.8, random_state=0)
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)

bc = BaggingClassifier(base_estimator=DecisionTreeClassifier(), random_state=0)
bc.fit(X_train, y_train)
y_pred = bc.predict(X_test)
print(balanced_accuracy_score(y_test, y_pred))
0.7739629664028289

很显然,可以通过前面介绍的采样方法来平衡每个数据子集,scikit-learn 中采用 BalancedBaggingClassifier 类来实现。采样由 sampler 或者 sampling_strategyreplacement 这两个参数控制,例如使用随机欠采样 (Random under-sampling):

from imblearn.ensemble import BalancedBaggingClassifier
bbc = BalancedBaggingClassifier(base_estimator=DecisionTreeClassifier(),
                                sampling_strategy='auto',
                                replacement=False,
                                random_state=0)
bbc.fit(X_train, y_train)
y_pred = bbc.predict(X_test)
print(balanced_accuracy_score(y_test, y_pred))
 0.8251353587264241

可以看到在经过采样使得数据集平衡后,分类的准确率得到了提升。

EasyEnsemble 法可以理解为使用 AdaBoostClassifier 作为学习器的特定 bagging 方法,它允许对在平衡 bootstrap 样本上训练的 AdaBoost 学习器进行打包。与 BalancedBaggingClassifier 的用法相似,scikit-learn 中采用 EasyEnsembleClassifier 类来实现:

from imblearn.ensemble import EasyEnsembleClassifier
eec = EasyEnsembleClassifier(random_state=0)
eec.fit(X_train, y_train) 
y_pred = eec.predict(X_test)
print(balanced_accuracy_score(y_test, y_pred))
0.6248477859302602

结合过采样和欠采样

我们之前已经说过,SMOTE 算法的问题是可能会在边缘异常值和内部值之间插入新点,从而生成噪声样本,而这个问题可以通过清理过采样产生的空间来解决,即将二者结合起来,先过采样再进行数据清洗。常用的方法是 SMOTE + Edited Nearest Neighbours (ENN) 和 SMOTE + Tomek Link,在 imbalanced-learn 库分别通过 SMOTEENNSMOTETomek 类来实现:

from imblearn.pipeline import make_pipeline
from imblearn import FunctionSampler
from imblearn.over_sampling import SMOTE
from imblearn.combine import SMOTEENN, SMOTETomek

X, y = make_classification(n_samples=5000, n_features=2, n_informative=2,
                           n_redundant=0, n_repeated=0, n_classes=3,
                           n_clusters_per_class=1,
                           weights=[0.01, 0.05, 0.94],
                           class_sep=0.8, random_state=0)

fig, axs = plt.subplots(4, 2, figsize=(15, 25))

samplers = [
    FunctionSampler(),
    SMOTE(random_state=0),
    SMOTEENN(random_state=0),
    SMOTETomek(random_state=0)
]

for ax, sampler in zip(axs, samplers):
    model = make_pipeline(sampler, clf).fit(X, y)
    plot_decision_function(
        X, y, clf, ax[0], title=f"Decision function for {sampler.__class__.__name__}"
    )
    plot_resampling(X, y, sampler, ax[1])

fig.tight_layout()

plt.show()

可以看到,相比于 SMOTE + Tomek Link 法,SMOTE + Edited Nearest Neighbours (ENN) 通常能清除更多的噪声样本。

参考

[1] 机器学习之类别不平衡问题 (3) —— 采样方法
[2] imbalanced-learn 库 User Guide