Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: require run in InterfaceBase.publish_partial_history #9264

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tests/system_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ def publish_util_helper(
for m in metrics:
interface._publish_metric(m)
for h in history:
if "run" not in h:
h["run"] = run
interface.publish_history(**h)
for a in artifacts:
interface.publish_artifact(**a)
Expand Down
10 changes: 2 additions & 8 deletions wandb/sdk/interface/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@

import gzip
import logging
import os
import time
from abc import abstractmethod
from pathlib import Path
Expand Down Expand Up @@ -100,8 +99,6 @@ def __init__(self) -> None:

def _hack_set_run(self, run: "Run") -> None:
self._run = run
current_pid = os.getpid()
self._run._set_iface_pid(current_pid)

def publish_header(self) -> None:
header = pb.HeaderRecord()
Expand Down Expand Up @@ -653,15 +650,13 @@ def _publish_telemetry(self, telem: tpb.TelemetryRecord) -> None:

def publish_partial_history(
self,
run: "Run",
data: dict,
user_step: int,
step: Optional[int] = None,
flush: Optional[bool] = None,
publish_step: bool = True,
run: Optional["Run"] = None,
) -> None:
run = run or self._run

data = history_dict_to_json(run, data, step=user_step, ignore_copy_err=True)
data.pop("_step", None)

Expand All @@ -688,12 +683,11 @@ def _publish_partial_history(self, history: pb.PartialHistoryRequest) -> None:

def publish_history(
self,
run: "Run",
data: dict,
step: Optional[int] = None,
run: Optional["Run"] = None,
publish_step: bool = True,
) -> None:
run = run or self._run
data = history_dict_to_json(run, data, step=step)
history = pb.HistoryRecord()
if publish_step:
Expand Down
4 changes: 3 additions & 1 deletion wandb/sdk/internal/tb_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,9 @@ def _save_row(self, row: "HistoryDict") -> None:
row[chart.spec.table_key] = chart.table

self._tbwatcher._interface.publish_history(
row, run=self._internal_run, publish_step=False
self._internal_run,
row,
publish_step=False,
)


Expand Down
13 changes: 1 addition & 12 deletions wandb/sdk/wandb_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -547,8 +547,6 @@ class Run:

_init_pid: int
_attach_pid: int
_iface_pid: int | None
_iface_port: int | None

_attach_id: str | None
_is_attached: bool
Expand Down Expand Up @@ -707,10 +705,6 @@ def _init(
if launch_trace_id:
self._config[wandb_key]["launch_trace_id"] = launch_trace_id

# interface pid and port configured when backend is configured (See _hack_set_run)
# TODO: using pid isn't the best for windows as pid reuse can happen more often than unix
self._iface_pid = None
self._iface_port = None
self._attach_id = None
self._is_attached = False
self._is_finished = False
Expand All @@ -721,12 +715,6 @@ def _init(
if not self._settings.x_disable_service:
self._attach_id = self._settings.run_id

def _set_iface_pid(self, iface_pid: int) -> None:
self._iface_pid = iface_pid

def _set_iface_port(self, iface_port: int) -> None:
self._iface_port = iface_port

def _handle_launch_artifact_overrides(self) -> None:
if self._settings.launch and (os.environ.get("WANDB_ARTIFACTS") is not None):
try:
Expand Down Expand Up @@ -1442,6 +1430,7 @@ def _partial_history_callback(

not_using_tensorboard = len(wandb.patched["tensorboard"]) == 0
self._backend.interface.publish_partial_history(
self,
data,
user_step=self._step,
step=step,
Expand Down
Loading