|
| 1 | +import json |
| 2 | +from datetime import datetime, timezone |
| 3 | +from io import StringIO |
| 4 | +from pathlib import Path |
| 5 | +from typing import Any |
| 6 | +from xml.etree import ElementTree as ET |
| 7 | + |
| 8 | +from rich.console import Console |
| 9 | + |
| 10 | +from ..core.result import CheckResult, ScenarioResult, SuiteResult |
| 11 | + |
| 12 | + |
| 13 | +def _seconds(duration_ms: int) -> str: |
| 14 | + return f"{duration_ms / 1000:.6f}" |
| 15 | + |
| 16 | + |
| 17 | +def _to_json(value: Any) -> str: |
| 18 | + if hasattr(value, "model_dump"): |
| 19 | + payload = value.model_dump(mode="json") |
| 20 | + else: |
| 21 | + payload = value |
| 22 | + return json.dumps(payload, ensure_ascii=False, default=str) |
| 23 | + |
| 24 | + |
| 25 | +def _check_label(result: CheckResult, fallback: str) -> str: |
| 26 | + if isinstance(result.details, dict): |
| 27 | + return str( |
| 28 | + result.details.get("check_name") |
| 29 | + or result.details.get("check_kind") |
| 30 | + or result.details.get("name") |
| 31 | + or fallback |
| 32 | + ) |
| 33 | + return fallback |
| 34 | + |
| 35 | + |
| 36 | +def _iter_checks(scenario: ScenarioResult[Any]): |
| 37 | + for step_index, step in enumerate(scenario.steps, start=1): |
| 38 | + for check_index, check in enumerate(step.results, start=1): |
| 39 | + yield step_index, check_index, check |
| 40 | + |
| 41 | + |
| 42 | +def _scenario_assertions(scenario: ScenarioResult[Any]) -> int: |
| 43 | + return sum(len(step.results) for step in scenario.steps) |
| 44 | + |
| 45 | + |
| 46 | +def _suite_assertions(result: SuiteResult) -> int: |
| 47 | + return sum(_scenario_assertions(scenario) for scenario in result.results) |
| 48 | + |
| 49 | + |
| 50 | +def _suite_counts(result: SuiteResult) -> tuple[int, int, int, int]: |
| 51 | + tests = len(result.results) |
| 52 | + failures = sum(1 for scenario in result.results if scenario.failed) |
| 53 | + errors = sum(1 for scenario in result.results if scenario.errored) |
| 54 | + skipped = sum(1 for scenario in result.results if scenario.skipped) |
| 55 | + return tests, failures, errors, skipped |
| 56 | + |
| 57 | + |
| 58 | +def _matching_checks( |
| 59 | + scenario: ScenarioResult[Any], |
| 60 | + *, |
| 61 | + failed: bool = False, |
| 62 | + errored: bool = False, |
| 63 | + skipped: bool = False, |
| 64 | +) -> list[tuple[int, int, CheckResult]]: |
| 65 | + matches: list[tuple[int, int, CheckResult]] = [] |
| 66 | + for step_index, check_index, check in _iter_checks(scenario): |
| 67 | + if failed and check.failed: |
| 68 | + matches.append((step_index, check_index, check)) |
| 69 | + elif errored and check.errored: |
| 70 | + matches.append((step_index, check_index, check)) |
| 71 | + elif skipped and check.skipped: |
| 72 | + matches.append((step_index, check_index, check)) |
| 73 | + return matches |
| 74 | + |
| 75 | + |
| 76 | +def _build_detail_text( |
| 77 | + scenario: ScenarioResult[Any], |
| 78 | + matches: list[tuple[int, int, CheckResult]], |
| 79 | +) -> str: |
| 80 | + lines = [ |
| 81 | + f"scenario={scenario.scenario_name}", |
| 82 | + "See testcase properties for final_trace and step payloads.", |
| 83 | + ] |
| 84 | + |
| 85 | + for step_index, check_index, check in matches: |
| 86 | + label = _check_label(check, f"check_{check_index}") |
| 87 | + status = check.status.value.upper() |
| 88 | + message = check.message or "" |
| 89 | + lines.append(f"[{status}] step_{step_index}.{label}: {message}") |
| 90 | + if check.details: |
| 91 | + lines.append(f"details={_to_json(check.details)}") |
| 92 | + |
| 93 | + return "\n".join(lines) |
| 94 | + |
| 95 | + |
| 96 | +def _append_properties(testcase_el: ET.Element, scenario: ScenarioResult[Any]) -> None: |
| 97 | + properties_el = ET.SubElement(testcase_el, "properties") |
| 98 | + |
| 99 | + ET.SubElement( |
| 100 | + properties_el, |
| 101 | + "property", |
| 102 | + {"name": "final_trace", "value": _to_json(scenario.final_trace)}, |
| 103 | + ) |
| 104 | + |
| 105 | + for step_index, step in enumerate(scenario.steps, start=1): |
| 106 | + ET.SubElement( |
| 107 | + properties_el, |
| 108 | + "property", |
| 109 | + {"name": f"step_{step_index}", "value": _to_json(step)}, |
| 110 | + ) |
| 111 | + |
| 112 | + for check_index, check in enumerate(step.results, start=1): |
| 113 | + for metric in check.metrics: |
| 114 | + ET.SubElement( |
| 115 | + properties_el, |
| 116 | + "property", |
| 117 | + { |
| 118 | + "name": f"step_{step_index}.check_{check_index}.{metric.name}", |
| 119 | + "value": str(metric.value), |
| 120 | + }, |
| 121 | + ) |
| 122 | + |
| 123 | + |
| 124 | +def _append_status_node(testcase_el: ET.Element, scenario: ScenarioResult[Any]) -> None: |
| 125 | + if scenario.errored: |
| 126 | + matches = _matching_checks(scenario, errored=True) |
| 127 | + first = matches[0][2] if matches else None |
| 128 | + node = ET.SubElement( |
| 129 | + testcase_el, |
| 130 | + "error", |
| 131 | + { |
| 132 | + "type": _check_label(first, "error") if first else "error", |
| 133 | + "message": first.message |
| 134 | + if first and first.message |
| 135 | + else "Scenario errored.", |
| 136 | + }, |
| 137 | + ) |
| 138 | + node.text = _build_detail_text(scenario, matches) |
| 139 | + return |
| 140 | + |
| 141 | + if scenario.failed: |
| 142 | + matches = _matching_checks(scenario, failed=True) |
| 143 | + first = matches[0][2] if matches else None |
| 144 | + node = ET.SubElement( |
| 145 | + testcase_el, |
| 146 | + "failure", |
| 147 | + { |
| 148 | + "type": _check_label(first, "failure") if first else "failure", |
| 149 | + "message": first.message |
| 150 | + if first and first.message |
| 151 | + else "Scenario failed.", |
| 152 | + }, |
| 153 | + ) |
| 154 | + node.text = _build_detail_text(scenario, matches) |
| 155 | + return |
| 156 | + |
| 157 | + if scenario.skipped: |
| 158 | + matches = _matching_checks(scenario, skipped=True) |
| 159 | + node = ET.SubElement(testcase_el, "skipped") |
| 160 | + node.text = _build_detail_text(scenario, matches) |
| 161 | + |
| 162 | + |
| 163 | +def _render_scenario_report(scenario: ScenarioResult[Any]) -> str: |
| 164 | + buffer = StringIO() |
| 165 | + console = Console( |
| 166 | + file=buffer, |
| 167 | + force_terminal=False, |
| 168 | + no_color=True, |
| 169 | + width=120, |
| 170 | + ) |
| 171 | + scenario.print_report(console) |
| 172 | + return buffer.getvalue().rstrip() |
| 173 | + |
| 174 | + |
| 175 | +def _append_system_out(testcase_el: ET.Element, scenario: ScenarioResult[Any]) -> None: |
| 176 | + report = _render_scenario_report(scenario) |
| 177 | + if not report: |
| 178 | + return |
| 179 | + |
| 180 | + system_out = ET.SubElement(testcase_el, "system-out") |
| 181 | + system_out.text = report |
| 182 | + |
| 183 | + |
| 184 | +def to_junit_xml(result: SuiteResult, path: str | Path | None = None) -> str: |
| 185 | + tests, failures, errors, skipped = _suite_counts(result) |
| 186 | + |
| 187 | + root = ET.Element( |
| 188 | + "testsuite", |
| 189 | + { |
| 190 | + "name": "Test run", |
| 191 | + "tests": str(tests), |
| 192 | + "failures": str(failures), |
| 193 | + "errors": str(errors), |
| 194 | + "skipped": str(skipped), |
| 195 | + "assertions": str(_suite_assertions(result)), |
| 196 | + "time": _seconds(result.duration_ms), |
| 197 | + "timestamp": datetime.now(timezone.utc) |
| 198 | + .isoformat(timespec="seconds") |
| 199 | + .replace("+00:00", "Z"), |
| 200 | + }, |
| 201 | + ) |
| 202 | + |
| 203 | + for scenario in result.results: |
| 204 | + testcase_el = ET.SubElement( |
| 205 | + root, |
| 206 | + "testcase", |
| 207 | + { |
| 208 | + "name": scenario.scenario_name, |
| 209 | + "assertions": str(_scenario_assertions(scenario)), |
| 210 | + "time": _seconds(scenario.duration_ms), |
| 211 | + }, |
| 212 | + ) |
| 213 | + |
| 214 | + _append_properties(testcase_el, scenario) |
| 215 | + _append_status_node(testcase_el, scenario) |
| 216 | + _append_system_out(testcase_el, scenario) |
| 217 | + |
| 218 | + tree = ET.ElementTree(root) |
| 219 | + ET.indent(tree, space=" ") |
| 220 | + |
| 221 | + if path is not None: |
| 222 | + output_path = Path(path) |
| 223 | + output_path.parent.mkdir(parents=True, exist_ok=True) |
| 224 | + tree.write(output_path, encoding="utf-8", xml_declaration=True) |
| 225 | + |
| 226 | + return ET.tostring(root, encoding="unicode") |
0 commit comments