#!/usr/bin/env python3 """ FHIR Logical Model JSON Schema Generator This script processes JSON StructureDefinition files generated by the FHIR IG Publisher for FHIR Logical Models and generates JSON schemas for each Logical Model. It handles FHIR-specific features including ValueSet bindings and references to ValueSet schemas generated by the ValueSet schema generator. The script is intended to be run after the IG publisher runs and processes the JSON StructureDefinition files from the output directory to create schemas that can be used for validation of data against the Logical Models. Usage: python generate_logical_model_schemas.py [output_dir] [schema_output_dir] Author: SMART Guidelines Team """ import json import os import sys import logging import re from typing import Dict, List, Optional, Any, Tuple from pathlib import Path from datetime import datetime def setup_logging() -> logging.Logger: """Configure logging for the script.""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) return logging.getLogger(__name__) class QAReporter: """Handles QA reporting for Logical Model schema generation.""" def __init__(self, component: str = "logical_model_schemas"): self.component = component self.timestamp = datetime.now().isoformat() self.report = { "component": component, "timestamp": self.timestamp, "status": "running", "summary": {}, "details": { "successes": [], "warnings": [], "errors": [], "files_processed": [], "files_expected": [], "files_missing": [], "schemas_generated": [] } } def add_success(self, message: str, details: Optional[Dict] = None): """Add a success entry to the QA report.""" entry = {"message": message, "timestamp": datetime.now().isoformat()} if details: entry["details"] = details self.report["details"]["successes"].append(entry) def add_warning(self, message: str, details: Optional[Dict] = None): """Add a warning entry to the QA report.""" entry = {"message": message, "timestamp": datetime.now().isoformat()} if details: entry["details"] = details self.report["details"]["warnings"].append(entry) def add_error(self, message: str, details: Optional[Dict] = None): """Add an error entry to the QA report.""" entry = {"message": message, "timestamp": datetime.now().isoformat()} if details: entry["details"] = details self.report["details"]["errors"].append(entry) def add_file_processed(self, file_path: str, status: str = "success", details: Optional[Dict] = None): """Record a file that was processed.""" entry = { "file": file_path, "status": status, "timestamp": datetime.now().isoformat() } if details: entry["details"] = details self.report["details"]["files_processed"].append(entry) def add_file_expected(self, file_path: str, found: bool = False): """Record a file that was expected.""" self.report["details"]["files_expected"].append(file_path) if not found: self.report["details"]["files_missing"].append(file_path) def add_schema_generated(self, schema_info: Dict): """Record a schema that was generated.""" schema_info["timestamp"] = datetime.now().isoformat() self.report["details"]["schemas_generated"].append(schema_info) def finalize_report(self, status: str = "completed"): """Finalize the QA report with summary statistics.""" self.report["status"] = status self.report["summary"] = { "total_successes": len(self.report["details"]["successes"]), "total_warnings": len(self.report["details"]["warnings"]), "total_errors": len(self.report["details"]["errors"]), "files_processed_count": len(self.report["details"]["files_processed"]), "files_expected_count": len(self.report["details"]["files_expected"]), "files_missing_count": len(self.report["details"]["files_missing"]), "schemas_generated_count": len(self.report["details"]["schemas_generated"]), "completion_timestamp": datetime.now().isoformat() } return self.report def save_report(self, output_path: str, backup_path: str = None): """Save QA report to protected location and backup.""" report = self.finalize_report() try: # Save to primary protected location protected_dir = os.path.dirname(output_path) if protected_dir: Path(protected_dir).mkdir(parents=True, exist_ok=True) with open(output_path, 'w', encoding='utf-8') as f: json.dump(report, f, indent=2, ensure_ascii=False) print(f"QA report saved to protected location: {output_path}") # Save backup if specified if backup_path: backup_dir = os.path.dirname(backup_path) if backup_dir: Path(backup_dir).mkdir(parents=True, exist_ok=True) with open(backup_path, 'w', encoding='utf-8') as f: json.dump(report, f, indent=2, ensure_ascii=False) print(f"QA report backup saved to: {backup_path}") except Exception as e: print(f"Error saving QA report: {e}") # Fallback to temp if main save fails if backup_path and backup_path != output_path: try: with open(backup_path, 'w', encoding='utf-8') as f: json.dump(report, f, indent=2, ensure_ascii=False) print(f"QA report saved to fallback location: {backup_path}") except Exception as e2: print(f"Error saving QA report to fallback: {e2}") return report class StructureDefinitionParser: """Parser for JSON StructureDefinition files to extract Logical Model definitions.""" def __init__(self, logger: logging.Logger): self.logger = logger def find_structure_definition_files(self, directory: str) -> List[str]: """Find all StructureDefinition JSON files in the given directory.""" json_files = [] for root, dirs, files in os.walk(directory): for file in files: if file.startswith('StructureDefinition-') and file.endswith('.json'): json_files.append(os.path.join(root, file)) return json_files def parse_logical_models(self, json_files: List[str]) -> List[Dict[str, Any]]: """Parse logical models from StructureDefinition JSON files.""" logical_models = [] for json_file in json_files: model = self.extract_logical_model_from_file(json_file) if model: logical_models.append(model) return logical_models def extract_logical_model_from_file(self, file_path: str) -> Optional[Dict[str, Any]]: """Extract logical model from a single StructureDefinition JSON file.""" try: with open(file_path, 'r', encoding='utf-8') as f: structure_def = json.load(f) except Exception as e: self.logger.error(f"Error reading file {file_path}: {e}") return None # Check if this is a logical model if structure_def.get('kind') != 'logical': return None self.logger.info(f"Found logical model: {structure_def.get('name', 'Unknown')} in {file_path}") model = { 'name': structure_def.get('name', ''), 'id': structure_def.get('id', ''), 'title': structure_def.get('title', ''), 'description': structure_def.get('description', ''), 'url': structure_def.get('url', ''), 'parent': structure_def.get('baseDefinition', ''), 'elements': [], 'file_path': file_path } # Extract elements from snapshot or differential elements = [] if 'snapshot' in structure_def and 'element' in structure_def['snapshot']: elements = structure_def['snapshot']['element'] elif 'differential' in structure_def and 'element' in structure_def['differential']: elements = structure_def['differential']['element'] # Process elements for element in elements: parsed_element = self.parse_element(element, model['name']) if parsed_element: model['elements'].append(parsed_element) return model def parse_element(self, element: Dict[str, Any], model_name: str) -> Optional[Dict[str, Any]]: """Parse an element from StructureDefinition.""" path = element.get('path', '') # Skip the root element (same as model name) if path == model_name: return None # Extract element name from path if '.' in path: element_name = path.split('.')[-1] else: element_name = path # Skip extension elements for now if element_name.startswith('extension'): return None parsed_element = { 'name': element_name, 'path': path, 'cardinality': f"{element.get('min', 0)}..{element.get('max', '*')}", 'type': '', 'valueset': '', 'short': element.get('short', ''), 'definition': element.get('definition', '') } # Extract type information if 'type' in element and element['type']: type_info = element['type'][0] # Take first type parsed_element['type'] = type_info.get('code', '') # Check for choice types if len(element['type']) > 1: parsed_element['choice'] = True type_codes = [t.get('code', '') for t in element['type']] parsed_element['type'] = ' or '.join(type_codes) else: parsed_element['choice'] = False # Extract ValueSet binding if 'binding' in element: binding = element['binding'] if 'valueSet' in binding: valueset_url = binding['valueSet'] # Extract ValueSet name from URL if '/' in valueset_url: valueset_name = valueset_url.split('/')[-1] parsed_element['valueset'] = valueset_name return parsed_element class SchemaGenerator: """Generates JSON schemas from parsed logical models.""" def __init__(self, logger: logging.Logger, canonical_base: str = "http://smart.who.int/base"): self.logger = logger self.canonical_base = canonical_base # Determine the actual base URL for schema files - use GitHub Pages URL if available # Check for GitHub environment variables to determine the correct URL import os github_repository = os.environ.get('GITHUB_REPOSITORY', '').lower() github_ref_name = os.environ.get('GITHUB_REF_NAME', '') is_default_branch = os.environ.get('IS_DEFAULT_BRANCH', 'false').lower() == 'true' if github_repository == 'worldhealthorganization/smart-base': if is_default_branch: # Main branch deploys to root self.schema_base_url = "https://worldhealthorganization.github.io/smart-base" elif github_ref_name: # Other branches deploy to branches subdirectory self.schema_base_url = f"https://worldhealthorganization.github.io/smart-base/branches/{github_ref_name}" else: # Fallback to canonical base self.schema_base_url = canonical_base else: # For other repositories or local development, use canonical base self.schema_base_url = canonical_base # FHIR datatype to JSON Schema type mapping self.type_mapping = { 'string': {'type': 'string'}, 'boolean': {'type': 'boolean'}, 'integer': {'type': 'integer'}, 'decimal': {'type': 'number'}, 'date': {'type': 'string', 'format': 'date'}, 'dateTime': {'type': 'string', 'format': 'date-time'}, 'time': {'type': 'string', 'format': 'time'}, 'instant': {'type': 'string', 'format': 'date-time'}, 'uri': {'type': 'string', 'format': 'uri'}, 'url': {'type': 'string', 'format': 'uri'}, 'canonical': {'type': 'string', 'format': 'uri'}, 'oid': {'type': 'string'}, 'id': {'type': 'string'}, 'code': {'type': 'string'}, 'uuid': {'type': 'string', 'format': 'uuid'}, 'base64Binary': {'type': 'string'}, 'markdown': {'type': 'string'}, 'unsignedInt': {'type': 'integer', 'minimum': 0}, 'positiveInt': {'type': 'integer', 'minimum': 1}, # Complex types 'Coding': {'type': 'object'}, 'CodeableConcept': {'type': 'object'}, 'Identifier': {'type': 'object'}, 'Quantity': {'type': 'object'}, 'Range': {'type': 'object'}, 'Period': {'type': 'object'}, 'Attachment': {'type': 'object'}, 'ContactPoint': {'type': 'object'}, 'HumanName': {'type': 'object'}, 'Address': {'type': 'object'}, } def generate_schema(self, logical_model: Dict[str, Any]) -> Dict[str, Any]: """Generate JSON schema for a logical model.""" # Use the URL from the StructureDefinition if available, otherwise construct one model_url = logical_model.get('url', '') model_name = logical_model['name'] if model_url: # Extract base URL from canonical URL and use StructureDefinition-{name} pattern # e.g., http://smart.who.int/base/StructureDefinition/Animal -> http://smart.who.int/base/StructureDefinition-Animal.schema.json if '/StructureDefinition/' in model_url: # Use the schema base URL (GitHub Pages) instead of the canonical URL for accessibility schema_id = f"{self.schema_base_url}/StructureDefinition-{model_name}.schema.json" else: # Fallback if URL doesn't follow expected pattern schema_id = f"{self.schema_base_url}/StructureDefinition-{model_name}.schema.json" else: schema_id = f"{self.schema_base_url}/StructureDefinition-{model_name}.schema.json" # Collect ValueSets used in this logical model for JSON-LD context valuesets_used = set() for element in logical_model['elements']: if element.get('valueset'): valuesets_used.add(element['valueset']) schema = { "$schema": "https://json-schema.org/draft/2020-12/schema", "$id": schema_id, "title": logical_model.get('title', logical_model['name']), "description": logical_model.get('description', f"JSON Schema for {logical_model['name']} Logical Model. Generated from StructureDefinition. Supports both FHIR and JSON-LD representations."), "type": "object", "properties": { "resourceType": { "type": "string", "const": logical_model['id'], "description": f"Resource type identifier for {logical_model['name']} logical model" } }, "required": ["resourceType"] } # Add JSON-LD context support if ValueSets are used if valuesets_used: jsonld_context = { "@version": 1.1, "fhir": "http://hl7.org/fhir/" } # Add ValueSet context entries for vs in valuesets_used: jsonld_context[vs] = f"{self.canonical_base}/ValueSet-{vs}.jsonld" # Add JSON-LD context properties to schema schema["properties"]["@context"] = { "description": "JSON-LD context for this logical model with ValueSet vocabularies", "anyOf": [ { "type": "string", "format": "uri", "description": "URI reference to external JSON-LD context" }, { "type": "object", "description": "Inline JSON-LD context", "properties": { "@version": {"type": "number", "const": 1.1}, "fhir": {"type": "string", "const": "http://hl7.org/fhir/"} }, "additionalProperties": { "type": "string", "format": "uri" } }, { "type": "array", "description": "Array of JSON-LD context objects/URIs", "items": { "anyOf": [ {"type": "string", "format": "uri"}, {"type": "object"} ] } } ] } schema["properties"]["@type"] = { "type": "string", "description": f"JSON-LD type identifier for {logical_model['name']} logical model", "examples": [f"LogicalModel-{model_name}"] } # Add metadata including canonical URI using resourceDefinition schema["resourceDefinition"] = model_url if model_url else f"{self.canonical_base}/StructureDefinition/{model_name}" if logical_model.get('parent'): schema["fhir:parent"] = logical_model['parent'] # Add JSON-LD support metadata if valuesets_used: schema["jsonld:valuesets"] = list(valuesets_used) schema["jsonld:contextTemplate"] = jsonld_context # Process elements for element in logical_model['elements']: self.add_element_to_schema(schema, element) return schema def add_element_to_schema(self, schema: Dict[str, Any], element: Dict[str, Any]): """Add an element to the JSON schema.""" element_name = element['name'] cardinality = element['cardinality'] element_type = element['type'] valueset = element['valueset'] # Determine if element is required if cardinality and cardinality.startswith('1'): schema['required'].append(element_name) # Handle choice types if element.get('choice', False): # For choice types, we'll create a more generic schema element_schema = {"oneOf": []} # Parse "or" separated types if present if ' or ' in element_type: types = [t.strip() for t in element_type.split(' or ')] for type_option in types: type_schema = self.get_type_schema(type_option, valueset) if type_schema: element_schema["oneOf"].append(type_schema) else: type_schema = self.get_type_schema(element_type, valueset) if type_schema: element_schema = type_schema # If we only have one type, simplify if len(element_schema.get("oneOf", [])) == 1: element_schema = element_schema["oneOf"][0] else: # Regular element element_schema = self.get_type_schema(element_type, valueset) # Handle cardinality for arrays if cardinality and ('*' in cardinality or cardinality.endswith('..n')): element_schema = { "type": "array", "items": element_schema } # Set minimum items based on cardinality if cardinality.startswith('1'): element_schema["minItems"] = 1 # Add description if available if element.get('definition'): element_schema["description"] = element['definition'] elif element.get('short'): element_schema["description"] = element['short'] schema['properties'][element_name] = element_schema def get_type_schema(self, fhir_type: str, valueset: str = '') -> Dict[str, Any]: """Get JSON schema for a FHIR type.""" # Handle Reference types if fhir_type.startswith('Reference('): return { "type": "object", "description": f"Reference to {fhir_type}" } # Handle StructureDefinition URLs - these should reference other logical model schemas if fhir_type.startswith('http') and '/StructureDefinition/' in fhir_type: # Extract model name and create reference using StructureDefinition-{name} pattern model_name = fhir_type.split('/StructureDefinition/')[-1] return { "$ref": f"{self.schema_base_url}/StructureDefinition-{model_name}.schema.json" } # Handle ValueSet bindings with JSON-LD support if valueset: # Construct the ValueSet JSON-LD IRI valueset_jsonld_iri = f"{self.canonical_base}/ValueSet-{valueset}.jsonld" if fhir_type == 'code': # For code fields, support both plain string and JSON-LD structure return { "oneOf": [ { "type": "string", "description": f"Code from ValueSet {valueset} (plain string)" }, { "type": "object", "description": f"Code from ValueSet {valueset} (JSON-LD structure)", "properties": { "@type": { "type": "string", "const": valueset_jsonld_iri, "description": f"JSON-LD type reference to ValueSet {valueset}" }, "@id": { "type": "string", "format": "uri", "description": f"Full IRI of the code from ValueSet {valueset}" } }, "required": ["@type", "@id"], "additionalProperties": False } ], "description": f"Code from ValueSet {valueset}. Can be a plain string or JSON-LD structure with @type and @id." } elif fhir_type in ['Coding', 'CodeableConcept']: # For Coding/CodeableConcept types, support both FHIR structure and JSON-LD structure return { "oneOf": [ { "type": "object", "description": f"FHIR {fhir_type} from ValueSet {valueset}", "properties": { "system": {"type": "string", "description": "Code system URI"}, "code": {"type": "string", "description": "Code value"}, "display": {"type": "string", "description": "Human readable display text"} }, "required": ["system", "code"] }, { "type": "object", "description": f"{fhir_type} from ValueSet {valueset} (JSON-LD structure)", "properties": { "@type": { "type": "string", "const": valueset_jsonld_iri, "description": f"JSON-LD type reference to ValueSet {valueset}" }, "@id": { "type": "string", "format": "uri", "description": f"Full IRI of the code from ValueSet {valueset}" } }, "required": ["@type", "@id"], "additionalProperties": False } ], "description": f"{fhir_type} from ValueSet {valueset}. Supports both FHIR Coding structure and JSON-LD structure." } # Use type mapping if fhir_type in self.type_mapping: return self.type_mapping[fhir_type].copy() # Default for unknown types return { "type": "object", "description": f"FHIR {fhir_type}" } def save_schema(self, schema: Dict[str, Any], output_dir: str, model_name: str) -> Optional[str]: """Save a JSON schema to a file.""" try: # Ensure output directory exists Path(output_dir).mkdir(parents=True, exist_ok=True) # Create filename with StructureDefinition- prefix to match FHIR canonicals filename = f"StructureDefinition-{model_name}.schema.json" filepath = os.path.join(output_dir, filename) # Save schema with open(filepath, 'w', encoding='utf-8') as f: json.dump(schema, f, indent=2, ensure_ascii=False) self.logger.info(f"Saved schema for Logical Model {model_name} to {filepath}") return filepath except Exception as e: self.logger.error(f"Error saving schema for Logical Model {model_name}: {e}") return None def process_logical_models(structure_definition_dir: str, output_dir: str, qa_reporter: QAReporter) -> int: """Process StructureDefinition JSON files and generate JSON schemas for logical models.""" logger = logging.getLogger(__name__) try: # Initialize parser and generator parser = StructureDefinitionParser(logger) generator = SchemaGenerator(logger) # Find StructureDefinition JSON files try: json_files = parser.find_structure_definition_files(structure_definition_dir) qa_reporter.add_success(f"Found {len(json_files)} StructureDefinition files", { "directory": structure_definition_dir, "file_count": len(json_files) }) logger.info(f"Found {len(json_files)} StructureDefinition files to process") except Exception as e: qa_reporter.add_error(f"Error finding StructureDefinition files: {e}", { "directory": structure_definition_dir, "exception": str(e) }) return 0 # Record expected files for file_path in json_files: qa_reporter.add_file_expected(file_path, found=True) # Parse logical models try: logical_models = parser.parse_logical_models(json_files) qa_reporter.add_success(f"Found {len(logical_models)} logical models", { "models_found": len(logical_models), "model_names": [model['name'] for model in logical_models] }) logger.info(f"Found {len(logical_models)} logical models") except Exception as e: qa_reporter.add_error(f"Error parsing logical models: {e}", { "exception": str(e) }) return 0 # Generate schemas schemas_generated = 0 schema_files = [] for model in logical_models: model_name = model['name'] logger.info(f"Generating schema for logical model: {model_name}") try: # Generate schema schema = generator.generate_schema(model) qa_reporter.add_success(f"Generated schema for model {model_name}") # Save schema schema_path = generator.save_schema(schema, output_dir, model_name) if schema_path: schemas_generated += 1 schema_files.append(schema_path) qa_reporter.add_file_processed(schema_path, "success", { "model_name": model_name, "schema_size": len(json.dumps(schema)) }) qa_reporter.add_schema_generated({ "model_name": model_name, "schema_file": schema_path, "properties_count": len(schema.get("properties", {})), "required_fields": schema.get("required", []) }) else: qa_reporter.add_error(f"Failed to save schema for model {model_name}") except Exception as e: qa_reporter.add_error(f"Error processing logical model {model_name}: {e}", { "model_name": model_name, "exception": str(e) }) continue qa_reporter.add_success(f"Generated {schemas_generated} Logical Model schemas", { "schemas_generated": schemas_generated, "schema_files": schema_files }) logger.info(f"Generated {schemas_generated} Logical Model schemas") return schemas_generated except Exception as e: qa_reporter.add_error(f"Unexpected error in process_logical_models: {e}", { "exception": str(e) }) return 0 def main(): """Main entry point for the script.""" logger = setup_logging() logger.info("Starting 04_generate_logical_model_schemas.py") # Initialize QA reporter qa_reporter = QAReporter("logical_model_schemas") try: # Parse command line arguments # When run from template: first arg is ig_root directory # When run standalone: first arg is structure_definition_dir, second is output_dir if len(sys.argv) > 2: # Two arguments: standalone mode structure_definition_dir = sys.argv[1] output_dir = sys.argv[2] elif len(sys.argv) > 1: # Single argument: treat as ig_root (template execution mode) ig_root = Path(sys.argv[1]) structure_definition_dir = str(ig_root / "output") output_dir = str(ig_root / "output") else: # No arguments: default paths structure_definition_dir = "output" output_dir = "output" logger.info(f"Processing StructureDefinition files from: {structure_definition_dir}") logger.info(f"Schema output directory: {output_dir}") qa_reporter.add_success("Script started", { "input_directory": structure_definition_dir, "output_directory": output_dir }) # Check if input directory exists if not os.path.exists(structure_definition_dir): error_msg = f"StructureDefinition directory does not exist: {structure_definition_dir}" logger.error(error_msg) qa_reporter.add_error(error_msg, { "directory": structure_definition_dir }) else: qa_reporter.add_success("Input directory found", { "directory": structure_definition_dir }) # Process logical models schemas_generated = process_logical_models(structure_definition_dir, output_dir, qa_reporter) if schemas_generated > 0: success_msg = f"Successfully generated {schemas_generated} logical model schemas" logger.info(success_msg) qa_reporter.add_success(success_msg, { "schemas_generated": schemas_generated }) else: warning_msg = "No logical model schemas were generated (no logical models found)" logger.info(warning_msg) qa_reporter.add_warning(warning_msg) except Exception as e: error_msg = f"Unexpected error in main: {e}" logger.error(error_msg) qa_reporter.add_error(error_msg, { "exception": str(e) }) finally: # Always save QA report regardless of success/failure try: # Save to protected location that won't be overwritten by IG publisher protected_path = "input/temp/qa_logical_model_schemas.json" backup_path = "/tmp/qa_logical_model_schemas.json" qa_reporter.save_report(protected_path, backup_path) except Exception as e: logger.error(f"Error saving QA report: {e}") # Exit with 0 to avoid failing the workflow sys.exit(0) if __name__ == "__main__": main()