Skip to content

Commit

Permalink
Remove the pyspark summary tests from test_xgboost_summary.py and put…
Browse files Browse the repository at this point in the history
… them in test_spark_local.py
  • Loading branch information
a.cherkaoui committed Jan 10, 2025
1 parent 3e60eec commit 92d7cec
Show file tree
Hide file tree
Showing 2 changed files with 180 additions and 242 deletions.
189 changes: 180 additions & 9 deletions tests/test_distributed/test_with_spark/test_spark_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ def spark() -> Generator[SparkSession, None, None]:
"reg_df_test_with_eval_weight",
"reg_with_eval_best_score",
"reg_with_eval_and_weight_best_score",
"reg_expected_evals_result_train",
"reg_expected_evals_result_validation",
),
)

Expand Down Expand Up @@ -128,6 +130,14 @@ def reg_with_weight(
predt3 = reg3.predict(X)
best_score3 = reg3.best_score

reg4 = XGBRegressor(eval_metric="rmse")
reg4.fit(X_train, y_train, eval_set=[(X_train, y_train), (X_val, y_val)])
reg_expected_evals_result = reg4.evals_result()
reg_expected_evals_result_train = reg_expected_evals_result["validation_0"]["rmse"]
reg_expected_evals_result_validation = reg_expected_evals_result["validation_1"][
"rmse"
]

reg_df_train_with_eval_weight = spark.createDataFrame(
[
(Vectors.dense(1.0, 2.0, 3.0), 0, False, 1.0),
Expand Down Expand Up @@ -166,6 +176,8 @@ def reg_with_weight(
reg_df_test_with_eval_weight,
best_score2,
best_score3,
reg_expected_evals_result_train,
reg_expected_evals_result_validation,
)


Expand Down Expand Up @@ -288,6 +300,8 @@ def multi_clf_data(spark: SparkSession) -> Generator[MultiClfData, None, None]:
"cls_df_test_with_eval_weight",
"cls_with_eval_best_score",
"cls_with_eval_and_weight_best_score",
"cls_expected_evals_result_train",
"cls_expected_evals_result_validation",
),
)

Expand Down Expand Up @@ -326,6 +340,16 @@ def clf_with_weight(
sample_weight_eval_set=[w_val],
)

cls4 = XGBClassifier(eval_metric="logloss")
cls4.fit(
X_train,
y_train,
eval_set=[(X_train, y_train), (X_val, y_val)],
)
cls4_evals_result = cls4.evals_result()
cls_expected_evals_result_train = cls4_evals_result["validation_0"]["logloss"]
cls_expected_evals_result_validation = cls4_evals_result["validation_1"]["logloss"]

cls_df_train_with_eval_weight = spark.createDataFrame(
[
(Vectors.dense(1.0, 2.0, 3.0), 0, False, 1.0),
Expand Down Expand Up @@ -364,6 +388,8 @@ def clf_with_weight(
cls_df_test_with_eval_weight,
cls_with_eval_best_score,
cls_with_eval_and_weight_best_score,
cls_expected_evals_result_train,
cls_expected_evals_result_validation,
)


Expand Down Expand Up @@ -1211,6 +1237,76 @@ def test_empty_train_data(self, spark: SparkSession, tree_method: str) -> None:
for row in pred_result:
assert row.prediction == 1.0

def test_regressor_xgb_summary(self, reg_with_weight: RegWithWeight) -> None:
reg_df_train = reg_with_weight.reg_df_train_with_eval_weight.filter(
spark_sql_func.col("isVal") == False
)
spark_xgb_model = SparkXGBRegressor(eval_metric="rmse").fit(reg_df_train)

np.testing.assert_allclose(
reg_with_weight.reg_expected_evals_result_train,
spark_xgb_model.training_summary.train_objective_history["rmse"],
atol=1e-3,
)

assert spark_xgb_model.training_summary.validation_objective_history == {}

def test_regressor_xgb_summary_with_validation(
self, reg_with_weight: RegWithWeight
) -> None:
spark_xgb_model = SparkXGBRegressor(
eval_metric="rmse", validation_indicator_col="isVal"
).fit(
reg_with_weight.reg_df_train_with_eval_weight,
)

np.testing.assert_allclose(
reg_with_weight.reg_expected_evals_result_train,
spark_xgb_model.training_summary.train_objective_history["rmse"],
atol=1e-3,
)

np.testing.assert_allclose(
reg_with_weight.reg_expected_evals_result_validation,
spark_xgb_model.training_summary.validation_objective_history["rmse"],
atol=1e-3,
)

def test_classifier_xgb_summary(self, clf_with_weight: ClfWithWeight) -> None:
clf_df_train = clf_with_weight.cls_df_train_with_eval_weight.filter(
spark_sql_func.col("isVal") == False
)
spark_xgb_model = SparkXGBClassifier(eval_metric="logloss").fit(clf_df_train)

np.testing.assert_allclose(
clf_with_weight.cls_expected_evals_result_train,
spark_xgb_model.training_summary.train_objective_history["logloss"],
atol=1e-3,
)

assert spark_xgb_model.training_summary.validation_objective_history == {}

def test_classifier_xgb_summary_with_validation(
self, clf_with_weight: ClfWithWeight
) -> None:
spark_xgb_model = SparkXGBClassifier(
eval_metric="logloss", validation_indicator_col="isVal"
).fit(
clf_with_weight.cls_df_train_with_eval_weight,
)

np.testing.assert_allclose(
clf_with_weight.cls_expected_evals_result_train,
spark_xgb_model.training_summary.train_objective_history["logloss"],
atol=1e-3,
)

np.testing.assert_allclose(
clf_with_weight.cls_expected_evals_result_validation,
spark_xgb_model.training_summary.validation_objective_history["logloss"],
atol=1e-3,
)


class XgboostLocalTest(SparkTestCase):
def setUp(self):
Expand Down Expand Up @@ -1701,7 +1797,17 @@ def check_conf(conf: Config) -> None:
check_conf(loaded_model.getOrDefault(loaded_model.coll_cfg))


LTRData = namedtuple("LTRData", ("df_train", "df_test", "df_train_1"))
LTRData = namedtuple(
"LTRData",
(
"df_train",
"df_test",
"df_train_1",
"ranker_df_merged",
"expected_evals_result_train",
"expected_evals_result_validation",
),
)


@pytest.fixture
Expand Down Expand Up @@ -1741,22 +1847,47 @@ def ltr_data(spark: SparkSession) -> Generator[LTRData, None, None]:
[np.nan, 8.0, 10.5],
]
)
qid_test = np.array([0, 0, 0, 1, 1, 1])
y_test = np.array([1, 0, 2, 1, 1, 2])

ltr = xgb.XGBRanker(tree_method="approx", objective="rank:pairwise")
ltr.fit(X_train, y_train, qid=qid_train)
predt = ltr.predict(X_test)

ltr2 = xgb.XGBRanker(tree_method="approx", objective="rank:pairwise")
ltr2.fit(
X_train,
y_train,
qid=qid_train,
eval_set=[(X_train, y_train), (X_test, y_test)],
eval_qid=[qid_train, qid_test],
)
evals_result = ltr2.evals_result()
expected_evals_result_train = evals_result["validation_0"]["ndcg@32"]
expected_evals_result_validation = evals_result["validation_1"]["ndcg@32"]

ranker_df_test = spark.createDataFrame(
[
(Vectors.dense(1.5, 2.0, 3.0), 0, float(predt[0])),
(Vectors.dense(4.5, 5.0, 6.0), 0, float(predt[1])),
(Vectors.dense(9.0, 4.5, 8.0), 0, float(predt[2])),
(Vectors.sparse(3, {1: 1.0, 2: 6.0}), 1, float(predt[3])),
(Vectors.sparse(3, {1: 6.0, 2: 7.0}), 1, float(predt[4])),
(Vectors.sparse(3, {1: 8.0, 2: 10.5}), 1, float(predt[5])),
(Vectors.dense(1.5, 2.0, 3.0), 0, float(predt[0]), 1),
(Vectors.dense(4.5, 5.0, 6.0), 0, float(predt[1]), 0),
(Vectors.dense(9.0, 4.5, 8.0), 0, float(predt[2]), 2),
(Vectors.sparse(3, {1: 1.0, 2: 6.0}), 1, float(predt[3]), 1),
(Vectors.sparse(3, {1: 6.0, 2: 7.0}), 1, float(predt[4]), 1),
(Vectors.sparse(3, {1: 8.0, 2: 10.5}), 1, float(predt[5]), 2),
],
["features", "qid", "expected_prediction"],
["features", "qid", "expected_prediction", "label"],
)

ranker_df_merged = (
ranker_df_train.select(["features", "label", "qid"])
.withColumn("isVal", spark_sql_func.lit(False))
.union(
ranker_df_test.select(["features", "label", "qid"]).withColumn(
"isVal", spark_sql_func.lit(True)
)
)
)

ranker_df_train_1 = spark.createDataFrame(
[
(Vectors.sparse(3, {1: 1.0, 2: 5.5}), 0, 9),
Expand All @@ -1775,7 +1906,14 @@ def ltr_data(spark: SparkSession) -> Generator[LTRData, None, None]:
* 4,
["features", "label", "qid"],
)
yield LTRData(ranker_df_train, ranker_df_test, ranker_df_train_1)
yield LTRData(
ranker_df_train,
ranker_df_test,
ranker_df_train_1,
ranker_df_merged,
expected_evals_result_train,
expected_evals_result_validation,
)


class TestPySparkLocalLETOR:
Expand Down Expand Up @@ -1805,3 +1943,36 @@ def f(iterator: Iterable) -> List[int]:
for row in rows:
assert len(row) == 1
assert row[0].qid in [6, 7, 8, 9]

def test_ranker_xgb_summary(self, ltr_data: LTRData) -> None:
spark_xgb_model = SparkXGBRanker(
tree_method="approx", qid_col="qid", objective="rank:pairwise"
).fit(ltr_data.df_train)

np.testing.assert_allclose(
ltr_data.expected_evals_result_train,
spark_xgb_model.training_summary.train_objective_history["ndcg@32"],
atol=1e-3,
)

assert spark_xgb_model.training_summary.validation_objective_history == {}

def test_ranker_xgb_summary_with_validation(self, ltr_data: LTRData) -> None:
spark_xgb_model = SparkXGBRanker(
tree_method="approx",
qid_col="qid",
objective="rank:pairwise",
validation_indicator_col="isVal",
).fit(ltr_data.ranker_df_merged)

np.testing.assert_allclose(
ltr_data.expected_evals_result_train,
spark_xgb_model.training_summary.train_objective_history["ndcg@32"],
atol=1e-3,
)

np.testing.assert_allclose(
ltr_data.expected_evals_result_validation,
spark_xgb_model.training_summary.validation_objective_history["ndcg@32"],
atol=1e-3,
)
Loading

0 comments on commit 92d7cec

Please sign in to comment.