Skip to content

Commit

Permalink
Merge pull request #134 from PDOK/pdok-16629-validate-constraints-and…
Browse files Browse the repository at this point in the history
…-indices

[PDOK-16629] validate constraints and indices
  • Loading branch information
roelarents authored Nov 18, 2024
2 parents 291fe11 + 7789d7c commit 51b9199
Show file tree
Hide file tree
Showing 14 changed files with 681 additions and 82 deletions.
42 changes: 27 additions & 15 deletions geopackage_validator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
"""Main CLI entry for the Geopackage validator tool."""
# Setup logging before package imports.
import logging
from datetime import datetime
from pathlib import Path
import sys
import time
from datetime import datetime
from pathlib import Path

import click
import click_log
Expand Down Expand Up @@ -302,6 +302,13 @@ def geopackage_validator_command(
is_flag=True,
help="Output yaml",
)
@click.option(
"--with-indexes-and-fks",
default=False,
required=False,
is_flag=True,
help="Include indexes (and unique constraints) and foreign keys in the definitions",
)
@click.option(
"--s3-endpoint-no-protocol",
envvar="S3_ENDPOINT_NO_PROTOCOL",
Expand Down Expand Up @@ -367,17 +374,18 @@ def geopackage_validator_command(
)
@click_log.simple_verbosity_option(logger)
def geopackage_validator_command_generate_table_definitions(
gpkg_path,
yaml,
s3_endpoint_no_protocol,
s3_access_key,
s3_secret_key,
s3_bucket,
s3_key,
s3_secure,
s3_virtual_hosting,
s3_signing_region,
s3_no_sign_request,
gpkg_path: Path,
yaml: bool,
with_indexes_and_fks: bool,
s3_endpoint_no_protocol: str,
s3_access_key: str,
s3_secret_key: str,
s3_bucket: str,
s3_key: str,
s3_secure: bool,
s3_virtual_hosting: bool,
s3_signing_region: str,
s3_no_sign_request: bool,
):
gpkg_path_not_exists = s3_endpoint_no_protocol is None and (
gpkg_path is None
Expand All @@ -399,7 +407,9 @@ def geopackage_validator_command_generate_table_definitions(
s3_signing_region=s3_signing_region,
s3_no_sign_request=s3_no_sign_request,
)
definitionlist = generate.generate_definitions_for_path(gpkg_path)
definitionlist = generate.generate_definitions_for_path(
gpkg_path, with_indexes_and_fks
)
else:
with s3.minio_resource(
s3_endpoint_no_protocol,
Expand All @@ -409,7 +419,9 @@ def geopackage_validator_command_generate_table_definitions(
s3_key,
s3_secure,
) as localfilename:
definitionlist = generate.generate_definitions_for_path(localfilename)
definitionlist = generate.generate_definitions_for_path(
localfilename, with_indexes_and_fks
)
output.print_output(definitionlist, yaml)
except Exception:
logger.exception("Error while generating table definitions")
Expand Down
115 changes: 106 additions & 9 deletions geopackage_validator/generate.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
import logging
from typing import List
from typing import List, Optional, Dict

from osgeo.ogr import DataSource
from osgeo.ogr import DataSource, Layer

from geopackage_validator import __version__
from geopackage_validator import utils
from geopackage_validator.models import (
ColumnDefinition,
ColumnMapping,
ForeignKeyDefinition,
IndexDefinition,
TableDefinition,
TablesDefinition,
)
from geopackage_validator.utils import group_by

logger = logging.getLogger(__name__)


def columns_definition(table, geometry_column) -> List[ColumnDefinition]:
def column_definitions(table, geometry_column) -> List[ColumnDefinition]:
layer_definition = table.GetLayerDefn()

assert layer_definition, f'Invalid Layer {"" if not table else table.GetName()}'
Expand All @@ -40,8 +44,85 @@ def fid_column_definition(table) -> List[ColumnDefinition]:
return [ColumnDefinition(name=name, type="INTEGER")]


def get_index_definitions(
dataset: DataSource, table_name: str
) -> List[IndexDefinition]:
index_definitions: List[IndexDefinition] = []
index_list = dataset.ExecuteSQL(
f"select name, \"unique\", origin from pragma_index_list('{table_name}');"
)
pk_in_index_list = False
for index_listing in index_list:
pk_in_index_list = pk_in_index_list or index_listing["origin"] == "pk"
index_definitions.append(
IndexDefinition(
columns=tuple(get_index_column_names(dataset, index_listing["name"])),
unique=bool(int(index_listing["unique"])),
)
)
dataset.ReleaseResultSet(index_list)
index_definitions = sorted(index_definitions, key=lambda d: d.columns)

if not pk_in_index_list:
pk_index = get_pk_index(dataset, table_name)
if pk_index is not None:
index_definitions.insert(0, pk_index)

return index_definitions


def get_pk_index(dataset: DataSource, table_name: str) -> Optional[IndexDefinition]:
pk_columns = dataset.ExecuteSQL(
f"select name from pragma_table_info('{table_name}') where pk;"
)
column_names = tuple(r["name"] for r in pk_columns)
if len(column_names) == 0:
return None
return IndexDefinition(columns=column_names, unique=True)


def get_index_column_names(dataset: DataSource, index_name: str) -> List[str]:
index_info = dataset.ExecuteSQL(
f"select name from pragma_index_info('{index_name}');"
)
column_names: List[str] = [r["name"] for r in index_info]
dataset.ReleaseResultSet(index_info)
return column_names

def generate_table_definitions(dataset: DataSource) -> TablesDefinition:

def get_foreign_key_definitions(dataset, table_name) -> List[ForeignKeyDefinition]:
foreign_key_list = dataset.ExecuteSQL(
f'select id, seq, "table", "from", "to" from pragma_foreign_key_list(\'{table_name}\');'
)
foreign_key_definitions: List[ForeignKeyDefinition] = []
for foreign_key_listing in group_by(foreign_key_list, lambda r: r["id"]):
table: str = ""
columns: Dict[str, str] = {}
for column_reference in foreign_key_listing:
table = column_reference["table"]
to = column_reference["to"]
if to is None:
pk_index = get_pk_index(dataset, column_reference["table"])
to = pk_index.columns[int(column_reference["seq"])]
columns[column_reference["from"]] = to
foreign_key_definitions.append(
ForeignKeyDefinition(
table=table,
columns=tuple(
ColumnMapping(src=c[0], dst=c[1]) for c in columns.items()
),
)
)
foreign_key_definitions = sorted(
foreign_key_definitions, key=lambda fk: (fk.table, (c.src for c in fk.columns))
)
dataset.ReleaseResultSet(foreign_key_list)
return foreign_key_definitions


def generate_table_definitions(
dataset: DataSource, with_indexes_and_fks: bool = False
) -> TablesDefinition:
projections = set()
table_geometry_types = {
table_name: geometry_type_name
Expand All @@ -50,6 +131,7 @@ def generate_table_definitions(dataset: DataSource) -> TablesDefinition:

table_list: List[TableDefinition] = []
for table in dataset:
table: Layer
geo_column_name = table.GetGeometryColumn()
if geo_column_name == "":
continue
Expand All @@ -59,11 +141,21 @@ def generate_table_definitions(dataset: DataSource) -> TablesDefinition:
"name": geo_column_name,
"type": table_geometry_types[table_name],
}
columns = tuple(column_definitions(table, geometry_column))

indexes = None
foreign_keys = None
if with_indexes_and_fks:
indexes = tuple(get_index_definitions(dataset, table_name))
foreign_keys = tuple(get_foreign_key_definitions(dataset, table_name))

table_list.append(
TableDefinition(
name=table_name,
geometry_column=geo_column_name,
columns=columns_definition(table, geometry_column),
columns=columns,
indexes=indexes,
foreign_keys=foreign_keys,
)
)

Expand All @@ -74,16 +166,21 @@ def generate_table_definitions(dataset: DataSource) -> TablesDefinition:
result = TablesDefinition(
geopackage_validator_version=__version__,
projection=int(projections.pop()),
tables=table_list,
tables=tuple(sorted(table_list, key=lambda t: t.name)),
)

return result


def generate_definitions_for_path(gpkg_path: str) -> TablesDefinition:
def get_datasource_for_path(gpkg_path: str, error_handler=None) -> DataSource:
"""Starts the geopackage validation."""
utils.check_gdal_version()
return utils.open_dataset(gpkg_path, error_handler)

dataset = utils.open_dataset(gpkg_path)

return generate_table_definitions(dataset)
def generate_definitions_for_path(
gpkg_path: str, with_indexes_and_fks: bool = False
) -> TablesDefinition:
return generate_table_definitions(
get_datasource_for_path(gpkg_path), with_indexes_and_fks
)
92 changes: 70 additions & 22 deletions geopackage_validator/models.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,95 @@
import copy
from typing import List, Optional
from typing import Optional, Tuple

from pydantic import BaseModel
from pydantic import BaseModel, Field, field_validator, ConfigDict
from semver import Version


class Named(BaseModel):
name: str


class ColumnDefinition(Named):
model_config = ConfigDict(frozen=True)

type: str


class IndexDefinition(BaseModel):
model_config = ConfigDict(frozen=True)

columns: Tuple[str, ...] = Field(min_length=1)
unique: bool = False


class ColumnMapping(BaseModel):
model_config = ConfigDict(frozen=True)

src: str
dst: str


class ForeignKeyDefinition(BaseModel):
model_config = ConfigDict(frozen=True)

@field_validator("columns")
@classmethod
def unique_src_columns(
cls, v: Tuple[ColumnMapping, ...]
) -> Tuple[ColumnMapping, ...]:
src_columns = set()
for c in v:
if c.src in src_columns:
raise ValueError(f"Duplicate src column detected: {c.src}")
src_columns.add(c.src)
return v

table: str = Field(min_length=1)
columns: Tuple[ColumnMapping, ...] = Field(min_length=1)


class TableDefinition(Named):
model_config = ConfigDict(frozen=True)

geometry_column: str = "geom"
columns: List[ColumnDefinition] = []
columns: Tuple[ColumnDefinition, ...] = tuple()
"""Ordered as in the table (left to right), but with FID and geometry columns always first.
(This order is not validated.)"""
indexes: Optional[Tuple[IndexDefinition, ...]] = None
"""None means: don't validate. Empty list means: there should be no indexes."""
foreign_keys: Optional[Tuple[ForeignKeyDefinition, ...]] = None
"""None means: don't validate. Empty list means: there should be no foreign keys."""


class TablesDefinition(BaseModel):
model_config = ConfigDict(frozen=True)

geopackage_validator_version: str = "0"
projection: Optional[int]
tables: List[TableDefinition]
tables: Tuple[TableDefinition, ...]
"""Ordered by table name"""

def with_indexes_and_fks(self) -> bool:
for table in self.tables:
if table.indexes is not None or table.foreign_keys is not None:
return True
return False


def migrate_tables_definition(old: dict) -> dict:
def migrate_tables_definition(original: dict) -> dict:
"""Migrate a possibly old tables definition to new schema/model"""
version = old.get("geopackage_validator_version", "0")
# older versions where not versioned (?), so assuming "0" if there is no version
version_tuple = tuple(int(v) for v in version.split("."))
if version_tuple == (0, 0, 0, "-dev") or version_tuple > (
0,
5,
8,
): # no changes after 0.5.8
return old
new = copy.deepcopy(old)
if version_tuple <= (
0,
5,
8,
): # until 0.5.8, column's "type" property was named "data_type"
for t in new.get("tables", []):
# older versions were not versioned (?), so assuming "0.0.0" if there is no version
version = Version.parse(original.get("geopackage_validator_version", "0.0.0"))
if version == Version(0, 0, 0, "dev"):
return original
# nothing changed after v0.5.8
if version > Version(0, 5, 8):
return original
migrated = copy.deepcopy(original)
# until and including 0.5.8, column's "type" property was named "data_type"
if version <= Version(0, 5, 8):
for t in migrated.get("tables", []):
for c in t.get("columns", []):
c["type"] = c["data_type"]
del c["data_type"]
return new
return migrated
2 changes: 1 addition & 1 deletion geopackage_validator/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def print_output(python_object, as_yaml, yaml_indent=2):


def print_output_pydantic(model: BaseModel, as_yaml: bool, yaml_indent=2):
content = model.model_dump_json(indent=4)
content = model.model_dump_json(indent=4, exclude_none=True)
if as_yaml:
python_object = yaml.safe_load(content)
content = yaml.dump(python_object, indent=yaml_indent, sort_keys=False)
Expand Down
Loading

0 comments on commit 51b9199

Please sign in to comment.