#!/usr/bin/env python3
"""
DAK API Documentation Hub Generator
This script post-processes the IG-generated HTML files to inject DAK API content.
It works by:
1. Detecting existing JSON schemas (ValueSet and Logical Model schemas)
2. Creating minimal OpenAPI 3.0 wrappers for each JSON schema
3. Generating schema documentation content
4. Post-processing the dak-api.html file to replace content at the placeholder div (id="dak-api-content-placeholder")
5. Creating individual schema documentation pages using dak-api.html as template
The script is designed to work with a single IG publisher run, post-processing
the generated HTML files instead of creating markdown that requires a second run.
Usage:
python generate_dak_api_hub.py [output_dir] [openapi_dir]
Author: SMART Guidelines Team
"""
import json
import os
import sys
import logging
import re
from typing import Dict, List, Optional, Any, Tuple
from pathlib import Path
from urllib.parse import urlparse
from datetime import datetime
def setup_logging() -> logging.Logger:
"""Configure logging for the script."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
return logging.getLogger(__name__)
class QAReporter:
"""Handles QA reporting for post-processing steps and merging with FHIR IG publisher QA."""
def __init__(self, phase: str = "postprocessing"):
self.phase = phase
self.timestamp = datetime.now().isoformat()
self.report = {
"phase": phase,
"timestamp": self.timestamp,
"status": "running",
"summary": {},
"details": {
"successes": [],
"warnings": [],
"errors": [],
"files_processed": [],
"files_expected": [],
"files_missing": []
}
}
# Store existing IG publisher QA data if present
self.ig_publisher_qa = None
def load_existing_ig_qa(self, qa_file_path: str):
"""Load existing FHIR IG publisher QA file to preserve its structure."""
try:
if os.path.exists(qa_file_path):
with open(qa_file_path, 'r', encoding='utf-8') as f:
self.ig_publisher_qa = json.load(f)
print(f"Loaded existing IG publisher QA file: {qa_file_path}")
return True
else:
print(f"No existing IG publisher QA file found at: {qa_file_path}")
return False
except Exception as e:
print(f"Error loading existing IG publisher QA file: {e}")
return False
def add_success(self, message: str, details: Optional[Dict] = None):
"""Add a success entry to the QA report."""
entry = {"message": message, "timestamp": datetime.now().isoformat()}
if details:
entry["details"] = details
self.report["details"]["successes"].append(entry)
def add_warning(self, message: str, details: Optional[Dict] = None):
"""Add a warning entry to the QA report."""
entry = {"message": message, "timestamp": datetime.now().isoformat()}
if details:
entry["details"] = details
self.report["details"]["warnings"].append(entry)
def add_error(self, message: str, details: Optional[Dict] = None):
"""Add an error entry to the QA report."""
entry = {"message": message, "timestamp": datetime.now().isoformat()}
if details:
entry["details"] = details
self.report["details"]["errors"].append(entry)
def add_file_processed(self, file_path: str, status: str = "success", details: Optional[Dict] = None):
"""Record a file that was processed."""
entry = {
"file": file_path,
"status": status,
"timestamp": datetime.now().isoformat()
}
if details:
entry["details"] = details
self.report["details"]["files_processed"].append(entry)
def add_file_expected(self, file_path: str, found: bool = False):
"""Record a file that was expected."""
self.report["details"]["files_expected"].append(file_path)
if not found:
self.report["details"]["files_missing"].append(file_path)
def finalize_report(self, status: str = "completed"):
"""Finalize the QA report with summary statistics and merge with IG publisher QA if available."""
self.report["status"] = status
self.report["summary"] = {
"total_successes": len(self.report["details"]["successes"]),
"total_warnings": len(self.report["details"]["warnings"]),
"total_errors": len(self.report["details"]["errors"]),
"files_processed_count": len(self.report["details"]["files_processed"]),
"files_expected_count": len(self.report["details"]["files_expected"]),
"files_missing_count": len(self.report["details"]["files_missing"]),
"completion_timestamp": datetime.now().isoformat()
}
# If we have IG publisher QA data, merge it with our report
if self.ig_publisher_qa:
return self.merge_with_ig_publisher_qa()
return self.report
def merge_with_ig_publisher_qa(self):
"""Merge our QA report with the existing FHIR IG publisher QA structure."""
try:
# Create a merged report that preserves the IG publisher structure
merged_report = dict(self.ig_publisher_qa)
# Add our component reports as a new section
if "dak_api_processing" not in merged_report:
merged_report["dak_api_processing"] = {}
# Include any stored preprocessing reports
preprocessing_reports = {}
if hasattr(self, '_stored_preprocessing_reports'):
for i, report in enumerate(self._stored_preprocessing_reports):
component_name = report.get("component", report.get("phase", f"component_{i}"))
preprocessing_reports[component_name] = report
# Add our preprocessing and postprocessing reports
merged_report["dak_api_processing"] = {
"preprocessing_reports": preprocessing_reports,
"postprocessing": self.report,
"summary": {
"total_dak_api_successes": self.report["summary"]["total_successes"],
"total_dak_api_warnings": self.report["summary"]["total_warnings"],
"total_dak_api_errors": self.report["summary"]["total_errors"],
"dak_api_completion_timestamp": self.report["summary"]["completion_timestamp"]
}
}
return merged_report
except Exception as e:
print(f"Error merging with IG publisher QA: {e}")
# Fall back to our report only
return self.report
def merge_preprocessing_report(self, preprocessing_report: Dict):
"""Merge a preprocessing report into this post-processing report."""
if "details" in preprocessing_report:
# Add preprocessing entries with a prefix
component_name = preprocessing_report.get("component", preprocessing_report.get("phase", "Unknown"))
for success in preprocessing_report["details"].get("successes", []):
self.add_success(f"[{component_name}] {success['message']}", success.get("details"))
for warning in preprocessing_report["details"].get("warnings", []):
self.add_warning(f"[{component_name}] {warning['message']}", warning.get("details"))
for error in preprocessing_report["details"].get("errors", []):
self.add_error(f"[{component_name}] {error['message']}", error.get("details"))
for file_proc in preprocessing_report["details"].get("files_processed", []):
self.add_file_processed(f"[{component_name}] {file_proc['file']}", file_proc.get("status", "unknown"), file_proc.get("details"))
# Merge schemas_generated if available (for component reports)
for schema in preprocessing_report["details"].get("schemas_generated", []):
schema_with_component = dict(schema)
schema_with_component["component"] = component_name
self.add_success(f"[{component_name}] Generated schema", schema_with_component)
# Store preprocessing report in the final merged structure
if self.ig_publisher_qa and "dak_api_processing" in self.ig_publisher_qa:
self.ig_publisher_qa["dak_api_processing"]["preprocessing"] = preprocessing_report
else:
# Store for later merging
if not hasattr(self, '_stored_preprocessing_reports'):
self._stored_preprocessing_reports = []
self._stored_preprocessing_reports.append(preprocessing_report)
def save_to_file(self, output_path: str):
"""Save QA report to a JSON file."""
try:
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(self.report, f, indent=2, ensure_ascii=False)
return True
except Exception as e:
print(f"Error saving QA report to {output_path}: {e}")
return False
class SchemaDetector:
"""Detects and categorizes schema files in the output directory."""
def __init__(self, logger: logging.Logger):
self.logger = logger
def find_schema_files(self, output_dir: str) -> Dict[str, List[str]]:
"""
Find all schema files in the output directory.
Returns:
Dictionary with categories 'valueset' and 'logical_model'
containing lists of schema file paths
"""
schemas = {
'valueset': [],
'logical_model': [],
'other': []
}
if not os.path.exists(output_dir):
self.logger.warning(f"Output directory does not exist: {output_dir}")
return schemas
self.logger.info(f"Scanning directory for schema files: {output_dir}")
all_files = os.listdir(output_dir)
schema_count = 0
for file in all_files:
if file.endswith('.schema.json'):
schema_count += 1
file_path = os.path.join(output_dir, file)
self.logger.info(f"Found schema file: {file}")
if file.startswith('ValueSet-'):
schemas['valueset'].append(file_path)
self.logger.info(f" -> Categorized as ValueSet schema")
elif file in ['ValueSets.schema.json', 'LogicalModels.schema.json']:
# These are enumeration schemas, categorize appropriately
if file == 'ValueSets.schema.json':
schemas['valueset'].append(file_path)
self.logger.info(f" -> Categorized as ValueSet enumeration schema")
else:
schemas['logical_model'].append(file_path)
self.logger.info(f" -> Categorized as Logical Model enumeration schema")
elif not file.startswith('ValueSet-') and not file.startswith('CodeSystem-'):
# Assume logical model if not ValueSet or CodeSystem
schemas['logical_model'].append(file_path)
self.logger.info(f" -> Categorized as Logical Model schema")
else:
schemas['other'].append(file_path)
self.logger.info(f" -> Categorized as other schema")
self.logger.info(f"Schema detection summary:")
self.logger.info(f" Total schema files found: {schema_count}")
self.logger.info(f" ValueSet schemas: {len(schemas['valueset'])}")
self.logger.info(f" Logical Model schemas: {len(schemas['logical_model'])}")
self.logger.info(f" Other schemas: {len(schemas['other'])}")
if schema_count == 0:
self.logger.warning(f"No .schema.json files found in {output_dir}")
self.logger.info(f"Directory contents: {', '.join(all_files)}")
return schemas
def find_jsonld_files(self, output_dir: str) -> List[str]:
"""
Find all JSON-LD vocabulary files in the output directory.
Returns:
List of JSON-LD file paths
"""
jsonld_files = []
if not os.path.exists(output_dir):
self.logger.warning(f"Output directory does not exist: {output_dir}")
return jsonld_files
self.logger.info(f"Scanning directory for JSON-LD files: {output_dir}")
all_files = os.listdir(output_dir)
jsonld_count = 0
for file in all_files:
if file.endswith('.jsonld'):
jsonld_count += 1
file_path = os.path.join(output_dir, file)
self.logger.info(f"Found JSON-LD file: {file}")
if file.startswith('ValueSet-'):
jsonld_files.append(file_path)
self.logger.info(f" -> Added ValueSet JSON-LD vocabulary")
else:
self.logger.info(f" -> Skipped non-ValueSet JSON-LD file")
self.logger.info(f"JSON-LD detection summary:")
self.logger.info(f" Total JSON-LD files found: {jsonld_count}")
self.logger.info(f" ValueSet JSON-LD vocabularies: {len(jsonld_files)}")
if jsonld_count == 0:
self.logger.info(f"No .jsonld files found in {output_dir}")
return jsonld_files
class OpenAPIDetector:
"""Detects existing OpenAPI/Swagger files."""
def __init__(self, logger: logging.Logger):
self.logger = logger
def find_openapi_files(self, openapi_dir: str) -> List[str]:
"""Find OpenAPI/Swagger files in the given directory, including generated .openapi.json files."""
openapi_files = []
if not os.path.exists(openapi_dir):
self.logger.info(f"OpenAPI directory does not exist: {openapi_dir}")
return openapi_files
self.logger.info(f"Scanning for OpenAPI files in: {openapi_dir}")
for root, dirs, files in os.walk(openapi_dir):
for file in files:
# Include OpenAPI/Swagger files with more lenient matching for existing files
# Exclude index.html as it's handled separately for content extraction
if (file.endswith(('.json', '.yaml', '.yml')) and
file.lower() != 'index.html' and
('openapi' in file.lower() or 'swagger' in file.lower() or
'api' in file.lower() or # More lenient for existing API files
file.endswith('.openapi.json') or file.endswith('.openapi.yaml'))):
full_path = os.path.join(root, file)
openapi_files.append(full_path)
self.logger.info(f"Found OpenAPI file: {file}")
self.logger.info(f"Found {len(openapi_files)} OpenAPI/Swagger files total")
return openapi_files
def find_existing_html_content(self, openapi_dir: str) -> Optional[str]:
"""Find and extract content from existing index.html in OpenAPI directory."""
index_html_path = os.path.join(openapi_dir, "index.html")
if not os.path.exists(index_html_path):
self.logger.info(f"No existing index.html found in: {openapi_dir}")
return None
try:
from bs4 import BeautifulSoup
self.logger.info(f"Found existing OpenAPI HTML content at: {index_html_path}")
with open(index_html_path, 'r', encoding='utf-8') as f:
html_content = f.read()
# Parse the HTML using BeautifulSoup
soup = BeautifulSoup(html_content, 'html.parser')
# Extract the body content
body = soup.find('body')
if not body:
self.logger.warning("No
tag found in existing index.html")
return None
# Remove script tags and other non-content elements
for script in body.find_all(['script', 'noscript']):
script.decompose()
# Look for the main content container
# Try common container patterns
content_container = (
body.find('div', class_=lambda x: x and 'container' in x.lower()) or
body.find('div', class_=lambda x: x and 'content' in x.lower()) or
body.find('main') or
body.find('div', id=lambda x: x and 'content' in x.lower()) or
body
)
if content_container:
# Get the inner HTML content
extracted_content = str(content_container)
# If we got the whole body, remove the body tags
if content_container == body:
# Remove body tag but keep its content
extracted_content = extracted_content.replace('', '').replace('', '')
# Clean up any attributes from body tag
if extracted_content.startswith('')
if end_tag != -1:
extracted_content = extracted_content[end_tag + 1:]
self.logger.info(f"Extracted {len(extracted_content)} characters of HTML content from existing index.html")
return extracted_content.strip()
else:
self.logger.warning("Could not find suitable content container in existing index.html")
return None
except ImportError:
self.logger.error("BeautifulSoup not available. Please install beautifulsoup4: pip install beautifulsoup4")
return None
except Exception as e:
self.logger.error(f"Error parsing existing HTML content: {e}")
return None
class OpenAPIWrapper:
"""Creates OpenAPI 3.0 wrappers for JSON schemas."""
def __init__(self, logger: logging.Logger, canonical_base: str = "http://smart.who.int/base"):
self.logger = logger
self.canonical_base = canonical_base
def create_wrapper_for_schema(self, schema_path: str, schema_type: str, output_dir: str) -> Optional[str]:
"""
Create an OpenAPI 3.0 wrapper for a JSON schema.
Args:
schema_path: Path to the JSON schema file
schema_type: Type of schema ('valueset' or 'logical_model')
output_dir: Directory to save the OpenAPI wrapper
Returns:
Path to the generated OpenAPI wrapper file, or None if failed
"""
try:
# Load the schema
with open(schema_path, 'r', encoding='utf-8') as f:
schema = json.load(f)
schema_filename = os.path.basename(schema_path)
schema_name = schema_filename.replace('.schema.json', '')
# Determine the endpoint path and description
if schema_type == 'valueset':
endpoint_path = f"/{schema_filename}"
summary = f"JSON Schema definition for the enumeration {schema_name}"
description = f"This endpoint serves the JSON Schema definition for the enumeration {schema_name}."
media_type = "application/schema+json"
else: # logical_model
endpoint_path = f"/{schema_filename}"
summary = f"JSON Schema definition for the Logical Model {schema_name}"
description = f"This endpoint serves the JSON Schema definition for the Logical Model {schema_name}."
media_type = "application/schema+json"
# Create OpenAPI wrapper
openapi_spec = {
"openapi": "3.0.3",
"info": {
"title": f"{schema.get('title', schema_name)} API",
"description": schema.get('description', f"API for {schema_name} schema"),
"version": "1.0.0"
},
"paths": {
endpoint_path: {
"get": {
"summary": summary,
"description": description,
"responses": {
"200": {
"description": f"The JSON Schema for {schema_name}",
"content": {
media_type: {
"schema": {
"$ref": f"./{schema_filename}"
}
}
}
}
}
}
}
},
"components": {
"schemas": {
schema_name: schema
}
}
}
# Save OpenAPI wrapper
wrapper_filename = f"{schema_name}.openapi.json"
wrapper_path = os.path.join(output_dir, wrapper_filename)
with open(wrapper_path, 'w', encoding='utf-8') as f:
json.dump(openapi_spec, f, indent=2, ensure_ascii=False)
self.logger.info(f"Created OpenAPI wrapper: {wrapper_path}")
return wrapper_path
except Exception as e:
self.logger.error(f"Error creating OpenAPI wrapper for {schema_path}: {e}")
return None
def create_enumeration_wrapper(self, enum_schema_path: str, schema_type: str, output_dir: str) -> Optional[str]:
"""
Create an OpenAPI 3.0 wrapper for an enumeration schema.
Args:
enum_schema_path: Path to the enumeration schema file
schema_type: Type of schema ('valueset' or 'logical_model')
output_dir: Directory to save the OpenAPI wrapper
Returns:
Path to the generated OpenAPI wrapper file, or None if failed
"""
try:
# Load the enumeration schema
with open(enum_schema_path, 'r', encoding='utf-8') as f:
enum_schema = json.load(f)
enum_filename = os.path.basename(enum_schema_path)
# Determine the endpoint details
if schema_type == 'valueset':
endpoint_path = "/ValueSets.schema.json"
api_title = "ValueSets Enumeration API"
api_description = "API endpoint providing an enumeration of all available ValueSet schemas"
summary = "Get enumeration of all ValueSet schemas"
description = "Returns a list of all available ValueSet schemas with metadata"
else: # logical_model
endpoint_path = "/LogicalModels.schema.json"
api_title = "LogicalModels Enumeration API"
api_description = "API endpoint providing an enumeration of all available Logical Model schemas"
summary = "Get enumeration of all Logical Model schemas"
description = "Returns a list of all available Logical Model schemas with metadata"
# Create OpenAPI wrapper for the enumeration
openapi_spec = {
"openapi": "3.0.3",
"info": {
"title": api_title,
"description": api_description,
"version": "1.0.0"
},
"paths": {
endpoint_path: {
"get": {
"summary": summary,
"description": description,
"responses": {
"200": {
"description": f"Successfully retrieved {schema_type} enumeration",
"content": {
"application/json": {
"schema": {
"$ref": f"#/components/schemas/EnumerationResponse"
},
"example": enum_schema.get('example', {})
}
}
}
}
}
}
},
"components": {
"schemas": {
"EnumerationResponse": enum_schema
}
}
}
# Save OpenAPI wrapper
if schema_type == 'valueset':
wrapper_filename = "ValueSets-enumeration.openapi.json"
else:
wrapper_filename = "LogicalModels-enumeration.openapi.json"
wrapper_path = os.path.join(output_dir, wrapper_filename)
with open(wrapper_path, 'w', encoding='utf-8') as f:
json.dump(openapi_spec, f, indent=2, ensure_ascii=False)
self.logger.info(f"Created enumeration OpenAPI wrapper: {wrapper_path}")
return wrapper_path
except Exception as e:
self.logger.error(f"Error creating enumeration OpenAPI wrapper for {enum_schema_path}: {e}")
return None
class HTMLProcessor:
"""Post-processes HTML files to inject DAK API content."""
def __init__(self, logger: logging.Logger, output_dir: str):
self.logger = logger
self.output_dir = output_dir
def create_html_template_from_existing(self, template_html_path: str, title: str, content: str) -> str:
"""
Create a new HTML file using an existing file as template.
Args:
template_html_path: Path to the template HTML file (e.g., dak-api.html)
title: Title for the new page
content: HTML content to inject at the placeholder div
Returns:
The new HTML content as a string
"""
try:
with open(template_html_path, 'r', encoding='utf-8') as f:
template_html = f.read()
# Update the title
import re
title_pattern = r'([^<]*)'
match = re.search(title_pattern, template_html)
if match:
current_title = match.group(1)
# Preserve any suffix from the original title
if ' - ' in current_title:
suffix = ' - ' + current_title.split(' - ', 1)[1]
else:
suffix = ''
new_title = title + suffix
template_html = re.sub(title_pattern, f'{new_title}', template_html)
# Replace the placeholder div with actual content
# Look for either the div placeholder or the old comment marker
div_placeholder = '
'
comment_marker = ''
if div_placeholder in template_html:
# Find the closing
and replace the entire placeholder div
import re
placeholder_pattern = r'
.*?
'
template_html = re.sub(placeholder_pattern, content, template_html, flags=re.DOTALL)
elif comment_marker in template_html:
template_html = template_html.replace(comment_marker, content)
else:
self.logger.warning(f"No placeholder marker found in template")
return template_html
except Exception as e:
self.logger.error(f"Error creating HTML template: {e}")
return ""
def inject_content_at_comment_marker(self, html_file_path: str, content: str) -> bool:
"""
Inject content into an HTML file at the DAK API placeholder div.
Args:
html_file_path: Path to the HTML file to modify
content: HTML content to inject
Returns:
True if successful, False otherwise
"""
try:
self.logger.info(f"🔍 Starting content injection into: {html_file_path}")
self.logger.info(f"📏 Content to inject length: {len(content)} characters")
if not os.path.exists(html_file_path):
self.logger.error(f"❌ HTML file does not exist: {html_file_path}")
return False
with open(html_file_path, 'r', encoding='utf-8') as f:
html_content = f.read()
self.logger.info(f"📖 Read HTML file, original length: {len(html_content)} characters")
# Look for the placeholder div (survives HTML conversion from markdown)
import re
placeholder_pattern = r'
]*>.*?
'
if not re.search(placeholder_pattern, html_content, re.DOTALL):
# Fallback: try the old comment marker for backwards compatibility
comment_marker = ''
if comment_marker in html_content:
self.logger.info(f"✅ Found legacy DAK_API_CONTENT comment marker")
new_html_content = html_content.replace(comment_marker, content)
else:
self.logger.error(f"❌ DAK API placeholder div not found in {html_file_path}")
self.logger.info("Looking for:
")
self.logger.info("Available content sample for debugging:")
# Show a sample to help debug
sample_content = html_content[:1000] if len(html_content) > 1000 else html_content
self.logger.info(f"Sample content: {sample_content}")
return False
else:
self.logger.info(f"✅ Found DAK API placeholder div")
# Replace the placeholder div with the actual content
new_html_content = re.sub(placeholder_pattern, content, html_content, flags=re.DOTALL)
self.logger.info(f"📏 Content replacement: original={len(html_content)}, new={len(new_html_content)}")
# Write the modified HTML back to the file
with open(html_file_path, 'w', encoding='utf-8') as f:
f.write(new_html_content)
size_increase = len(new_html_content) - len(html_content)
self.logger.info(f"💾 Successfully wrote modified HTML back to {html_file_path}")
self.logger.info(f"📏 Final HTML file size: {len(new_html_content)} characters (increased by {size_increase})")
if size_increase > 100: # If we added substantial content
self.logger.info(f"✅ Content injection appears successful (substantial size increase)")
return True
else:
self.logger.warning(f"⚠️ Content injection may have failed (minimal size increase: {size_increase})")
return False
except Exception as e:
self.logger.error(f"❌ Error injecting content into {html_file_path}: {e}")
import traceback
self.logger.error(f"🔍 Traceback: {traceback.format_exc()}")
return False
class SchemaDocumentationRenderer:
"""Generates HTML documentation content for schemas."""
def __init__(self, logger: logging.Logger):
self.logger = logger
def get_codesystem_anchors(self, codesystem_url: str, output_dir: str) -> Dict[str, str]:
"""
Attempt to find anchor mappings for codes in a CodeSystem HTML file.
Args:
codesystem_url: The canonical URL of the CodeSystem
output_dir: Directory where HTML files are located
Returns:
Dictionary mapping codes to their anchor names
"""
anchor_map = {}
try:
# Extract CodeSystem ID from URL
if '/CodeSystem/' in codesystem_url:
codesystem_id = codesystem_url.split('/CodeSystem/')[-1]
html_filename = f"CodeSystem-{codesystem_id}.html"
html_path = os.path.join(output_dir, html_filename)
if os.path.exists(html_path):
with open(html_path, 'r', encoding='utf-8') as f:
html_content = f.read()
# Look for anchor patterns in code definition tables
# Pattern 1: id="CodeSystem-ID-code" (simple case)
# Pattern 2: id="ID-code" (common IG Publisher pattern)
# Pattern 3: code-specific patterns based on the actual HTML structure
import re
# Try different anchor patterns that might be used by IG Publisher
patterns = [
rf'id="({codesystem_id}-[^"]+)"', # CodeSystem-ID-code
rf'id="([^"]*-[0-9.]+[^"]*)"', # Any ID with numeric codes
rf'
'
code_matches = re.findall(td_pattern, html_content)
for code in code_matches:
# Create a best-guess anchor
anchor_map[code] = f"{codesystem_id}-{code}"
self.logger.info(f"Found {len(anchor_map)} anchor mappings for CodeSystem {codesystem_id}")
if anchor_map:
# Log a few examples for debugging
sample_mappings = list(anchor_map.items())[:3]
self.logger.info(f"Sample mappings: {sample_mappings}")
except Exception as e:
self.logger.warning(f"Could not load CodeSystem anchors for {codesystem_url}: {e}")
return anchor_map
def generate_schema_documentation_html(self, schema_path: str, schema_type: str, output_dir: str) -> str:
"""
Generate HTML documentation content for a schema.
Args:
schema_path: Path to the schema file
schema_type: Type of schema ('valueset' or 'logical_model')
output_dir: Output directory for reference files
Returns:
HTML content as a string
"""
try:
schema_filename = os.path.basename(schema_path)
spec_name = schema_filename.replace('.schema.json', '')
# Load the schema
with open(schema_path, 'r', encoding='utf-8') as f:
schema_data = json.load(f)
title = schema_data.get('title', f"{spec_name} Schema Documentation")
# Generate HTML content
html_content = f"""
"""
# Add schema ID as link if available
schema_id = schema_data.get('$id', '')
if schema_id:
# Determine the correct FHIR page link based on schema type
if schema_filename == 'ValueSets.schema.json':
fhir_url = "artifacts.html#terminology-value-sets"
elif schema_filename == 'LogicalModels.schema.json':
fhir_url = "artifacts.html#structures-logical-models"
else:
# Individual schemas link to their specific HTML files
fhir_url = schema_filename.replace('.schema.json', '.html')
html_content += f"""
"""
# Handle enum values for ValueSets
if 'enum' in schema_data:
# Sort enum values alphabetically and truncate after 40 entries
enum_values = sorted(schema_data['enum'])
displayed_values = enum_values[:40]
truncated = len(enum_values) > 40
# Check if we can link to CodeSystem definitions
codesystem_anchors = {}
if schema_type == 'valueset' and schema_id:
# Try to load system mapping to find CodeSystem
try:
if '/' in schema_id:
base_url = '/'.join(schema_id.split('/')[:-1])
system_filename = f"{spec_name}.system.json"
system_path = os.path.join(output_dir, system_filename)
if os.path.exists(system_path):
with open(system_path, 'r', encoding='utf-8') as f:
system_data = json.load(f)
# Get system URIs for codes
fhir_systems = system_data.get('fhir:systems', {})
if fhir_systems:
# Check if any system is from the same IG
for code, system_uri in fhir_systems.items():
if system_uri and base_url in system_uri:
# This is a local CodeSystem, try to get anchors
codesystem_anchors = self.get_codesystem_anchors(system_uri, output_dir)
break
except Exception as e:
self.logger.warning(f"Could not load system mappings for {spec_name}: {e}")
html_content += """
Allowed Values
"""
for enum_value in displayed_values:
if enum_value in codesystem_anchors:
# Create link to CodeSystem anchor
anchor = codesystem_anchors[enum_value]
codesystem_id = anchor.split('-')[0]
link_url = f"CodeSystem-{codesystem_id}.html#{anchor}"
html_content += f' {enum_value}\n'
else:
html_content += f' {enum_value}\n'
if truncated:
remaining_count = len(enum_values) - 40
html_content += f'
... and {remaining_count} more values
\n'
html_content += """
"""
# Handle object properties for Logical Models
if 'properties' in schema_data:
html_content += """
Properties
"""
for prop_name, prop_def in schema_data['properties'].items():
prop_type = prop_def.get('type', 'unknown')
html_content += f'
{prop_name} ({prop_type}): {prop_def.get("description", "No description")}
\n'
html_content += """
"""
# Show required fields
required = schema_data.get('required', [])
if required:
html_content += f"""
Required fields: {', '.join(required)}
"""
html_content += """
"""
# Show full schema as collapsible JSON
schema_json_str = json.dumps(schema_data, indent=2)
html_content += f"""
Full Schema (JSON)
{schema_json_str}
This documentation is automatically generated from the schema definition.
"""
return html_content
except Exception as e:
self.logger.error(f"Error generating schema documentation for {schema_path}: {e}")
return ""
def _find_injection_point(self, html_content: str, schema_type: str) -> Optional[int]:
"""
Find the appropriate injection point in FHIR IG generated HTML content.
For StructureDefinition pages: after "Formal Views of Profile Content"
For ValueSet pages: after "Expansion" section (last IG publisher content)
Args:
html_content: The HTML content to search
schema_type: Type of schema ('valueset', 'logical_model', etc.)
Returns:
Index position for injection, or None if not found
"""
try:
# For logical models (StructureDefinition pages), look for "Formal Views" section
if schema_type == 'logical_model':
# Look for the end of the "Formal Views of Profile Content" section
formal_views_patterns = [
r'
]*>Formal Views of Profile Content
.*?\s*',
r'
]*>Formal Views of Profile Content
.*?\s*',
r'
]*>Formal Views
.*?\s*',
r'
]*>Formal Views
.*?\s*'
]
for pattern in formal_views_patterns:
import re
match = re.search(pattern, html_content, re.DOTALL | re.IGNORECASE)
if match:
self.logger.info(f"Found 'Formal Views' section for injection point")
return match.end()
# For ValueSet pages, look for "Expansion" section
elif schema_type == 'valueset':
# Look for the end of the "Expansion" section
expansion_patterns = [
r'
]*>Expansion
.*?\s*',
r'
]*>Expansion
.*?\s*',
r'
]*>Expansion
.*?\s*'
]
for pattern in expansion_patterns:
import re
match = re.search(pattern, html_content, re.DOTALL | re.IGNORECASE)
if match:
self.logger.info(f"Found 'Expansion' section for injection point")
return match.end()
# Fallback: look for common FHIR IG content structures
fallback_patterns = [
# Look for the end of main content div
r'
]*class="[^"]*col-12[^"]*"[^>]*>.*?
\s*(?=
]*class="[^"]*col-12[^"]*"|$)',
# Look for navigation or footer sections to inject before
r'(?=