diff --git a/pyxform/entities/entity_declaration.py b/pyxform/entities/entity_declaration.py
index 49222f4f..3c9e240e 100644
--- a/pyxform/entities/entity_declaration.py
+++ b/pyxform/entities/entity_declaration.py
@@ -46,7 +46,7 @@ def __init__(self, name: str, type: str, parameters: dict, **kwargs):
super().__init__(name=name, **kwargs)
def xml_instance(self, **kwargs):
- parameters = self.get(const.PARAMETERS, {})
+ parameters = self.parameters
attributes = {
EC.DATASET.value: parameters.get(EC.DATASET, ""),
@@ -75,7 +75,7 @@ def xml_bindings(self, survey: "Survey"):
"""
See the class comment for an explanation of the logic for generating bindings.
"""
- parameters = self.get(const.PARAMETERS, {})
+ parameters = self.parameters
entity_id_expression = parameters.get(EC.ENTITY_ID, None)
create_condition = parameters.get(EC.CREATE_IF, None)
update_condition = parameters.get(EC.UPDATE_IF, None)
diff --git a/pyxform/parsing/expression.py b/pyxform/parsing/expression.py
index 2c80b74f..d94c0c94 100644
--- a/pyxform/parsing/expression.py
+++ b/pyxform/parsing/expression.py
@@ -1,12 +1,8 @@
import re
-from collections.abc import Iterable
from functools import lru_cache
-def get_expression_lexer(name_only: bool = False) -> re.Scanner:
- """
- Get a expression lexer (scanner) for parsing.
- """
+def get_lexer_rules():
# ncname regex adapted from eulxml https://github.com/emory-libraries/eulxml/blob/2e1a9f71ffd1fd455bd8326ec82125e333b352e0/eulxml/xpath/lexrules.py
# (C) 2010,2011 Emory University Libraries [Apache v2.0 License]
# They in turn adapted it from https://www.w3.org/TR/REC-xml/#NT-NameStartChar
@@ -29,7 +25,7 @@ def get_expression_lexer(name_only: bool = False) -> re.Scanner:
date_time_regex = date_regex + "T" + time_regex
# Rule order is significant - match priority runs top to bottom.
- lexer_rules = {
+ return {
# https://www.w3.org/TR/xmlschema-2/#dateTime
"DATETIME": date_time_regex,
"DATE": date_regex,
@@ -49,7 +45,7 @@ def get_expression_lexer(name_only: bool = False) -> re.Scanner:
"SYSTEM_LITERAL": r""""[^"]*"|'[^']*'""",
"COMMA": r",",
"WHITESPACE": r"\s+",
- "PYXFORM_REF": r"\$\{" + ncname_regex + r"(#" + ncname_regex + r")?" + r"\}",
+ "PYXFORM_REF": r"\$\{(last-saved#)?" + ncname_regex + r"\}",
"FUNC_CALL": ncname_regex + r"\(",
"XPATH_PRED_START": ncname_regex + r"\[",
"XPATH_PRED_END": r"\]",
@@ -60,15 +56,21 @@ def get_expression_lexer(name_only: bool = False) -> re.Scanner:
"OTHER": r".+?", # Catch any other character so that parsing doesn't stop.
}
+
+LEXER_RULES = get_lexer_rules()
+RE_ONLY_NCNAME = re.compile(rf"""^{LEXER_RULES["NAME"]}$""")
+RE_ONLY_PYXFORM_REF = re.compile(rf"""^{LEXER_RULES["PYXFORM_REF"]}$""")
+RE_ANY_PYXFORM_REF = re.compile(LEXER_RULES["PYXFORM_REF"])
+
+
+def get_expression_lexer() -> re.Scanner:
def get_tokenizer(name):
def tokenizer(scan, value) -> ExpLexerToken | str:
- if name_only:
- return name
return ExpLexerToken(name, value, scan.match.start(), scan.match.end())
return tokenizer
- lexicon = [(v, get_tokenizer(k)) for k, v in lexer_rules.items()]
+ lexicon = [(v, get_tokenizer(k)) for k, v in LEXER_RULES.items()]
# re.Scanner is undocumented but has been around since at least 2003
# https://mail.python.org/pipermail/python-dev/2003-April/035075.html
return re.Scanner(lexicon)
@@ -84,9 +86,8 @@ def __init__(self, name: str, value: str, start: int, end: int) -> None:
self.end: int = end
-# Scanner takes a few 100ms to compile so use these shared instances.
+# Scanner takes a few 100ms to compile so use the shared instance.
_EXPRESSION_LEXER = get_expression_lexer()
-_TOKEN_NAME_LEXER = get_expression_lexer(name_only=True)
@lru_cache(maxsize=128)
@@ -103,32 +104,29 @@ def parse_expression(text: str) -> tuple[list[ExpLexerToken], str]:
return tokens, remainder
-def is_single_token_expression(expression: str, token_types: Iterable[str]) -> bool:
- """
- Does the expression contain single token of one of the provided token types?
- """
- if not expression:
- return False
- tokens, _ = _TOKEN_NAME_LEXER.scan(expression.strip())
- if 1 == len(tokens) and tokens[0] in token_types:
- return True
- else:
- return False
-
-
def is_pyxform_reference(value: str) -> bool:
"""
Does the input string contain only a valid Pyxform reference? e.g. ${my_question}
"""
- if not value or len(value) <= 3: # Needs 3 characters for "${}", plus a name inside.
- return False
- return is_single_token_expression(expression=value, token_types=("PYXFORM_REF",))
+ # Needs 3 characters for "${}", plus a name inside.
+ return value and len(value) > 3 and bool(RE_ONLY_PYXFORM_REF.match(value))
def is_xml_tag(value: str) -> bool:
"""
Does the input string contain only a valid XML tag / element name?
"""
- if not value:
- return False
- return is_single_token_expression(expression=value, token_types=("NAME",))
+ return value and bool(RE_ONLY_NCNAME.match(value))
+
+
+def has_last_saved(value: str) -> bool:
+ """
+ Does the input string contain a valid '#last-saved' Pyxform reference? e.g. ${last-saved#my_question}
+ """
+ # Needs 14 characters for "${last-saved#}", plus a name inside.
+ return (
+ value
+ and len(value) > 14
+ and "${last-saved#" in value
+ and RE_ANY_PYXFORM_REF.search(value)
+ )
diff --git a/pyxform/parsing/instance_expression.py b/pyxform/parsing/instance_expression.py
index 7ab5fbb2..3c43d53e 100644
--- a/pyxform/parsing/instance_expression.py
+++ b/pyxform/parsing/instance_expression.py
@@ -21,11 +21,13 @@ def find_boundaries(xml_text: str) -> list[tuple[int, int]]:
:param xml_text: XML text that may contain an instance expression.
:return: Tokens in instance expression, and the string position boundaries.
"""
+ tokens, _ = parse_expression(xml_text)
+ if not tokens:
+ return []
instance_enter = False
path_enter = False
pred_enter = False
last_token = None
- tokens, _ = parse_expression(xml_text)
boundaries = []
for t in tokens:
@@ -96,8 +98,11 @@ def replace_with_output(xml_text: str, context: "SurveyElement", survey: "Survey
:param survey: The Survey that the context is in.
:return: The possibly modified string.
"""
+ # 9 = len("instance(")
+ if 9 >= len(xml_text):
+ return xml_text
boundaries = find_boundaries(xml_text=xml_text)
- if 0 < len(boundaries):
+ if boundaries:
new_strings = []
for start, end in boundaries:
old_str = xml_text[start:end]
@@ -116,6 +121,6 @@ def replace_with_output(xml_text: str, context: "SurveyElement", survey: "Survey
# expression positions due to incremental replacement.
offset = 0
for s, e, o, n in new_strings:
- xml_text = xml_text[: s + offset] + n + xml_text[e + offset :]
+ xml_text = f"{xml_text[: s + offset]}{n}{xml_text[e + offset :]}"
offset += len(n) - len(o)
return xml_text
diff --git a/pyxform/question.py b/pyxform/question.py
index 211626d8..4c7f2329 100644
--- a/pyxform/question.py
+++ b/pyxform/question.py
@@ -3,7 +3,7 @@
"""
import os.path
-from collections.abc import Iterable
+from collections.abc import Callable, Generator, Iterable
from itertools import chain
from typing import TYPE_CHECKING
@@ -21,6 +21,7 @@
from pyxform.utils import (
PYXFORM_REFERENCE_REGEX,
DetachableElement,
+ coalesce,
combine_lists,
default_is_dynamic,
node,
@@ -45,7 +46,6 @@
"trigger",
constants.BIND,
constants.CHOICE_FILTER,
- constants.COMPACT_TAG, # used for compact (sms) representation
constants.CONTROL,
constants.HINT,
constants.MEDIA,
@@ -106,7 +106,6 @@ def __init__(self, fields: tuple[str, ...] | None = None, **kwargs):
self.trigger: str | None = None
# SMS / compact settings
- self.compact_tag: str | None = None
self.sms_field: str | None = None
qtd = kwargs.pop("question_type_dictionary", QUESTION_TYPE_DICT)
@@ -149,27 +148,20 @@ def validate(self):
raise PyXFormError(f"Unknown question type '{self.type}'.")
def xml_instance(self, survey: "Survey", **kwargs):
- attributes = self.get("instance")
+ attributes = self.instance
if attributes is None:
attributes = {}
else:
for key, value in attributes.items():
attributes[key] = survey.insert_xpaths(value, self)
- if self.get("default") and not default_is_dynamic(self.default, self.type):
- return node(self.name, str(self.get("default")), **attributes)
+ if self.default and not default_is_dynamic(self.default, self.type):
+ return node(self.name, str(self.default), **attributes)
return node(self.name, **attributes)
def xml_control(self, survey: "Survey"):
if self.type == "calculate" or (
- (
- (
- hasattr(self, "bind")
- and self.bind is not None
- and "calculate" in self.bind
- )
- or self.trigger
- )
+ (self.bind is not None and "calculate" in self.bind or self.trigger)
and not (self.label or self.hint)
):
nested_setvalues = survey.get_trigger_values_for_question_name(
@@ -268,13 +260,13 @@ def build_xml(self, survey: "Survey"):
result.appendChild(element)
# Input types are used for selects with external choices sheets.
- if self["query"]:
- choice_filter = self.get(constants.CHOICE_FILTER)
+ if self.query:
+ choice_filter = self.choice_filter
if choice_filter is not None:
pred = survey.insert_xpaths(choice_filter, self, True)
- query = f"""instance('{self["query"]}')/root/item[{pred}]"""
+ query = f"""instance('{self.query}')/root/item[{pred}]"""
else:
- query = f"""instance('{self["query"]}')/root/item"""
+ query = f"""instance('{self.query}')/root/item"""
result.setAttribute("query", query)
return result
@@ -376,12 +368,14 @@ def __init__(
# I'm going to try to stick to just choices.
# Aliases in the json format will make it more difficult
# to use going forward.
- choices = combine_lists(
- a=kwargs.pop(constants.CHOICES, None), b=kwargs.pop(constants.CHILDREN, None)
- )
- if choices:
+ kw_choices = kwargs.pop(constants.CHOICES, None)
+ kw_children = kwargs.pop(constants.CHILDREN, None)
+ choices = coalesce(kw_choices, kw_children)
+ if isinstance(choices, tuple) and isinstance(next(iter(choices)), Option):
+ self.children = choices
+ elif choices:
self.children = tuple(
- c if isinstance(c, Option) else Option(**c) for c in choices
+ Option(**c) for c in combine_lists(kw_choices, kw_children)
)
super().__init__(**kwargs)
@@ -391,6 +385,22 @@ def validate(self):
for child in self.children:
child.validate()
+ def iter_descendants(
+ self,
+ condition: Callable[["SurveyElement"], bool] | None = None,
+ iter_into_section_items: bool = False,
+ ) -> Generator["SurveyElement", None, None]:
+ if condition is None:
+ yield self
+ elif condition(self):
+ yield self
+ if iter_into_section_items and self.children:
+ for e in self.children:
+ yield from e.iter_descendants(
+ condition=condition,
+ iter_into_section_items=iter_into_section_items,
+ )
+
def build_xml(self, survey: "Survey"):
if self.bind["type"] not in {"string", "odk:rank"}:
raise PyXFormError("""Invalid value for `self.bind["type"]`.""")
@@ -408,8 +418,8 @@ def build_xml(self, survey: "Survey"):
# itemset are only supposed to be strings,
# check to prevent the rare dicts that show up
- if self["itemset"] and isinstance(self["itemset"], str):
- itemset, file_extension = os.path.splitext(self["itemset"])
+ if self.itemset and isinstance(self.itemset, str):
+ itemset, file_extension = os.path.splitext(self.itemset)
if file_extension == ".geojson":
itemset_value_ref = EXTERNAL_CHOICES_ITEMSET_REF_VALUE_GEOJSON
@@ -417,33 +427,31 @@ def build_xml(self, survey: "Survey"):
else:
itemset_value_ref = EXTERNAL_CHOICES_ITEMSET_REF_VALUE
itemset_label_ref = EXTERNAL_CHOICES_ITEMSET_REF_LABEL
- if hasattr(self, "parameters") and self.parameters is not None:
+ if self.parameters is not None:
itemset_value_ref = self.parameters.get("value", itemset_value_ref)
itemset_label_ref = self.parameters.get("label", itemset_label_ref)
- multi_language = self.get("_itemset_multi_language", False)
- has_media = self.get("_itemset_has_media", False)
- has_dyn_label = self.get("_itemset_dyn_label", False)
- is_previous_question = bool(
- PYXFORM_REFERENCE_REGEX.search(self.get("itemset"))
- )
+ multi_language = self._itemset_multi_language
+ has_media = self._itemset_has_media
+ has_dyn_label = self._itemset_dyn_label
+ is_previous_question = bool(PYXFORM_REFERENCE_REGEX.search(self.itemset))
if file_extension in EXTERNAL_INSTANCE_EXTENSIONS:
pass
elif not multi_language and not has_media and not has_dyn_label:
- itemset = self["itemset"]
+ itemset = self.itemset
else:
- itemset = self["itemset"]
+ itemset = self.itemset
itemset_label_ref = "jr:itext(itextId)"
- choice_filter = self.get(constants.CHOICE_FILTER)
+ choice_filter = self.choice_filter
if choice_filter is not None:
choice_filter = survey.insert_xpaths(
choice_filter, self, True, is_previous_question
)
if is_previous_question:
path = (
- survey.insert_xpaths(self["itemset"], self, reference_parent=True)
+ survey.insert_xpaths(self.itemset, self, reference_parent=True)
.strip()
.split("/")
)
@@ -452,7 +460,7 @@ def build_xml(self, survey: "Survey"):
itemset_label_ref = path[-1]
if choice_filter:
choice_filter = choice_filter.replace(
- "current()/" + nodeset, "."
+ f"current()/{nodeset}", "."
).replace(nodeset, ".")
else:
# Choices must have a value. Filter out repeat instances without
@@ -465,21 +473,18 @@ def build_xml(self, survey: "Survey"):
if choice_filter:
nodeset += f"[{choice_filter}]"
- if self["parameters"]:
- params = self["parameters"]
+ if self.parameters:
+ params = self.parameters
if "randomize" in params and params["randomize"] == "true":
- nodeset = "randomize(" + nodeset
+ nodeset = f"randomize({nodeset}"
if "seed" in params:
if params["seed"].startswith("${"):
- nodeset = (
- nodeset
- + ", "
- + survey.insert_xpaths(params["seed"], self).strip()
- )
+ seed = survey.insert_xpaths(params["seed"], self).strip()
+ nodeset = f"{nodeset}, {seed}"
else:
- nodeset = nodeset + ", " + params["seed"]
+ nodeset = f"""{nodeset}, {params["seed"]}"""
nodeset += ")"
@@ -505,15 +510,33 @@ def get_slot_names() -> tuple[str, ...]:
def __init__(self, name: str, label: str | dict | None = None, **kwargs):
self.children: tuple[Option, ...] | None = None
- choices = combine_lists(
- a=kwargs.pop(constants.CHOICES, None), b=kwargs.pop(constants.CHILDREN, None)
- )
- if choices:
+ kw_choices = kwargs.pop(constants.CHOICES, None)
+ kw_children = kwargs.pop(constants.CHILDREN, None)
+ choices = coalesce(kw_choices, kw_children)
+ if isinstance(choices, tuple) and isinstance(next(iter(choices)), Option):
+ self.children = choices
+ elif choices:
self.children = tuple(
- c if isinstance(c, Option) else Option(**c) for c in choices
+ Option(**c) for c in combine_lists(kw_choices, kw_children)
)
super().__init__(name=name, label=label, **kwargs)
+ def iter_descendants(
+ self,
+ condition: Callable[["SurveyElement"], bool] | None = None,
+ iter_into_section_items: bool = False,
+ ) -> Generator["SurveyElement", None, None]:
+ if condition is None:
+ yield self
+ elif condition(self):
+ yield self
+ if iter_into_section_items and self.children:
+ for e in self.children:
+ yield from e.iter_descendants(
+ condition=condition,
+ iter_into_section_items=iter_into_section_items,
+ )
+
def xml(self, survey: "Survey"):
result = node("tag", key=self.name)
result.appendChild(self.xml_label(survey=survey))
@@ -548,6 +571,22 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)
+ def iter_descendants(
+ self,
+ condition: Callable[["SurveyElement"], bool] | None = None,
+ iter_into_section_items: bool = False,
+ ) -> Generator["SurveyElement", None, None]:
+ if condition is None:
+ yield self
+ elif condition(self):
+ yield self
+ if iter_into_section_items and self.children:
+ for e in self.children:
+ yield from e.iter_descendants(
+ condition=condition,
+ iter_into_section_items=iter_into_section_items,
+ )
+
def build_xml(self, survey: "Survey"):
control_dict = self.control
control_dict["ref"] = self.get_xpath()
@@ -569,8 +608,9 @@ def build_xml(self, survey: "Survey"):
for key, value in control_dict.items():
control_dict[key] = survey.insert_xpaths(value, self)
control_dict["ref"] = self.get_xpath()
- params = self.get("parameters", {})
- control_dict.update(params)
+ params = self.parameters
+ if params:
+ control_dict.update(params)
result = node(**control_dict)
if label_and_hint:
for element in self.xml_label_and_hint(survey=survey):
diff --git a/pyxform/section.py b/pyxform/section.py
index 2111980c..e806726c 100644
--- a/pyxform/section.py
+++ b/pyxform/section.py
@@ -2,7 +2,7 @@
Section survey element module.
"""
-from collections.abc import Generator, Iterable
+from collections.abc import Callable, Generator, Iterable
from itertools import chain
from typing import TYPE_CHECKING
@@ -78,6 +78,22 @@ def validate(self):
element.validate()
self._validate_uniqueness_of_element_names()
+ def iter_descendants(
+ self,
+ condition: Callable[["SurveyElement"], bool] | None = None,
+ iter_into_section_items: bool = False,
+ ) -> Generator["SurveyElement", None, None]:
+ if condition is None:
+ yield self
+ elif condition(self):
+ yield self
+ if self.children:
+ for e in self.children:
+ yield from e.iter_descendants(
+ condition=condition,
+ iter_into_section_items=iter_into_section_items,
+ )
+
# there's a stronger test of this when creating the xpath
# dictionary for a survey.
def _validate_uniqueness_of_element_names(self):
@@ -246,10 +262,10 @@ def xml_control(self, survey: "Survey"):
else:
attributes = {}
- if not self.get("flat"):
+ if not self.flat:
attributes["ref"] = self.get_xpath()
- if "label" in self and self.label is not None and len(self["label"]) > 0:
+ if self.label:
children.append(self.xml_label(survey=survey))
for n in Section.xml_control(self, survey=survey):
children.append(n)
diff --git a/pyxform/survey.py b/pyxform/survey.py
index 75ddf3cd..50ecafe9 100644
--- a/pyxform/survey.py
+++ b/pyxform/survey.py
@@ -15,20 +15,19 @@
from pyxform import aliases, constants
from pyxform.constants import EXTERNAL_INSTANCE_EXTENSIONS, NSMAP
-from pyxform.entities.entity_declaration import EntityDeclaration
from pyxform.errors import PyXFormError, ValidationError
from pyxform.external_instance import ExternalInstance
from pyxform.instance import SurveyInstance
-from pyxform.parsing import instance_expression
+from pyxform.parsing.expression import has_last_saved
+from pyxform.parsing.instance_expression import replace_with_output
from pyxform.question import MultipleChoiceQuestion, Option, Question, Tag
from pyxform.section import SECTION_EXTRA_FIELDS, Section
from pyxform.survey_element import SURVEY_ELEMENT_FIELDS, SurveyElement
from pyxform.utils import (
BRACKETED_TAG_REGEX,
LAST_SAVED_INSTANCE_NAME,
- LAST_SAVED_REGEX,
DetachableElement,
- PatchedText,
+ escape_text_for_xml,
has_dynamic_label,
node,
)
@@ -132,7 +131,7 @@ def _get_steps_and_target_xpath(context_parent, xpath_parent, include_parent=Fal
steps = len(context_parts[index - 1 :])
parts = xpath_parts[index - 1 :]
break
- return (steps, "/" + "/".join(parts) if parts else remainder_xpath)
+ return (steps, f"""/{"/".join(parts)}""" if parts else remainder_xpath)
context_parent = is_parent_a_repeat(survey, context_xpath)
xpath_parent = is_parent_a_repeat(survey, xpath)
@@ -239,7 +238,7 @@ def __init__(self, **kwargs):
self._created: datetime.now = datetime.now()
self._search_lists: set = set()
self._translations: recursive_dict = recursive_dict()
- self._xpath: dict[str, SurveyElement | None] = {}
+ self._xpath: dict[str, Section | Question | None] = {}
# Structure
# attribute is for custom instance attrs from settings e.g. attribute::abc:xyz
@@ -335,13 +334,12 @@ def get_nsmap(self):
for ns in self.namespaces.split()
if len(ns.split("=")) == 2 and ns.split("=")[0] != ""
]
- xmlns = "xmlns:"
nsmap = NSMAP.copy()
nsmap.update(
{
- xmlns + k: v.replace('"', "").replace("'", "")
+ f"xmlns:{k}": v.replace('"', "").replace("'", "")
for k, v in nslist
- if xmlns + k not in nsmap
+ if f"xmlns:{k}" not in nsmap
}
)
return nsmap
@@ -570,26 +568,22 @@ def _generate_from_file_instances(element: SurveyElement) -> InstanceInfo | None
return None
@staticmethod
- def _generate_last_saved_instance(element) -> bool:
+ def _generate_last_saved_instance(element: SurveyElement) -> bool:
"""
True if a last-saved instance should be generated, false otherwise.
"""
- if not hasattr(element, "bind") or element.bind is None:
+ if not isinstance(element, Question):
return False
- for expression_type in constants.EXTERNAL_INSTANCES:
- last_saved_expression = re.search(
- LAST_SAVED_REGEX, str(element["bind"].get(expression_type))
- )
- if last_saved_expression:
- return True
- return bool(
- hasattr(element, constants.CHOICE_FILTER)
- and element.choice_filter is not None
- and re.search(LAST_SAVED_REGEX, str(element.choice_filter))
- or hasattr(element, "default")
- and element.default is not None
- and re.search(LAST_SAVED_REGEX, str(element.default))
- )
+ if has_last_saved(element.default):
+ return True
+ if has_last_saved(element.choice_filter):
+ return True
+ if element.bind:
+ # Assuming average len(bind) < 10 and len(EXTERNAL_INSTANCES) = 5 and the
+ # current has_last_saved implementation, iterating bind keys is fastest.
+ for k, v in element.bind.items():
+ if k in constants.EXTERNAL_INSTANCES and has_last_saved(v):
+ return True
@staticmethod
def _get_last_saved_instance() -> InstanceInfo:
@@ -999,7 +993,7 @@ def _set_up_media_translations(media_dict, translation_key):
for media_type, possibly_localized_media in media_dict.items():
if media_type not in constants.SUPPORTED_MEDIA_TYPES:
- raise PyXFormError("Media type: " + media_type + " not supported")
+ raise PyXFormError(f"Media type: {media_type} not supported")
if isinstance(possibly_localized_media, dict):
# media is localized
@@ -1027,17 +1021,15 @@ def _set_up_media_translations(media_dict, translation_key):
translations_trans_key[media_type] = media
- for survey_element in self.iter_descendants(
- condition=lambda i: not isinstance(
- i, Survey | EntityDeclaration | ExternalInstance | Tag | Option
- )
+ for item in self.iter_descendants(
+ condition=lambda i: isinstance(i, Section | Question)
):
# Skip set up of media for choices in selects. Translations for their media
# content should have been set up in _setup_translations, with one copy of
# each choice translation per language (after _add_empty_translations).
- media_dict = survey_element.get("media")
- if isinstance(media_dict, dict) and 0 < len(media_dict):
- translation_key = survey_element.get_xpath() + ":label"
+ media_dict = item.media
+ if isinstance(media_dict, dict) and media_dict:
+ translation_key = f"{item.get_xpath()}:label"
_set_up_media_translations(media_dict, translation_key)
def itext(self) -> DetachableElement:
@@ -1099,7 +1091,7 @@ def itext(self) -> DetachableElement:
itext_nodes.append(
node(
"value",
- "jr://images/" + value,
+ f"jr://images/{value}",
form=media_type,
toParseString=output_inserted,
)
@@ -1108,7 +1100,7 @@ def itext(self) -> DetachableElement:
itext_nodes.append(
node(
"value",
- "jr://" + media_type + "/" + value,
+ f"jr://{media_type}/{value}",
form=media_type,
toParseString=output_inserted,
)
@@ -1123,11 +1115,11 @@ def date_stamp(self):
return self._created.strftime("%Y_%m_%d")
def _to_ugly_xml(self) -> str:
- return '' + self.xml().toxml()
+ return f"""{self.xml().toxml()}"""
def _to_pretty_xml(self) -> str:
"""Get the XForm with human readable formatting."""
- return '\n' + self.xml().toprettyxml(indent=" ")
+ return f"""\n{self.xml().toprettyxml(indent=" ")}"""
def __repr__(self):
return self.__unicode__()
@@ -1137,10 +1129,11 @@ def __unicode__(self):
def _setup_xpath_dictionary(self):
for element in self.iter_descendants(lambda i: isinstance(i, Question | Section)):
- if element.name in self._xpath:
- self._xpath[element.name] = None
+ element_name = element.name
+ if element_name in self._xpath:
+ self._xpath[element_name] = None
else:
- self._xpath[element.name] = element
+ self._xpath[element_name] = element
def _var_repl_function(
self, matchobj, context, use_current=False, reference_parent=False
@@ -1194,7 +1187,7 @@ def _relative_path(ref_name: str, _use_current: bool) -> str | None:
if steps:
ref_path = ref_path if ref_path.endswith(ref_name) else f"/{name}"
prefix = " current()/" if _use_current else " "
- return_path = prefix + "/".join([".."] * steps) + ref_path + " "
+ return_path = f"""{prefix}{"/".join(".." for _ in range(steps))}{ref_path} """
return return_path
@@ -1263,9 +1256,9 @@ def _is_return_relative_path() -> bool:
return relative_path
last_saved_prefix = (
- "instance('" + LAST_SAVED_INSTANCE_NAME + "')" if last_saved else ""
+ f"instance('{LAST_SAVED_INSTANCE_NAME}')" if last_saved else ""
)
- return " " + last_saved_prefix + self._xpath[name].get_xpath() + " "
+ return f" {last_saved_prefix}{self._xpath[name].get_xpath()} "
def insert_xpaths(
self,
@@ -1291,7 +1284,7 @@ def _var_repl_output_function(self, matchobj, context):
A regex substitution function that will replace
${varname} with an output element that has the xpath to varname.
"""
- return ''
+ return f""""""
def insert_output_values(
self,
@@ -1307,6 +1300,8 @@ def insert_output_values(
:param context: The document node that the text belongs to.
:return: The output text, and a flag indicating whether any changes were made.
"""
+ if text == "-":
+ return text, False
def _var_repl_output_function(matchobj):
return self._var_repl_output_function(matchobj, context)
@@ -1316,14 +1311,12 @@ def _var_repl_output_function(matchobj):
# For exampke, `${name} < 3` causes an error but `< 3` does not.
# This is my hacky fix for it, which does string escaping prior to
# variable replacement:
- text_node = PatchedText()
- text_node.data = text
- original_xml = text_node.toxml()
+ original_xml = escape_text_for_xml(text=text)
# need to make sure we have reason to replace
# since at this point < is <,
# the net effect < gets translated again to <
- xml_text = instance_expression.replace_with_output(original_xml, context, self)
+ xml_text = replace_with_output(original_xml, context, self)
if "{" in xml_text:
xml_text = re.sub(BRACKETED_TAG_REGEX, _var_repl_output_function, xml_text)
changed = xml_text != original_xml
@@ -1342,7 +1335,7 @@ def print_xform_to_file(
if warnings is None:
warnings = []
if not path:
- path = self.id_string + ".xml"
+ path = f"{self.id_string}.xml"
if pretty_print:
xml = self._to_pretty_xml()
else:
diff --git a/pyxform/survey_element.py b/pyxform/survey_element.py
index f72d4f74..b6c851b1 100644
--- a/pyxform/survey_element.py
+++ b/pyxform/survey_element.py
@@ -141,24 +141,24 @@ def validate(self):
f"The name '{self.name}' contains an invalid character '{invalid_char.group(0)}'. Names {const.XML_IDENTIFIER_ERROR_MESSAGE}"
)
- # TODO: Make sure renaming this doesn't cause any problems
def iter_descendants(
- self, condition: Callable[["SurveyElement"], bool] | None = None
+ self,
+ condition: Callable[["SurveyElement"], bool] | None = None,
+ iter_into_section_items: bool = False,
) -> Generator["SurveyElement", None, None]:
"""
- Get each of self.children.
+ Iterate the object, and it's children (if applicable).
- :param condition: If this evaluates to True, yield the element.
+ :param condition: If provided, the element will only be returned if this callable
+ evaluates to True. Can be used to filter by class/type or other properties.
+ :param iter_into_section_items: If False, only iterate into the children of
+ sections (survey or group), e.g. to get Sections, Questions, etc. If True, also
+ iterate into the children of those children, e.g. to get Options and Tags.
"""
- # it really seems like this method should not yield self
- if condition is not None:
- if condition(self):
- yield self
- else:
+ if condition is None:
+ yield self
+ elif condition(self):
yield self
- if hasattr(self, const.CHILDREN) and self.children is not None:
- for e in self.children:
- yield from e.iter_descendants(condition=condition)
def iter_ancestors(
self, condition: Callable[["SurveyElement"], bool] | None = None
@@ -318,7 +318,7 @@ def to_json(self):
def json_dump(self, path=""):
if not path:
- path = self.name + ".json"
+ path = f"{self.name}.json"
print_pyobj_to_json(self.to_json_dict(), path)
def __eq__(self, y):
@@ -330,14 +330,14 @@ def __eq__(self, y):
def _translation_path(self, display_element: str) -> str:
"""Get an itextId based on the element XPath and display type."""
- return self.get_xpath() + ":" + display_element
+ return f"{self.get_xpath()}:{display_element}"
def get_translations(self, default_language):
"""
Returns translations used by this element so they can be included in
the block. @see survey._setup_translations
"""
- bind_dict = self.get("bind")
+ bind_dict = self.bind
if bind_dict and isinstance(bind_dict, dict):
constraint_msg = bind_dict.get("jr:constraintMsg")
if isinstance(constraint_msg, dict):
@@ -410,11 +410,11 @@ def get_translations(self, default_language):
display_element == "hint"
and not isinstance(label_or_hint, dict)
and hasattr(self, "hint")
- and self.get("hint") is not None
+ and self.hint is not None
and len(label_or_hint) > 0
and hasattr(self, "guidance_hint")
- and self.get("guidance_hint") is not None
- and len(self["guidance_hint"]) > 0
+ and self.guidance_hint is not None
+ and len(self.guidance_hint) > 0
):
label_or_hint = {default_language: label_or_hint}
diff --git a/pyxform/utils.py b/pyxform/utils.py
index 66eb771b..e42445a6 100644
--- a/pyxform/utils.py
+++ b/pyxform/utils.py
@@ -7,6 +7,7 @@
import json
import re
from collections.abc import Generator, Iterable
+from functools import lru_cache
from io import StringIO
from itertools import chain
from json.decoder import JSONDecodeError
@@ -24,9 +25,11 @@
INVALID_XFORM_TAG_REGEXP = re.compile(r"[^a-zA-Z:_][^a-zA-Z:_0-9\-.]*")
LAST_SAVED_INSTANCE_NAME = "__last-saved"
BRACKETED_TAG_REGEX = re.compile(r"\${(last-saved#)?(.*?)}")
-LAST_SAVED_REGEX = re.compile(r"\${last-saved#(.*?)}")
PYXFORM_REFERENCE_REGEX = re.compile(r"\$\{(.*?)\}")
-NODE_TYPE_TEXT = (Node.TEXT_NODE, Node.CDATA_SECTION_NODE)
+NODE_TYPE_TEXT = {Node.TEXT_NODE, Node.CDATA_SECTION_NODE}
+XML_TEXT_SUBS = {"&": "&", "<": "<", ">": ">"}
+XML_TEXT_SUBS_KEYS = set(XML_TEXT_SUBS)
+XML_TEXT_TABLE = str.maketrans(XML_TEXT_SUBS)
class DetachableElement(Element):
@@ -48,14 +51,13 @@ def writexml(self, writer, indent="", addindent="", newl=""):
# indent = current indentation
# addindent = indentation to add to higher levels
# newl = newline string
- writer.write(indent + "<" + self.tagName)
+ writer.write(f"{indent}<{self.tagName}")
- attrs = self._get_attributes()
-
- for a_name in attrs.keys():
- writer.write(f' {a_name}="')
- _write_data(writer, attrs[a_name].value)
- writer.write('"')
+ if self._attrs:
+ for k, v in self._attrs.items():
+ writer.write(f' {k}="')
+ _write_data(writer, v.value)
+ writer.write('"')
if self.childNodes:
writer.write(">")
# For text or mixed content, write without adding indents or newlines.
@@ -71,19 +73,27 @@ def writexml(self, writer, indent="", addindent="", newl=""):
else:
writer.write(newl)
for cnode in self.childNodes:
- cnode.writexml(writer, indent + addindent, addindent, newl)
+ cnode.writexml(writer, f"{indent}{addindent}", addindent, newl)
writer.write(indent)
writer.write(f"{self.tagName}>{newl}")
else:
writer.write(f"/>{newl}")
+@lru_cache(maxsize=64)
+def escape_text_for_xml(text: str) -> str:
+ if any(c in set(text) for c in XML_TEXT_SUBS_KEYS):
+ return text.translate(XML_TEXT_TABLE)
+ else:
+ return text
+
+
class PatchedText(Text):
def writexml(self, writer, indent="", addindent="", newl=""):
"""Same as original but no replacing double quotes with '"'."""
- data = "".join((indent, self.data, newl))
+ data = f"{indent}{self.data}{newl}"
if data:
- data = data.replace("&", "&").replace("<", "<").replace(">", ">")
+ data = escape_text_for_xml(text=data)
writer.write(data)
diff --git a/pyxform/validators/pyxform/pyxform_reference.py b/pyxform/validators/pyxform/pyxform_reference.py
index e55a408a..a1b02783 100644
--- a/pyxform/validators/pyxform/pyxform_reference.py
+++ b/pyxform/validators/pyxform/pyxform_reference.py
@@ -11,15 +11,18 @@
def validate_pyxform_reference_syntax(
value: str, sheet_name: str, row_number: int, key: str
) -> None:
+ # Needs 3 characters for "${}" plus a name inside, but need to catch ${ for warning.
+ if not value or len(value) <= 2 or "${" not in value:
+ return
# Skip columns in potentially large sheets where references are not allowed.
- if sheet_name == co.SURVEY:
- if key in (co.TYPE, co.NAME):
+ elif sheet_name == co.SURVEY:
+ if key in {co.TYPE, co.NAME}:
return
elif sheet_name == co.CHOICES:
- if key in (co.LIST_NAME_S, co.LIST_NAME_U, co.NAME):
+ if key in {co.LIST_NAME_S, co.LIST_NAME_U, co.NAME}:
return
elif sheet_name == co.ENTITIES:
- if key == (co.LIST_NAME_S, co.LIST_NAME_U):
+ if key in {co.LIST_NAME_S, co.LIST_NAME_U}:
return
tokens, _ = parse_expression(value)
diff --git a/pyxform/xls2json.py b/pyxform/xls2json.py
index 9636c641..6d64ded3 100644
--- a/pyxform/xls2json.py
+++ b/pyxform/xls2json.py
@@ -6,6 +6,7 @@
import os
import re
import sys
+from itertools import chain
from typing import IO, Any
from pyxform import aliases, constants
@@ -55,9 +56,9 @@ def merge_dicts(dict_a, dict_b, default_key="default"):
a recursive call to this function,
otherwise they are just added to the output dict.
"""
- if dict_a is None or dict_a == {}:
+ if not dict_a:
return dict_b
- if dict_b is None or dict_b == {}:
+ if not dict_b:
return dict_a
if not isinstance(dict_a, dict):
@@ -71,8 +72,7 @@ def merge_dicts(dict_a, dict_b, default_key="default"):
# Union keys but retain order (as opposed to set()), preferencing dict_a then dict_b.
# E.g. {"a": 1, "b": 2} + {"c": 3, "a": 4} -> {"a": None, "b": None, "c": None}
- all_keys = {k: None for k in dict_a.keys()}
- all_keys.update({k: None for k in dict_b.keys()})
+ all_keys = {k: None for k in (chain(dict_a.keys(), dict_b.keys()))}
out_dict = {}
for key in all_keys.keys():
diff --git a/tests/test_dynamic_default.py b/tests/test_dynamic_default.py
index 05d48c1c..a603cb37 100644
--- a/tests/test_dynamic_default.py
+++ b/tests/test_dynamic_default.py
@@ -2,13 +2,13 @@
Test handling dynamic default in forms
"""
-import os
-import unittest
from dataclasses import dataclass
+from os import getpid
from time import perf_counter
+from unittest import skip
from unittest.mock import patch
-import psutil
+from psutil import Process
from pyxform import utils
from pyxform.xls2xform import convert
@@ -770,7 +770,7 @@ def test_dynamic_default_xform_structure(self):
],
)
- @unittest.skip("Slow performance test. Un-skip to run as needed.")
+ @skip("Slow performance test. Un-skip to run as needed.")
def test_dynamic_default_performance__time(self):
"""
Should find the dynamic default check costs little extra relative time large forms.
@@ -778,11 +778,11 @@ def test_dynamic_default_performance__time(self):
Results with Python 3.10.14 on VM with 2vCPU (i7-7700HQ) 1GB RAM, x questions
each, average of 10 runs (seconds), with and without the check, per question:
| num | with | without | peak RSS MB |
- | 500 | 0.2415 | 0.2512 | 58 |
- | 1000 | 0.4754 | 0.5199 | 63 |
- | 2000 | 0.9866 | 1.2936 | 67 |
- | 5000 | 3.1041 | 2.7132 | 96 |
- | 10000 | 5.4795 | 5.3229 | 133 |
+ | 500 | 0.1626 | 0.1886 | 60 |
+ | 1000 | 0.3330 | 0.3916 | 63 |
+ | 2000 | 0.8675 | 0.7823 | 70 |
+ | 5000 | 1.7051 | 1.5653 | 91 |
+ | 10000 | 3.1097 | 3.8525 | 137 |
"""
survey_header = """
| survey | | | | |
@@ -791,19 +791,26 @@ def test_dynamic_default_performance__time(self):
question = """
| | text | q{i} | Q{i} | if(../t2 = 'test', 1, 2) + 15 - int(1.2) |
"""
+ process = Process(getpid())
for count in (500, 1000, 2000):
- questions = "\n".join(question.format(i=i) for i in range(1, count))
+ questions = "\n".join(question.format(i=i) for i in range(count))
md = "".join((survey_header, questions))
def run(name, case):
runs = 0
results = []
+ peak_memory_usage = process.memory_info().rss
while runs < 10:
start = perf_counter()
convert(xlsform=case)
results.append(perf_counter() - start)
+ peak_memory_usage = max(process.memory_info().rss, peak_memory_usage)
runs += 1
- print(name, round(sum(results) / len(results), 4))
+ print(
+ name,
+ round(sum(results) / len(results), 4),
+ f"| Peak RSS: {peak_memory_usage}",
+ )
run(name=f"questions={count}, with check (seconds):", case=md)
@@ -828,7 +835,7 @@ def test_dynamic_default_performance__memory(self):
"""
questions = "\n".join(question.format(i=i) for i in range(1, 2000))
md = "".join((survey_header, questions))
- process = psutil.Process(os.getpid())
+ process = Process(getpid())
pre_mem = process.memory_info().rss
self.assertPyxformXform(md=md)
post_mem = process.memory_info().rss
diff --git a/tests/test_translations.py b/tests/test_translations.py
index 2215c4d3..0a44b887 100644
--- a/tests/test_translations.py
+++ b/tests/test_translations.py
@@ -2,11 +2,13 @@
Test translations syntax.
"""
-import unittest
from dataclasses import dataclass
+from os import getpid
from time import perf_counter
+from unittest import skip
from unittest.mock import patch
+from psutil import Process
from pyxform.constants import CHOICES, SURVEY
from pyxform.constants import DEFAULT_LANGUAGE_VALUE as DEFAULT_LANG
from pyxform.validators.pyxform.translations_checks import (
@@ -392,7 +394,7 @@ def test_missing_translation__one_lang_all_cols(self):
],
)
- @unittest.skip("Slow performance test. Un-skip to run as needed.")
+ @skip("Slow performance test. Un-skip to run as needed.")
def test_missing_translations_check_performance(self):
"""
Should find the translations check costs a fraction of a second for large forms.
@@ -401,11 +403,11 @@ def test_missing_translations_check_performance(self):
with 2 choices each, average of 10 runs (seconds), with and without the check,
per question:
| num | with | without | peak RSS MB |
- | 500 | 1.0235 | 0.9831 | 74 |
- | 1000 | 2.3025 | 2.6332 | 101 |
- | 2000 | 5.6960 | 6.2805 | 157 |
- | 5000 | 23.439 | 25.327 | 265 |
- | 10000 | 80.396 | 75.165 | 480 |
+ | 500 | 0.7427 | 0.8133 | 77 |
+ | 1000 | 1.7908 | 1.7777 | 94 |
+ | 2000 | 5.6719 | 4.8387 | 141 |
+ | 5000 | 20.452 | 19.502 | 239 |
+ | 10000 | 70.871 | 62.106 | 416 |
"""
survey_header = """
| survey | | | | |
@@ -422,20 +424,27 @@ def test_missing_translations_check_performance(self):
| | c{i} | na | la-d | la-e |
| | c{i} | nb | lb-d | lb-e |
"""
+ process = Process(getpid())
for count in (500, 1000, 2000):
- questions = "\n".join(question.format(i=i) for i in range(1, count))
- choice_lists = "\n".join(choice_list.format(i=i) for i in range(1, count))
+ questions = "\n".join(question.format(i=i) for i in range(count))
+ choice_lists = "\n".join(choice_list.format(i=i) for i in range(count))
md = "".join((survey_header, questions, choices_header, choice_lists))
def run(name, case):
runs = 0
results = []
+ peak_memory_usage = process.memory_info().rss
while runs < 10:
start = perf_counter()
convert(xlsform=case)
results.append(perf_counter() - start)
+ peak_memory_usage = max(process.memory_info().rss, peak_memory_usage)
runs += 1
- print(name, round(sum(results) / len(results), 4))
+ print(
+ name,
+ round(sum(results) / len(results), 4),
+ f"| Peak RSS: {peak_memory_usage}",
+ )
run(name=f"questions={count}, with check (seconds):", case=md)
diff --git a/tests/test_unicode_rtl.py b/tests/test_unicode_rtl.py
index 322bec1b..664159bc 100644
--- a/tests/test_unicode_rtl.py
+++ b/tests/test_unicode_rtl.py
@@ -39,7 +39,7 @@ def test_smart_quotes(self):
"type": "integer",
"name": "my_default_is_123",
"label": "my default is 123",
- "default": 123,
+ "default": "123",
},
],
"choices": [