-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update tap with fastsync #8
base: release
Are you sure you want to change the base?
Changes from 55 commits
d91d69f
6a04e76
9971f0c
bc17b43
15d7ef1
e3be333
a575a15
4e05478
4eb1df3
03f592c
26f4882
e84016c
866da9c
d5c0f32
8fa8f2c
0b9e0c3
10b60b3
730c1b7
96bd429
4d3d559
08b81c4
07fb8b7
25c1020
27e5e59
81a3e8b
c02a49b
acacfcd
8e96274
dec761e
98d0994
0b632f9
303a51b
b158d02
a851137
cd169ef
1ac0851
2c12035
9d5a7f6
9590f3b
c2355b5
7c087d1
3266a4d
1721c8c
3d49e18
a514b91
8d4665a
0c19653
8a96bbe
411749f
1c1b38d
16e0c84
d0431a1
5594a17
edc7799
5c62571
c718040
592f78f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,11 +2,19 @@ | |
# pylint: disable=too-many-arguments,duplicate-code,too-many-locals | ||
|
||
import copy | ||
import csv | ||
import datetime | ||
import glob | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think any of these imports are being used here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed |
||
import multiprocessing | ||
import os | ||
import secrets | ||
import singer | ||
import string | ||
import time | ||
import uuid | ||
|
||
from . import split_gzip | ||
|
||
import singer.metrics as metrics | ||
from singer import metadata | ||
from singer import utils | ||
|
@@ -87,16 +95,26 @@ def get_key_properties(catalog_entry): | |
return key_properties | ||
|
||
|
||
def generate_select_sql(catalog_entry, columns): | ||
def generate_select_sql(catalog_entry, columns, fastsync=False): | ||
database_name = get_database_name(catalog_entry) | ||
escaped_db = escape(database_name) | ||
escaped_table = escape(catalog_entry.table) | ||
escaped_columns = [escape(c) for c in columns] | ||
|
||
select_sql = "SELECT {} FROM {}.{}".format( | ||
",".join(escaped_columns), escaped_db, escaped_table | ||
) | ||
if fastsync: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we consider moving this logic to a new folder called This isn't essential, just an idea to keep fast sync separate from other methods already being used in other replication types (like incremental) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh yeah good call. I planned on having a condition within full_table.sync_table() that would call one or the other, so that was something i meant to do but forgot to make note of. This is updated There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oops, i think i read 'folder' as 'function' lol |
||
time_extracted = utils.now() | ||
_sdc_extracted_at = f"'{time_extracted}' as _SDC_EXTRACTED_AT" | ||
_sdc_deleted_at = "NULL as _SDC_DELETED_AT" | ||
_sdc_batched_at = f"'{time_extracted}' as _SDC_BATCHED_AT" | ||
|
||
select_sql = "SELECT {}, {}, {}, {} FROM {}.{}".format( | ||
",".join(escaped_columns), _sdc_extracted_at, _sdc_deleted_at, _sdc_batched_at, escaped_db, escaped_table | ||
) | ||
else: | ||
select_sql = "SELECT {} FROM {}.{}".format( | ||
",".join(escaped_columns), escaped_db, escaped_table | ||
) | ||
|
||
# escape percent signs | ||
select_sql = select_sql.replace("%", "%%") | ||
return select_sql | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,9 +2,18 @@ | |
# pylint: disable=duplicate-code,too-many-locals,simplifiable-if-expression | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add a Fast Sync section to the readme.md? There are a few required config changes like - "fastsync_batch_rows" that should be called out. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
|
||
import copy | ||
import csv | ||
import datetime | ||
import json | ||
import pandas as pd | ||
import os | ||
import secrets | ||
import singer | ||
from singer import metadata | ||
|
||
from singer import utils | ||
import singer.metrics as metrics | ||
import string | ||
import sys | ||
import tap_mssql.sync_strategies.common as common | ||
|
||
from tap_mssql.connection import ( | ||
|
@@ -16,7 +25,6 @@ | |
|
||
LOGGER = singer.get_logger() | ||
|
||
|
||
def generate_bookmark_keys(catalog_entry): | ||
md_map = metadata.to_map(catalog_entry.metadata) | ||
stream_metadata = md_map.get((), {}) | ||
|
@@ -30,9 +38,20 @@ def generate_bookmark_keys(catalog_entry): | |
} | ||
|
||
bookmark_keys = base_bookmark_keys | ||
|
||
return bookmark_keys | ||
|
||
def write_dataframe_record(row, catalog_entry, stream_version, columns, table_stream, time_extracted): | ||
|
||
rec = row.to_dict() | ||
|
||
record_message = singer.RecordMessage( | ||
stream=table_stream, | ||
record=rec, | ||
version=stream_version, | ||
time_extracted=time_extracted, | ||
) | ||
|
||
singer.write_message(record_message) | ||
|
||
def sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version): | ||
mssql_conn = get_azure_sql_engine(config) | ||
|
@@ -62,31 +81,99 @@ def sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version | |
): | ||
singer.write_message(activate_version_message) | ||
|
||
#TODO: Update code: If fastsync_batch_rows is not set, use old non fastsync method | ||
with mssql_conn.connect() as open_conn: | ||
LOGGER.info("Generating select_sql") | ||
select_sql = common.generate_select_sql(catalog_entry, columns) | ||
|
||
params = {} | ||
|
||
if catalog_entry.tap_stream_id == "dbo-InputMetadata": | ||
prev_converter = modify_ouput_converter(open_conn) | ||
|
||
common.sync_query( | ||
open_conn, | ||
catalog_entry, | ||
state, | ||
select_sql, | ||
columns, | ||
stream_version, | ||
table_stream, | ||
params, | ||
) | ||
columns.sort() | ||
select_sql = common.generate_select_sql(catalog_entry, columns, fastsync=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to always force full table syncs to use fast sync? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we consider leaving the sync_table function as is and just building another fast_sync_table function in a new This way we keep the tap with either the original full sync logic or give the option for fast sync. Also this keeps fast sync logic separate from everything else. |
||
|
||
columns.extend(['_SDC_EXTRACTED_AT','_SDC_DELETED_AT','_SDC_BATCHED_AT']) | ||
|
||
query_df = df = pd.DataFrame(columns=columns) #TODO: delete? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need Pandas here? This can slow down performance and may not be needed if the goal is to just create a csv. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this works as is right now and will require rewriting/testing this, are you cool with leaving it as pandas and I can create another ticket in the R&R epic for updating this method? If its best to get it switched to the most efficient asap, then i can make time this week to update this last part, but if its not pressing then i can do the ticket and get back to it. |
||
time_extracted = utils.now() #TODO: delete? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is needed anymore. |
||
|
||
conn = mssql_conn.connect().execution_options(stream_results=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just a question, but is this step necessary? It looks like you already have a connection opened in line 85 |
||
|
||
csv_saved = 0 | ||
|
||
chunk_size = config.get("fastsync_batch_rows") #TODO: update this so that its not required (if not set, fastsync disabled) | ||
files = [] | ||
for chunk_dataframe in pd.read_sql(select_sql, conn, chunksize=chunk_size): | ||
csv_saved += 1 | ||
|
||
filename = gen_export_filename(table=table_stream) | ||
filepath = os.path.join('fastsync', filename) | ||
chunk_dataframe.to_csv(f'{filepath}', sep=',', encoding='utf-8',index=False,header=False, compression='gzip') | ||
|
||
files.append(filename) | ||
|
||
# creating singer-like record to signify FASTSYNC for initial sync | ||
singer_message = {'type': 'FASTSYNC','stream':table_stream, 'version': stream_version, 'files':files } | ||
LOGGER.info(singer_message) | ||
json_object = json.dumps(singer_message) | ||
sys.stdout.write(str(json_object) + '\n') | ||
sys.stdout.flush() | ||
|
||
if catalog_entry.tap_stream_id == "dbo-InputMetadata": | ||
revert_ouput_converter(open_conn, prev_converter) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Delete blank line There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed |
||
|
||
# clear max pk value and last pk fetched upon successful sync | ||
singer.clear_bookmark(state, catalog_entry.tap_stream_id, "max_pk_values") | ||
singer.clear_bookmark(state, catalog_entry.tap_stream_id, "last_pk_fetched") | ||
|
||
singer.write_message(activate_version_message) | ||
|
||
|
||
brose7230 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
def generate_random_string(length: int = 8) -> str: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could be another function that makes sense to move to a file in a new |
||
""" | ||
Generate cryptographically secure random strings | ||
Uses best practice from Python doc https://docs.python.org/3/library/secrets.html#recipes-and-best-practices | ||
Args: | ||
length: length of the string to generate | ||
Returns: random string | ||
""" | ||
|
||
if length < 1: | ||
raise Exception('Length must be at least 1!') | ||
|
||
if 0 < length < 8: | ||
LOGGER.info('Length is too small! consider 8 or more characters') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit, but this could be upgraded to a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. updated |
||
|
||
return ''.join( | ||
secrets.choice(string.ascii_uppercase + string.digits) for _ in range(length) | ||
) | ||
|
||
def gen_export_filename( | ||
table: str, suffix: str = None, postfix: str = None, ext: str = None | ||
) -> str: | ||
""" | ||
Generates a unique filename used for exported fastsync data that avoids file name collision | ||
Default pattern: | ||
pipelinewise_<tap_id>_<table>_<timestamp_with_ms>_fastsync_<random_string>.csv.gz | ||
Args: | ||
tap_id: Unique tap id | ||
table: Name of the table to export | ||
suffix: Generated filename suffix. Defaults to current timestamp in milliseconds | ||
postfix: Generated filename postfix. Defaults to a random 8 character length string | ||
ext: Filename extension. Defaults to .csv.gz | ||
Returns: | ||
Unique filename as a string | ||
""" | ||
if not suffix: | ||
suffix = datetime.datetime.now().strftime('%Y%m%d-%H%M%S-%f') | ||
|
||
if not postfix: | ||
postfix = generate_random_string() | ||
|
||
if not ext: | ||
ext = 'csv.gz' | ||
|
||
return 'pipelinewise_{}_{}_batch_{}.{}'.format( | ||
table, suffix, postfix, ext | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,172 @@ | ||
"""Functions that write chunked gzipped files.""" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It may make sense to move this file up one level out of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i used that earlier on during the fastsync work and it is no longer needed so i trashed it in the latest push |
||
import io | ||
import logging | ||
import gzip | ||
import builtins | ||
|
||
LOGGER = logging.getLogger(__name__) | ||
|
||
|
||
DEFAULT_CHUNK_SIZE_MB = 1000 | ||
DEFAULT_MAX_CHUNKS = 20 | ||
|
||
# Detecting compressed file size at write time is not possible by GzipFile. | ||
# The data hase to be written into the file first before the actual compression performed. | ||
# We need to use a good estimate for a text gzip file to split the file at write time. | ||
EST_COMPR_RATE = 0.12 | ||
|
||
|
||
# pylint: disable=W0622,R1732 | ||
def open( | ||
base_filename, | ||
mode='wb', | ||
chunk_size_mb=None, | ||
max_chunks=None, | ||
est_compr_rate=None, | ||
compress=True, | ||
): | ||
"""Open a gzip-compressed file in binary or text mode. | ||
Args: | ||
base_filename: Path where to create the zip file(s) with the exported data. | ||
Dynamic chunk numbers are appended to the base_filename | ||
mode: "wb" or "wt". (Default: wb) | ||
chunk_size_mb: File chunk sizes. (Default: 1000) | ||
max_chunks: Max number of chunks. If set to 0 then splitting is disabled and one single | ||
file will be created (Default: 20) | ||
est_compr_rate: Detecting compressed file size at write time is not possible by GzipFile. | ||
The data hase to be written into the file first before the actual compression performed. | ||
We need to use a good estimate for a text gzip file to split the file at write time. | ||
(Default 0.12) | ||
Return: | ||
File like object | ||
""" | ||
if mode not in ['wb', 'wt']: | ||
raise ValueError('Invalid mode: %r' % (mode,)) | ||
if chunk_size_mb is not None and chunk_size_mb < 1: | ||
raise ValueError('Invalid chunk_size_mb: %d' % (chunk_size_mb,)) | ||
if max_chunks is not None and max_chunks < 0: | ||
raise ValueError('Invalid max_chunks: %d' % (max_chunks,)) | ||
return SplitGzipFile( | ||
base_filename, mode, chunk_size_mb, max_chunks, est_compr_rate, compress | ||
) | ||
|
||
|
||
# pylint: disable=R0902 | ||
class SplitGzipFile(io.BufferedIOBase): | ||
"""The SplitGzipFile file like object class that implements only the write method. | ||
This class only supports writing files in binary mode. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
base_filename, | ||
mode: str = None, | ||
chunk_size_mb: int = None, | ||
max_chunks: int = None, | ||
est_compr_rate: float = None, | ||
compress=True, | ||
): | ||
super().__init__() | ||
|
||
self.base_filename = base_filename | ||
self.mode = mode | ||
self.chunk_size_mb = chunk_size_mb or DEFAULT_CHUNK_SIZE_MB | ||
self.max_chunks = max_chunks if max_chunks is not None else DEFAULT_MAX_CHUNKS | ||
self.compress = compress | ||
if compress: | ||
self.est_compr_rate = ( | ||
est_compr_rate if est_compr_rate is not None else EST_COMPR_RATE | ||
) | ||
else: | ||
self.est_compr_rate = 1.0 | ||
self.chunk_seq = 1 | ||
self.current_chunk_size_mb = 0 | ||
self.chunk_filename = None | ||
self.chunk_file = None | ||
|
||
def _gen_chunk_filename(self) -> str: | ||
""" | ||
Generates a chunk filename | ||
Pattern if max_chunks is zero: <base_filename> | ||
Pattern if max_chunks is greater than zero: <base_filename>.part<chunk-number-padded-five-digits> | ||
Returns: | ||
string | ||
""" | ||
if self.max_chunks == 0: | ||
chunk_filename = self.base_filename | ||
else: | ||
if ( | ||
self.current_chunk_size_mb >= self.chunk_size_mb | ||
and self.chunk_seq < self.max_chunks | ||
): | ||
# Increase the chunk sequence and reset size to zero | ||
self.chunk_seq += 1 | ||
self.current_chunk_size_mb = 0 | ||
|
||
chunk_filename = f'{self.base_filename}.part{self.chunk_seq:05d}' | ||
|
||
return chunk_filename | ||
|
||
def _activate_chunk_file(self): | ||
""" | ||
Activate a file like object to write data into the active chunk | ||
""" | ||
chunk_filename = self._gen_chunk_filename() | ||
# Close the actual chunk file if exists and open a new one | ||
if self.chunk_filename != chunk_filename: | ||
if self.chunk_file: | ||
self.chunk_file.close() | ||
|
||
# Open the actual chunk file with gzip data writer | ||
self.chunk_filename = chunk_filename | ||
if self.compress: | ||
self.chunk_file = gzip.open(self.chunk_filename, self.mode) | ||
else: | ||
if 'b' in self.mode: | ||
self.chunk_file = builtins.open( # pylint: disable=unspecified-encoding | ||
self.chunk_filename, self.mode | ||
) | ||
else: | ||
self.chunk_file = builtins.open( | ||
self.chunk_filename, self.mode, encoding='utf-8' | ||
) | ||
|
||
@staticmethod | ||
def _bytes_to_megabytes(size: int) -> float: | ||
""" | ||
Transforms bytes ot megabytes | ||
Args: | ||
size: Number of bytes | ||
Returns: | ||
floating point number number | ||
""" | ||
return size / float(1 << 20) | ||
|
||
def write(self, _bytes): | ||
""" | ||
Writes bytes into the active chunk file and updates the estimated new size of the file after compression. | ||
Compressed file size cannot be calculated at write time because compression is a separated | ||
process that performs only when we finished writing the file. We need to estimate the compressed size. | ||
Args: | ||
_bytes: Bytes to write | ||
""" | ||
self._activate_chunk_file() | ||
|
||
self.chunk_file.write(_bytes) | ||
self.current_chunk_size_mb = SplitGzipFile._bytes_to_megabytes( | ||
self.chunk_file.tell() * self.est_compr_rate | ||
) | ||
|
||
def close(self): | ||
""" | ||
Close the active chunk file | ||
""" | ||
if self.chunk_file is None: | ||
return | ||
self.chunk_file.close() | ||
|
||
def flush(self): | ||
""" | ||
Flush the active chunk write buffers | ||
""" | ||
self.chunk_file.flush() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a few changes available in master aren't included here. The change to
logical.py
that updates the state even if no records changed is not here. See line 301 oflogical.py
in master - this isn't anywhere in this PR.