Skip to content

Commit

Permalink
Registre d'arrêt : premier script d'export
Browse files Browse the repository at this point in the history
Supporte que les GTFS.
  • Loading branch information
ptitfred committed Dec 17, 2024
1 parent 6244559 commit 7ad20c2
Show file tree
Hide file tree
Showing 5 changed files with 229 additions and 21 deletions.
101 changes: 101 additions & 0 deletions apps/transport/lib/registry/engine.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
defmodule Transport.Registry.Engine do
@moduledoc """
Stream eligible resources and run extractors to produce a raw registry at the end.
"""

alias Transport.Registry.Extractor
alias Transport.Registry.GTFS
alias Transport.Registry.Model.Stop

import Ecto.Query

require Logger

@spec execute(output_file :: Path.t(), list()) :: :ok
def execute(output_file, opts \\ []) do
limit = Keyword.get(opts, :limit, 1_000_000)
formats = Keyword.get(opts, :formats, ~w(GTFS NeTEx))

create_empty_csv_with_headers(output_file)

enumerate_gtfs_resources(limit, formats)
|> Extractor.traverse(&prepare_extractor/1)
|> Task.async_stream(&download/1, max_concurrency: 10, timeout: 120_000)
# one for Task.async_stream
|> Extractor.keep_results()
# one for download/1
|> Extractor.keep_results()
|> Extractor.traverse(&extract_from_archive/1)
|> dump_to_csv(output_file)
end

def create_empty_csv_with_headers(output_file) do
headers = NimbleCSV.RFC4180.dump_to_iodata([Stop.csv_headers()])
File.write(output_file, headers)
end

def enumerate_gtfs_resources(limit, formats) do
DB.Resource.base_query()
|> DB.ResourceHistory.join_resource_with_latest_resource_history()
|> where([resource: r], r.format in ^formats)
|> preload([resource_history: rh], resource_history: rh)
|> limit(^limit)
|> DB.Repo.all()
end

def prepare_extractor(%DB.Resource{} = resource) do
case resource.format do
"GTFS" -> {:ok, {GTFS, resource.url}}
_ -> {:error, "Unsupported format"}
end
end

def download({extractor, url}) do
Logger.debug("download #{extractor} #{url}")
tmp_path = System.tmp_dir!() |> Path.join("#{Ecto.UUID.generate()}.dat")

error_result = fn msg ->
File.rm(tmp_path)
{:error, msg}
end

http_result =
Transport.HTTPClient.get(url,
decode_body: false,
compressed: false,
into: File.stream!(tmp_path)
)

case http_result do
{:error, error} ->
error_result.("Unexpected error while downloading the resource from #{url}: #{Exception.message(error)}")

{:ok, %{status: status}} ->
cond do
status >= 200 && status < 300 ->
{:ok, {extractor, tmp_path}}

status > 400 ->
error_result.("Error #{status} while downloading the resource from #{url}")

true ->
error_result.("Unexpected HTTP error #{status} while downloading the resource from #{url}")
end
end
end

@spec extract_from_archive({module(), Path.t()}) :: Extractor.result([Stop.t()])
def extract_from_archive({extractor, file}) do
Logger.debug("extract_from_archive #{extractor} #{file}")
extractor.extract_from_archive(file)
end

def dump_to_csv(enumerable, output_file) do
enumerable
|> Stream.concat()
|> Stream.map(&Stop.to_csv/1)
|> NimbleCSV.RFC4180.dump_to_stream()
|> Stream.into(File.stream!(output_file, [:append, :utf8]))
|> Stream.run()
end
end
17 changes: 17 additions & 0 deletions apps/transport/lib/registry/extractor.ex
Original file line number Diff line number Diff line change
@@ -1,7 +1,24 @@
defmodule Transport.Registry.Extractor do
@moduledoc """
Interface and utilities for stops extractors.
"""

require Logger

alias Transport.Registry.Model.Stop

@type result(positive) :: {:ok, positive} | {:error, binary()}

@callback extract_from_archive(path :: Path.t()) :: result([Stop.t()])

def keep_results(enumerable), do: Stream.flat_map(enumerable, &keep_result/1)

defp keep_result({:ok, result}), do: [result]
defp keep_result(_), do: []

def traverse(enumerable, mapper) do
enumerable
|> Stream.map(mapper)
|> keep_results()
end
end
95 changes: 74 additions & 21 deletions apps/transport/lib/registry/gtfs.ex
Original file line number Diff line number Diff line change
@@ -1,28 +1,34 @@
defmodule Transport.Registry.GTFS do
alias Transport.Registry.Model.DataSource
@moduledoc """
Implementation of a stop extractor for GTFS resources.
"""

alias Transport.Registry.Model.Stop
alias Transport.Registry.Model.StopIdentifier

require Logger

@behaviour Transport.Registry.Extractor
@doc """
Extract stops from GTFS ressource.
"""
def extract_from_archive(archive) do
archive
|> file_stream!()
|> to_stream_of_maps()
|> Stream.map(fn r ->
%Stop{
main_id: %StopIdentifier{id: Map.fetch!(r, "stop_id"), type: :main},
display_name: Map.fetch!(r, "stop_name"),
latitude: Map.fetch!(r, "stop_lat") |> convert_text_to_float(),
longitude: Map.fetch!(r, "stop_lon") |> convert_text_to_float(),
projection: :utm_wgs84,
stop_type: r |> csv_get_with_default!("location_type", "0", false) |> to_stop_type()
}
end)
case file_stream(archive) do
{:error, error} ->
Logger.error(error)
{:error, error}

{:ok, content} ->
Logger.debug("Valid Zip archive")

{:ok, []}
stops =
content
|> to_stream_of_maps()
|> Stream.flat_map(&handle_stop/1)
|> Enum.to_list()

{:ok, stops}
end
end

@doc """
Expand All @@ -46,9 +52,35 @@ defmodule Transport.Registry.GTFS do
end)
end

defp handle_stop(record) do
latitude = fetch_position(record, "stop_lat")
longitude = fetch_position(record, "stop_lon")

if latitude != nil && longitude != nil do
[
%Stop{
main_id: %StopIdentifier{id: Map.fetch!(record, "stop_id"), type: :main},
display_name: Map.fetch!(record, "stop_name"),
latitude: latitude,
longitude: longitude,
projection: :utm_wgs84,
stop_type: record |> csv_get_with_default!("location_type", "0") |> to_stop_type()
}
]
else
[]
end
end

defp fetch_position(record, field) do
Map.fetch!(record, field) |> convert_text_to_float()
end

@doc """
Convert textual values to float.
iex> convert_text_to_float("")
nil
iex> convert_text_to_float("0")
0.0
iex> convert_text_to_float("0.0")
Expand All @@ -61,23 +93,44 @@ defmodule Transport.Registry.GTFS do
-48.7
"""
def convert_text_to_float(input) do
input |> String.trim() |> Decimal.new() |> Decimal.to_float()
if input |> String.trim() != "" do
input |> String.trim() |> Decimal.new() |> Decimal.to_float()
else
nil
end
end

defp to_stop_type("0"), do: :quay
defp to_stop_type("1"), do: :stop
defp to_stop_type(_), do: :other

defp file_stream!(archive) do
defp file_stream(archive) do
zip_file = Unzip.LocalFile.open(archive)

{:ok, unzip} = Unzip.new(zip_file)
case Unzip.new(zip_file) do
{:ok, unzip} ->
if has_stops?(unzip) do
{:ok, Unzip.file_stream!(unzip, "stops.txt")}
else
{:error, "Missing stops.txt in #{archive}"}
end

{:error, error} ->
{:error, "Error while unzipping archive #{archive}: #{error}"}
end
end

defp has_stops?(unzip) do
Unzip.list_entries(unzip)
|> Enum.any?(&entry_of_name?("stops.txt", &1))
end

Unzip.file_stream!(unzip, "stops.txt")
defp entry_of_name?(name, %Unzip.Entry{file_name: file_name}) do
file_name == name
end

defp csv_get_with_default!(map, field, default_value, mandatory_column \\ true) do
value = if mandatory_column, do: Map.fetch!(map, field), else: Map.get(map, field)
defp csv_get_with_default!(map, field, default_value) do
value = Map.get(map, field)

case value do
nil -> default_value
Expand Down
36 changes: 36 additions & 0 deletions apps/transport/lib/registry/model/stop.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ defmodule Transport.Registry.Model.StopIdentifier do
}

@type identifier_type :: :main | :private_code | :stop_code | :other

def to_field(%__MODULE__{id: id, type: type}) do
"#{type}:#{id}"
end
end

defmodule Transport.Registry.Model.Stop do
Expand Down Expand Up @@ -54,4 +58,36 @@ defmodule Transport.Registry.Model.Stop do
@type stop_type :: :stop | :quay | :other

@type projection :: :utm_wgs84 | :lambert93_rgf93

def csv_headers do
~w(
main_id
display_name
data_source_id
data_source_format
parent_id
latitude
longitude
projection
stop_type
)
end

def to_csv(%__MODULE__{} = stop) do
[
StopIdentifier.to_field(stop.main_id),
stop.display_name,
stop.data_source_id,
stop.data_source_format,
maybe(stop.parent_id, &StopIdentifier.to_field/1, ""),
stop.latitude,
stop.longitude,
stop.projection,
stop.stop_type
]
end

@spec maybe(value :: any() | nil, mapper :: (any() -> any()), defaultValue :: any()) :: any() | nil
def maybe(nil, _, defaultValue), do: defaultValue
def maybe(value, mapper, _), do: mapper.(value)
end
1 change: 1 addition & 0 deletions scripts/registre-arrets.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Transport.Registry.Engine.execute("./registre-arrets.csv")

0 comments on commit 7ad20c2

Please sign in to comment.