''' Benjamin Langley Usage: python3 R4CapStatement_Maker.py [xlsx file] Dependecies: fhirclient pandas xlrd stringcase To install all dependencies: pip3 install -r requirements.txt NOTE: this requires the r4models to be installed in the fhirclient pip site-package Modified from: https://github.com/Healthedata1/MyNotebooks/blob/master/CapStatement/R4CapStatement_Maker.ipynb ''' import sys import fhirclient.r4models.capabilitystatement as CS import fhirclient.r4models.codeableconcept as CC import fhirclient.r4models.fhirdate as D import fhirclient.r4models.extension as X import fhirclient.r4models.contactdetail as CD import fhirclient.r4models.narrative as N import fhirclient.r4models.bundle as B import re import tarfile # import fhirclient.r4models.narrative as N from json import dumps, loads from requests import post from pathlib import Path from collections import namedtuple from pandas import * from datetime import datetime, date from stringcase import snakecase, titlecase from jinja2 import Environment, FileSystemLoader, select_autoescape from commonmark import commonmark from lxml import etree # GLOBALS fhir_base_url = 'http://hl7.org/fhir/' f_jurisdiction = CC.CodeableConcept({ "coding": [ { "system": "urn:iso:std:iso:3166", "code": "US" } ] }) conf_url = 'http://hl7.org/fhir/StructureDefinition/capabilitystatement-expectation' combo_url = 'http://hl7.org/fhir/StructureDefinition/capabilitystatement-search-parameter-combination' # dict to for SP to get right canonicals, may use spreadsheet or package file in future. sp_specials = { 'us-core-includeprovenance': 'http://hl7.org/fhir/us/core/SearchParameter/us-core-includeprovenance'} none_list = ['', ' ', 'none', 'n/a', 'N/A', 'N', 'False'] sep_list = (',', ';', ' ', ', ', '; ') f_now = D.FHIRDate(str(date.today())) def markdown(text, *args, **kwargs): return commonmark(text, *args, **kwargs) def main(): if (len(sys.argv) < 2): print( "Error: missing xlsx file - correct usage is:\n\tpython3 R4CapStatement_Maker.py [xlsx file]") return xls = sys.argv[1] print('....creating CapabilityStatement.....') # Read the config sheet from the spreadsheet # use the index_col = 0 for setting the first row as the index config_df = read_excel(xls, 'config', na_filter=False, index_col=0) # --------- ig specific variable ------------------- pre = config_df.Value.pre # for Titles - not sure this is actually used canon = config_df.Value.canon # don't forget the slash - fix using os.join or path publisher = config_df.Value.publisher restinteraction = config_df.Value.rest publisher_endpoint = dict( system=config_df.Value.publishersystem, value=config_df.Value.publishervalue, ) definitions_file = config_df.Value.definitions_file #source of spec.internal file manually extracted from downloaded spec # Read the meta sheet from the spreadsheet meta_df = read_excel(xls, 'meta', na_filter=False) meta_dict = dict(zip(meta_df.Element, meta_df.Value)) meta = namedtuple("Meta", meta_dict.keys())(*meta_dict.values()) # Create the CapabilityStatement cs = create_capabilitystatement( meta, canon, publisher, publisher_endpoint, xls) rest = CS.CapabilityStatementRest(dict( mode=meta.mode, documentation=meta.documentation, security=dict( description=meta.security ) if meta.security else None, interaction=get_rest_ints(xls) if restinteraction else None, operation=get_sys_op(xls) )) cs.rest = [rest] df_profiles = read_excel(xls, 'profiles', na_filter=False) df_profiles = df_profiles[df_profiles.Profile.str[0] != '!'] resources_df = read_excel(xls, 'resources', na_filter=False) resources_df = resources_df[resources_df.type.str[0] != '!'] df_i = read_excel(xls, 'interactions', na_filter=False) df_sp = read_excel(xls, 'sps', na_filter=False) df_combos = read_excel(xls, 'sp_combos', na_filter=False) df_op = read_excel(xls, 'ops', na_filter=False) rest.resource = [] for r in resources_df.itertuples(index=True): supported_profile = [p.Profile for p in df_profiles.itertuples( index=True) if p.Type == r.type] res = CS.CapabilityStatementRestResource( dict( type=r.type, documentation=r.documentation if r.documentation not in none_list else None, versioning=r.versioning if r.versioning not in none_list else None, readHistory=r.readHistory if r.readHistory not in none_list else None, updateCreate=r.updateCreate if r.updateCreate not in none_list else None, conditionalCreate=r.conditionalCreate if r.conditionalCreate not in none_list else None, conditionalRead=r.conditionalRead if r.conditionalRead not in none_list else None, conditionalUpdate=r.conditionalUpdate if r.conditionalUpdate not in none_list else None, conditionalDelete=r.conditionalDelete if r.conditionalDelete not in none_list else None, referencePolicy=[re.sub('\s+','',x) for x in r.referencePolicy.split(",") if x], searchInclude=[re.sub('\s+','',x) for x in r.shall_include.split( ",") + r.should_include.split(",") if x], searchRevInclude=[re.sub('\s+','',x) for x in r.shall_revinclude.split( ",") + r.should_revinclude.split(",") if x], interaction=get_i(r.type, df_i), searchParam=get_sp(r.type, df_sp, pre, canon), operation=get_op(r.type, df_op), supportedProfile=supported_profile ) ) res.extension = get_conf(r.conformance) combos = {(i.combo, i.combo_conf) for i in df_combos.itertuples(index=True) if i.base == r.type} # convert list to lst of combo extension res.extension = res.extension + get_combo_ext(r.type, combos) rest.resource.append(res) rest.resource = sorted( rest.resource, key=lambda x: x.type) # sort resources cs.rest = [rest] # add in conformance expectations for primitives # convert to dict since model can't handle primitive extensions resttype_dict = res.as_json() for i in ['Include', 'RevInclude']: element = f'_search{i}' resttype_dict[element] = [] for expectation in ['should', 'shall']: # list all should includes first sp_attr = f'{expectation}_{i.lower()}' includes = getattr(r, sp_attr).split(',') for include in includes: if include not in none_list: conf = get_conf(expectation.upper(), as_dict=True) resttype_dict[element].append(conf) if not resttype_dict[element]: del(resttype_dict[element]) print(resttype_dict) print(dumps(cs.as_json(), indent=3)) # %% [markdown] print('.............validating..............') r = validate(cs) if (r.status_code != 200): print("Error: Unable to validate - status code {}".format(r.status_code)) path = Path.cwd() / 'validation.html' path.write_text( f'

Validation output

Status Code = {r.status_code}

{r.json()["text"]["div"]}') print(f"HTML webpage of validation saved to:\n\t {path}") # get from package (json) file in local .fhir directory si = get_si2(definitions_file) path_map = si['paths'] path_map in_path = '' in_file = 'R4capabilitystatement-server.j2' env = Environment( loader=FileSystemLoader(searchpath=in_path), autoescape=select_autoescape(['html', 'xml', 'xhtml', 'j2', 'md']) ) env.filters['markdown'] = markdown template = env.get_template(in_file) sp_map = {sp.code: sp.type for sp in df_sp.itertuples(index=True)} pname_map = {p.Profile: p.Name for p in df_profiles.itertuples(index=True)} print(pname_map) rendered = template.render(cs=cs, path_map=path_map, pname_map=pname_map, sp_map=sp_map) # print(HTML(rendered)) parser = etree.XMLParser(remove_blank_text=True) root = etree.fromstring(rendered, parser=parser) div = (etree.tostring(root[1][0], encoding='unicode', method='html')) narr = N.Narrative() narr.status = 'generated' narr.div = div cs.text = narr # save to file print('...........saving to file............') # path = Path.cwd() / f'capabilitystatement-{cs.id.lower()}.json' path = Path.cwd() / f'capabilitystatement-{meta.title.lower()}.json' path.write_text(dumps(cs.as_json(), indent=4)) print(f"CapabilityStatement saved to:\n\t {path}") def get_conf(conf='MAY', as_dict=False): if as_dict: return [X.Extension(dict( url=conf_url, valueCode=conf )).as_json()] else: return [X.Extension(dict( url=conf_url, valueCode=conf ))] def get_addin_ext(py_ext, json_ext): if json_ext: # ie not '' addin_ext = X.Extension(loads(json_ext)) py_ext.append(addin_ext) # add in other extensions return py_ext def validate(r): #fhir_test_server = 'http://test.fhir.org/r4' #fhir_test_server = 'http://hapi.fhir.org/baseR4' fhir_test_server = 'http://wildfhir4.aegis.net/fhir4-0-1' headers = { 'Accept': 'application/fhir+json', 'Content-Type': 'application/fhir+json' } # profile = 'http://hl7.org/fhir/us/core/StructureDefinition/us-core-patient' # The official URL for this profile is: http://hl7.org/fhir/us/core/StructureDefinition/us-core-patient params = dict( # profile = 'http://hl7.org/fhir/us/core/StructureDefinition/us-core-patient' # The official URL for this profile is: http://hl7.org/fhir/us/core/StructureDefinition/us-core-patient ) r = post(f'{fhir_test_server}/CapabilityStatement/$validate', params=params, headers=headers, data=dumps(r.as_json())) return r def get_sys_op(xls): op_list = [] df_op = read_excel(xls, 'ops', na_filter=False) for i in df_op.itertuples(index=True): if i.type == 'system': op = CS.CapabilityStatementRestResourceOperation() op.name = i.name op.definition = i.definition op.extension = get_conf(i.conf) op_list.append(op.as_json()) return op_list def get_rest_ints(xls): ri_list = [] df_ri = read_excel(xls, 'rest_interactions', na_filter=False) for i in df_ri.itertuples(index=True): ri = CS.CapabilityStatementRestInteraction() ri.code = i.code ri.documentation = i.doc if i.doc not in none_list else None ri.extension = get_conf(i.conf) ri_list.append(ri.as_json()) return ri_list def get_igs(xls): ig_list = [] df_igs = read_excel(xls, 'igs', na_filter=False) for ig in df_igs.itertuples(index=True): ig_list.append(ig.uri) return ig_list # TODO add conformance to this def kebab_to_pascal(word): return ''.join(x.capitalize() for x in word.split('-')) def create_capabilitystatement(meta, canon, publisher, publisher_endpoint, xls): cs = CS.CapabilityStatement() # cs.id = meta.id cs.id = meta.title.lower() cs.url = f'{canon}CapabilityStatement/{meta.title.lower()}' cs.version = meta.version cs.name = f'{kebab_to_pascal(meta.id)}{cs.resource_type}' # cs.title = f'{titlecase(meta.id).replace("Us ", "US ")} {cs.resource_type}' cs.title = f'{meta.title} {cs.resource_type}' cs.status = 'active' cs.experimental = False cs.date = f_now # as FHIRDate cs.publisher = publisher cs.contact = [CD.ContactDetail({"telecom": [publisher_endpoint]})] cs.description = meta.description cs.jurisdiction = [f_jurisdiction] cs.kind = 'requirements' cs.fhirVersion = meta.fhirVersion cs.acceptUnknown = 'both' cs.format = [ "xml", "json" ] cs.patchFormat = [ "application/json-patch+json", ] cs.implementationGuide = meta.ig.split(",") + get_igs(xls) return cs def get_i(type, df_i): int_list = [] for i in df_i.itertuples(index=True): if getattr(i, f'conf_{type}') not in none_list: int = CS.CapabilityStatementRestResourceInteraction() int.code = i.code try: int.documentation = getattr(i, f'doc_{type}') if getattr( i, f'doc_{type}') not in none_list else None except AttributeError: pass int.extension = get_conf(getattr(i, f'conf_{type}')) int_list.append(int.as_json()) return int_list def get_sp(r_type, df_sp, pre, canon): sp_list = [] for i in df_sp.itertuples(index=True): if i.base == r_type: sp = CS.CapabilityStatementRestResourceSearchParam() sp.name = i.code # TODO need to fix this to reference the package file to reconcile definition to names #if i.code in sp_specials: # special case temp fix for us-core # sp.definition = sp_specials[i.code] #el if i.update == 'Y' or i.exists == 'N': # sp.definition = f'{canon}SearchParameter/{pre.lower()}-{i.base.lower()}-{i.code.split("_")[-1]}' sp.definition = f'{canon}SearchParameter/{i.base.lower()}-{i.code.split("_")[-1]}' else: # use base definition # removes the '_' for things like _id #sp.definition = f'{fhir_base_url}SearchParameter/{i.base.lower()}-{i.code.split("_")[-1]}' sp.definition = f'{fhir_base_url}SearchParameter/Resource-{i.code.split("_")[-1]}' sp.type = i.type sp.extension = get_conf(i.base_conf) sp_list.append(sp.as_json()) return sp_list def get_combo_ext(r_type, combos): x_list = [] for combo in combos: # convert to extension combo_ext = X.Extension() combo_ext.url = combo_url combo_conf_ext = get_conf(combo[1]) combo_ext.extension = combo_conf_ext for param in combo[0].split(','): req_combo = X.Extension( dict( url='required', valueString=param # http://hl7.org/fhir/us/core/SearchParameter/us-core-patient-family ) ) combo_ext.extension.append(req_combo) x_list.append(combo_ext) return x_list def get_op(r_type, df_op): op_list = [] for i in df_op.itertuples(index=True): if i.type == r_type: op = CS.CapabilityStatementRestResourceOperation() op.name = i.name op.definition = i.definition op.documentation = i.documentation if i.documentation not in none_list else None op.extension = get_conf(i.conf) try: op.extension = get_addin_ext(op.extension, i.ext) except AttributeError: print("---- no addin extensions found-----") op_list.append(op.as_json()) return op_list def markdown(text, *args, **kwargs): return commonmark(text, *args, **kwargs) def get_si(path): with tarfile.open(f'{path}/package.tgz', mode='r') as tf: #pprint(tf.getnames()) f = tf.extractfile('other/spec.internals') r = f.read() return(loads(r)) def get_si2(path): with open(f'{path}', 'r', encoding='utf-8-sig') as f: r = f.read() return(loads(r, encoding = 'utf-8')) main()