Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ci: elasticsearch: Upload script improvements #76889

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
325 changes: 297 additions & 28 deletions scripts/ci/upload_test_results_es.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,179 @@
#!/usr/bin/env python3

# Copyright (c) 2022 Intel Corporation
# Copyright (c) 2022-2024 Intel Corporation
# SPDX-License-Identifier: Apache-2.0

"""
This script uploads ``twister.json`` file to Elasticsearch index for reporting and analysis.
see https://kibana.zephyrproject.io/

# This script upload test ci results to the zephyr ES instance for reporting and analysis.
# see https://kibana.zephyrproject.io/
The script expects two evironment variables with the Elasticsearch server connection parameters:
`ELASTICSEARCH_SERVER`
`ELASTICSEARCH_KEY`
"""

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch.helpers import bulk, BulkIndexError
import sys
import os
import json
import argparse
import re

def gendata(f, index, run_date=None, run_id=None, run_attempt=None):

def flatten(name, value, name_sep="_", names_dict=None, parent_name=None, escape_sep=""):
"""
Flatten ``value`` into a plain dictionary.

:param name: the flattened name of the ``value`` to be used as a name prefix for all its items.
:param name_sep: string to separate flattened names; if the same string is already present
in the names it will be repeated twise.
:param names_dict: An optional dictionary with 'foo':'bar' items to flatten 'foo' list properties
where each item should be a dictionary with the 'bar' item storing an unique
name, so it will be taken as a part of the flattened item's name instead of
the item's index in its parent list.
:param parent_name: the short, single-level, name of the ``value``.
:param value: object to flatten, for example, a dictionary:
{
"ROM":{
"symbols":{
"name":"Root",
"size":4320,
"identifier":"root",
"address":0,
"children":[
{
"name":"(no paths)",
"size":2222,
"identifier":":",
"address":0,
"children":[
{
"name":"var1",
"size":20,
"identifier":":/var1",
"address":1234
}, ...
]
} ...
]
}
} ...
}

:return: the ``value`` flattened to a plain dictionary where each key is concatenated from
names of its initially nested items being separated by the ``name_sep``,
for the above example:
{
"ROM/symbols/name": "Root",
"ROM/symbols/size": 4320,
"ROM/symbols/identifier": "root",
"ROM/symbols/address": 0,
"ROM/symbols/(no paths)/size": 2222,
"ROM/symbols/(no paths)/identifier": ":",
"ROM/symbols/(no paths)/address": 0,
"ROM/symbols/(no paths)/var1/size": 20,
"ROM/symbols/(no paths)/var1/identifier": ":/var1",
"ROM/symbols/(no paths)/var1/address": 1234,
}
"""
res_dict = {}
name_prefix = name + name_sep if name and len(name) else ''
if isinstance(value, list) and len(value):
for idx,val in enumerate(value):
if isinstance(val, dict) and names_dict and parent_name and isinstance(names_dict, dict) and parent_name in names_dict:
flat_name = name_prefix + str(val[names_dict[parent_name]]).replace(name_sep, escape_sep + name_sep)
val_ = val.copy()
val_.pop(names_dict[parent_name])
flat_item = flatten(flat_name, val_, name_sep, names_dict, parent_name, escape_sep)
else:
flat_name = name_prefix + str(idx)
flat_item = flatten(flat_name, val, name_sep, names_dict, parent_name, escape_sep)
res_dict = { **res_dict, **flat_item }
elif isinstance(value, dict) and len(value):
for key,val in value.items():
if names_dict and key in names_dict:
name_k = name
else:
name_k = name_prefix + str(key).replace(name_sep, escape_sep + name_sep)
flat_item = flatten(name_k, val, name_sep, names_dict, key, escape_sep)
res_dict = { **res_dict, **flat_item }
elif len(name):
res_dict[name] = value
return res_dict

def unflatten(src_dict, name_sep):
"""
Unflat ``src_dict`` at its deepest level splitting keys with ``name_sep``
and using the rightmost chunk to name properties.

:param src_dict: a dictionary to unflat for example:
{
"ROM/symbols/name": "Root",
"ROM/symbols/size": 4320,
"ROM/symbols/identifier": "root",
"ROM/symbols/address": 0,
"ROM/symbols/(no paths)/size": 2222,
"ROM/symbols/(no paths)/identifier": ":",
"ROM/symbols/(no paths)/address": 0,
"ROM/symbols/(no paths)/var1/size": 20,
"ROM/symbols/(no paths)/var1/identifier": ":/var1",
"ROM/symbols/(no paths)/var1/address": 1234,
}

:param name_sep: string to split the dictionary keys.
:return: the unflatten dictionary, for the above example:
{
"ROM/symbols": {
"name": "Root",
"size": 4320,
"identifier": "root",
"address": 0
},
"ROM/symbols/(no paths)": {
"size": 2222,
"identifier": ":",
"address": 0
},
"ROM/symbols/(no paths)/var1": {
"size": 20,
"identifier": ":/var1",
"address": 1234
}
}
"""
res_dict = {}
for k,v in src_dict.items():
k_pref, _, k_suff = k.rpartition(name_sep)
if not k_pref in res_dict:
res_dict[k_pref] = {k_suff: v}
else:
if k_suff in res_dict[k_pref]:
if not isinstance(res_dict[k_pref][k_suff], list):
res_dict[k_pref][k_suff] = [res_dict[k_pref][k_suff]]
res_dict[k_pref][k_suff].append(v)
else:
res_dict[k_pref][k_suff] = v
return res_dict


def transform(t, args):
if args.transform:
rules = json.loads(str(args.transform).replace("'", "\"").replace("\\", "\\\\"))
for property_name, rule in rules.items():
if property_name in t:
match = re.match(rule, t[property_name])
if match:
t.update(match.groupdict(default=""))
#
#
for excl_item in args.exclude:
if excl_item in t:
t.pop(excl_item)

return t

def gendata(f, args):
with open(f, "r") as j:
data = json.load(j)
for t in data['testsuites']:
Expand All @@ -23,34 +182,84 @@ def gendata(f, index, run_date=None, run_id=None, run_attempt=None):
main_group = _grouping.split(".")[0]
sub_group = _grouping.split(".")[1]
env = data['environment']
if run_date:
env['run_date'] = run_date
if run_id:
env['run_id'] = run_id
if run_attempt:
env['run_attempt'] = run_attempt
if args.run_date:
env['run_date'] = args.run_date
if args.run_id:
env['run_id'] = args.run_id
if args.run_attempt:
env['run_attempt'] = args.run_attempt
if args.run_branch:
env['run_branch'] = args.run_branch
if args.run_workflow:
env['run_workflow'] = args.run_workflow
t['environment'] = env
t['component'] = main_group
t['sub_component'] = sub_group
yield {
"_index": index,
"_source": t

yield_records = 0
# If the flattered property is a dictionary, convert it to a plain list
# where each item is a flat dictionaly.
if args.flatten and args.flatten in t and isinstance(t[args.flatten], dict):
flat = t.pop(args.flatten)
flat_list_dict = {}
if args.flatten_list_names:
flat_list_dict = json.loads(str(args.flatten_list_names).replace("'", "\"").replace("\\", "\\\\"))
#
# Normalize flattening to a plain dictionary.
flat = flatten('', flat, args.transpose_separator, flat_list_dict, str(args.escape_separator))
# Unflat one, the deepest level, expecting similar set of property names there.
flat = unflatten(flat, args.transpose_separator)
# Keep dictionary names as their properties and flatten the dictionary to a list of dictionaries.
as_name = args.flatten_dict_name
if len(as_name):
flat_list = []
for k,v in flat.items():
v[as_name] = k + args.transpose_separator + v[as_name] if as_name in v else k
v[as_name + '_depth'] = v[as_name].count(args.transpose_separator)
flat_list.append(v)
t[args.flatten] = flat_list
else:
t[args.flatten] = flat

# Flatten lists or dictionaries cloning the records with the rest of their items and
# rename them composing the flattened property name with the item's name or index respectively.
if args.flatten and args.flatten in t and isinstance(t[args.flatten], list):
flat = t.pop(args.flatten)
for flat_item in flat:
t_clone = t.copy()
if isinstance(flat_item, dict):
t_clone.update({ args.flatten + args.flatten_separator + k : v for k,v in flat_item.items() })
elif isinstance(flat_item, list):
t_clone.update({ args.flatten + args.flatten_separator + str(idx) : v for idx,v in enumerate(flat_item) })
yield {
"_index": args.index,
"_source": transform(t_clone, args)
}
yield_records += 1

if not yield_records: # also yields a record without an empty flat object.
yield {
"_index": args.index,
"_source": transform(t, args)
}


def main():
args = parse_args()

if args.index:
index_name = args.index
else:
index_name = 'tests-zephyr-1'

settings = {
"index": {
"number_of_shards": 4
}
}
mappings = {

mappings = {}

if args.map_file:
with open(args.map_file, "rt") as json_map:
mappings = json.load(json_map)
else:
mappings = {
"properties": {
"execution_time": {"type": "float"},
"retries": {"type": "integer"},
Expand All @@ -61,9 +270,9 @@ def main():
if args.dry_run:
xx = None
for f in args.files:
xx = gendata(f, index_name, args.run_date, args.run_id, args.run_attempt)
for x in xx:
print(x)
xx = gendata(f, args)
for x in xx:
print(json.dumps(x, indent=4))
sys.exit(0)

es = Elasticsearch(
Expand All @@ -73,24 +282,84 @@ def main():
)

if args.create_index:
es.indices.create(index=index_name, mappings=mappings, settings=settings)
es.indices.create(index=args.index, mappings=mappings, settings=settings)
else:
if args.run_date:
print(f"Setting run date from command line: {args.run_date}")
for f in args.files:
bulk(es, gendata(f, index_name, args.run_date, args.run_id, args.run_attempt))

for f in args.files:
print(f"Process: '{f}'")
try:
bulk(es, gendata(f, args), request_timeout=args.bulk_timeout)
except BulkIndexError as e:
print(f"ERROR adding '{f}' exception: {e}")
error_0 = e.errors[0].get("index", {}).get("error", {})
reason_0 = error_0.get('reason')
print(f"ERROR reason: {reason_0}")
raise e
#
#
#

def parse_args():
parser = argparse.ArgumentParser(allow_abbrev=False)
parser = argparse.ArgumentParser(allow_abbrev=False,
formatter_class=argparse.RawTextHelpFormatter,
description=__doc__)
parser.add_argument('-y','--dry-run', action="store_true", help='Dry run.')
parser.add_argument('-c','--create-index', action="store_true", help='Create index.')
parser.add_argument('-i', '--index', help='index to push to.', required=True)
parser.add_argument('-m', '--map-file', required=False,
help='JSON map file with Elasticsearch index structure and data types.')
parser.add_argument('-i', '--index', required=True, default='tests-zephyr-1',
help='Elasticsearch index to push to.')
parser.add_argument('-r', '--run-date', help='Run date in ISO format', required=False)
parser.add_argument('--flatten', required=False, default=None,
metavar='TESTSUITE_PROPERTY',
help="Flatten one of the test suite's properties:\n"
"it will be converted to a list where each list item becomes a separate index record\n"
"with all other properties of the test suite object duplicated and the flattened\n"
"property name used as a prefix for all its items, e.g.\n"
"'recording.cycles' becomes 'recording_cycles'.")
parser.add_argument('--flatten-dict-name', required=False, default="name",
metavar='PROPERTY_NAME',
help="For dictionaries flattened into a list, use this name for additional property\n"
"to store the item's flat concatenated name. One more property with that name\n"
"and'_depth' suffix will be added for number of `--transpose_separator`s in the name.\n"
"Default: '%(default)s'. Set empty string to disable.")
parser.add_argument('--flatten-list-names', required=False, default=None,
metavar='DICT',
help="An optional string with json dictionary like {'children':'name', ...}\n"
"to use it for flattening lists of dictionaries named 'children' which should\n"
"contain keys 'name' with unique string value as an actual name for the item.\n"
"This name value will be composed instead of the container's name 'children' and\n"
"the item's numeric index.")
parser.add_argument('--flatten-separator', required=False, default="_",
help="Separator to use it for the flattened property names. Default: '%(default)s'")
parser.add_argument('--transpose-separator', required=False, default="/",
help="Separator to use it for the transposed dictionary names stored in\n"
"`flatten-dict-name` properties. Default: '%(default)s'")
parser.add_argument('--escape-separator', required=False, default='',
help="Prepend name separators with the escape string if already present in names. "
"Default: '%(default)s'.")
parser.add_argument('--transform', required=False,
metavar='RULE',
help="Apply regexp group parsing to selected string properties after flattening.\n"
"The string is a json dictionary with property names and regexp strings to apply\n"
"on them to extract values, for example:\n"
r"\"{ 'recording_metric': '(?P<object>[^\.]+)\.(?P<action>[^\.]+)\.' }\"")
parser.add_argument('--exclude', required=False, nargs='*', default=[],
metavar='TESTSUITE_PROPERTY',
help="Don't store these properties in the Elasticsearch index.")
parser.add_argument('--run-workflow', required=False,
help="Source workflow identificator, e.g. the workflow short name "
"and its triggering event name.")
parser.add_argument('--run-branch', required=False,
help="Source branch identificator.")
parser.add_argument('--run-id', required=False,
help="unique run-id (e.g. from github.run_id context)")
parser.add_argument('--run-attempt', required=False,
help="unique run attempt number (e.g. from github.run_attempt context)")
parser.add_argument('--bulk-timeout', required=False, type=int, default=60,
help="Elasticsearch bulk request timeout, seconds. Default %(default)s.")
parser.add_argument('files', metavar='FILE', nargs='+', help='file with test data.')

args = parser.parse_args()
Expand Down
Loading