diff --git a/pr_agent/algo/utils.py b/pr_agent/algo/utils.py
index 3e8576753f..748d526911 100644
--- a/pr_agent/algo/utils.py
+++ b/pr_agent/algo/utils.py
@@ -1,5 +1,4 @@
from __future__ import annotations
-
import ast
import copy
import difflib
@@ -153,6 +152,9 @@ def convert_to_markdown_v2(output_data: dict,
"Estimated effort to review [1-5]": "⏱️",
"Contribution time cost estimate": "⏳",
"Ticket compliance check": "🎫",
+ "Risk level": "⚠️",
+ "Merge recommendation": "✅",
+ "Review priority files": "📂",
}
markdown_text = ""
if not incremental_review:
@@ -172,7 +174,7 @@ def convert_to_markdown_v2(output_data: dict,
todo_summary = output_data['review'].pop('todo_summary', '')
for key, value in output_data['review'].items():
if value is None or value == '' or value == {} or value == []:
- if key.lower() not in ['can_be_split', 'key_issues_to_review']:
+ if key.lower() not in ['can_be_split', 'key_issues_to_review', 'review_priority_files']:
continue
key_nice = key.replace('_', ' ').capitalize()
emoji = emojis.get(key_nice, "")
@@ -236,6 +238,37 @@ def convert_to_markdown_v2(output_data: dict,
markdown_text += f"### {emoji} Security concerns\n\n"
value = emphasize_header(value.strip(), only_markdown=True)
markdown_text += f"{value}\n\n"
+ elif "risk level" in key_nice.lower():
+ risk_value = str(value).strip().lower().replace("_", " ")
+ risk_display = risk_value.capitalize() if risk_value else "Unknown"
+ markdown_text += f"### {emoji} Risk level: {risk_display}\n\n"
+
+ elif "merge recommendation" in key_nice.lower():
+ recommendation = str(value).strip().replace("_", " ")
+ recommendation_display = (
+ recommendation.capitalize() if recommendation else "Unknown"
+ )
+ markdown_text += (
+ f"### {emoji} Merge recommendation: {recommendation_display}\n\n"
+ )
+
+ elif "review priority files" in key_nice.lower():
+ priority_files = []
+ if isinstance(value, list):
+ priority_files = [
+ str(priority_file).strip()
+ for priority_file in value
+ if str(priority_file).strip()
+ ]
+
+ if not priority_files:
+ markdown_text += f"### {emoji} Priority files: None\n\n"
+ else:
+ markdown_text += f"### {emoji} Priority files\n\n"
+ for priority_file in priority_files:
+ markdown_text += f"- {priority_file}\n"
+ markdown_text += "\n"
+
elif 'todo sections' in key_nice.lower():
if gfm_supported:
markdown_text += "
| "
diff --git a/pr_agent/git_providers/bitbucket_provider.py b/pr_agent/git_providers/bitbucket_provider.py
index 6944e41fa2..135f66aee3 100644
--- a/pr_agent/git_providers/bitbucket_provider.py
+++ b/pr_agent/git_providers/bitbucket_provider.py
@@ -489,6 +489,9 @@ def get_languages(self):
def get_pr_branch(self):
return self.pr.source_branch
+ def get_pr_base_ref(self) -> str:
+ return self.pr.destination_branch or ""
+
# This function attempts to get the default branch of the repository. As a fallback, uses the PR destination branch.
# Note: Must be running from a PR context.
def get_repo_default_branch(self):
diff --git a/pr_agent/git_providers/bitbucket_server_provider.py b/pr_agent/git_providers/bitbucket_server_provider.py
index c929221af9..ff99c00a1d 100644
--- a/pr_agent/git_providers/bitbucket_server_provider.py
+++ b/pr_agent/git_providers/bitbucket_server_provider.py
@@ -417,6 +417,18 @@ def get_languages(self):
def get_pr_branch(self):
return self.pr.fromRef['displayId']
+ def get_pr_base_ref(self) -> str:
+ return self.pr.toRef.get("latestCommit") or self.pr.toRef.get("displayId") or ""
+
+ def get_pr_file_content(self, file_path: str, branch: str) -> str:
+ try:
+ return self.get_file(file_path, branch)
+ except Exception as e:
+ get_logger().warning(
+ f"Error retrieving file {file_path} from ref {branch}: {e}"
+ )
+ return ""
+
def get_pr_owner_id(self) -> str | None:
return self.workspace_slug
diff --git a/pr_agent/git_providers/git_provider.py b/pr_agent/git_providers/git_provider.py
index 631e189c04..1f09fc5aeb 100644
--- a/pr_agent/git_providers/git_provider.py
+++ b/pr_agent/git_providers/git_provider.py
@@ -181,6 +181,10 @@ def get_languages(self):
def get_pr_branch(self):
pass
+ def get_pr_base_ref(self) -> str:
+ get_logger().warning("Not implemented! Returning empty base ref")
+ return ""
+
@abstractmethod
def get_user_id(self):
pass
diff --git a/pr_agent/git_providers/gitea_provider.py b/pr_agent/git_providers/gitea_provider.py
index 89a6248e9b..c60b966705 100644
--- a/pr_agent/git_providers/gitea_provider.py
+++ b/pr_agent/git_providers/gitea_provider.py
@@ -578,6 +578,23 @@ def get_pr_branch(self) -> str:
return self.pr.head.ref if self.pr.head.ref else ""
+ def get_pr_base_ref(self) -> str:
+ return self.base_sha or self.base_ref or ""
+
+ def get_pr_file_content(self, file_path: str, branch: str) -> str:
+ try:
+ return self.repo_api.get_file_content(
+ owner=self.owner,
+ repo=self.repo,
+ commit_sha=branch,
+ filepath=file_path,
+ )
+ except Exception as e:
+ self.logger.warning(
+ f"Error retrieving file {file_path} from ref {branch}: {e}"
+ )
+ return ""
+
def get_pr_description_full(self) -> str:
"""Get full PR description with metadata"""
if not self.pr:
diff --git a/pr_agent/git_providers/github_provider.py b/pr_agent/git_providers/github_provider.py
index fa52b7dc05..4d001b06d8 100644
--- a/pr_agent/git_providers/github_provider.py
+++ b/pr_agent/git_providers/github_provider.py
@@ -701,6 +701,9 @@ def get_languages(self):
def get_pr_branch(self):
return self.pr.head.ref
+ def get_pr_base_ref(self) -> str:
+ return self.pr.base.sha or self.pr.base.ref or ""
+
def get_pr_owner_id(self) -> str | None:
if not self.repo:
return None
diff --git a/pr_agent/git_providers/gitlab_provider.py b/pr_agent/git_providers/gitlab_provider.py
index b3f54920d0..be1ac29c26 100644
--- a/pr_agent/git_providers/gitlab_provider.py
+++ b/pr_agent/git_providers/gitlab_provider.py
@@ -774,6 +774,9 @@ def get_languages(self):
def get_pr_branch(self):
return self.mr.source_branch
+ def get_pr_base_ref(self) -> str:
+ return getattr(self.mr, "target_branch", "") or ""
+
def get_pr_owner_id(self) -> str | None:
if not self.gitlab_url or 'gitlab.com' in self.gitlab_url:
if not self.id_project:
diff --git a/pr_agent/settings/configuration.toml b/pr_agent/settings/configuration.toml
index c2533b54ec..9142cf1808 100644
--- a/pr_agent/settings/configuration.toml
+++ b/pr_agent/settings/configuration.toml
@@ -81,6 +81,12 @@ require_security_review=true
require_estimate_contribution_time_cost=false
require_todo_scan=false
require_ticket_analysis_review=true
+require_risk_assessment=false
+require_merge_recommendation=false
+require_priority_files=false
+enable_review_rules=true
+review_rules_paths=[".pr_agent/review_rules.md",".github/review_rules.md","docs/review_rules.md"]
+max_review_rules_tokens=1200
# general options
publish_output_no_suggestions=true # Set to "false" if you only need the reviewer's remarks (not labels, not "security audit", etc.) and want to avoid noisy "No major issues detected" comments.
persistent_comment=true
diff --git a/pr_agent/settings/pr_reviewer_prompts.toml b/pr_agent/settings/pr_reviewer_prompts.toml
index bbe6c6d04c..2477bb28ed 100644
--- a/pr_agent/settings/pr_reviewer_prompts.toml
+++ b/pr_agent/settings/pr_reviewer_prompts.toml
@@ -68,7 +68,16 @@ Extra instructions from the user:
{{ extra_instructions }}
======
{% endif %}
+{%- if review_rules %}
+Repository/team review rules:
+======
+{{ review_rules }}
+======
+- Apply these repository rules in addition to the general review criteria.
+- When a repository rule conflicts with a generic style preference, prioritize the repository rule.
+- Use the repository rules to decide what deserves extra attention, but do not invent violations that are not supported by the diff.
+{% endif %}
The output must be a YAML object equivalent to type $PRReview, according to the following Pydantic definitions:
=====
@@ -117,6 +126,15 @@ class Review(BaseModel):
{%- if require_estimate_effort_to_review %}
estimated_effort_to_review_[1-5]: int = Field(description="Estimate, on a scale of 1-5 (inclusive), the time and effort required to review this PR by an experienced and knowledgeable developer. 1 means short and easy review, 5 means long and hard review. Take into account the size, complexity, quality, and the needed changes of the PR code diff.")
{%- endif %}
+{%- if require_risk_assessment %}
+ risk_level: str = Field(description="Overall risk level of this PR. Answer with exactly one of: low, medium, high. Use high only when the PR introduces a clear bug, security concern, or major logic risk. Use medium when the PR is not clearly broken but contains non-trivial areas that require careful human verification. Use low when the PR is small, low-impact, and no important issues are identified.")
+{%- endif %}
+{%- if require_merge_recommendation %}
+ merge_recommendation: str = Field(description="Overall merge recommendation for this PR. Answer with exactly one of: safe_to_merge, merge_with_caution, changes_required. Use changes_required when there are clear issues that should be fixed before merge. Use merge_with_caution when the PR seems acceptable but still deserves focused reviewer attention. Use safe_to_merge when no important blockers or risks are identified.")
+{%- endif %}
+{%- if require_priority_files %}
+ review_priority_files: List[str] = Field(description="A short list of the most important files a human reviewer should inspect first. Return an empty list if the PR is too small or no file deserves special attention.")
+{%- endif %}
{%- if require_estimate_contribution_time_cost %}
contribution_time_cost_estimate: ContributionTimeCostEstimate = Field(description="An estimate of the time required to implement the changes, based on the quantity, quality, and complexity of the contribution, as well as the context from the PR description and commit messages.")
{%- endif %}
@@ -165,6 +183,19 @@ review:
estimated_effort_to_review_[1-5]: |
3
{%- endif %}
+{%- if require_risk_assessment %}
+ risk_level: |
+ low
+{%- endif %}
+{%- if require_merge_recommendation %}
+ merge_recommendation: |
+ safe_to_merge
+{%- endif %}
+{%- if require_priority_files %}
+ review_priority_files:
+ - |
+ src/example.py
+{%- endif %}
{%- if require_score %}
score: 89
{%- endif %}
@@ -303,6 +334,19 @@ review:
estimated_effort_to_review_[1-5]: |
3
{%- endif %}
+{%- if require_risk_assessment %}
+ risk_level: |
+ low
+{%- endif %}
+{%- if require_merge_recommendation %}
+ merge_recommendation: |
+ safe_to_merge
+{%- endif %}
+{%- if require_priority_files %}
+ review_priority_files:
+ - |
+ src/example.py
+{%- endif %}
{%- if require_score %}
score: 89
{%- endif %}
diff --git a/pr_agent/tools/pr_reviewer.py b/pr_agent/tools/pr_reviewer.py
index c4917f3597..66f1d78b04 100644
--- a/pr_agent/tools/pr_reviewer.py
+++ b/pr_agent/tools/pr_reviewer.py
@@ -1,6 +1,7 @@
import copy
import datetime
import traceback
+
from collections import OrderedDict
from functools import partial
from typing import List, Tuple
@@ -13,7 +14,7 @@
get_pr_diff,
retry_with_fallback_models)
from pr_agent.algo.token_handler import TokenHandler
-from pr_agent.algo.utils import (ModelType, PRReviewHeader,
+from pr_agent.algo.utils import (ModelType, PRReviewHeader, clip_tokens,
convert_to_markdown_v2, github_action_output,
load_yaml, show_relevant_configurations)
from pr_agent.config_loader import get_settings
@@ -66,6 +67,8 @@ def __init__(self, pr_url: str, is_answer: bool = False, is_auto: bool = False,
answer_str, question_str = self._get_user_answers()
self.pr_description, self.pr_description_files = (
self.git_provider.get_pr_description(split_changes_walkthrough=True))
+ self.review_rules = self._get_review_rules()
+
if (self.pr_description_files and get_settings().get("config.is_auto_command", False) and
get_settings().get("config.enable_ai_metadata", False)):
add_ai_metadata_to_diff_files(self.git_provider, self.pr_description_files)
@@ -85,6 +88,9 @@ def __init__(self, pr_url: str, is_answer: bool = False, is_auto: bool = False,
"require_score": get_settings().pr_reviewer.require_score_review,
"require_tests": get_settings().pr_reviewer.require_tests_review,
"require_estimate_effort_to_review": get_settings().pr_reviewer.require_estimate_effort_to_review,
+ "require_risk_assessment": get_settings().pr_reviewer.get("require_risk_assessment", False),
+ "require_merge_recommendation": get_settings().pr_reviewer.get("require_merge_recommendation", False),
+ "require_priority_files": get_settings().pr_reviewer.get("require_priority_files", False),
"require_estimate_contribution_time_cost": get_settings().pr_reviewer.require_estimate_contribution_time_cost,
'require_can_be_split_review': get_settings().pr_reviewer.require_can_be_split_review,
'require_security_review': get_settings().pr_reviewer.require_security_review,
@@ -99,6 +105,8 @@ def __init__(self, pr_url: str, is_answer: bool = False, is_auto: bool = False,
"related_tickets": get_settings().get('related_tickets', []),
'duplicate_prompt_examples': get_settings().config.get('duplicate_prompt_examples', False),
"date": datetime.datetime.now().strftime('%Y-%m-%d'),
+ "review_rules": self.review_rules,
+
}
self.token_handler = TokenHandler(
@@ -117,6 +125,63 @@ def parse_incremental(self, args: List[str]):
incremental = IncrementalPR(is_incremental)
return incremental
+ def _get_review_rules(self) -> str:
+ if not get_settings().pr_reviewer.get("enable_review_rules", False):
+ return ""
+
+ rule_paths = get_settings().pr_reviewer.get("review_rules_paths", []) or []
+ if isinstance(rule_paths, str):
+ rule_paths = [rule_paths]
+ elif isinstance(rule_paths, (list, tuple)):
+ rule_paths = [
+ str(rule_path).strip()
+ for rule_path in rule_paths
+ if str(rule_path).strip()
+ ]
+ else:
+ get_logger().warning(
+ "Invalid review_rules_paths value; expected string or list of strings",
+ artifacts={"review_rules_paths": rule_paths},
+ )
+ rule_paths = []
+
+ ref = self.git_provider.get_pr_base_ref()
+ if not ref:
+ get_logger().warning("Could not resolve a trusted base ref for review rules")
+ return ""
+ loaded_rules = []
+ loaded_rule_paths = []
+
+ for rule_path in rule_paths:
+ try:
+ rule_content = self.git_provider.get_pr_file_content(rule_path, ref)
+ except Exception:
+ continue
+
+ if rule_content and rule_content.strip():
+ loaded_rule_paths.append(rule_path)
+ loaded_rules.append(f"File: `{rule_path}`\n{rule_content.strip()}")
+
+ if not loaded_rules:
+ get_logger().info("No review rules file found for this PR")
+ return ""
+
+ review_rules = "\n\n---\n\n".join(loaded_rules)
+ max_tokens = get_settings().pr_reviewer.get("max_review_rules_tokens", 0)
+ try:
+ max_tokens = int(max_tokens)
+ except (TypeError, ValueError):
+ get_logger().warning(
+ "Invalid max_review_rules_tokens value; skipping token clipping",
+ artifacts={"max_review_rules_tokens": max_tokens},
+ )
+ max_tokens = 0
+
+ if max_tokens > 0:
+ review_rules = clip_tokens(review_rules, max_tokens)
+ get_logger().info("Loaded review rules for this PR", artifacts={"rule_files": loaded_rule_paths})
+ return review_rules
+
async def run(self) -> None:
try:
if not self.git_provider.get_files():
@@ -173,7 +238,7 @@ async def run(self) -> None:
if get_settings().pr_reviewer.persistent_comment and not self.incremental.is_incremental:
final_update_message = get_settings().pr_reviewer.final_update_message
self.git_provider.publish_persistent_comment(pr_review,
- initial_header=f"{PRReviewHeader.REGULAR.value} 🔍",
+ initial_header=f"{PRReviewHeader.REGULAR.value}",
update_header=True,
final_update_message=final_update_message, )
else:
@@ -231,17 +296,32 @@ def _prepare_pr_review(self) -> str:
Prepare the PR review by processing the AI prediction and generating a markdown-formatted text that summarizes
the feedback.
"""
- first_key = 'review'
- last_key = 'security_concerns'
- data = load_yaml(self.prediction.strip(),
- keys_fix_yaml=["ticket_compliance_check", "estimated_effort_to_review_[1-5]:", "security_concerns:", "key_issues_to_review:",
- "relevant_file:", "relevant_line:", "suggestion:"],
- first_key=first_key, last_key=last_key)
+ first_key = "review"
+ last_key = "security_concerns"
+ data = load_yaml(
+ self.prediction.strip(),
+ keys_fix_yaml=[
+ "ticket_compliance_check:",
+ "estimated_effort_to_review_[1-5]:",
+ "risk_level:",
+ "merge_recommendation:",
+ "security_concerns:",
+ "key_issues_to_review:",
+ "relevant_file:",
+ "relevant_line:",
+ "suggestion:",
+ ],
+ first_key=first_key,
+ last_key=last_key,
+ )
github_action_output(data, 'review')
if 'review' not in data:
get_logger().exception("Failed to parse review data", artifact={"data": data})
return ""
+ get_logger().info(f"Risk level: {data.get('review', {}).get('risk_level')}")
+ get_logger().info(f"Merge recommendation: {data.get('review', {}).get('merge_recommendation')}")
+ get_logger().info(f"Priority files: {data.get('review', {}).get('review_priority_files')}")
# move data['review'] 'key_issues_to_review' key to the end of the dictionary
if 'key_issues_to_review' in data['review']:
@@ -273,6 +353,7 @@ def _prepare_pr_review(self) -> str:
# Add custom labels from the review prediction (effort, security)
self.set_review_labels(data)
+
if markdown_text == None or len(markdown_text) == 0:
markdown_text = ""
diff --git a/tests/unittest/test_convert_to_markdown.py b/tests/unittest/test_convert_to_markdown.py
index 0d18e03cec..aa254e1a69 100644
--- a/tests/unittest/test_convert_to_markdown.py
+++ b/tests/unittest/test_convert_to_markdown.py
@@ -255,6 +255,49 @@ def test_contribution_time_cost_estimate(self):
""")
assert convert_to_markdown_v2(input_data, gfm_supported=False).strip() == expected_output_no_gfm.strip()
+ def test_structured_review_fields_render(self):
+ input_data = {
+ "review": {
+ "risk_level": "medium",
+ "merge_recommendation": "merge_with_caution",
+ "review_priority_files": ["gui_app.py", "app.py"],
+ }
+ }
+
+ markdown = convert_to_markdown_v2(input_data, gfm_supported=False)
+
+ assert "Risk level: Medium" in markdown
+ assert "Merge recommendation: Merge with caution" in markdown
+ assert "Priority files" in markdown
+ assert "- gui_app.py" in markdown
+ assert "- app.py" in markdown
+
+ def test_structured_review_fields_render_empty_priority_files(self):
+ input_data = {
+ "review": {
+ "risk_level": "low",
+ "merge_recommendation": "safe_to_merge",
+ "review_priority_files": [],
+ }
+ }
+
+ markdown = convert_to_markdown_v2(input_data, gfm_supported=False)
+
+ assert "Risk level: Low" in markdown
+ assert "Merge recommendation: Safe to merge" in markdown
+ assert "Priority files: None" in markdown
+
+ def test_structured_review_fields_ignore_invalid_priority_files_type(self):
+ input_data = {
+ "review": {
+ "review_priority_files": {"file": "gui_app.py"},
+ }
+ }
+
+ markdown = convert_to_markdown_v2(input_data, gfm_supported=False)
+
+ assert "Priority files: None" in markdown
+
# Tests that the function works correctly with an empty dictionary input
def test_empty_dictionary_input(self):
diff --git a/tests/unittest/test_pr_reviewer.py b/tests/unittest/test_pr_reviewer.py
new file mode 100644
index 0000000000..7a4c9b8f75
--- /dev/null
+++ b/tests/unittest/test_pr_reviewer.py
@@ -0,0 +1,82 @@
+from types import SimpleNamespace
+from unittest.mock import MagicMock, patch
+
+from pr_agent.tools.pr_reviewer import PRReviewer
+
+
+def _make_reviewer():
+ with patch.object(PRReviewer, "__init__", lambda self, *a, **kw: None):
+ reviewer = PRReviewer.__new__(PRReviewer)
+ reviewer.git_provider = MagicMock()
+ return reviewer
+
+
+class TestPRReviewerReviewRules:
+ @patch("pr_agent.tools.pr_reviewer.get_settings")
+ def test_get_review_rules_disabled_returns_empty(self, mock_get_settings):
+ mock_get_settings.return_value = SimpleNamespace(
+ pr_reviewer={"enable_review_rules": False}
+ )
+ reviewer = _make_reviewer()
+
+ assert reviewer._get_review_rules() == ""
+
+ @patch("pr_agent.tools.pr_reviewer.get_settings")
+ def test_get_review_rules_invalid_paths_type_returns_empty(self, mock_get_settings):
+ mock_get_settings.return_value = SimpleNamespace(
+ pr_reviewer={
+ "enable_review_rules": True,
+ "review_rules_paths": 123,
+ "max_review_rules_tokens": 0,
+ }
+ )
+ reviewer = _make_reviewer()
+ reviewer.git_provider.get_pr_base_ref.return_value = "main"
+
+ assert reviewer._get_review_rules() == ""
+ reviewer.git_provider.get_pr_file_content.assert_not_called()
+
+ @patch("pr_agent.tools.pr_reviewer.get_settings")
+ def test_get_review_rules_missing_base_ref_returns_empty(self, mock_get_settings):
+ mock_get_settings.return_value = SimpleNamespace(
+ pr_reviewer={
+ "enable_review_rules": True,
+ "review_rules_paths": [".pr_agent/review_rules.md"],
+ "max_review_rules_tokens": 0,
+ }
+ )
+ reviewer = _make_reviewer()
+ reviewer.git_provider.get_pr_base_ref.return_value = ""
+
+ assert reviewer._get_review_rules() == ""
+ reviewer.git_provider.get_pr_file_content.assert_not_called()
+
+ @patch("pr_agent.tools.pr_reviewer.get_settings")
+ def test_get_review_rules_loads_and_concatenates_files(self, mock_get_settings):
+ mock_get_settings.return_value = SimpleNamespace(
+ pr_reviewer={
+ "enable_review_rules": True,
+ "review_rules_paths": [
+ ".pr_agent/review_rules.md",
+ ".github/review_rules.md",
+ ],
+ "max_review_rules_tokens": 0,
+ }
+ )
+ reviewer = _make_reviewer()
+ reviewer.git_provider.get_pr_base_ref.return_value = "main"
+
+ def _get_file_content(path, ref):
+ if path == ".pr_agent/review_rules.md":
+ return "Rule A"
+ if path == ".github/review_rules.md":
+ return "Rule B"
+ return ""
+
+ reviewer.git_provider.get_pr_file_content.side_effect = _get_file_content
+
+ review_rules = reviewer._get_review_rules()
+
+ assert "File: `.pr_agent/review_rules.md`\nRule A" in review_rules
+ assert "File: `.github/review_rules.md`\nRule B" in review_rules
+ assert "\n\n---\n\n" in review_rules
|