diff --git a/apps/transport/lib/registry/engine.ex b/apps/transport/lib/registry/engine.ex new file mode 100644 index 0000000000..17198d3e9c --- /dev/null +++ b/apps/transport/lib/registry/engine.ex @@ -0,0 +1,101 @@ +defmodule Transport.Registry.Engine do + @moduledoc """ + Stream eligible resources and run extractors to produce a raw registry at the end. + """ + + alias Transport.Registry.Extractor + alias Transport.Registry.GTFS + alias Transport.Registry.Model.Stop + + import Ecto.Query + + require Logger + + @spec execute(output_file :: Path.t(), list()) :: :ok + def execute(output_file, opts \\ []) do + limit = Keyword.get(opts, :limit, 1_000_000) + formats = Keyword.get(opts, :formats, ~w(GTFS NeTEx)) + + create_empty_csv_with_headers(output_file) + + enumerate_gtfs_resources(limit, formats) + |> Extractor.traverse(&prepare_extractor/1) + |> Task.async_stream(&download/1, max_concurrency: 10, timeout: 120_000) + # one for Task.async_stream + |> Extractor.keep_results() + # one for download/1 + |> Extractor.keep_results() + |> Extractor.traverse(&extract_from_archive/1) + |> dump_to_csv(output_file) + end + + def create_empty_csv_with_headers(output_file) do + headers = NimbleCSV.RFC4180.dump_to_iodata([Stop.csv_headers()]) + File.write(output_file, headers) + end + + def enumerate_gtfs_resources(limit, formats) do + DB.Resource.base_query() + |> DB.ResourceHistory.join_resource_with_latest_resource_history() + |> where([resource: r], r.format in ^formats) + |> preload([resource_history: rh], resource_history: rh) + |> limit(^limit) + |> DB.Repo.all() + end + + def prepare_extractor(%DB.Resource{} = resource) do + case resource.format do + "GTFS" -> {:ok, {GTFS, resource.url}} + _ -> {:error, "Unsupported format"} + end + end + + def download({extractor, url}) do + Logger.debug("download #{extractor} #{url}") + tmp_path = System.tmp_dir!() |> Path.join("#{Ecto.UUID.generate()}.dat") + + error_result = fn msg -> + File.rm(tmp_path) + {:error, msg} + end + + http_result = + Transport.HTTPClient.get(url, + decode_body: false, + compressed: false, + into: File.stream!(tmp_path) + ) + + case http_result do + {:error, error} -> + error_result.("Unexpected error while downloading the resource from #{url}: #{Exception.message(error)}") + + {:ok, %{status: status}} -> + cond do + status >= 200 && status < 300 -> + {:ok, {extractor, tmp_path}} + + status > 400 -> + error_result.("Error #{status} while downloading the resource from #{url}") + + true -> + error_result.("Unexpected HTTP error #{status} while downloading the resource from #{url}") + end + end + end + + @spec extract_from_archive({module(), Path.t()}) :: Extractor.result([Stop.t()]) + def extract_from_archive({extractor, file}) do + Logger.debug("extract_from_archive #{extractor} #{file}") + extractor.extract_from_archive(file) + end + + def dump_to_csv(enumerable, output_file) do + enumerable + |> Stream.concat() + |> Stream.map(&Stop.to_csv/1) + |> NimbleCSV.RFC4180.dump_to_stream() + |> Stream.into(File.stream!(output_file, [:append, :utf8])) + |> Stream.run() + end +end diff --git a/apps/transport/lib/registry/extractor.ex b/apps/transport/lib/registry/extractor.ex index b26eba4930..a0f8a7c52d 100644 --- a/apps/transport/lib/registry/extractor.ex +++ b/apps/transport/lib/registry/extractor.ex @@ -1,7 +1,24 @@ defmodule Transport.Registry.Extractor do + @moduledoc """ + Interface and utilities for stops extractors. + """ + + require Logger + alias Transport.Registry.Model.Stop @type result(positive) :: {:ok, positive} | {:error, binary()} @callback extract_from_archive(path :: Path.t()) :: result([Stop.t()]) + + def keep_results(enumerable), do: Stream.flat_map(enumerable, &keep_result/1) + + defp keep_result({:ok, result}), do: [result] + defp keep_result(_), do: [] + + def traverse(enumerable, mapper) do + enumerable + |> Stream.map(mapper) + |> keep_results() + end end diff --git a/apps/transport/lib/registry/gtfs.ex b/apps/transport/lib/registry/gtfs.ex index c7cedcc8d1..9e3a3c000e 100644 --- a/apps/transport/lib/registry/gtfs.ex +++ b/apps/transport/lib/registry/gtfs.ex @@ -1,28 +1,34 @@ defmodule Transport.Registry.GTFS do - alias Transport.Registry.Model.DataSource + @moduledoc """ + Implementation of a stop extractor for GTFS resources. + """ + alias Transport.Registry.Model.Stop alias Transport.Registry.Model.StopIdentifier + require Logger + @behaviour Transport.Registry.Extractor @doc """ Extract stops from GTFS ressource. """ def extract_from_archive(archive) do - archive - |> file_stream!() - |> to_stream_of_maps() - |> Stream.map(fn r -> - %Stop{ - main_id: %StopIdentifier{id: Map.fetch!(r, "stop_id"), type: :main}, - display_name: Map.fetch!(r, "stop_name"), - latitude: Map.fetch!(r, "stop_lat") |> convert_text_to_float(), - longitude: Map.fetch!(r, "stop_lon") |> convert_text_to_float(), - projection: :utm_wgs84, - stop_type: r |> csv_get_with_default!("location_type", "0", false) |> to_stop_type() - } - end) + case file_stream(archive) do + {:error, error} -> + Logger.error(error) + {:error, error} + + {:ok, content} -> + Logger.debug("Valid Zip archive") - {:ok, []} + stops = + content + |> to_stream_of_maps() + |> Stream.flat_map(&handle_stop/1) + |> Enum.to_list() + + {:ok, stops} + end end @doc """ @@ -46,9 +52,35 @@ defmodule Transport.Registry.GTFS do end) end + defp handle_stop(record) do + latitude = fetch_position(record, "stop_lat") + longitude = fetch_position(record, "stop_lon") + + if latitude != nil && longitude != nil do + [ + %Stop{ + main_id: %StopIdentifier{id: Map.fetch!(record, "stop_id"), type: :main}, + display_name: Map.fetch!(record, "stop_name"), + latitude: latitude, + longitude: longitude, + projection: :utm_wgs84, + stop_type: record |> csv_get_with_default!("location_type", "0") |> to_stop_type() + } + ] + else + [] + end + end + + defp fetch_position(record, field) do + Map.fetch!(record, field) |> convert_text_to_float() + end + @doc """ Convert textual values to float. + iex> convert_text_to_float("") + nil iex> convert_text_to_float("0") 0.0 iex> convert_text_to_float("0.0") @@ -61,23 +93,44 @@ defmodule Transport.Registry.GTFS do -48.7 """ def convert_text_to_float(input) do - input |> String.trim() |> Decimal.new() |> Decimal.to_float() + if input |> String.trim() != "" do + input |> String.trim() |> Decimal.new() |> Decimal.to_float() + else + nil + end end defp to_stop_type("0"), do: :quay defp to_stop_type("1"), do: :stop defp to_stop_type(_), do: :other - defp file_stream!(archive) do + defp file_stream(archive) do zip_file = Unzip.LocalFile.open(archive) - {:ok, unzip} = Unzip.new(zip_file) + case Unzip.new(zip_file) do + {:ok, unzip} -> + if has_stops?(unzip) do + {:ok, Unzip.file_stream!(unzip, "stops.txt")} + else + {:error, "Missing stops.txt in #{archive}"} + end + + {:error, error} -> + {:error, "Error while unzipping archive #{archive}: #{error}"} + end + end + + defp has_stops?(unzip) do + Unzip.list_entries(unzip) + |> Enum.any?(&entry_of_name?("stops.txt", &1)) + end - Unzip.file_stream!(unzip, "stops.txt") + defp entry_of_name?(name, %Unzip.Entry{file_name: file_name}) do + file_name == name end - defp csv_get_with_default!(map, field, default_value, mandatory_column \\ true) do - value = if mandatory_column, do: Map.fetch!(map, field), else: Map.get(map, field) + defp csv_get_with_default!(map, field, default_value) do + value = Map.get(map, field) case value do nil -> default_value diff --git a/apps/transport/lib/registry/model/stop.ex b/apps/transport/lib/registry/model/stop.ex index 50c069818f..767283ab83 100644 --- a/apps/transport/lib/registry/model/stop.ex +++ b/apps/transport/lib/registry/model/stop.ex @@ -14,6 +14,10 @@ defmodule Transport.Registry.Model.StopIdentifier do } @type identifier_type :: :main | :private_code | :stop_code | :other + + def to_field(%__MODULE__{id: id, type: type}) do + "#{type}:#{id}" + end end defmodule Transport.Registry.Model.Stop do @@ -54,4 +58,36 @@ defmodule Transport.Registry.Model.Stop do @type stop_type :: :stop | :quay | :other @type projection :: :utm_wgs84 | :lambert93_rgf93 + + def csv_headers do + ~w( + main_id + display_name + data_source_id + data_source_format + parent_id + latitude + longitude + projection + stop_type + ) + end + + def to_csv(%__MODULE__{} = stop) do + [ + StopIdentifier.to_field(stop.main_id), + stop.display_name, + stop.data_source_id, + stop.data_source_format, + maybe(stop.parent_id, &StopIdentifier.to_field/1, ""), + stop.latitude, + stop.longitude, + stop.projection, + stop.stop_type + ] + end + + @spec maybe(value :: any() | nil, mapper :: (any() -> any()), defaultValue :: any()) :: any() | nil + def maybe(nil, _, defaultValue), do: defaultValue + def maybe(value, mapper, _), do: mapper.(value) end diff --git a/scripts/registre-arrets.exs b/scripts/registre-arrets.exs new file mode 100644 index 0000000000..d02e89cdce --- /dev/null +++ b/scripts/registre-arrets.exs @@ -0,0 +1 @@ +Transport.Registry.Engine.execute("./registre-arrets.csv")