diff --git a/pyxform/entities/entity_declaration.py b/pyxform/entities/entity_declaration.py index 49222f4f..3c9e240e 100644 --- a/pyxform/entities/entity_declaration.py +++ b/pyxform/entities/entity_declaration.py @@ -46,7 +46,7 @@ def __init__(self, name: str, type: str, parameters: dict, **kwargs): super().__init__(name=name, **kwargs) def xml_instance(self, **kwargs): - parameters = self.get(const.PARAMETERS, {}) + parameters = self.parameters attributes = { EC.DATASET.value: parameters.get(EC.DATASET, ""), @@ -75,7 +75,7 @@ def xml_bindings(self, survey: "Survey"): """ See the class comment for an explanation of the logic for generating bindings. """ - parameters = self.get(const.PARAMETERS, {}) + parameters = self.parameters entity_id_expression = parameters.get(EC.ENTITY_ID, None) create_condition = parameters.get(EC.CREATE_IF, None) update_condition = parameters.get(EC.UPDATE_IF, None) diff --git a/pyxform/parsing/expression.py b/pyxform/parsing/expression.py index 2c80b74f..d94c0c94 100644 --- a/pyxform/parsing/expression.py +++ b/pyxform/parsing/expression.py @@ -1,12 +1,8 @@ import re -from collections.abc import Iterable from functools import lru_cache -def get_expression_lexer(name_only: bool = False) -> re.Scanner: - """ - Get a expression lexer (scanner) for parsing. - """ +def get_lexer_rules(): # ncname regex adapted from eulxml https://github.com/emory-libraries/eulxml/blob/2e1a9f71ffd1fd455bd8326ec82125e333b352e0/eulxml/xpath/lexrules.py # (C) 2010,2011 Emory University Libraries [Apache v2.0 License] # They in turn adapted it from https://www.w3.org/TR/REC-xml/#NT-NameStartChar @@ -29,7 +25,7 @@ def get_expression_lexer(name_only: bool = False) -> re.Scanner: date_time_regex = date_regex + "T" + time_regex # Rule order is significant - match priority runs top to bottom. - lexer_rules = { + return { # https://www.w3.org/TR/xmlschema-2/#dateTime "DATETIME": date_time_regex, "DATE": date_regex, @@ -49,7 +45,7 @@ def get_expression_lexer(name_only: bool = False) -> re.Scanner: "SYSTEM_LITERAL": r""""[^"]*"|'[^']*'""", "COMMA": r",", "WHITESPACE": r"\s+", - "PYXFORM_REF": r"\$\{" + ncname_regex + r"(#" + ncname_regex + r")?" + r"\}", + "PYXFORM_REF": r"\$\{(last-saved#)?" + ncname_regex + r"\}", "FUNC_CALL": ncname_regex + r"\(", "XPATH_PRED_START": ncname_regex + r"\[", "XPATH_PRED_END": r"\]", @@ -60,15 +56,21 @@ def get_expression_lexer(name_only: bool = False) -> re.Scanner: "OTHER": r".+?", # Catch any other character so that parsing doesn't stop. } + +LEXER_RULES = get_lexer_rules() +RE_ONLY_NCNAME = re.compile(rf"""^{LEXER_RULES["NAME"]}$""") +RE_ONLY_PYXFORM_REF = re.compile(rf"""^{LEXER_RULES["PYXFORM_REF"]}$""") +RE_ANY_PYXFORM_REF = re.compile(LEXER_RULES["PYXFORM_REF"]) + + +def get_expression_lexer() -> re.Scanner: def get_tokenizer(name): def tokenizer(scan, value) -> ExpLexerToken | str: - if name_only: - return name return ExpLexerToken(name, value, scan.match.start(), scan.match.end()) return tokenizer - lexicon = [(v, get_tokenizer(k)) for k, v in lexer_rules.items()] + lexicon = [(v, get_tokenizer(k)) for k, v in LEXER_RULES.items()] # re.Scanner is undocumented but has been around since at least 2003 # https://mail.python.org/pipermail/python-dev/2003-April/035075.html return re.Scanner(lexicon) @@ -84,9 +86,8 @@ def __init__(self, name: str, value: str, start: int, end: int) -> None: self.end: int = end -# Scanner takes a few 100ms to compile so use these shared instances. +# Scanner takes a few 100ms to compile so use the shared instance. _EXPRESSION_LEXER = get_expression_lexer() -_TOKEN_NAME_LEXER = get_expression_lexer(name_only=True) @lru_cache(maxsize=128) @@ -103,32 +104,29 @@ def parse_expression(text: str) -> tuple[list[ExpLexerToken], str]: return tokens, remainder -def is_single_token_expression(expression: str, token_types: Iterable[str]) -> bool: - """ - Does the expression contain single token of one of the provided token types? - """ - if not expression: - return False - tokens, _ = _TOKEN_NAME_LEXER.scan(expression.strip()) - if 1 == len(tokens) and tokens[0] in token_types: - return True - else: - return False - - def is_pyxform_reference(value: str) -> bool: """ Does the input string contain only a valid Pyxform reference? e.g. ${my_question} """ - if not value or len(value) <= 3: # Needs 3 characters for "${}", plus a name inside. - return False - return is_single_token_expression(expression=value, token_types=("PYXFORM_REF",)) + # Needs 3 characters for "${}", plus a name inside. + return value and len(value) > 3 and bool(RE_ONLY_PYXFORM_REF.match(value)) def is_xml_tag(value: str) -> bool: """ Does the input string contain only a valid XML tag / element name? """ - if not value: - return False - return is_single_token_expression(expression=value, token_types=("NAME",)) + return value and bool(RE_ONLY_NCNAME.match(value)) + + +def has_last_saved(value: str) -> bool: + """ + Does the input string contain a valid '#last-saved' Pyxform reference? e.g. ${last-saved#my_question} + """ + # Needs 14 characters for "${last-saved#}", plus a name inside. + return ( + value + and len(value) > 14 + and "${last-saved#" in value + and RE_ANY_PYXFORM_REF.search(value) + ) diff --git a/pyxform/parsing/instance_expression.py b/pyxform/parsing/instance_expression.py index 7ab5fbb2..3c43d53e 100644 --- a/pyxform/parsing/instance_expression.py +++ b/pyxform/parsing/instance_expression.py @@ -21,11 +21,13 @@ def find_boundaries(xml_text: str) -> list[tuple[int, int]]: :param xml_text: XML text that may contain an instance expression. :return: Tokens in instance expression, and the string position boundaries. """ + tokens, _ = parse_expression(xml_text) + if not tokens: + return [] instance_enter = False path_enter = False pred_enter = False last_token = None - tokens, _ = parse_expression(xml_text) boundaries = [] for t in tokens: @@ -96,8 +98,11 @@ def replace_with_output(xml_text: str, context: "SurveyElement", survey: "Survey :param survey: The Survey that the context is in. :return: The possibly modified string. """ + # 9 = len("instance(") + if 9 >= len(xml_text): + return xml_text boundaries = find_boundaries(xml_text=xml_text) - if 0 < len(boundaries): + if boundaries: new_strings = [] for start, end in boundaries: old_str = xml_text[start:end] @@ -116,6 +121,6 @@ def replace_with_output(xml_text: str, context: "SurveyElement", survey: "Survey # expression positions due to incremental replacement. offset = 0 for s, e, o, n in new_strings: - xml_text = xml_text[: s + offset] + n + xml_text[e + offset :] + xml_text = f"{xml_text[: s + offset]}{n}{xml_text[e + offset :]}" offset += len(n) - len(o) return xml_text diff --git a/pyxform/question.py b/pyxform/question.py index 211626d8..4c7f2329 100644 --- a/pyxform/question.py +++ b/pyxform/question.py @@ -3,7 +3,7 @@ """ import os.path -from collections.abc import Iterable +from collections.abc import Callable, Generator, Iterable from itertools import chain from typing import TYPE_CHECKING @@ -21,6 +21,7 @@ from pyxform.utils import ( PYXFORM_REFERENCE_REGEX, DetachableElement, + coalesce, combine_lists, default_is_dynamic, node, @@ -45,7 +46,6 @@ "trigger", constants.BIND, constants.CHOICE_FILTER, - constants.COMPACT_TAG, # used for compact (sms) representation constants.CONTROL, constants.HINT, constants.MEDIA, @@ -106,7 +106,6 @@ def __init__(self, fields: tuple[str, ...] | None = None, **kwargs): self.trigger: str | None = None # SMS / compact settings - self.compact_tag: str | None = None self.sms_field: str | None = None qtd = kwargs.pop("question_type_dictionary", QUESTION_TYPE_DICT) @@ -149,27 +148,20 @@ def validate(self): raise PyXFormError(f"Unknown question type '{self.type}'.") def xml_instance(self, survey: "Survey", **kwargs): - attributes = self.get("instance") + attributes = self.instance if attributes is None: attributes = {} else: for key, value in attributes.items(): attributes[key] = survey.insert_xpaths(value, self) - if self.get("default") and not default_is_dynamic(self.default, self.type): - return node(self.name, str(self.get("default")), **attributes) + if self.default and not default_is_dynamic(self.default, self.type): + return node(self.name, str(self.default), **attributes) return node(self.name, **attributes) def xml_control(self, survey: "Survey"): if self.type == "calculate" or ( - ( - ( - hasattr(self, "bind") - and self.bind is not None - and "calculate" in self.bind - ) - or self.trigger - ) + (self.bind is not None and "calculate" in self.bind or self.trigger) and not (self.label or self.hint) ): nested_setvalues = survey.get_trigger_values_for_question_name( @@ -268,13 +260,13 @@ def build_xml(self, survey: "Survey"): result.appendChild(element) # Input types are used for selects with external choices sheets. - if self["query"]: - choice_filter = self.get(constants.CHOICE_FILTER) + if self.query: + choice_filter = self.choice_filter if choice_filter is not None: pred = survey.insert_xpaths(choice_filter, self, True) - query = f"""instance('{self["query"]}')/root/item[{pred}]""" + query = f"""instance('{self.query}')/root/item[{pred}]""" else: - query = f"""instance('{self["query"]}')/root/item""" + query = f"""instance('{self.query}')/root/item""" result.setAttribute("query", query) return result @@ -376,12 +368,14 @@ def __init__( # I'm going to try to stick to just choices. # Aliases in the json format will make it more difficult # to use going forward. - choices = combine_lists( - a=kwargs.pop(constants.CHOICES, None), b=kwargs.pop(constants.CHILDREN, None) - ) - if choices: + kw_choices = kwargs.pop(constants.CHOICES, None) + kw_children = kwargs.pop(constants.CHILDREN, None) + choices = coalesce(kw_choices, kw_children) + if isinstance(choices, tuple) and isinstance(next(iter(choices)), Option): + self.children = choices + elif choices: self.children = tuple( - c if isinstance(c, Option) else Option(**c) for c in choices + Option(**c) for c in combine_lists(kw_choices, kw_children) ) super().__init__(**kwargs) @@ -391,6 +385,22 @@ def validate(self): for child in self.children: child.validate() + def iter_descendants( + self, + condition: Callable[["SurveyElement"], bool] | None = None, + iter_into_section_items: bool = False, + ) -> Generator["SurveyElement", None, None]: + if condition is None: + yield self + elif condition(self): + yield self + if iter_into_section_items and self.children: + for e in self.children: + yield from e.iter_descendants( + condition=condition, + iter_into_section_items=iter_into_section_items, + ) + def build_xml(self, survey: "Survey"): if self.bind["type"] not in {"string", "odk:rank"}: raise PyXFormError("""Invalid value for `self.bind["type"]`.""") @@ -408,8 +418,8 @@ def build_xml(self, survey: "Survey"): # itemset are only supposed to be strings, # check to prevent the rare dicts that show up - if self["itemset"] and isinstance(self["itemset"], str): - itemset, file_extension = os.path.splitext(self["itemset"]) + if self.itemset and isinstance(self.itemset, str): + itemset, file_extension = os.path.splitext(self.itemset) if file_extension == ".geojson": itemset_value_ref = EXTERNAL_CHOICES_ITEMSET_REF_VALUE_GEOJSON @@ -417,33 +427,31 @@ def build_xml(self, survey: "Survey"): else: itemset_value_ref = EXTERNAL_CHOICES_ITEMSET_REF_VALUE itemset_label_ref = EXTERNAL_CHOICES_ITEMSET_REF_LABEL - if hasattr(self, "parameters") and self.parameters is not None: + if self.parameters is not None: itemset_value_ref = self.parameters.get("value", itemset_value_ref) itemset_label_ref = self.parameters.get("label", itemset_label_ref) - multi_language = self.get("_itemset_multi_language", False) - has_media = self.get("_itemset_has_media", False) - has_dyn_label = self.get("_itemset_dyn_label", False) - is_previous_question = bool( - PYXFORM_REFERENCE_REGEX.search(self.get("itemset")) - ) + multi_language = self._itemset_multi_language + has_media = self._itemset_has_media + has_dyn_label = self._itemset_dyn_label + is_previous_question = bool(PYXFORM_REFERENCE_REGEX.search(self.itemset)) if file_extension in EXTERNAL_INSTANCE_EXTENSIONS: pass elif not multi_language and not has_media and not has_dyn_label: - itemset = self["itemset"] + itemset = self.itemset else: - itemset = self["itemset"] + itemset = self.itemset itemset_label_ref = "jr:itext(itextId)" - choice_filter = self.get(constants.CHOICE_FILTER) + choice_filter = self.choice_filter if choice_filter is not None: choice_filter = survey.insert_xpaths( choice_filter, self, True, is_previous_question ) if is_previous_question: path = ( - survey.insert_xpaths(self["itemset"], self, reference_parent=True) + survey.insert_xpaths(self.itemset, self, reference_parent=True) .strip() .split("/") ) @@ -452,7 +460,7 @@ def build_xml(self, survey: "Survey"): itemset_label_ref = path[-1] if choice_filter: choice_filter = choice_filter.replace( - "current()/" + nodeset, "." + f"current()/{nodeset}", "." ).replace(nodeset, ".") else: # Choices must have a value. Filter out repeat instances without @@ -465,21 +473,18 @@ def build_xml(self, survey: "Survey"): if choice_filter: nodeset += f"[{choice_filter}]" - if self["parameters"]: - params = self["parameters"] + if self.parameters: + params = self.parameters if "randomize" in params and params["randomize"] == "true": - nodeset = "randomize(" + nodeset + nodeset = f"randomize({nodeset}" if "seed" in params: if params["seed"].startswith("${"): - nodeset = ( - nodeset - + ", " - + survey.insert_xpaths(params["seed"], self).strip() - ) + seed = survey.insert_xpaths(params["seed"], self).strip() + nodeset = f"{nodeset}, {seed}" else: - nodeset = nodeset + ", " + params["seed"] + nodeset = f"""{nodeset}, {params["seed"]}""" nodeset += ")" @@ -505,15 +510,33 @@ def get_slot_names() -> tuple[str, ...]: def __init__(self, name: str, label: str | dict | None = None, **kwargs): self.children: tuple[Option, ...] | None = None - choices = combine_lists( - a=kwargs.pop(constants.CHOICES, None), b=kwargs.pop(constants.CHILDREN, None) - ) - if choices: + kw_choices = kwargs.pop(constants.CHOICES, None) + kw_children = kwargs.pop(constants.CHILDREN, None) + choices = coalesce(kw_choices, kw_children) + if isinstance(choices, tuple) and isinstance(next(iter(choices)), Option): + self.children = choices + elif choices: self.children = tuple( - c if isinstance(c, Option) else Option(**c) for c in choices + Option(**c) for c in combine_lists(kw_choices, kw_children) ) super().__init__(name=name, label=label, **kwargs) + def iter_descendants( + self, + condition: Callable[["SurveyElement"], bool] | None = None, + iter_into_section_items: bool = False, + ) -> Generator["SurveyElement", None, None]: + if condition is None: + yield self + elif condition(self): + yield self + if iter_into_section_items and self.children: + for e in self.children: + yield from e.iter_descendants( + condition=condition, + iter_into_section_items=iter_into_section_items, + ) + def xml(self, survey: "Survey"): result = node("tag", key=self.name) result.appendChild(self.xml_label(survey=survey)) @@ -548,6 +571,22 @@ def __init__(self, **kwargs): super().__init__(**kwargs) + def iter_descendants( + self, + condition: Callable[["SurveyElement"], bool] | None = None, + iter_into_section_items: bool = False, + ) -> Generator["SurveyElement", None, None]: + if condition is None: + yield self + elif condition(self): + yield self + if iter_into_section_items and self.children: + for e in self.children: + yield from e.iter_descendants( + condition=condition, + iter_into_section_items=iter_into_section_items, + ) + def build_xml(self, survey: "Survey"): control_dict = self.control control_dict["ref"] = self.get_xpath() @@ -569,8 +608,9 @@ def build_xml(self, survey: "Survey"): for key, value in control_dict.items(): control_dict[key] = survey.insert_xpaths(value, self) control_dict["ref"] = self.get_xpath() - params = self.get("parameters", {}) - control_dict.update(params) + params = self.parameters + if params: + control_dict.update(params) result = node(**control_dict) if label_and_hint: for element in self.xml_label_and_hint(survey=survey): diff --git a/pyxform/section.py b/pyxform/section.py index 2111980c..e806726c 100644 --- a/pyxform/section.py +++ b/pyxform/section.py @@ -2,7 +2,7 @@ Section survey element module. """ -from collections.abc import Generator, Iterable +from collections.abc import Callable, Generator, Iterable from itertools import chain from typing import TYPE_CHECKING @@ -78,6 +78,22 @@ def validate(self): element.validate() self._validate_uniqueness_of_element_names() + def iter_descendants( + self, + condition: Callable[["SurveyElement"], bool] | None = None, + iter_into_section_items: bool = False, + ) -> Generator["SurveyElement", None, None]: + if condition is None: + yield self + elif condition(self): + yield self + if self.children: + for e in self.children: + yield from e.iter_descendants( + condition=condition, + iter_into_section_items=iter_into_section_items, + ) + # there's a stronger test of this when creating the xpath # dictionary for a survey. def _validate_uniqueness_of_element_names(self): @@ -246,10 +262,10 @@ def xml_control(self, survey: "Survey"): else: attributes = {} - if not self.get("flat"): + if not self.flat: attributes["ref"] = self.get_xpath() - if "label" in self and self.label is not None and len(self["label"]) > 0: + if self.label: children.append(self.xml_label(survey=survey)) for n in Section.xml_control(self, survey=survey): children.append(n) diff --git a/pyxform/survey.py b/pyxform/survey.py index 75ddf3cd..50ecafe9 100644 --- a/pyxform/survey.py +++ b/pyxform/survey.py @@ -15,20 +15,19 @@ from pyxform import aliases, constants from pyxform.constants import EXTERNAL_INSTANCE_EXTENSIONS, NSMAP -from pyxform.entities.entity_declaration import EntityDeclaration from pyxform.errors import PyXFormError, ValidationError from pyxform.external_instance import ExternalInstance from pyxform.instance import SurveyInstance -from pyxform.parsing import instance_expression +from pyxform.parsing.expression import has_last_saved +from pyxform.parsing.instance_expression import replace_with_output from pyxform.question import MultipleChoiceQuestion, Option, Question, Tag from pyxform.section import SECTION_EXTRA_FIELDS, Section from pyxform.survey_element import SURVEY_ELEMENT_FIELDS, SurveyElement from pyxform.utils import ( BRACKETED_TAG_REGEX, LAST_SAVED_INSTANCE_NAME, - LAST_SAVED_REGEX, DetachableElement, - PatchedText, + escape_text_for_xml, has_dynamic_label, node, ) @@ -132,7 +131,7 @@ def _get_steps_and_target_xpath(context_parent, xpath_parent, include_parent=Fal steps = len(context_parts[index - 1 :]) parts = xpath_parts[index - 1 :] break - return (steps, "/" + "/".join(parts) if parts else remainder_xpath) + return (steps, f"""/{"/".join(parts)}""" if parts else remainder_xpath) context_parent = is_parent_a_repeat(survey, context_xpath) xpath_parent = is_parent_a_repeat(survey, xpath) @@ -239,7 +238,7 @@ def __init__(self, **kwargs): self._created: datetime.now = datetime.now() self._search_lists: set = set() self._translations: recursive_dict = recursive_dict() - self._xpath: dict[str, SurveyElement | None] = {} + self._xpath: dict[str, Section | Question | None] = {} # Structure # attribute is for custom instance attrs from settings e.g. attribute::abc:xyz @@ -335,13 +334,12 @@ def get_nsmap(self): for ns in self.namespaces.split() if len(ns.split("=")) == 2 and ns.split("=")[0] != "" ] - xmlns = "xmlns:" nsmap = NSMAP.copy() nsmap.update( { - xmlns + k: v.replace('"', "").replace("'", "") + f"xmlns:{k}": v.replace('"', "").replace("'", "") for k, v in nslist - if xmlns + k not in nsmap + if f"xmlns:{k}" not in nsmap } ) return nsmap @@ -570,26 +568,22 @@ def _generate_from_file_instances(element: SurveyElement) -> InstanceInfo | None return None @staticmethod - def _generate_last_saved_instance(element) -> bool: + def _generate_last_saved_instance(element: SurveyElement) -> bool: """ True if a last-saved instance should be generated, false otherwise. """ - if not hasattr(element, "bind") or element.bind is None: + if not isinstance(element, Question): return False - for expression_type in constants.EXTERNAL_INSTANCES: - last_saved_expression = re.search( - LAST_SAVED_REGEX, str(element["bind"].get(expression_type)) - ) - if last_saved_expression: - return True - return bool( - hasattr(element, constants.CHOICE_FILTER) - and element.choice_filter is not None - and re.search(LAST_SAVED_REGEX, str(element.choice_filter)) - or hasattr(element, "default") - and element.default is not None - and re.search(LAST_SAVED_REGEX, str(element.default)) - ) + if has_last_saved(element.default): + return True + if has_last_saved(element.choice_filter): + return True + if element.bind: + # Assuming average len(bind) < 10 and len(EXTERNAL_INSTANCES) = 5 and the + # current has_last_saved implementation, iterating bind keys is fastest. + for k, v in element.bind.items(): + if k in constants.EXTERNAL_INSTANCES and has_last_saved(v): + return True @staticmethod def _get_last_saved_instance() -> InstanceInfo: @@ -999,7 +993,7 @@ def _set_up_media_translations(media_dict, translation_key): for media_type, possibly_localized_media in media_dict.items(): if media_type not in constants.SUPPORTED_MEDIA_TYPES: - raise PyXFormError("Media type: " + media_type + " not supported") + raise PyXFormError(f"Media type: {media_type} not supported") if isinstance(possibly_localized_media, dict): # media is localized @@ -1027,17 +1021,15 @@ def _set_up_media_translations(media_dict, translation_key): translations_trans_key[media_type] = media - for survey_element in self.iter_descendants( - condition=lambda i: not isinstance( - i, Survey | EntityDeclaration | ExternalInstance | Tag | Option - ) + for item in self.iter_descendants( + condition=lambda i: isinstance(i, Section | Question) ): # Skip set up of media for choices in selects. Translations for their media # content should have been set up in _setup_translations, with one copy of # each choice translation per language (after _add_empty_translations). - media_dict = survey_element.get("media") - if isinstance(media_dict, dict) and 0 < len(media_dict): - translation_key = survey_element.get_xpath() + ":label" + media_dict = item.media + if isinstance(media_dict, dict) and media_dict: + translation_key = f"{item.get_xpath()}:label" _set_up_media_translations(media_dict, translation_key) def itext(self) -> DetachableElement: @@ -1099,7 +1091,7 @@ def itext(self) -> DetachableElement: itext_nodes.append( node( "value", - "jr://images/" + value, + f"jr://images/{value}", form=media_type, toParseString=output_inserted, ) @@ -1108,7 +1100,7 @@ def itext(self) -> DetachableElement: itext_nodes.append( node( "value", - "jr://" + media_type + "/" + value, + f"jr://{media_type}/{value}", form=media_type, toParseString=output_inserted, ) @@ -1123,11 +1115,11 @@ def date_stamp(self): return self._created.strftime("%Y_%m_%d") def _to_ugly_xml(self) -> str: - return '' + self.xml().toxml() + return f"""{self.xml().toxml()}""" def _to_pretty_xml(self) -> str: """Get the XForm with human readable formatting.""" - return '\n' + self.xml().toprettyxml(indent=" ") + return f"""\n{self.xml().toprettyxml(indent=" ")}""" def __repr__(self): return self.__unicode__() @@ -1137,10 +1129,11 @@ def __unicode__(self): def _setup_xpath_dictionary(self): for element in self.iter_descendants(lambda i: isinstance(i, Question | Section)): - if element.name in self._xpath: - self._xpath[element.name] = None + element_name = element.name + if element_name in self._xpath: + self._xpath[element_name] = None else: - self._xpath[element.name] = element + self._xpath[element_name] = element def _var_repl_function( self, matchobj, context, use_current=False, reference_parent=False @@ -1194,7 +1187,7 @@ def _relative_path(ref_name: str, _use_current: bool) -> str | None: if steps: ref_path = ref_path if ref_path.endswith(ref_name) else f"/{name}" prefix = " current()/" if _use_current else " " - return_path = prefix + "/".join([".."] * steps) + ref_path + " " + return_path = f"""{prefix}{"/".join(".." for _ in range(steps))}{ref_path} """ return return_path @@ -1263,9 +1256,9 @@ def _is_return_relative_path() -> bool: return relative_path last_saved_prefix = ( - "instance('" + LAST_SAVED_INSTANCE_NAME + "')" if last_saved else "" + f"instance('{LAST_SAVED_INSTANCE_NAME}')" if last_saved else "" ) - return " " + last_saved_prefix + self._xpath[name].get_xpath() + " " + return f" {last_saved_prefix}{self._xpath[name].get_xpath()} " def insert_xpaths( self, @@ -1291,7 +1284,7 @@ def _var_repl_output_function(self, matchobj, context): A regex substitution function that will replace ${varname} with an output element that has the xpath to varname. """ - return '' + return f"""""" def insert_output_values( self, @@ -1307,6 +1300,8 @@ def insert_output_values( :param context: The document node that the text belongs to. :return: The output text, and a flag indicating whether any changes were made. """ + if text == "-": + return text, False def _var_repl_output_function(matchobj): return self._var_repl_output_function(matchobj, context) @@ -1316,14 +1311,12 @@ def _var_repl_output_function(matchobj): # For exampke, `${name} < 3` causes an error but `< 3` does not. # This is my hacky fix for it, which does string escaping prior to # variable replacement: - text_node = PatchedText() - text_node.data = text - original_xml = text_node.toxml() + original_xml = escape_text_for_xml(text=text) # need to make sure we have reason to replace # since at this point < is <, # the net effect < gets translated again to &lt; - xml_text = instance_expression.replace_with_output(original_xml, context, self) + xml_text = replace_with_output(original_xml, context, self) if "{" in xml_text: xml_text = re.sub(BRACKETED_TAG_REGEX, _var_repl_output_function, xml_text) changed = xml_text != original_xml @@ -1342,7 +1335,7 @@ def print_xform_to_file( if warnings is None: warnings = [] if not path: - path = self.id_string + ".xml" + path = f"{self.id_string}.xml" if pretty_print: xml = self._to_pretty_xml() else: diff --git a/pyxform/survey_element.py b/pyxform/survey_element.py index f72d4f74..b6c851b1 100644 --- a/pyxform/survey_element.py +++ b/pyxform/survey_element.py @@ -141,24 +141,24 @@ def validate(self): f"The name '{self.name}' contains an invalid character '{invalid_char.group(0)}'. Names {const.XML_IDENTIFIER_ERROR_MESSAGE}" ) - # TODO: Make sure renaming this doesn't cause any problems def iter_descendants( - self, condition: Callable[["SurveyElement"], bool] | None = None + self, + condition: Callable[["SurveyElement"], bool] | None = None, + iter_into_section_items: bool = False, ) -> Generator["SurveyElement", None, None]: """ - Get each of self.children. + Iterate the object, and it's children (if applicable). - :param condition: If this evaluates to True, yield the element. + :param condition: If provided, the element will only be returned if this callable + evaluates to True. Can be used to filter by class/type or other properties. + :param iter_into_section_items: If False, only iterate into the children of + sections (survey or group), e.g. to get Sections, Questions, etc. If True, also + iterate into the children of those children, e.g. to get Options and Tags. """ - # it really seems like this method should not yield self - if condition is not None: - if condition(self): - yield self - else: + if condition is None: + yield self + elif condition(self): yield self - if hasattr(self, const.CHILDREN) and self.children is not None: - for e in self.children: - yield from e.iter_descendants(condition=condition) def iter_ancestors( self, condition: Callable[["SurveyElement"], bool] | None = None @@ -318,7 +318,7 @@ def to_json(self): def json_dump(self, path=""): if not path: - path = self.name + ".json" + path = f"{self.name}.json" print_pyobj_to_json(self.to_json_dict(), path) def __eq__(self, y): @@ -330,14 +330,14 @@ def __eq__(self, y): def _translation_path(self, display_element: str) -> str: """Get an itextId based on the element XPath and display type.""" - return self.get_xpath() + ":" + display_element + return f"{self.get_xpath()}:{display_element}" def get_translations(self, default_language): """ Returns translations used by this element so they can be included in the block. @see survey._setup_translations """ - bind_dict = self.get("bind") + bind_dict = self.bind if bind_dict and isinstance(bind_dict, dict): constraint_msg = bind_dict.get("jr:constraintMsg") if isinstance(constraint_msg, dict): @@ -410,11 +410,11 @@ def get_translations(self, default_language): display_element == "hint" and not isinstance(label_or_hint, dict) and hasattr(self, "hint") - and self.get("hint") is not None + and self.hint is not None and len(label_or_hint) > 0 and hasattr(self, "guidance_hint") - and self.get("guidance_hint") is not None - and len(self["guidance_hint"]) > 0 + and self.guidance_hint is not None + and len(self.guidance_hint) > 0 ): label_or_hint = {default_language: label_or_hint} diff --git a/pyxform/utils.py b/pyxform/utils.py index 66eb771b..e42445a6 100644 --- a/pyxform/utils.py +++ b/pyxform/utils.py @@ -7,6 +7,7 @@ import json import re from collections.abc import Generator, Iterable +from functools import lru_cache from io import StringIO from itertools import chain from json.decoder import JSONDecodeError @@ -24,9 +25,11 @@ INVALID_XFORM_TAG_REGEXP = re.compile(r"[^a-zA-Z:_][^a-zA-Z:_0-9\-.]*") LAST_SAVED_INSTANCE_NAME = "__last-saved" BRACKETED_TAG_REGEX = re.compile(r"\${(last-saved#)?(.*?)}") -LAST_SAVED_REGEX = re.compile(r"\${last-saved#(.*?)}") PYXFORM_REFERENCE_REGEX = re.compile(r"\$\{(.*?)\}") -NODE_TYPE_TEXT = (Node.TEXT_NODE, Node.CDATA_SECTION_NODE) +NODE_TYPE_TEXT = {Node.TEXT_NODE, Node.CDATA_SECTION_NODE} +XML_TEXT_SUBS = {"&": "&", "<": "<", ">": ">"} +XML_TEXT_SUBS_KEYS = set(XML_TEXT_SUBS) +XML_TEXT_TABLE = str.maketrans(XML_TEXT_SUBS) class DetachableElement(Element): @@ -48,14 +51,13 @@ def writexml(self, writer, indent="", addindent="", newl=""): # indent = current indentation # addindent = indentation to add to higher levels # newl = newline string - writer.write(indent + "<" + self.tagName) + writer.write(f"{indent}<{self.tagName}") - attrs = self._get_attributes() - - for a_name in attrs.keys(): - writer.write(f' {a_name}="') - _write_data(writer, attrs[a_name].value) - writer.write('"') + if self._attrs: + for k, v in self._attrs.items(): + writer.write(f' {k}="') + _write_data(writer, v.value) + writer.write('"') if self.childNodes: writer.write(">") # For text or mixed content, write without adding indents or newlines. @@ -71,19 +73,27 @@ def writexml(self, writer, indent="", addindent="", newl=""): else: writer.write(newl) for cnode in self.childNodes: - cnode.writexml(writer, indent + addindent, addindent, newl) + cnode.writexml(writer, f"{indent}{addindent}", addindent, newl) writer.write(indent) writer.write(f"{newl}") else: writer.write(f"/>{newl}") +@lru_cache(maxsize=64) +def escape_text_for_xml(text: str) -> str: + if any(c in set(text) for c in XML_TEXT_SUBS_KEYS): + return text.translate(XML_TEXT_TABLE) + else: + return text + + class PatchedText(Text): def writexml(self, writer, indent="", addindent="", newl=""): """Same as original but no replacing double quotes with '"'.""" - data = "".join((indent, self.data, newl)) + data = f"{indent}{self.data}{newl}" if data: - data = data.replace("&", "&").replace("<", "<").replace(">", ">") + data = escape_text_for_xml(text=data) writer.write(data) diff --git a/pyxform/validators/pyxform/pyxform_reference.py b/pyxform/validators/pyxform/pyxform_reference.py index e55a408a..a1b02783 100644 --- a/pyxform/validators/pyxform/pyxform_reference.py +++ b/pyxform/validators/pyxform/pyxform_reference.py @@ -11,15 +11,18 @@ def validate_pyxform_reference_syntax( value: str, sheet_name: str, row_number: int, key: str ) -> None: + # Needs 3 characters for "${}" plus a name inside, but need to catch ${ for warning. + if not value or len(value) <= 2 or "${" not in value: + return # Skip columns in potentially large sheets where references are not allowed. - if sheet_name == co.SURVEY: - if key in (co.TYPE, co.NAME): + elif sheet_name == co.SURVEY: + if key in {co.TYPE, co.NAME}: return elif sheet_name == co.CHOICES: - if key in (co.LIST_NAME_S, co.LIST_NAME_U, co.NAME): + if key in {co.LIST_NAME_S, co.LIST_NAME_U, co.NAME}: return elif sheet_name == co.ENTITIES: - if key == (co.LIST_NAME_S, co.LIST_NAME_U): + if key in {co.LIST_NAME_S, co.LIST_NAME_U}: return tokens, _ = parse_expression(value) diff --git a/pyxform/xls2json.py b/pyxform/xls2json.py index 9636c641..6d64ded3 100644 --- a/pyxform/xls2json.py +++ b/pyxform/xls2json.py @@ -6,6 +6,7 @@ import os import re import sys +from itertools import chain from typing import IO, Any from pyxform import aliases, constants @@ -55,9 +56,9 @@ def merge_dicts(dict_a, dict_b, default_key="default"): a recursive call to this function, otherwise they are just added to the output dict. """ - if dict_a is None or dict_a == {}: + if not dict_a: return dict_b - if dict_b is None or dict_b == {}: + if not dict_b: return dict_a if not isinstance(dict_a, dict): @@ -71,8 +72,7 @@ def merge_dicts(dict_a, dict_b, default_key="default"): # Union keys but retain order (as opposed to set()), preferencing dict_a then dict_b. # E.g. {"a": 1, "b": 2} + {"c": 3, "a": 4} -> {"a": None, "b": None, "c": None} - all_keys = {k: None for k in dict_a.keys()} - all_keys.update({k: None for k in dict_b.keys()}) + all_keys = {k: None for k in (chain(dict_a.keys(), dict_b.keys()))} out_dict = {} for key in all_keys.keys(): diff --git a/tests/test_dynamic_default.py b/tests/test_dynamic_default.py index 05d48c1c..a603cb37 100644 --- a/tests/test_dynamic_default.py +++ b/tests/test_dynamic_default.py @@ -2,13 +2,13 @@ Test handling dynamic default in forms """ -import os -import unittest from dataclasses import dataclass +from os import getpid from time import perf_counter +from unittest import skip from unittest.mock import patch -import psutil +from psutil import Process from pyxform import utils from pyxform.xls2xform import convert @@ -770,7 +770,7 @@ def test_dynamic_default_xform_structure(self): ], ) - @unittest.skip("Slow performance test. Un-skip to run as needed.") + @skip("Slow performance test. Un-skip to run as needed.") def test_dynamic_default_performance__time(self): """ Should find the dynamic default check costs little extra relative time large forms. @@ -778,11 +778,11 @@ def test_dynamic_default_performance__time(self): Results with Python 3.10.14 on VM with 2vCPU (i7-7700HQ) 1GB RAM, x questions each, average of 10 runs (seconds), with and without the check, per question: | num | with | without | peak RSS MB | - | 500 | 0.2415 | 0.2512 | 58 | - | 1000 | 0.4754 | 0.5199 | 63 | - | 2000 | 0.9866 | 1.2936 | 67 | - | 5000 | 3.1041 | 2.7132 | 96 | - | 10000 | 5.4795 | 5.3229 | 133 | + | 500 | 0.1626 | 0.1886 | 60 | + | 1000 | 0.3330 | 0.3916 | 63 | + | 2000 | 0.8675 | 0.7823 | 70 | + | 5000 | 1.7051 | 1.5653 | 91 | + | 10000 | 3.1097 | 3.8525 | 137 | """ survey_header = """ | survey | | | | | @@ -791,19 +791,26 @@ def test_dynamic_default_performance__time(self): question = """ | | text | q{i} | Q{i} | if(../t2 = 'test', 1, 2) + 15 - int(1.2) | """ + process = Process(getpid()) for count in (500, 1000, 2000): - questions = "\n".join(question.format(i=i) for i in range(1, count)) + questions = "\n".join(question.format(i=i) for i in range(count)) md = "".join((survey_header, questions)) def run(name, case): runs = 0 results = [] + peak_memory_usage = process.memory_info().rss while runs < 10: start = perf_counter() convert(xlsform=case) results.append(perf_counter() - start) + peak_memory_usage = max(process.memory_info().rss, peak_memory_usage) runs += 1 - print(name, round(sum(results) / len(results), 4)) + print( + name, + round(sum(results) / len(results), 4), + f"| Peak RSS: {peak_memory_usage}", + ) run(name=f"questions={count}, with check (seconds):", case=md) @@ -828,7 +835,7 @@ def test_dynamic_default_performance__memory(self): """ questions = "\n".join(question.format(i=i) for i in range(1, 2000)) md = "".join((survey_header, questions)) - process = psutil.Process(os.getpid()) + process = Process(getpid()) pre_mem = process.memory_info().rss self.assertPyxformXform(md=md) post_mem = process.memory_info().rss diff --git a/tests/test_translations.py b/tests/test_translations.py index 2215c4d3..0a44b887 100644 --- a/tests/test_translations.py +++ b/tests/test_translations.py @@ -2,11 +2,13 @@ Test translations syntax. """ -import unittest from dataclasses import dataclass +from os import getpid from time import perf_counter +from unittest import skip from unittest.mock import patch +from psutil import Process from pyxform.constants import CHOICES, SURVEY from pyxform.constants import DEFAULT_LANGUAGE_VALUE as DEFAULT_LANG from pyxform.validators.pyxform.translations_checks import ( @@ -392,7 +394,7 @@ def test_missing_translation__one_lang_all_cols(self): ], ) - @unittest.skip("Slow performance test. Un-skip to run as needed.") + @skip("Slow performance test. Un-skip to run as needed.") def test_missing_translations_check_performance(self): """ Should find the translations check costs a fraction of a second for large forms. @@ -401,11 +403,11 @@ def test_missing_translations_check_performance(self): with 2 choices each, average of 10 runs (seconds), with and without the check, per question: | num | with | without | peak RSS MB | - | 500 | 1.0235 | 0.9831 | 74 | - | 1000 | 2.3025 | 2.6332 | 101 | - | 2000 | 5.6960 | 6.2805 | 157 | - | 5000 | 23.439 | 25.327 | 265 | - | 10000 | 80.396 | 75.165 | 480 | + | 500 | 0.7427 | 0.8133 | 77 | + | 1000 | 1.7908 | 1.7777 | 94 | + | 2000 | 5.6719 | 4.8387 | 141 | + | 5000 | 20.452 | 19.502 | 239 | + | 10000 | 70.871 | 62.106 | 416 | """ survey_header = """ | survey | | | | | @@ -422,20 +424,27 @@ def test_missing_translations_check_performance(self): | | c{i} | na | la-d | la-e | | | c{i} | nb | lb-d | lb-e | """ + process = Process(getpid()) for count in (500, 1000, 2000): - questions = "\n".join(question.format(i=i) for i in range(1, count)) - choice_lists = "\n".join(choice_list.format(i=i) for i in range(1, count)) + questions = "\n".join(question.format(i=i) for i in range(count)) + choice_lists = "\n".join(choice_list.format(i=i) for i in range(count)) md = "".join((survey_header, questions, choices_header, choice_lists)) def run(name, case): runs = 0 results = [] + peak_memory_usage = process.memory_info().rss while runs < 10: start = perf_counter() convert(xlsform=case) results.append(perf_counter() - start) + peak_memory_usage = max(process.memory_info().rss, peak_memory_usage) runs += 1 - print(name, round(sum(results) / len(results), 4)) + print( + name, + round(sum(results) / len(results), 4), + f"| Peak RSS: {peak_memory_usage}", + ) run(name=f"questions={count}, with check (seconds):", case=md) diff --git a/tests/test_unicode_rtl.py b/tests/test_unicode_rtl.py index 322bec1b..664159bc 100644 --- a/tests/test_unicode_rtl.py +++ b/tests/test_unicode_rtl.py @@ -39,7 +39,7 @@ def test_smart_quotes(self): "type": "integer", "name": "my_default_is_123", "label": "my default is 123", - "default": 123, + "default": "123", }, ], "choices": [