Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - AMB-452 Coalesce futures order book events #5

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions marketdata/futures_order_book_event/coalesce-daily/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
### RUN PARAMS

``./coalesce_pair_day.sh {START_PAIR} {END_PAIR} {VERIFY_IF_SUCCESS_FILE} {START_DATE} {END_DATE} {SOURCE_FILE} {DESTINATION_FILE}
``

### EXAMPLE

``./coalesce_pair_day.sh 10000NFTUSDT 10000NFTUSDT true 2022-01-27 2022-01-27 s3://amberdata-marketdata/futures_order_book_event s3://amberdata-efra/futures_order_book_event``
87 changes: 87 additions & 0 deletions marketdata/futures_order_book_event/coalesce-daily/coalesce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
import glob
import json
import gzip
import time
import sys
import re


def is_csv_gzip(_file_path):
return _file_path.endswith("csv.gz")


def add_exchange(file_path):
try:
matches = list(re.finditer(r"((?:.(?!\/))+$)", file_path))
file_name = matches[0].group()
return re.match(r"\/(.*?)-", file_name).group(1)
except Exception as e:
print(f"file: {file_path}", file=sys.stderr)


def convert_line_from_csv_gz(_line, file_path):
# TODO: Verify gz files in futures
[timestamp, timestampNanoseconds, tradeId, price, size, isBuySide] = _line.decode('UTF-8').strip().split(',')
return {
"exchange": add_exchange(file_path),
"timestamp": int(timestamp),
"timestampNanoseconds": int(timestampNanoseconds),
"tradeId": int(tradeId),
"size": float(size),
"price": float(price),
"isBuySide": bool(isBuySide),
"filePath": file_path,
}


def process_pair_day(output_path, file_paths):

o_path = f"{output_path}/out"

with open(o_path, "w") as o:

json_paths = [file_path for file_path in file_paths if not is_csv_gzip(file_path)]

for json_path in json_paths:
with open(json_path, 'r') as f:
for i, line in enumerate(f):
try:
result = {**json.loads(line.strip()), "filePath": json_path}
o.write(f"{json.dumps(result, separators=(',', ':'))}\n")
except Exception as e:
print(f"file: {json_path}", file=sys.stderr)
print(f"line_index: {i}", file=sys.stderr)
print(f"line: {line}", file=sys.stderr)

csv_gz_paths = [file_path for file_path in file_paths if is_csv_gzip(file_path)]

for csv_gz_path in csv_gz_paths:
with gzip.open(csv_gz_path, 'rb') as f:
for i, line in enumerate(f):
try:
result = convert_line_from_csv_gz(line, csv_gz_path)
o.write(f"{json.dumps(result, separators=(',', ':'))}\n")
except Exception as e:
print(f"file: {csv_gz_path}", file=sys.stderr)
print(f"line_index: {i}", file=sys.stderr)
print(f"line: {line}", file=sys.stderr)

if __name__ == "__main__":

PAIR = sys.argv[1]
DATE = sys.argv[2]
DATA_DIR = sys.argv[3]
OUTPUT_DIR=sys.argv[4]

start = int(time.time())

output_path = f"{OUTPUT_DIR}/"
file_paths = glob.glob(f"{DATA_DIR}/*/*")

process_pair_day(output_path, file_paths)

end = int(time.time())

with open(f"{output_path}/_SUCCESS", "w") as f:
f.write(f"time: {end-start}\n")

144 changes: 144 additions & 0 deletions marketdata/futures_order_book_event/coalesce-daily/coalesce.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@

#!/bin/bash

####################################################################################

PAIR=$1
DATE=$2
IGNORE_SUCCESS=$3
DATA_PATH=$4
OUTPUT_PATH=$5

####################################################################################

S3_DIR="$OUTPUT_PATH/$DATE/$PAIR"

####################################################################################

echo "$PAIR $DATE $IGNORE_SUCCESS"

####################################################################################

if [[ "${IGNORE_SUCCESS}" == "false" ]];
then
echo " CHECKING SUCCESS FLAG..."
SUCCESS=$(aws s3 ls "${S3_DIR}/_SUCCESS" | wc -l)
if [ "$SUCCESS" -ge 1 ];
then
echo " SUCCESS FILE PRESENT. SKIPPING..."
exit
else
echo " SUCCESS FILE NOT PRESENT..."
fi
fi

####################################################################################

# Adjust this path. Local root path where all tmp files are stored.
TMP_DIR_SUB="./tmp/$DATE"
TMP_DIR="$TMP_DIR_SUB/$PAIR"
OUT="$TMP_DIR/_OUT"
ERR="$TMP_DIR/_ERR"

# Downloaded input files are stored here.
DATA_DIR="$TMP_DIR/data"
# Coalesced output files are stored here.
OUTPUT_DIR="$TMP_DIR/output"

####################################################################################

mkdir -p "${OUTPUT_DIR}"
mkdir -p "${DATA_DIR}"
rm -f "${S3_OUT}" && rm -f "${OUT}" && rm -f "${ERR}"

####################################################################################

echo " DOWNLOADING FILES..."

start=`date +%s.%N`

SOURCE="$DATA_PATH/$PAIR/$DATE/"
DEST="${DATA_DIR}/"

aws s3 sync "${SOURCE}" "${DEST}" >> "${OUT}" 2>> "${ERR}"

end=`date +%s.%N`

runtime=$( echo "$end - $start" | bc -l )

echo " Runtime: ${runtime}"; echo "";

####################################################################################

# Fix some files format

find "$DATA_DIR" ! -name '*\.gz' -type f -exec sed -i 's/}{/}\n{/g' {} +

####################################################################################

echo " COALESCING FILES..."

start=`date +%s.%N`

python3 coalesce.py "${PAIR}" "${DATE}" "${DATA_DIR}" "${OUTPUT_DIR}" >> "${OUT}" 2>> "${ERR}"

echo "" >> "${OUT}" && echo "" >> "${ERR}"

end=`date +%s.%N`

runtime=$( echo "$end - $start" | bc -l )

echo " Runtime: ${runtime}"; echo "";

####################################################################################

echo " COPYING LOGS..."

start=`date +%s.%N`

cp "${OUT}" "${OUTPUT_DIR}"

ERR_LINES=$(cat "${ERR}" | wc -l )

[ "$ERR_LINES" -ge 2 ] && cp "${ERR}" "${OUTPUT_DIR}"

end=`date +%s.%N`

runtime=$( echo "$end - $start" | bc -l )

echo " Runtime: ${runtime}"; echo "";

####################################################################################

echo " SYNCING COALESCED FILES WITH S3..."

start=`date +%s.%N`

SOURCE=$OUTPUT_DIR
DEST="${S3_DIR}/"

aws s3 sync "${SOURCE}" "${DEST}" >> "${OUT}" 2>> "${ERR}"

end=`date +%s.%N`

runtime=$( echo "$end - $start" | bc -l )

echo " Runtime: ${runtime}"; echo "";

####################################################################################

echo " REMOVING DOWNLOADED AND COALESCED FILES..."

start=`date +%s.%N`

rm -rf "${TMP_DIR}"

rm -rf "${TMP_DIR_SUB}"

end=`date +%s.%N`

runtime=$( echo "$end - $start" | bc -l )

echo " Runtime: ${runtime}"; echo "";

####################################################################################
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/bin/bash

####################################################################################

START_PAIR=$1
END_PAIR=$2
IGNORE_SUCCESS=$3
START_DATE=$4
END_DATE=$5
DATA_PATH=$6
OUTPUT_PATH=$7

####################################################################################

aws s3 ls "${DATA_PATH}/" | sed -e 's# *PRE \([^/]*\)/#\1#' | sort | while read -r PAIR
do
# Filter pairs outside of range
if [[ "${PAIR}" < "${START_PAIR}" ]] || [[ "${PAIR}" > "${END_PAIR}" ]];
then
continue
fi

start=`date +%s.%N`

DATES_PATH="${DATA_PATH}/${PAIR}/"

DATES=( $(
aws s3 ls "${DATES_PATH}" | awk '{ print $2 }' | sed 's/.$//' | sort -n -r | while read -r dt
do
[[ ! "${dt}" < "${START_DATE}" ]] && [[ ! "${dt}" > "${END_DATE}" ]] && echo "${dt}"
done
) )

if [ -z "${DATES}" ];
then
continue
fi

parallel ./coalesce.sh "\"$PAIR\"" {} $IGNORE_SUCCESS "$DATA_PATH" "$OUTPUT_PATH" ::: ${DATES[@]}

end=`date +%s.%N`

runtime=$( echo "$end - $start" | bc -l )

echo "DONE. Runtime: ${runtime}"; echo "";
done