Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Registre d'arrêts : première ébauche #4393

Draft
wants to merge 13 commits into
base: master
Choose a base branch
from
25 changes: 17 additions & 8 deletions apps/shared/lib/wrapper/wrapper_req.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ defmodule Transport.HTTPClient do
"""

def get!(url, options) do
{req, options} = setup_cache(options)

Transport.Req.impl().get!(req, options |> Keyword.merge(url: url))
end

def get(url, options) do
{req, options} = setup_cache(options)

Transport.Req.impl().get(req, options |> Keyword.merge(url: url))
end

defp setup_cache(options) do
options =
Keyword.validate!(options, [
:custom_cache_dir,
Expand All @@ -48,13 +60,10 @@ defmodule Transport.HTTPClient do

{enable_cache, options} = options |> Keyword.pop!(:enable_cache)

req =
if enable_cache do
req |> Transport.Shared.ReqCustomCache.attach()
else
req
end

Transport.Req.impl().get!(req, options |> Keyword.merge(url: url))
if enable_cache do
{req |> Transport.Shared.ReqCustomCache.attach(), options}
else
{req, options}
end
end
end
87 changes: 87 additions & 0 deletions apps/transport/lib/gtfs/utils.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
defmodule Transport.GTFS.Utils do
@moduledoc """
Some helpers for handling GTFS archives.
"""

def fetch_position(record, field) do
Map.fetch!(record, field) |> convert_text_to_float()
end

@doc """
Convert textual values to float.

iex> convert_text_to_float("")
nil
iex> convert_text_to_float("0")
0.0
iex> convert_text_to_float("0.0")
0.0
iex> convert_text_to_float("12.7")
12.7
iex> convert_text_to_float("-12.7")
-12.7
iex> convert_text_to_float(" -48.7 ")
-48.7
"""
def convert_text_to_float(input) do
if input |> String.trim() != "" do
input |> String.trim() |> Decimal.new() |> Decimal.to_float()
else
nil
end
end

@doc """
Variant of csv_get_with_default/3 that raises if a mandatory column is missing.
"""
def csv_get_with_default!(map, field, default_value, mandatory_column \\ true) do
value = if mandatory_column, do: Map.fetch!(map, field), else: Map.get(map, field)

case value do
nil -> default_value
"" -> default_value
v -> v
end
end

@doc """
iex> csv_get_with_default(%{}, "field", 0)
0
iex> csv_get_with_default(%{"other_field" => 1}, "field", 0)
0
iex> csv_get_with_default(%{"field" => 2, "other_field" => 1}, "field", 0)
2
iex> csv_get_with_default(%{"field" => "", "other_field" => 1}, "field", 0)
0
"""
def csv_get_with_default(map, field, default_value) do
value = Map.get(map, field)

case value do
nil -> default_value
"" -> default_value
v -> v
end
end

@doc """
Transform the stream outputed by Unzip to a stream of maps, each map
corresponding to a row from the CSV.
"""
def to_stream_of_maps(file_stream) do
file_stream
# transform the stream to a stream of binaries
|> Stream.map(fn c -> IO.iodata_to_binary(c) end)
# stream line by line
|> NimbleCSV.RFC4180.to_line_stream()
|> NimbleCSV.RFC4180.parse_stream(skip_headers: false)
# transform the stream to a stream of maps %{column_name1: value1, ...}
|> Stream.transform([], fn r, acc ->
if acc == [] do
{%{}, r |> Enum.map(fn h -> h |> String.replace_prefix("\uFEFF", "") end)}
else
{[acc |> Enum.zip(r) |> Enum.into(%{})], acc}
end
end)
end
end
65 changes: 9 additions & 56 deletions apps/transport/lib/jobs/gtfs_to_db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,7 @@ defmodule Transport.Jobs.GtfsToDB do
Get the content of a GTFS ResourceHistory, store it in the DB
"""

@doc """
Convert textual values to float.

iex> convert_text_to_float("0")
0.0
iex> convert_text_to_float("0.0")
0.0
iex> convert_text_to_float("12.7")
12.7
iex> convert_text_to_float("-12.7")
-12.7
iex> convert_text_to_float(" -48.7 ")
-48.7
"""
def convert_text_to_float(input) do
input |> String.trim() |> Decimal.new() |> Decimal.to_float()
end

def csv_get_with_default!(map, field, default_value, mandatory_column \\ true) do
value = if mandatory_column, do: Map.fetch!(map, field), else: Map.get(map, field)

case value do
nil -> default_value
"" -> default_value
v -> v
end
end
alias Transport.GTFS.Utils

def import_gtfs_from_resource_history(resource_history_id) do
%{id: data_import_id} = %DB.DataImport{resource_history_id: resource_history_id} |> DB.Repo.insert!()
Expand Down Expand Up @@ -61,16 +35,16 @@ defmodule Transport.Jobs.GtfsToDB do
def stops_stream_insert(file_stream, data_import_id) do
DB.Repo.transaction(fn ->
file_stream
|> to_stream_of_maps()
|> Utils.to_stream_of_maps()
# the map is reshaped for Ecto's needs
|> Stream.map(fn r ->
%{
data_import_id: data_import_id,
stop_id: r |> Map.fetch!("stop_id"),
stop_name: r |> Map.fetch!("stop_name"),
stop_lat: r |> Map.fetch!("stop_lat") |> convert_text_to_float(),
stop_lon: r |> Map.fetch!("stop_lon") |> convert_text_to_float(),
location_type: r |> csv_get_with_default!("location_type", "0", false) |> String.to_integer()
stop_lat: r |> Utils.fetch_position("stop_lat"),
stop_lon: r |> Utils.fetch_position("stop_lon"),
location_type: r |> Utils.csv_get_with_default!("location_type", "0", false) |> String.to_integer()
}
end)
|> Stream.chunk_every(1000)
Expand All @@ -79,27 +53,6 @@ defmodule Transport.Jobs.GtfsToDB do
end)
end

@doc """
Transform the stream outputed by Unzip to a stream of maps, each map
corresponding to a row from the CSV.
"""
def to_stream_of_maps(file_stream) do
file_stream
# transform the stream to a stream of binaries
|> Stream.map(fn c -> IO.iodata_to_binary(c) end)
# stream line by line
|> NimbleCSV.RFC4180.to_line_stream()
|> NimbleCSV.RFC4180.parse_stream(skip_headers: false)
# transform the stream to a stream of maps %{column_name1: value1, ...}
|> Stream.transform([], fn r, acc ->
if acc == [] do
{%{}, r |> Enum.map(fn h -> h |> String.replace_prefix("\uFEFF", "") end)}
else
{[acc |> Enum.zip(r) |> Enum.into(%{})], acc}
end
end)
end

def fill_calendar_from_resource_history(resource_history_id, data_import_id) do
file_stream = file_stream(resource_history_id, "calendar.txt")
calendar_stream_insert(file_stream, data_import_id)
Expand All @@ -108,7 +61,7 @@ defmodule Transport.Jobs.GtfsToDB do
def calendar_stream_insert(file_stream, data_import_id) do
DB.Repo.transaction(fn ->
file_stream
|> to_stream_of_maps()
|> Utils.to_stream_of_maps()
|> Stream.map(fn r ->
res = %{
data_import_id: data_import_id,
Expand Down Expand Up @@ -155,7 +108,7 @@ defmodule Transport.Jobs.GtfsToDB do
DB.Repo.transaction(
fn ->
file_stream
|> to_stream_of_maps()
|> Utils.to_stream_of_maps()
|> Stream.map(fn r ->
%{
data_import_id: data_import_id,
Expand Down Expand Up @@ -209,7 +162,7 @@ defmodule Transport.Jobs.GtfsToDB do
DB.Repo.transaction(
fn ->
file_stream
|> to_stream_of_maps()
|> Utils.to_stream_of_maps()
|> Stream.map(fn r ->
%{
data_import_id: data_import_id,
Expand All @@ -235,7 +188,7 @@ defmodule Transport.Jobs.GtfsToDB do
DB.Repo.transaction(
fn ->
file_stream
|> to_stream_of_maps()
|> Utils.to_stream_of_maps()
|> Stream.map(fn r ->
%{
data_import_id: data_import_id,
Expand Down
47 changes: 39 additions & 8 deletions apps/transport/lib/netex/netex_archive_parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ defmodule Transport.NeTEx do
# Entry names ending with a slash `/` are directories. Skip them.
# https://github.com/akash-akya/unzip/blob/689a1ca7a134ab2aeb79c8c4f8492d61fa3e09a0/lib/unzip.ex#L69
String.ends_with?(file_name, "/") ->
[]
{:ok, []}

extension |> String.downcase() == ".zip" ->
raise "Insupported zip inside zip for file #{file_name}"
{:error, "Insupported zip inside zip for file #{file_name}"}

extension |> String.downcase() != ".xml" ->
raise "Insupported file extension (#{extension}) for file #{file_name}"
{:error, "Insupported file extension (#{extension}) for file #{file_name}"}

true ->
{:ok, state} =
parsing_result =
unzip
|> Unzip.file_stream!(file_name)
|> Stream.map(&IO.iodata_to_binary(&1))
Expand All @@ -42,7 +42,21 @@ defmodule Transport.NeTEx do
end
})

state.stop_places
case parsing_result do
{:ok, state} -> {:ok, state.stop_places}
{:error, exception} -> {:error, Exception.message(exception)}
{:halt, _state, _rest} -> {:error, "SAX parsing interrupted unexpectedly."}
end
end
end

@doc """
Like read_stop_places/2 but raises on errors.
"""
def read_stop_places!(%Unzip{} = unzip, file_name) do
case read_stop_places(unzip, file_name) do
{:ok, stop_places} -> stop_places
{:error, message} -> raise message
end
end

Expand All @@ -53,8 +67,14 @@ defmodule Transport.NeTEx do
zip_file = Unzip.LocalFile.open(zip_file_name)

try do
{:ok, unzip} = Unzip.new(zip_file)
cb.(unzip)
case Unzip.new(zip_file) do
{:ok, unzip} ->
cb.(unzip)

{:error, message} ->
Logger.error("Error while reading #{zip_file_name}: #{message}")
[]
end
after
Unzip.LocalFile.close(zip_file)
end
Expand All @@ -67,6 +87,17 @@ defmodule Transport.NeTEx do
See tests for actual output. Will be refactored soonish.
"""
def read_all_stop_places(zip_file_name) do
read_all(zip_file_name, &read_stop_places/2)
end

@doc """
Like read_all_stop_places/1 but raises on error.
"""
def read_all_stop_places!(zip_file_name) do
read_all(zip_file_name, &read_stop_places!/2)
end

defp read_all(zip_file_name, reader) do
with_zip_file_handle(zip_file_name, fn unzip ->
unzip
|> Unzip.list_entries()
Expand All @@ -75,7 +106,7 @@ defmodule Transport.NeTEx do

{
metadata.file_name,
read_stop_places(unzip, metadata.file_name)
reader.(unzip, metadata.file_name)
}
end)
end)
Expand Down
Loading
Loading