Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update tap with fastsync #8

Open
wants to merge 57 commits into
base: release
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
d91d69f
FastSync to tap-mssql update
brose7230 Mar 17, 2022
6a04e76
extra logger before adding code in
brose7230 Mar 17, 2022
9971f0c
Fixed logger position
brose7230 Mar 17, 2022
bc17b43
extra logging for pk values
brose7230 Mar 17, 2022
15d7ef1
removed unused function
brose7230 Mar 17, 2022
e3be333
fixed ANOTHER error
brose7230 Mar 17, 2022
a575a15
fixed results
brose7230 Mar 17, 2022
4e05478
added utils and testing initial sync duration
brose7230 Mar 17, 2022
4eb1df3
length of rows
brose7230 Mar 17, 2022
03f592c
number of rows
brose7230 Mar 17, 2022
26f4882
fixed extracted/batched vars
brose7230 Mar 17, 2022
e84016c
added copy_table function for chunking results into gzips
brose7230 Mar 17, 2022
866da9c
fixed logging error
brose7230 Mar 17, 2022
d5c0f32
params
brose7230 Mar 17, 2022
8fa8f2c
removed utils and moved 2 functions to common
brose7230 Mar 17, 2022
0b9e0c3
trying pandas;
brose7230 Mar 17, 2022
10b60b3
fixed connection
brose7230 Mar 17, 2022
730c1b7
Fixed pandas
brose7230 Mar 17, 2022
96bd429
removed from common
brose7230 Mar 17, 2022
4d3d559
added pandas to setup
brose7230 Mar 17, 2022
08b81c4
Update iwth singer rows
brose7230 Mar 18, 2022
07fb8b7
fixed imports
brose7230 Mar 18, 2022
25c1020
write_message record
brose7230 Mar 18, 2022
27e5e59
updated record_message
brose7230 Mar 18, 2022
81a3e8b
in common
brose7230 Mar 18, 2022
c02a49b
fixed json to dict
brose7230 Mar 18, 2022
acacfcd
Fixed time and version
brose7230 Mar 18, 2022
8e96274
defined time extracted
brose7230 Mar 18, 2022
dec761e
cleanup
brose7230 Mar 18, 2022
98d0994
cleanup
brose7230 Mar 18, 2022
0b632f9
fixed pandas columns
brose7230 Mar 18, 2022
303a51b
removed some logging
brose7230 Mar 18, 2022
b158d02
increased chunk size
brose7230 Mar 18, 2022
a851137
Changed method of sending
brose7230 Mar 21, 2022
cd169ef
Updated singer message with full sync
brose7230 Mar 21, 2022
1ac0851
fixed name and message to singer
brose7230 Mar 21, 2022
2c12035
added sys stdout write for FASTSYNC
brose7230 Mar 21, 2022
9d5a7f6
fixed logger spelling
brose7230 Mar 21, 2022
9590f3b
converted dict to str
brose7230 Mar 21, 2022
c2355b5
fixed dict
brose7230 Mar 21, 2022
7c087d1
updated json stinrg
brose7230 Mar 21, 2022
3266a4d
added file names to singer record
brose7230 Mar 21, 2022
1721c8c
changed to filename
brose7230 Mar 21, 2022
3d49e18
added _sdc_ fields to query/dataframe
brose7230 Mar 21, 2022
a514b91
fixed fastync typo in generate_select_sql function
brose7230 Mar 21, 2022
8d4665a
changed to_csv to use chunk_dataframe instead of appending a larger d…
brose7230 Mar 21, 2022
0c19653
sort columns when fastsync
brose7230 Mar 21, 2022
8a96bbe
removed some logging and cleaned up unused functions
brose7230 Mar 22, 2022
411749f
creates dir if not existing
brose7230 May 17, 2022
1c1b38d
removed mkdir
brose7230 May 17, 2022
16e0c84
moved max_pk_values and last_pk_fetched alignment
brose7230 May 17, 2022
d0431a1
removed else in sync_table
brose7230 May 17, 2022
5594a17
removed unneeded code
brose7230 May 17, 2022
edc7799
changed export_batch_rows to fastsync_batch_rows
brose7230 May 17, 2022
5c62571
comment
brose7230 May 17, 2022
c718040
review changes
brose7230 Jun 7, 2022
592f78f
removed gzip import
brose7230 Jun 7, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"backoff==1.8.0",
"MarkupSafe==2.0.1",
"jinja2==2.11.3",
"pandas==1.2.2",
],
entry_points="""
[console_scripts]
Expand Down
26 changes: 22 additions & 4 deletions tap_mssql/sync_strategies/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,19 @@
# pylint: disable=too-many-arguments,duplicate-code,too-many-locals

import copy
import csv

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a few changes available in master aren't included here. The change to logical.py that updates the state even if no records changed is not here. See line 301 of logical.py in master - this isn't anywhere in this PR.

import datetime
import glob

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think any of these imports are being used here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

import multiprocessing
import os
import secrets
import singer
import string
import time
import uuid

from . import split_gzip

import singer.metrics as metrics
from singer import metadata
from singer import utils
Expand Down Expand Up @@ -87,16 +95,26 @@ def get_key_properties(catalog_entry):
return key_properties


def generate_select_sql(catalog_entry, columns):
def generate_select_sql(catalog_entry, columns, fastsync=False):
database_name = get_database_name(catalog_entry)
escaped_db = escape(database_name)
escaped_table = escape(catalog_entry.table)
escaped_columns = [escape(c) for c in columns]

select_sql = "SELECT {} FROM {}.{}".format(
",".join(escaped_columns), escaped_db, escaped_table
)
if fastsync:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we consider moving this logic to a new folder called fast_sync? Then in full_table.py the conditional formatting can be used to decide whether to use generate_select_sql or the fast_sync_generate_select_sql.

This isn't essential, just an idea to keep fast sync separate from other methods already being used in other replication types (like incremental)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah good call. I planned on having a condition within full_table.sync_table() that would call one or the other, so that was something i meant to do but forgot to make note of. This is updated

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, i think i read 'folder' as 'function' lol

time_extracted = utils.now()
_sdc_extracted_at = f"'{time_extracted}' as _SDC_EXTRACTED_AT"
_sdc_deleted_at = "NULL as _SDC_DELETED_AT"
_sdc_batched_at = f"'{time_extracted}' as _SDC_BATCHED_AT"

select_sql = "SELECT {}, {}, {}, {} FROM {}.{}".format(
",".join(escaped_columns), _sdc_extracted_at, _sdc_deleted_at, _sdc_batched_at, escaped_db, escaped_table
)
else:
select_sql = "SELECT {} FROM {}.{}".format(
",".join(escaped_columns), escaped_db, escaped_table
)

# escape percent signs
select_sql = select_sql.replace("%", "%%")
return select_sql
Expand Down
119 changes: 103 additions & 16 deletions tap_mssql/sync_strategies/full_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,18 @@
# pylint: disable=duplicate-code,too-many-locals,simplifiable-if-expression

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a Fast Sync section to the readme.md? There are a few required config changes like - "fastsync_batch_rows" that should be called out.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated


import copy
import csv
import datetime
import json
import pandas as pd
import os
import secrets
import singer
from singer import metadata

from singer import utils
import singer.metrics as metrics
import string
import sys
import tap_mssql.sync_strategies.common as common

from tap_mssql.connection import (
Expand All @@ -16,7 +25,6 @@

LOGGER = singer.get_logger()


def generate_bookmark_keys(catalog_entry):
md_map = metadata.to_map(catalog_entry.metadata)
stream_metadata = md_map.get((), {})
Expand All @@ -30,9 +38,20 @@ def generate_bookmark_keys(catalog_entry):
}

bookmark_keys = base_bookmark_keys

return bookmark_keys

def write_dataframe_record(row, catalog_entry, stream_version, columns, table_stream, time_extracted):

rec = row.to_dict()

record_message = singer.RecordMessage(
stream=table_stream,
record=rec,
version=stream_version,
time_extracted=time_extracted,
)

singer.write_message(record_message)

def sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version):
mssql_conn = get_azure_sql_engine(config)
Expand Down Expand Up @@ -62,31 +81,99 @@ def sync_table(mssql_conn, config, catalog_entry, state, columns, stream_version
):
singer.write_message(activate_version_message)

#TODO: Update code: If fastsync_batch_rows is not set, use old non fastsync method
with mssql_conn.connect() as open_conn:
LOGGER.info("Generating select_sql")
select_sql = common.generate_select_sql(catalog_entry, columns)

params = {}

if catalog_entry.tap_stream_id == "dbo-InputMetadata":
prev_converter = modify_ouput_converter(open_conn)

common.sync_query(
open_conn,
catalog_entry,
state,
select_sql,
columns,
stream_version,
table_stream,
params,
)
columns.sort()
select_sql = common.generate_select_sql(catalog_entry, columns, fastsync=True)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to always force full table syncs to use fast sync?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider leaving the sync_table function as is and just building another fast_sync_table function in a new fast_sync directory?

This way we keep the tap with either the original full sync logic or give the option for fast sync. Also this keeps fast sync logic separate from everything else.


columns.extend(['_SDC_EXTRACTED_AT','_SDC_DELETED_AT','_SDC_BATCHED_AT'])

query_df = df = pd.DataFrame(columns=columns) #TODO: delete?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need Pandas here? This can slow down performance and may not be needed if the goal is to just create a csv.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this works as is right now and will require rewriting/testing this, are you cool with leaving it as pandas and I can create another ticket in the R&R epic for updating this method? If its best to get it switched to the most efficient asap, then i can make time this week to update this last part, but if its not pressing then i can do the ticket and get back to it.

time_extracted = utils.now() #TODO: delete?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is needed anymore.


conn = mssql_conn.connect().execution_options(stream_results=True)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question, but is this step necessary? It looks like you already have a connection opened in line 85


csv_saved = 0

chunk_size = config.get("fastsync_batch_rows") #TODO: update this so that its not required (if not set, fastsync disabled)
files = []
for chunk_dataframe in pd.read_sql(select_sql, conn, chunksize=chunk_size):
csv_saved += 1

filename = gen_export_filename(table=table_stream)
filepath = os.path.join('fastsync', filename)
chunk_dataframe.to_csv(f'{filepath}', sep=',', encoding='utf-8',index=False,header=False, compression='gzip')

files.append(filename)

# creating singer-like record to signify FASTSYNC for initial sync
singer_message = {'type': 'FASTSYNC','stream':table_stream, 'version': stream_version, 'files':files }
LOGGER.info(singer_message)
json_object = json.dumps(singer_message)
sys.stdout.write(str(json_object) + '\n')
sys.stdout.flush()

if catalog_entry.tap_stream_id == "dbo-InputMetadata":
revert_ouput_converter(open_conn, prev_converter)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete blank line

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed


# clear max pk value and last pk fetched upon successful sync
singer.clear_bookmark(state, catalog_entry.tap_stream_id, "max_pk_values")
singer.clear_bookmark(state, catalog_entry.tap_stream_id, "last_pk_fetched")

singer.write_message(activate_version_message)


brose7230 marked this conversation as resolved.
Show resolved Hide resolved

def generate_random_string(length: int = 8) -> str:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be another function that makes sense to move to a file in a new fast_sync directory.

"""
Generate cryptographically secure random strings
Uses best practice from Python doc https://docs.python.org/3/library/secrets.html#recipes-and-best-practices
Args:
length: length of the string to generate
Returns: random string
"""

if length < 1:
raise Exception('Length must be at least 1!')

if 0 < length < 8:
LOGGER.info('Length is too small! consider 8 or more characters')

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, but this could be upgraded to a Warn

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated


return ''.join(
secrets.choice(string.ascii_uppercase + string.digits) for _ in range(length)
)

def gen_export_filename(
table: str, suffix: str = None, postfix: str = None, ext: str = None
) -> str:
"""
Generates a unique filename used for exported fastsync data that avoids file name collision
Default pattern:
pipelinewise_<tap_id>_<table>_<timestamp_with_ms>_fastsync_<random_string>.csv.gz
Args:
tap_id: Unique tap id
table: Name of the table to export
suffix: Generated filename suffix. Defaults to current timestamp in milliseconds
postfix: Generated filename postfix. Defaults to a random 8 character length string
ext: Filename extension. Defaults to .csv.gz
Returns:
Unique filename as a string
"""
if not suffix:
suffix = datetime.datetime.now().strftime('%Y%m%d-%H%M%S-%f')

if not postfix:
postfix = generate_random_string()

if not ext:
ext = 'csv.gz'

return 'pipelinewise_{}_{}_batch_{}.{}'.format(
table, suffix, postfix, ext
)
172 changes: 172 additions & 0 deletions tap_mssql/sync_strategies/split_gzip.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
"""Functions that write chunked gzipped files."""

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may make sense to move this file up one level out of sync_strategies to tap_mssql.

Copy link
Author

@brose7230 brose7230 Jun 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i used that earlier on during the fastsync work and it is no longer needed so i trashed it in the latest push

import io
import logging
import gzip
import builtins

LOGGER = logging.getLogger(__name__)


DEFAULT_CHUNK_SIZE_MB = 1000
DEFAULT_MAX_CHUNKS = 20

# Detecting compressed file size at write time is not possible by GzipFile.
# The data hase to be written into the file first before the actual compression performed.
# We need to use a good estimate for a text gzip file to split the file at write time.
EST_COMPR_RATE = 0.12


# pylint: disable=W0622,R1732
def open(
base_filename,
mode='wb',
chunk_size_mb=None,
max_chunks=None,
est_compr_rate=None,
compress=True,
):
"""Open a gzip-compressed file in binary or text mode.
Args:
base_filename: Path where to create the zip file(s) with the exported data.
Dynamic chunk numbers are appended to the base_filename
mode: "wb" or "wt". (Default: wb)
chunk_size_mb: File chunk sizes. (Default: 1000)
max_chunks: Max number of chunks. If set to 0 then splitting is disabled and one single
file will be created (Default: 20)
est_compr_rate: Detecting compressed file size at write time is not possible by GzipFile.
The data hase to be written into the file first before the actual compression performed.
We need to use a good estimate for a text gzip file to split the file at write time.
(Default 0.12)
Return:
File like object
"""
if mode not in ['wb', 'wt']:
raise ValueError('Invalid mode: %r' % (mode,))
if chunk_size_mb is not None and chunk_size_mb < 1:
raise ValueError('Invalid chunk_size_mb: %d' % (chunk_size_mb,))
if max_chunks is not None and max_chunks < 0:
raise ValueError('Invalid max_chunks: %d' % (max_chunks,))
return SplitGzipFile(
base_filename, mode, chunk_size_mb, max_chunks, est_compr_rate, compress
)


# pylint: disable=R0902
class SplitGzipFile(io.BufferedIOBase):
"""The SplitGzipFile file like object class that implements only the write method.
This class only supports writing files in binary mode.
"""

def __init__(
self,
base_filename,
mode: str = None,
chunk_size_mb: int = None,
max_chunks: int = None,
est_compr_rate: float = None,
compress=True,
):
super().__init__()

self.base_filename = base_filename
self.mode = mode
self.chunk_size_mb = chunk_size_mb or DEFAULT_CHUNK_SIZE_MB
self.max_chunks = max_chunks if max_chunks is not None else DEFAULT_MAX_CHUNKS
self.compress = compress
if compress:
self.est_compr_rate = (
est_compr_rate if est_compr_rate is not None else EST_COMPR_RATE
)
else:
self.est_compr_rate = 1.0
self.chunk_seq = 1
self.current_chunk_size_mb = 0
self.chunk_filename = None
self.chunk_file = None

def _gen_chunk_filename(self) -> str:
"""
Generates a chunk filename
Pattern if max_chunks is zero: <base_filename>
Pattern if max_chunks is greater than zero: <base_filename>.part<chunk-number-padded-five-digits>
Returns:
string
"""
if self.max_chunks == 0:
chunk_filename = self.base_filename
else:
if (
self.current_chunk_size_mb >= self.chunk_size_mb
and self.chunk_seq < self.max_chunks
):
# Increase the chunk sequence and reset size to zero
self.chunk_seq += 1
self.current_chunk_size_mb = 0

chunk_filename = f'{self.base_filename}.part{self.chunk_seq:05d}'

return chunk_filename

def _activate_chunk_file(self):
"""
Activate a file like object to write data into the active chunk
"""
chunk_filename = self._gen_chunk_filename()
# Close the actual chunk file if exists and open a new one
if self.chunk_filename != chunk_filename:
if self.chunk_file:
self.chunk_file.close()

# Open the actual chunk file with gzip data writer
self.chunk_filename = chunk_filename
if self.compress:
self.chunk_file = gzip.open(self.chunk_filename, self.mode)
else:
if 'b' in self.mode:
self.chunk_file = builtins.open( # pylint: disable=unspecified-encoding
self.chunk_filename, self.mode
)
else:
self.chunk_file = builtins.open(
self.chunk_filename, self.mode, encoding='utf-8'
)

@staticmethod
def _bytes_to_megabytes(size: int) -> float:
"""
Transforms bytes ot megabytes
Args:
size: Number of bytes
Returns:
floating point number number
"""
return size / float(1 << 20)

def write(self, _bytes):
"""
Writes bytes into the active chunk file and updates the estimated new size of the file after compression.
Compressed file size cannot be calculated at write time because compression is a separated
process that performs only when we finished writing the file. We need to estimate the compressed size.
Args:
_bytes: Bytes to write
"""
self._activate_chunk_file()

self.chunk_file.write(_bytes)
self.current_chunk_size_mb = SplitGzipFile._bytes_to_megabytes(
self.chunk_file.tell() * self.est_compr_rate
)

def close(self):
"""
Close the active chunk file
"""
if self.chunk_file is None:
return
self.chunk_file.close()

def flush(self):
"""
Flush the active chunk write buffers
"""
self.chunk_file.flush()