Skip to content

Commit

Permalink
Consolidation BNLC : ajout dataset_id et resource_id (#3623)
Browse files Browse the repository at this point in the history
  • Loading branch information
AntoineAugusti authored Nov 24, 2023
1 parent 1639696 commit bafe2c2
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 44 deletions.
43 changes: 33 additions & 10 deletions apps/transport/lib/jobs/consolidate_bnlc_job.ex
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,24 @@ defmodule Transport.Jobs.ConsolidateBNLCJob do
|> hd()
end

@doc """
The CSV columns we will have in the final file.
We:
- add a `id_lieu` column at the beginning
- remove `id_lieu` if it was present in the BNLC file hosted on GitHub
- add `dataset_id` and `resource_id` to know for each line the associated resource/dataset
on data.gouv.fr
iex> final_csv_headers(["foo", "bar", "id_lieu"])
["id_lieu", "foo", "bar", "dataset_id", "resource_id"]
"""
def final_csv_headers(bnlc_headers) do
# In the 0.2.8 schema version the `id_lieu` column was present.
# https://schema.data.gouv.fr/etalab/schema-lieux-covoiturage/0.2.8/documentation.html
# Starting with 0.3.0 `id_lieu` should not be present in the files
# we consolidate as we add it ourselves with `add_columns/2`
["id_lieu"] ++ Enum.reject(bnlc_headers, &(&1 == "id_lieu")) ++ ["dataset_id", "resource_id"]
end

@doc """
Given a list of resources, previously prepared by `download_resources/1`,
creates the BNLC final file and write on the local disk at `@bnlc_path`.
Expand All @@ -296,30 +314,31 @@ defmodule Transport.Jobs.ConsolidateBNLCJob do
def consolidate_resources(resources_details) do
file = File.open!(@bnlc_path, [:write, :utf8])
bnlc_headers = bnlc_csv_headers()
# In the 0.2.8 schema version the `id_lieu` column was present.
# https://schema.data.gouv.fr/etalab/schema-lieux-covoiturage/0.2.8/documentation.html
# Starting with 0.3.0 `id_lieu` should not be present in the files
# we consolidate as we add it ourselves with `add_id_lieu_column/1`
final_headers = ["id_lieu"] ++ Enum.reject(bnlc_headers, &(&1 == "id_lieu"))
final_headers = final_csv_headers(bnlc_headers)

%HTTPoison.Response{body: body, status_code: 200} = @bnlc_github_url |> http_client().get!()

# Write first the header + content of the BNLC hosted on GitHub
[body]
|> CSV.decode!(field_transform: &String.trim/1, headers: bnlc_headers)
|> Stream.drop(1)
|> add_id_lieu_column()
# Magic `dataset_id` and `resource_id` values for the BNLC file hosted
# on GitHub as this is the only file not hosted/referenced on GitHub.
|> add_columns(%{dataset: %{"id" => "bnlc_github"}, resource: %{"id" => "bnlc_github"}})
|> CSV.encode(headers: final_headers)
|> Enum.each(&IO.write(file, &1))

# Append other valid resources to the file
Enum.each(resources_details, fn {_dataset_detail, %{@download_path_key => tmp_path, @separator_key => separator}} ->
Enum.each(resources_details, fn {
dataset_details,
%{@download_path_key => tmp_path, @separator_key => separator} = resource_details
} ->
tmp_path
|> File.stream!()
|> CSV.decode!(headers: true, field_transform: &String.trim/1, separator: separator)
# Keep only columns that are present in the BNLC, ignore extra columns
|> Stream.filter(&Map.take(&1, bnlc_headers))
|> add_id_lieu_column()
|> add_columns(%{dataset: dataset_details, resource: resource_details})
|> CSV.encode(headers: final_headers)
# Don't write the CSV header again each time, it has already been written
# because the BNLC is first in the file
Expand All @@ -336,9 +355,13 @@ defmodule Transport.Jobs.ConsolidateBNLCJob do
Generate it by concatenating values found in each file: insee + id_local
"""
def add_id_lieu_column(%Stream{} = stream) do
def add_columns(%Stream{} = stream, %{dataset: %{"id" => dataset_id}, resource: %{"id" => resource_id}}) do
Stream.map(stream, fn %{"insee" => insee, "id_local" => id_local} = map ->
Map.put(map, "id_lieu", "#{insee}-#{id_local}")
Map.merge(map, %{
"id_lieu" => "#{insee}-#{id_local}",
"dataset_id" => dataset_id,
"resource_id" => resource_id
})
end)
end

Expand Down
86 changes: 52 additions & 34 deletions apps/transport/test/transport/jobs/consolidate_bnlc_job_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -306,23 +306,25 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do
dataset_detail = %{
"resources" => [
resource = %{
"id" => Ecto.UUID.generate(),
"id" => resource_id = Ecto.UUID.generate(),
"schema" => %{"name" => @target_schema},
"url" => url = "https://example.com/file.csv"
}
],
"slug" => "foo"
"slug" => "foo",
"id" => dataset_id = Ecto.UUID.generate()
}

other_dataset_detail = %{
"resources" => [
other_resource = %{
"id" => Ecto.UUID.generate(),
"id" => other_resource_id = Ecto.UUID.generate(),
"schema" => %{"name" => @target_schema},
"url" => other_url = "https://example.com/other_file.csv"
}
],
"slug" => "bar"
"slug" => "bar",
"id" => other_dataset_id = Ecto.UUID.generate()
}

Transport.HTTPoison.Mock
Expand Down Expand Up @@ -374,47 +376,59 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do
"baz" => "CSV",
"insee" => "21231",
"id_local" => "5",
"id_lieu" => "21231-5"
"id_lieu" => "21231-5",
"dataset_id" => "bnlc_github",
"resource_id" => "bnlc_github"
},
%{
"foo" => "Very",
"bar" => "Much",
"baz" => "So",
"insee" => "21231",
"id_local" => "6",
"id_lieu" => "21231-6"
"id_lieu" => "21231-6",
"dataset_id" => "bnlc_github",
"resource_id" => "bnlc_github"
},
%{
"foo" => "1",
"bar" => "2",
"baz" => "3",
"insee" => "21231",
"id_local" => "1",
"id_lieu" => "21231-1"
"id_lieu" => "21231-1",
"dataset_id" => dataset_id,
"resource_id" => resource_id
},
%{
"foo" => "4",
"bar" => "5",
"baz" => "6",
"insee" => "21231",
"id_local" => "2",
"id_lieu" => "21231-2"
"id_lieu" => "21231-2",
"dataset_id" => dataset_id,
"resource_id" => resource_id
},
%{
"foo" => "a",
"bar" => "b",
"baz" => "c",
"insee" => "21231",
"id_local" => "3",
"id_lieu" => "21231-3"
"id_lieu" => "21231-3",
"dataset_id" => other_dataset_id,
"resource_id" => other_resource_id
},
%{
"foo" => "d",
"bar" => "e",
"baz" => "f",
"insee" => "21231",
"id_local" => "4",
"id_lieu" => "21231-4"
"id_lieu" => "21231-4",
"dataset_id" => other_dataset_id,
"resource_id" => other_resource_id
}
] == @tmp_path |> File.stream!() |> CSV.decode!(headers: true) |> Enum.to_list()

Expand All @@ -423,14 +437,14 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do
# We could change to just a newline, using the `delimiter` option:
# https://hexdocs.pm/csv/CSV.html#encode/2
assert """
id_lieu,foo,bar,baz,insee,id_local\r
21231-5,I,Love,CSV,21231,5\r
21231-6,Very,Much,So,21231,6\r
21231-1,1,2,3,21231,1\r
21231-2,4,5,6,21231,2\r
21231-3,a,b,c,21231,3\r
21231-4,d,e,f,21231,4\r
""" = File.read!(@tmp_path)
id_lieu,foo,bar,baz,insee,id_local,dataset_id,resource_id\r
21231-5,I,Love,CSV,21231,5,bnlc_github,bnlc_github\r
21231-6,Very,Much,So,21231,6,bnlc_github,bnlc_github\r
21231-1,1,2,3,21231,1,#{dataset_id},#{resource_id}\r
21231-2,4,5,6,21231,2,#{dataset_id},#{resource_id}\r
21231-3,a,b,c,21231,3,#{other_dataset_id},#{other_resource_id}\r
21231-4,d,e,f,21231,4,#{other_dataset_id},#{other_resource_id}\r
""" == File.read!(@tmp_path)

# Temporary files have been removed
[{_, r1}, {_, r2}] = res
Expand All @@ -455,11 +469,12 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do
)

foo_dataset_response = %{
"id" => foo_dataset_id = Ecto.UUID.generate(),
"slug" => "foo",
"resources" => [
%{
"schema" => %{"name" => @target_schema},
"id" => Ecto.UUID.generate(),
"id" => foo_resource_id = Ecto.UUID.generate(),
"url" => foo_url = "https://example.com/foo.csv"
},
# Should be ignored, irrelevant resource
Expand All @@ -472,11 +487,12 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do
}

bar_dataset_response = %{
"id" => bar_dataset_id = Ecto.UUID.generate(),
"slug" => "bar",
"resources" => [
%{
"schema" => %{"name" => @target_schema},
"id" => Ecto.UUID.generate(),
"id" => bar_resource_id = Ecto.UUID.generate(),
"url" => bar_url = "https://example.com/bar.csv"
}
]
Expand Down Expand Up @@ -551,13 +567,13 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do

# CSV content is fine
assert """
id_lieu,foo,bar,baz,insee,id_local\r
21231-4,I,Love,CSV,21231,4\r
21231-5,Very,Much,So,21231,5\r
21231-1,a,b,c,21231,1\r
21231-2,d,e,f,21231,2\r
21231-3,1,2,3,21231,3\r
""" = File.read!(@tmp_path)
id_lieu,foo,bar,baz,insee,id_local,dataset_id,resource_id\r
21231-4,I,Love,CSV,21231,4,bnlc_github,bnlc_github\r
21231-5,Very,Much,So,21231,5,bnlc_github,bnlc_github\r
21231-1,a,b,c,21231,1,#{foo_dataset_id},#{foo_resource_id}\r
21231-2,d,e,f,21231,2,#{foo_dataset_id},#{foo_resource_id}\r
21231-3,1,2,3,21231,3,#{bar_dataset_id},#{bar_resource_id}\r
""" == File.read!(@tmp_path)
end

test "stops when the schema validator is down" do
Expand Down Expand Up @@ -625,11 +641,12 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do
)

foo_dataset_response = %{
"id" => foo_dataset_id = Ecto.UUID.generate(),
"slug" => "foo",
"resources" => [
%{
"schema" => %{"name" => @target_schema},
"id" => Ecto.UUID.generate(),
"id" => foo_resource_id = Ecto.UUID.generate(),
"url" => foo_url = "https://example.com/foo.csv"
},
# Should be ignored, irrelevant resource
Expand All @@ -642,6 +659,7 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do
}

bar_dataset_response = %{
"id" => Ecto.UUID.generate(),
"slug" => "bar",
"title" => "Bar JDD",
"page" => "https://data.gouv.fr/bar",
Expand Down Expand Up @@ -730,12 +748,12 @@ defmodule Transport.Test.Transport.Jobs.ConsolidateBNLCJobTest do
expect_job_scheduled_to_remove_file()

assert """
id_lieu,foo,bar,baz,insee,id_local\r
21231-3,I,Love,CSV,21231,3\r
21231-4,Very,Much,So,21231,4\r
21231-1,a,b,c,21231,1\r
21231-2,d,e,f,21231,2\r
""" = File.read!(@tmp_path)
id_lieu,foo,bar,baz,insee,id_local,dataset_id,resource_id\r
21231-3,I,Love,CSV,21231,3,bnlc_github,bnlc_github\r
21231-4,Very,Much,So,21231,4,bnlc_github,bnlc_github\r
21231-1,a,b,c,21231,1,#{foo_dataset_id},#{foo_resource_id}\r
21231-2,d,e,f,21231,2,#{foo_dataset_id},#{foo_resource_id}\r
""" == File.read!(@tmp_path)
end
end

Expand Down

0 comments on commit bafe2c2

Please sign in to comment.