generated from oracle/template-repo
-
Notifications
You must be signed in to change notification settings - Fork 23
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: add in new metadata-based heuristic to pypi malware analyzer
- Loading branch information
Showing
5 changed files
with
603 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
223 changes: 223 additions & 0 deletions
223
src/macaron/malware_analyzer/pypi_heuristics/metadata/anomalistic_version.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,223 @@ | ||
# Copyright (c) 2024 - 2025, Oracle and/or its affiliates. All rights reserved. | ||
# Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/. | ||
|
||
"""The heuristic analyzer to check for an anomalistic package version.""" | ||
|
||
import logging | ||
from enum import Enum | ||
|
||
from packaging.version import InvalidVersion, parse | ||
|
||
from macaron.config.defaults import defaults | ||
from macaron.errors import HeuristicAnalyzerValueError | ||
from macaron.json_tools import JsonType, json_extract | ||
from macaron.malware_analyzer.datetime_parser import parse_datetime | ||
from macaron.malware_analyzer.pypi_heuristics.base_analyzer import BaseHeuristicAnalyzer | ||
from macaron.malware_analyzer.pypi_heuristics.heuristics import HeuristicResult, Heuristics | ||
from macaron.slsa_analyzer.package_registry.pypi_registry import PyPIPackageJsonAsset | ||
|
||
logger: logging.Logger = logging.getLogger(__name__) | ||
|
||
|
||
class AnomalisticVersionAnalyzer(BaseHeuristicAnalyzer): | ||
""" | ||
Analyze the version number (if there is only a single release) to detect if it is anomalistic. | ||
A version number is anomalistic if it is above the thresholds for an epoch, major, or minor value. | ||
If the version does not adhere to PyPI standards (PEP 440, as per the 'packaging' module), this heuristic | ||
cannot analyze it. | ||
Calendar versioning is detected as version numbers with the year, month and day present in the following combinations: | ||
(using the example 11th October 2016) | ||
- YYYY.MM.DD, e.g. 2016.10.11 | ||
- YYYY.DD.MM, e.g. 2016.11.10 | ||
- YY.DD.MM, e.g. 16.11.10 | ||
- YY.MM.DD, e.g. 16.10.11 | ||
- MM.DD.YYYY, e.g. 10.11.2016 | ||
- DD.MM.YYYY, e.g. 11.10.2016 | ||
- DD.MM.YY, e.g. 11.10.16 | ||
- MM.DD.YY, e.g. 10.11.16 | ||
- YYYYMMDD, e.g. 20161011 | ||
- YYYYDDMM, e.g. 20161110 | ||
- YYDDMM, e.g. 161110 | ||
- YYMMDD, e.g. 161011 | ||
- MMDDYYYY, e.g. 10112016 | ||
- DDMMYYYY, e.g. 11102016 | ||
- DDMMYY, e.g. 111016 | ||
- MMDDYY, e.g. 101116 | ||
This may be followed by further versioning (e.g. 2016.10.11.5.6.2). This type of versioning is detected based on the | ||
date of the upload time for the release within a threshold of a number of days (in the defaults file). | ||
Calendar-semantic versioning is detected as version numbers with the major value as the year (either yyyy or yy), | ||
and any other series of numbers following it: | ||
- 2016.7.1 woud be version 7.1 of 2016 | ||
- 16.1.4 would be version 1.4 of 2016 | ||
This type of versioning is detected based on the exact year of the upload time for the release. | ||
All other versionings are detected as semantic versioning. | ||
""" | ||
|
||
DETAIL_INFO_KEY: str = "versioning" | ||
DIGIT_DATE_FORMATS: list[str] = ["%Y%m%d", "%Y%d%m", "%d%m%Y", "%m%d%Y", "%y%m%d", "%y%d%m", "%d%m%y", "%m%d%y"] | ||
|
||
def __init__(self) -> None: | ||
super().__init__( | ||
name="anomalistic_version_analyzer", | ||
heuristic=Heuristics.ANOMALISTIC_VERSION, | ||
depends_on=[(Heuristics.ONE_RELEASE, HeuristicResult.FAIL)], | ||
) | ||
self.major_threshold, self.epoch_threshold, self.day_publish_error = self._load_defaults() | ||
|
||
def _load_defaults(self) -> tuple[int, int, int]: | ||
"""Load default settings from defaults.ini.""" | ||
section_name = "heuristic.pypi" | ||
if defaults.has_section(section_name): | ||
section = defaults[section_name] | ||
return ( | ||
section.getint("major_threshold"), | ||
section.getint("epoch_threshold"), | ||
section.getint("day_publish_error"), | ||
) | ||
# Major threshold, Epoch threshold, Day pushlish error | ||
return 20, 3, 4 | ||
|
||
def analyze(self, pypi_package_json: PyPIPackageJsonAsset) -> tuple[HeuristicResult, dict[str, JsonType]]: | ||
"""Analyze the package. | ||
Parameters | ||
---------- | ||
pypi_package_json: PyPIPackageJsonAsset | ||
The PyPI package JSON asset object. | ||
Returns | ||
------- | ||
tuple[HeuristicResult, dict[str, JsonType]]: | ||
The result and related information collected during the analysis. | ||
Raises | ||
------ | ||
HeuristicAnalyzerValueError | ||
if there is no release information available. | ||
""" | ||
releases = pypi_package_json.get_releases() | ||
if releases is None: # no release information | ||
error_msg = "There is no information for any release of this package." | ||
logger.debug(error_msg) | ||
raise HeuristicAnalyzerValueError(error_msg) | ||
|
||
if len(releases) != 1: | ||
error_msg = ( | ||
"This heuristic depends on a single release, but somehow there are multiple when the one release" | ||
+ " heuristic failed." | ||
) | ||
logger.debug(error_msg) | ||
raise HeuristicAnalyzerValueError(error_msg) | ||
|
||
# Since there is only one release, the latest version should be that release | ||
release = pypi_package_json.get_latest_version() | ||
if release is None: | ||
error_msg = "No latest version information available" | ||
logger.debug(error_msg) | ||
raise HeuristicAnalyzerValueError(error_msg) | ||
|
||
try: | ||
release_metadata = releases[release] | ||
except KeyError as release_error: | ||
error_msg = "The latest release is not available in the list of releases" | ||
logger.debug(error_msg) | ||
raise HeuristicAnalyzerValueError(error_msg) from release_error | ||
|
||
try: | ||
version = parse(release) | ||
except InvalidVersion: | ||
return HeuristicResult.SKIP, {self.DETAIL_INFO_KEY: Versioning.INVALID.value} | ||
|
||
years = [] | ||
months = [] | ||
publish_days = [] | ||
|
||
for distribution in release_metadata: | ||
upload_time = json_extract(distribution, ["upload_time"], str) | ||
if upload_time is None: | ||
error_msg = "Missing upload time from release information" | ||
logger.debug(error_msg) | ||
raise HeuristicAnalyzerValueError(error_msg) | ||
|
||
parsed_time = parse_datetime(upload_time) | ||
if parsed_time is None: | ||
error_msg = "Upload time is not of the expected PyPI format" | ||
logger.debug(error_msg) | ||
raise HeuristicAnalyzerValueError(error_msg) | ||
|
||
years.append(parsed_time.year) | ||
years.append(parsed_time.year % 100) # last 2 digits | ||
months.append(parsed_time.month) | ||
publish_days.append(parsed_time.day) | ||
|
||
days = list(range(min(publish_days) - self.day_publish_error, max(publish_days) + self.day_publish_error + 1)) | ||
|
||
calendar = False | ||
calendar_semantic = False | ||
|
||
# check for year YY[YY]... | ||
if version.major in years: | ||
# calendar versioning: YY[YY].(M[M].D[D])(D[D].M[M])... | ||
if (version.minor in months and version.micro in days) or ( | ||
version.minor in days and version.micro in months | ||
): | ||
calendar = True | ||
else: | ||
calendar_semantic = True | ||
# check for calendar versioning: M[M].D[D].YY[YY]... or D[D].M[M].YY[YY]... or the whole digit rerpesenting a datetime | ||
elif ( | ||
((version.major in months and version.minor in days) or (version.major in days and version.minor in months)) | ||
and version.micro in years | ||
) or self.__integer_date(version.major, years, months, days): | ||
# must include day and year for this to be calendar | ||
calendar = True | ||
|
||
if calendar: # just check epoch | ||
detail_info: dict[str, JsonType] = {self.DETAIL_INFO_KEY: Versioning.CALENDAR.value} | ||
if version.epoch > self.epoch_threshold: | ||
return HeuristicResult.FAIL, detail_info | ||
|
||
return HeuristicResult.PASS, detail_info | ||
|
||
if calendar_semantic: # check minor (as major) and epoch | ||
detail_info = {self.DETAIL_INFO_KEY: Versioning.CALENDAR_SEMANTIC.value} | ||
|
||
if version.epoch > self.epoch_threshold: | ||
return HeuristicResult.FAIL, detail_info | ||
if version.minor > self.major_threshold: | ||
return HeuristicResult.FAIL, detail_info | ||
|
||
return HeuristicResult.PASS, detail_info | ||
|
||
# semantic versioning | ||
detail_info = {self.DETAIL_INFO_KEY: Versioning.SEMANTIC.value} | ||
|
||
if version.epoch > self.epoch_threshold: | ||
return HeuristicResult.FAIL, detail_info | ||
if version.major > self.major_threshold: | ||
return HeuristicResult.FAIL, detail_info | ||
|
||
return HeuristicResult.PASS, detail_info | ||
|
||
def __integer_date(self, value: int, years: list[int], months: list[int], days: list[int]) -> bool: | ||
for date_format in self.DIGIT_DATE_FORMATS: | ||
if (date := parse_datetime(str(value), date_format)) is None: | ||
continue | ||
|
||
if date.year in years and date.month in months and date.day in days: | ||
return True | ||
|
||
return False | ||
|
||
|
||
class Versioning(Enum): | ||
"""Enum used to assign different versioning methods.""" | ||
|
||
INVALID = "invalid" | ||
CALENDAR = "calendar" | ||
CALENDAR_SEMANTIC = "calendar_semantic" | ||
SEMANTIC = "semantic" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.