Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Sweetnow committed Jan 9, 2025
1 parent 18c581a commit f74112e
Show file tree
Hide file tree
Showing 3 changed files with 239 additions and 60 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "python-moss"
version = "1.2.2"
version = "1.2.3"
description = "MObility Simulation System"
authors = [
{ name = "Jun Zhang", email = "[email protected]" },
Expand Down
125 changes: 94 additions & 31 deletions python/src/moss/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,25 @@ def __init__(
"""
The interval of speed statistics. Set to `0` to disable speed statistics.
"""
# check parameters

if step_interval <= 0:
raise ValueError("step_interval should be greater than 0")
if step_interval > 1:
warn("step_interval is greater than 1, the simulation may not be accurate")
if person_limit < -1:
raise ValueError("person_limit should be greater than -1, -1 means no limit")
if junction_yellow_time < 0:
raise ValueError("junction_yellow_time should be greater than 0")
if phase_pressure_coeff <= 0:
raise ValueError("phase_pressure_coeff should be greater than 0")
if speed_stat_interval < 0:
raise ValueError("Cannot set speed_stat_interval to be less than 0")
if out_xmin > out_xmax:
raise ValueError("out_xmin should be less than out_xmax")
if out_ymin > out_ymax:
raise ValueError("out_ymin should be less than out_ymax")

self._e = _moss.Engine(
name,
map_file,
Expand Down Expand Up @@ -283,10 +300,13 @@ def get_current_time(self) -> float:
"""
return self._e.get_current_time()

def fetch_persons(self) -> Dict[str, NDArray]:
def fetch_persons(self, fields: List[str] = []) -> Dict[str, NDArray]:
"""
Fetch the persons' information.
Args:
- fields: The fields to fetch, should be a subset of ["id", "enable", "status", "lane_id", "lane_parent_id", "s", "aoi_id", "v", "shadow_lane_id", "shadow_s", "lc_yaw", "lc_completed_ratio", "is_forward", "x", "y", "dir", "schedule_index", "trip_index", "departure_time", "traveling_time", "total_distance"]. If empty, fetch all fields.
The result values is a dictionary with the following keys:
- id: The id of the person
- enable: Whether the person is enabled
Expand All @@ -312,7 +332,33 @@ def fetch_persons(self) -> Dict[str, NDArray]:
We strongly recommend using `pd.DataFrame(e.fetch_persons())` to convert the result to a DataFrame for better visualization and analysis.
"""
if self._fetched_persons is None:
if len(fields) == 0:
fields = [
"id",
"enable",
"status",
"lane_id",
"lane_parent_id",
"s",
"aoi_id",
"v",
"shadow_lane_id",
"shadow_s",
"lc_yaw",
"lc_completed_ratio",
"is_forward",
"x",
"y",
"dir",
"schedule_index",
"trip_index",
"departure_time",
"traveling_time",
"total_distance",
]
has_fields = set() if self._fetched_persons is None else set(self._fetched_persons.keys())
delta_fields = set(fields) - has_fields
if len(delta_fields) > 0:
(
ids,
enables,
Expand All @@ -335,8 +381,8 @@ def fetch_persons(self) -> Dict[str, NDArray]:
departure_times,
traveling_times,
total_distances,
) = self._e.fetch_persons()
self._fetched_persons = {
) = self._e.fetch_persons(list(delta_fields))
new_fetch = {
"id": ids,
"enable": enables,
"status": statuses,
Expand All @@ -359,6 +405,11 @@ def fetch_persons(self) -> Dict[str, NDArray]:
"traveling_time": traveling_times,
"total_distance": total_distances,
}
# add new fields to the fetched persons
if self._fetched_persons is None:
self._fetched_persons = {}
for k in delta_fields:
self._fetched_persons[k] = new_fetch[k]
return self._fetched_persons

def fetch_lanes(self) -> Dict[str, NDArray]:
Expand All @@ -385,9 +436,10 @@ def get_running_person_count(self) -> int:
"""
Get the total number of running persons (including driving and walking)
"""
persons = self.fetch_persons()
persons = self.fetch_persons(["enable", "status"])
enable = persons["enable"] # type: NDArray[np.uint8]
status: NDArray[np.uint8] = persons["status"]
return ((status == DRIVING) | (status == WALKING)).sum()
return ((enable == 1) & ((status == DRIVING) | (status == WALKING))).sum()

def get_lane_statuses(self) -> NDArray[np.int8]:
"""
Expand All @@ -399,41 +451,42 @@ def get_lane_statuses(self) -> NDArray[np.int8]:

def get_lane_waiting_vehicle_counts(
self, speed_threshold: float = 0.1
) -> Dict[int, int]:
) -> Tuple[NDArray[np.int32], NDArray[np.int32]]:
"""
Get the number of vehicles of each lane with speed lower than `speed_threshold`
Returns:
- Dict: lane id -> number of vehicles
"""

persons = self.fetch_persons()
persons = self.fetch_persons(["enable", "lane_id", "status", "v"])
enable = persons["enable"]
lane_id = persons["lane_id"]
status = persons["status"]
v = persons["v"]
filter = (status == DRIVING) & (v < speed_threshold)
filter = (enable == 1) & (status == DRIVING) & (v < speed_threshold)
filtered_lane_id = lane_id[filter]
# count for the lane id
unique, counts = np.unique(filtered_lane_id, return_counts=True)

return dict(zip(unique, counts))
return unique, counts

def get_lane_waiting_at_end_vehicle_counts(
self, speed_threshold: float = 0.1, distance_to_end: float = 100
) -> Dict[int, int]:
) -> Tuple[NDArray[np.int32], NDArray[np.int32]]:
"""
Get the number of vehicles of each lane with speed lower than `speed_threshold` and distance to end lower than `distance_to_end`
Returns:
- Dict: lane id -> number of vehicles
"""

persons = self.fetch_persons()
persons = self.fetch_persons(["enable", "lane_id", "status", "v", "s"])
enable = persons["enable"]
lane_id = persons["lane_id"]
status = persons["status"]
v = persons["v"]
s = persons["s"]
filter = (status == DRIVING) & (v < speed_threshold)
filter = (enable == 1) & (status == DRIVING) & (v < speed_threshold)
filtered_lane_id = lane_id[filter]
filtered_s = s[filter]
# find the distance to the end of the lane
Expand All @@ -443,7 +496,7 @@ def get_lane_waiting_at_end_vehicle_counts(
lane_ids_for_count.append(i)
# count for the lane id
unique, counts = np.unique(lane_ids_for_count, return_counts=True)
return dict(zip(unique, counts))
return unique, counts

def get_lane_ids(self) -> NDArray[np.int32]:
"""
Expand Down Expand Up @@ -531,82 +584,92 @@ def get_finished_person_count(self) -> int:
"""
Get the number of the finished persons
"""
persons = self.fetch_persons()
persons = self.fetch_persons(["enable", "status"])
enable = persons["enable"]
status: NDArray[np.uint8] = persons["status"]
return (status == FINISHED).sum()
return ((enable == 1) & (status == FINISHED)).sum()

def get_finished_person_average_traveling_time(self) -> float:
"""
Get the average traveling time of the finished persons
"""
persons = self.fetch_persons()
persons = self.fetch_persons(["enable", "status", "traveling_time"])
enable = persons["enable"]
status: NDArray[np.uint8] = persons["status"]
filter = (enable == 1) & (status == FINISHED)
traveling_time = persons["traveling_time"]
return traveling_time[FINISHED].mean()
return traveling_time[filter].mean()

def get_running_person_average_traveling_time(self) -> float:
"""
Get the average traveling time of the running persons
"""
persons = self.fetch_persons()
persons = self.fetch_persons(["enable", "status", "traveling_time"])
enable = persons["enable"]
status: NDArray[np.uint8] = persons["status"]
filter = (enable == 1) & ((status == DRIVING) | (status == WALKING))
traveling_time = persons["traveling_time"]
return traveling_time[status == DRIVING].mean()
return traveling_time[filter].mean()

def get_departed_person_average_traveling_time(self) -> float:
"""
Get the average traveling time of the departed persons (running+finished)
"""
persons = self.fetch_persons()
persons = self.fetch_persons(["enable", "status", "traveling_time"])
enable = persons["enable"]
status: NDArray[np.uint8] = persons["status"]
filter = (enable == 1) & (status != SLEEP)
traveling_time = persons["traveling_time"]
return traveling_time[status != SLEEP].mean()
return traveling_time[filter].mean()

def get_road_lane_plan_index(self, road_index: int) -> int:
"""
Get the lane plan of road `road_index`
"""
return self._e.get_road_lane_plan_index(road_index)

def get_road_vehicle_counts(self) -> Dict[int, int]:
def get_road_vehicle_counts(self) -> Tuple[NDArray[np.int32], NDArray[np.int32]]:
"""
Get the number of vehicles of each road
Returns:
- Dict: road id -> number of vehicles
"""
persons = self.fetch_persons()
persons = self.fetch_persons(["enable", "lane_parent_id", "status"])
enable = persons["enable"]
road_id = persons["lane_parent_id"]
status = persons["status"]
filter = status == DRIVING
filter = (enable == 1) & (status == DRIVING)
filtered_road_id = road_id[filter]
# count for the road id
unique, counts = np.unique(filtered_road_id, return_counts=True)
return dict(zip(unique, counts))
return unique, counts

def get_road_waiting_vehicle_counts(
self, speed_threshold: float = 0.1
) -> Dict[int, int]:
) -> Tuple[NDArray[np.int32], NDArray[np.int32]]:
"""
Get the number of vehicles with speed lower than `speed_threshold` of each road
Returns:
- Dict: road id -> number of vehicles
"""

persons = self.fetch_persons()
persons = self.fetch_persons(["enable", "lane_parent_id", "status", "v"])
enable = persons["enable"]
road_id = persons["lane_parent_id"]
status = persons["status"]
v = persons["v"]
filter = (
(status == DRIVING)
(enable == 1)
& (status == DRIVING)
& (v < speed_threshold)
& (road_id < 3_0000_0000) # the road id ranges [2_0000_0000, 3_0000_0000)
)
filtered_road_id = road_id[filter]
# count for the road id
unique, counts = np.unique(filtered_road_id, return_counts=True)
return dict(zip(unique, counts))
return unique, counts

def set_person_enable(self, person_index: int, enable: bool):
"""
Expand Down
Loading

0 comments on commit f74112e

Please sign in to comment.