Skip to content

Commit

Permalink
Merge pull request #185 from centre-for-humanities-computing/data_wra…
Browse files Browse the repository at this point in the history
…ngling

Data Wrangling: Twitter, DaNews, DAGW
  • Loading branch information
KennethEnevoldsen authored Nov 16, 2023
2 parents 29aecb3 + 17319a2 commit bee7e3c
Show file tree
Hide file tree
Showing 5 changed files with 580 additions and 0 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ requires-python = ">=3.10"

dependencies = [
"pydantic>=2.4.2", # dolma does not work with very old versions of pydantic
"datasets>=2.4.0",
"dolma@git+https://github.com/allenai/dolma.git@5a010a2685914b1db7744426abfb4b9ece52da95", # Install from git until a 0.9.2 package is released
"kenlm>=0.2.0", # Used for perplexity tagging
"blingfire>=0.1.8", # Used for perplexity tagging
Expand Down
149 changes: 149 additions & 0 deletions src/dfm/projects/data_processing/convert_dagw_to_jsonlgz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""
downloads dataset and save it as jsonl.gz file with the format:
{
"id": "...", # MANDATORY: source-specific identifier
"text": "foo", # MANDATORY: textual content of the document
"source": "...", # MANDATORY: source of the data, such as peS2o, common-crawl, etc.
"added": "...", # OPTIONAL: timestamp ai2 acquired this data
"created": "..." # OPTIONAL: timestamp when orig document was created (best-guess if not available)
"metadata": {...} # OPTIONAL: source-specific metadata
}
"""

from datasets import Dataset, DatasetDict, load_dataset # type: ignore


def reformat_dataset(ds: Dataset, num_proc: int) -> Dataset:
# current keys: dict_keys(['text', 'source', 'doc_id', 'LICENSE', 'uri', 'date_built'])

# doc-id --> id
ds = ds.rename_column("doc_id", "id")
# date-built --> added
ds = ds.rename_column("date_built", "added")

source2domain = {
"retsinformationdk": "Legal",
"skat": "Legal",
"retspraksis": "Legal",
"hest": "Social Media",
"cc": "Web",
"adl": "Wiki & Books",
"botxt": "Other",
"danavis": "News",
"dannet": "dannet",
"depbank": "Other",
"ep": "Conversation",
"ft": "Conversation",
"gutenberg": "Wiki & Books",
"jvj": "Wiki & Books",
"naat": "Conversation",
"opensub": "Conversation",
"relig": "Wiki & Books",
"spont": "Conversation",
"synne": "Other",
"tv2r": "News",
"wiki": "Wiki & Books",
"wikibooks": "Wiki & Books",
"wikisource": "Wiki & Books",
"twfv19": "Social Media", # not present in this version of the dataset
}

# add domain
ds = ds.map( # type: ignore
lambda x: {"domain": source2domain[x["source"]]}, # type: ignore
num_proc=num_proc, # type: ignore
)

source2time = {
"retsinformationdk": "2000-01-01T00:00:00.000Z, 2022-01-01T00:00:00.000Z",
"skat": "2000-01-01T00:00:00.000Z, 2022-01-01T00:00:00.000Z",
"retspraksis": "2000-01-01T00:00:00.000Z, 2022-01-01T00:00:00.000Z",
"hest": "2000-01-01T00:00:00.000Z, 2022-01-01T00:00:00.000Z",
"cc": "2000-01-01T00:00:00.000Z, 2022-01-01T00:00:00.000Z",
"adl": "1700-01-01T00:00:00.000Z, 2022-01-01T00:00:00.000Z",
"botxt": "2000-01-01T00:00:00.000Z, 2022-01-01T00:00:00.000Z",
"danavis": "1999-01-01T00:00:00.000Z, 2004-01-01T00:00:00.000Z",
"dannet": "2000-01-01T00:00:00.000Z, 2022-01-01T00:00:00.000Z",
"depbank": "2000-01-01T00:00:00.000Z, 2022-01-01T00:00:00.000Z",
"ep": "2004-01-01T00:00:00.000Z, 2009-01-01T00:00:00.000Z",
"ft": "2009-01-01T00:00:00.000Z, 2019-01-01T00:00:00.000Z",
"gutenberg": "1700-01-01T00:00:00.000Z, 2022-01-01T00:00:00.000Z",
"jvj": "1873-01-01T00:00:00.000Z, 1951-01-01T00:00:00.000Z",
"naat": "1930-01-01T00:00:00.000Z, 2022-01-01T00:00:00.000Z",
"opensub": "2000-01-01T00:00:00.000Z, 2022-01-01T00:00:00.000Z",
"relig": "NA",
"spont": "2019-01-01T00:00:00.000Z, 2020-01-01T00:00:00.000Z",
"synne": "2000-01-01T00:00:00.000Z, 2022-01-01T00:00:00.000Z",
"tv2r": "2015-01-01T00:00:00.000Z, 2020-01-01T00:00:00.000Z",
"wiki": "2019-01-01T00:00:00.000Z, 2021-01-01T00:00:00.000Z",
"wikibooks": "2019-01-01T00:00:00.000Z, 2021-01-01T00:00:00.000Z",
"wikisource": "1700-01-01T00:00:00.000Z, 2022-01-01T00:00:00.000Z",
"twfv19": "2000-01-01T00:00:00.000Z, 2022-01-01T00:00:00.000Z", # not present in this version of the dataset
}

# add created
ds = ds.map(lambda x: {"created": source2time[x["source"]]}, num_proc=num_proc) # type: ignore

source2longname = {
"retsinformationdk": "retsinformation.dk (Danish legal information)",
"skat": "Skat (Danish tax authority)",
"retspraksis": "retspraksis (Danish legal information)",
"hest": "Hestenettet (Danish debate forum)",
"cc": "Common Crawl",
"adl": " Archive for Danish Literature",
"botxt": "Bornholmsk (Danish dialect)",
"danavis": "Danish daily newspapers",
"dannet": "DanNet (Danish WordNet)",
"depbank": "Danish Dependency Treebank",
"ep": "European Parliament",
"ft": "Folketinget (Danish Parliament)",
"gutenberg": "Gutenberg",
"jvj": "Johannes V. Jensen (Danish poet)",
"naat": "NAAT",
"opensub": "Open Subtitles",
"relig": "Religious texts",
"spont": "Spontaneous speech",
"synne": "Synderjysk (Danish dialect)",
"tv2r": "TV 2 Radio (Danish news)",
"wiki": "Wikipedia",
"wikibooks": "Wikibooks",
"wikisource": "Wikisource",
"twfv19": "Twitter Folketingsvalget 2019 (Danish election tweets)", # not present in this version of the dataset
}

# update source
ds = ds.map(lambda x: {"source": source2longname[x["source"]]}, num_proc=num_proc) # type: ignore

# move license, domain to metadata
ds = ds.map( # type: ignore
lambda x: {"metadata": {"license": x["LICENSE"], "domain": x["domain"]}}, # type: ignore
num_proc=num_proc,
)
ds = ds.remove_columns(["LICENSE", "domain", "uri"])
return ds


def main():
num_proc = 2
ds: DatasetDict = load_dataset("DDSC/partial-danish-gigaword-no-twitter") # type: ignore
ds: Dataset = ds["train"] # type: ignore

# reformat
ds = reformat_dataset(ds, num_proc=num_proc)

# save to jsonl.gz
ds.to_json("data.jsonl.gz", orient="records", lines=True, compression="gzip") # type: ignore


if __name__ == "__main__":
main()

# # test that it load back in
ds = load_dataset("json", data_files="data.jsonl.gz", split="train")
assert isinstance(ds[0], dict) # type: ignore

# test that it can be streamed
ds = load_dataset("json", data_files="data.jsonl.gz", split="train", streaming=True)
example = next(iter(ds)) # type: ignore
assert isinstance(example, dict)
112 changes: 112 additions & 0 deletions src/dfm/projects/data_processing/convert_danews_to_jsonlgz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""
Converts infomedia to a .jsonl.gz file with the format:
{
"id": "...", # MANDATORY: source-specific identifier
"text": "foo", # MANDATORY: textual content of the document
"source": "...", # MANDATORY: source of the data, such as peS2o, common-crawl, etc.
"added": "...", # OPTIONAL: timestamp ai2 acquired this data
"created": "..." # OPTIONAL: timestamp when orig document was created (best-guess if not available)
"metadata": {...} # OPTIONAL: source-specific metadata
}
"""

from pathlib import Path
from typing import Any

from datasets import Dataset, load_dataset # type: ignore


def create_text(example: dict[str, Any]) -> dict[str, Any]:
"""
create the full text from the different text fields
"""
text: str = ""
text += f"# {example['Heading']}\n\n"
if example["SubHeading"]:
text += f"## {example['SubHeading']}\n\n"

if example["Lead"]:
text += example["Lead"] + "\n" # type: ignore
text += example["BodyText"] + "\n" # type: ignore
text = text.strip() # type: ignore

example["text"] = text
return example # type: ignore


def create_metadata(example: dict[str, Any]) -> dict[str, Any]:
"""
create the metadata from the different fields
"""

metadata = {
"article_url": example["ArticleUrl"],
"authors": example["Authors"],
"source": example["Source"],
"word_count": example["WordCount"],
"page_ids": example["PageIds"],
"section": example["Section"],
}
example["metadata"] = metadata
return example


def reformat_dataset(ds: Dataset, num_proc: int = 8) -> Dataset:
ds = ds.map(create_text, num_proc=num_proc) # type: ignore
ds = ds.map(lambda x: {"source": "infomedia"}, num_proc=num_proc) # type: ignore # noqa: ARG005
ds = ds.map(lambda x: {"added": "2022-10-24T00:00:00.000Z"}, num_proc=num_proc) # type: ignore # noqa: ARG005
ds = ds.map(create_metadata, num_proc=num_proc) # type: ignore
ds = ds.map(lambda x: {"id": x["ArticleId"]}, num_proc=num_proc) # type: ignore
ds = ds.map(lambda x: {"created": x["PublishDate"]}, num_proc=num_proc) # type: ignore

# remove unnecessary columns
ds = ds.remove_columns(
[
"ArticleUrl",
"Heading",
"SubHeading",
"Lead",
"Paragraph",
"PublishDate",
"BodyText",
"Captions",
"Authors",
"Source",
"WordCount",
"ArticleId",
"PageIds",
"Section",
],
)
return ds


def main():
path = Path("/work/845878/raw_datasets/hope-infomedia/yearly")
files = [str(p) for p in path.glob("*.ndjson")]
save_path = "dfm-data/v3.0.0/danews/data.jsonl.gz"

ds: Dataset = load_dataset("json", data_files=files, split="train") # type: ignore
# current keys are:
# dict_keys(['ArticleUrl', 'Heading', 'SubHeading', 'Lead', 'Paragraph', 'PublishDate', 'BodyText', 'Captions', 'Authors', 'Source', 'WordCount', 'ArticleId', 'PageIds', 'Section']) # noqa

# ids in file are not are not unique -->
# create new id:
ids = list(range(len(ds)))
ds = ds.add_column("id", ids) # type: ignore

# reformat dataset
ds = reformat_dataset(ds) # type: ignore

# save
ds.to_json( # type: ignore
save_path,
orient="records",
lines=True,
compression="gzip",
)


if __name__ == "__main__":
main()
Loading

0 comments on commit bee7e3c

Please sign in to comment.