Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added the ability to separate out reads and writes into their own connection pools. #344

Merged
merged 15 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 51 additions & 21 deletions services/data/postgres_async_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,13 @@
import math
import re
import time
from services.utils import logging
from services.utils import logging, DBType
from typing import List, Tuple

from .db_utils import DBResponse, DBPagination, aiopg_exception_handling, \
get_db_ts_epoch_str, translate_run_key, translate_task_key, new_heartbeat_ts
from .models import FlowRow, RunRow, StepRow, TaskRow, MetadataRow, ArtifactRow
from services.utils import DBConfiguration
from services.utils import DBConfiguration, USE_SEPARATE_READER_POOL

from services.data.service_configs import max_connection_retires, \
connection_retry_wait_time_seconds
Expand Down Expand Up @@ -49,6 +49,7 @@ class _AsyncPostgresDB(object):
metadata_table_postgres = None

pool = None
reader_pool = None
db_conf: DBConfiguration = None

def __init__(self, name='global'):
Expand Down Expand Up @@ -77,21 +78,42 @@ async def _init(self, db_conf: DBConfiguration, create_triggers=DB_TRIGGER_CREAT
for i in range(retries):
try:
self.pool = await aiopg.create_pool(
db_conf.dsn,
db_conf.get_dsn(),
minsize=db_conf.pool_min,
maxsize=db_conf.pool_max,
timeout=db_conf.timeout,
pool_recycle=10 * db_conf.timeout,
echo=AIOPG_ECHO)

self.reader_pool = await aiopg.create_pool(
db_conf.get_dsn(type=DBType.READER),
minsize=db_conf.pool_min,
maxsize=db_conf.pool_max,
timeout=db_conf.timeout,
pool_recycle=10 * db_conf.timeout,
echo=AIOPG_ECHO) if USE_SEPARATE_READER_POOL else self.pool

for table in self.tables:
await table._init(create_triggers=create_triggers)

self.logger.info(
"Connection established.\n"
" Pool min: {pool_min} max: {pool_max}\n".format(
pool_min=self.pool.minsize,
pool_max=self.pool.maxsize))
if USE_SEPARATE_READER_POOL:
self.logger.info(
"Writer Connection established.\n"
" Pool min: {pool_min} max: {pool_max}\n".format(
pool_min=self.pool.minsize,
pool_max=self.pool.maxsize))

self.logger.info(
"Reader Connection established.\n"
" Pool min: {pool_min} max: {pool_max}\n".format(
pool_min=self.reader_pool.minsize,
pool_max=self.reader_pool.maxsize))
else:
self.logger.info(
"Connection established.\n"
" Pool min: {pool_min} max: {pool_max}\n".format(
pool_min=self.pool.minsize,
pool_max=self.pool.maxsize))

break # Break the retry loop
except Exception as e:
Expand Down Expand Up @@ -211,15 +233,20 @@ async def find_records(self, conditions: List[str] = None, values=[], fetch_sing

async def execute_sql(self, select_sql: str, values=[], fetch_single=False,
expanded=False, limit: int = 0, offset: int = 0,
cur: aiopg.Cursor = None) -> Tuple[DBResponse, DBPagination]:
cur: aiopg.Cursor = None, serialize: bool = True) -> Tuple[DBResponse, DBPagination]:
async def _execute_on_cursor(_cur):
await _cur.execute(select_sql, values)

rows = []
records = await _cur.fetchall()
for record in records:
row = self._row_type(**record) # pylint: disable=not-callable
rows.append(row.serialize(expanded))
if serialize:
for record in records:
# pylint-initial-ignore: Lack of __init__ makes this too hard for pylint
# pylint: disable=not-callable
row = self._row_type(**record)
rows.append(row.serialize(expanded))
else:
rows = records

count = len(rows)

Expand All @@ -232,17 +259,20 @@ async def _execute_on_cursor(_cur):
page=math.floor(int(offset) / max(int(limit), 1)) + 1,
)
return body, pagination
if cur:
# if we are using the passed in cursor, we allow any errors to be managed by cursor owner
body, pagination = await _execute_on_cursor(cur)
return DBResponse(response_code=200, body=body), pagination

try:
with (await self.db.pool.cursor(
cursor_factory=psycopg2.extras.DictCursor
)) as cur:
if cur:
# if we are using the passed in cursor, we allow any errors to be managed by cursor owner
body, pagination = await _execute_on_cursor(cur)
cur.close() # unsure if needed, leaving in there for safety
return DBResponse(response_code=200, body=body), pagination
return DBResponse(response_code=200, body=body), pagination
else:
db_pool = self.db.reader_pool if USE_SEPARATE_READER_POOL else self.db.pool
with (await db_pool.cursor(
cursor_factory=psycopg2.extras.DictCursor
)) as cur:
body, pagination = await _execute_on_cursor(cur)
cur.close()
return DBResponse(response_code=200, body=body), pagination
except IndexError as error:
return aiopg_exception_handling(error), None
except (Exception, psycopg2.DatabaseError) as error:
Expand Down
9 changes: 7 additions & 2 deletions services/metadata_service/tests/integration_tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from typing import Callable

import pytest
import psycopg2
import psycopg2.extras
from aiohttp import web
from services.data.postgres_async_db import AsyncPostgresDB
from services.utils.tests import get_test_dbconf
Expand Down Expand Up @@ -67,8 +69,11 @@ async def clean_db(db: AsyncPostgresDB):
db.run_table_postgres,
db.flow_table_postgres
]
for table in tables:
await table.execute_sql(select_sql="DELETE FROM {}".format(table.table_name))
with (await db.pool.cursor(
cursor_factory=psycopg2.extras.DictCursor
)) as cur:
for table in tables:
await table.execute_sql(select_sql="DELETE FROM {}".format(table.table_name), cur=cur)


@pytest.fixture
Expand Down
2 changes: 1 addition & 1 deletion services/migration_service/api/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async def upgrade(self, request):
description: could not upgrade
"""
goose_version_cmd = make_goose_migration_template(
db_conf.connection_string_url,
db_conf.connection_string_url(),
"up"
)
p = Popen(goose_version_cmd, shell=True,
Expand Down
6 changes: 3 additions & 3 deletions services/migration_service/api/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def get_unapplied_migrations(current_version):
@staticmethod
async def get_goose_version():
# if tables exist but goose doesn't find version table then
goose_version_cmd = make_goose_template(db_conf.connection_string_url, 'version')
goose_version_cmd = make_goose_template(db_conf.connection_string_url(), 'version')

p = Popen(goose_version_cmd, stdout=PIPE, stderr=PIPE, shell=True,
close_fds=True)
Expand Down Expand Up @@ -55,7 +55,7 @@ async def get_latest_compatible_version():
return version_dict[version]
else:
print("Running initial migration..", file=sys.stderr)
goose_version_cmd = make_goose_migration_template(db_conf.connection_string_url, 'up')
goose_version_cmd = make_goose_migration_template(db_conf.connection_string_url(), 'up')
p = Popen(goose_version_cmd, shell=True,
close_fds=True)
if p.wait() != 0:
Expand All @@ -65,7 +65,7 @@ async def get_latest_compatible_version():
@staticmethod
async def is_migration_in_progress():
goose_version_cmd = make_goose_template(
db_conf.connection_string_url, "status"
db_conf.connection_string_url(), "status"
)

p = Popen(goose_version_cmd, stdout=PIPE, stderr=PIPE, shell=True,
Expand Down
2 changes: 1 addition & 1 deletion services/migration_service/data/postgres_async_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async def _init(self, db_conf: DBConfiguration):
retries = 3
for i in range(retries):
try:
self.pool = await aiopg.create_pool(db_conf.dsn, timeout=db_conf.timeout)
self.pool = await aiopg.create_pool(db_conf.get_dsn(), timeout=db_conf.timeout)
except Exception as e:
print("printing connection exception: " + str(e))
if retries - i < 1:
Expand Down
4 changes: 2 additions & 2 deletions services/ui_backend_service/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ The UI service module is `services.ui_backend_service.ui_server`:
> $ /opt/latest/bin/python3 -m services.ui_backend_service.ui_server
> ```

Below is an Docker run command for running UI Service exposed at port 8083:
Below is a Docker run command for running UI Service exposed at port 8083:

> ```sh
> $ docker run \
Expand Down Expand Up @@ -60,7 +60,7 @@ The service depends on the following Environment Variables to be set:
- `MF_METADATA_DB_PSWD` [defaults to postgres]
- `MF_METADATA_DB_NAME` [defaults to postgres]

Optionally you can also overrider the host and port the service runs on:
Optionally you can also override the host and port the service runs on:

- `MF_UI_METADATA_PORT` [defaults to 8083]
- `MF_UI_METADATA_HOST` [defaults to 0.0.0.0]
Expand Down
1 change: 1 addition & 0 deletions services/ui_backend_service/data/db/postgres_async_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class AsyncPostgresDB(BaseAsyncPostgresDB):
metadata_table_postgres = None

pool = None
reader_pool = None
db_conf: DBConfiguration = None

def __init__(self, name='global'):
Expand Down
51 changes: 1 addition & 50 deletions services/ui_backend_service/data/db/tables/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,56 +269,7 @@ async def benchmark_sql(
self.db.logger.exception("Query Benchmarking failed")
return None

async def execute_sql(
self,
select_sql: str,
values=[],
fetch_single=False,
expanded=False,
limit: int = 0,
offset: int = 0,
serialize: bool = True,
) -> Tuple[DBResponse, DBPagination]:
try:
with (
await self.db.pool.cursor(cursor_factory=psycopg2.extras.DictCursor)
) as cur:
await cur.execute(select_sql, values)

rows = []
records = await cur.fetchall()
if serialize:
for record in records:
# pylint-initial-ignore: Lack of __init__ makes this too hard for pylint
# pylint: disable=not-callable
row = self._row_type(**record)
rows.append(row.serialize(expanded))
else:
rows = records

count = len(rows)

# Will raise IndexError in case fetch_single=True and there's no results
body = rows[0] if fetch_single else rows

pagination = DBPagination(
limit=limit,
offset=offset,
count=count,
page=math.floor(int(offset) / max(int(limit), 1)) + 1,
)

cur.close()
return DBResponse(response_code=200, body=body), pagination
except IndexError as error:
return aiopg_exception_handling(error), None
except (Exception, psycopg2.DatabaseError) as error:
self.db.logger.exception("Exception occured")
return aiopg_exception_handling(error), None

async def get_tags(
self, conditions: List[str] = None, values=[], limit: int = 0, offset: int = 0
):
async def get_tags(self, conditions: List[str] = None, values=[], limit: int = 0, offset: int = 0):
sql_template = """
SELECT DISTINCT tag
FROM (
Expand Down
25 changes: 25 additions & 0 deletions services/ui_backend_service/docs/environment.md
Original file line number Diff line number Diff line change
Expand Up @@ -186,3 +186,28 @@ The `MF_LOG_LOAD_POLICY` environment variable restricts the amount of log conten
## Card content restriction

The `MF_CARD_LOAD_POLICY` (default `full`) environment variable can be set to `blurb_only` to return a Python code snippet to access card using Metaflow client, instead of loading actual HTML card payload.


## Scaling reads using read replicas

Databases such as [Amazon Aurora](https://aws.amazon.com/rds/aurora/) provide
[read replicas](https://aws.amazon.com/rds/features/read-replicas/) that make it easy to elastically scale beyond
the capacity constraints of single database instance for heavy read workloads. You are able to separate out the reads
and the writes of this application by setting the following two environment variables:

>```
> USE_SEPARATE_READER_POOL = 1

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am looking for something exactly like this PR. This variable seems a bit redundant. It could simply be:

USE_SEPARATE_READER_POOL = bool(MF_METADATA_DB_READ_REPLICA_HOST)

thus disabling the feature if no host is given and implicitly turning it on if one is given. This would also prevent the case where this is set to true and no host is given.

> MF_METADATA_DB_READ_REPLICA_HOST = <READ_REPLICA_ENDPOINT>
>```

As the name suggests, the `USE_SEPARATE_READER_POOL` variable creates a separate read pool with the same
min/max pool size as the writer pool. It is also required to set this variable `MF_METADATA_DB_READ_REPLICA_HOST` to
point to the read replica endpoint that is typically a load balancer in front of all the database's read replicas.

### Accounting for eventual consistency

When a read replica is created, there is a lag between the time a transaction is committed to the writer instance and
the time when the newly written data is available in the read replica. In Amazon Aurora, this [lag is usually much less
than 100ms](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/Aurora.Replication.html) because the replicas
share the same underlying storage layer as the writer instance thereby avoiding the need to copy data into the replica
nodes. This Metaflow UI service application is read heavy and hence is a great candidate for scaling reads using this model.
9 changes: 7 additions & 2 deletions services/ui_backend_service/tests/integration_tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from aiohttp import web
from pyee import AsyncIOEventEmitter
import pytest
import psycopg2
import psycopg2.extras
import os
import json
import datetime
Expand Down Expand Up @@ -95,8 +97,11 @@ async def clean_db(db: AsyncPostgresDB):
db.run_table_postgres,
db.flow_table_postgres
]
for table in tables:
await table.execute_sql(select_sql="DELETE FROM {}".format(table.table_name))
with (await db.pool.cursor(
cursor_factory=psycopg2.extras.DictCursor
)) as cur:
for table in tables:
await table.execute_sql(select_sql="DELETE FROM {}".format(table.table_name), cur=cur)


@pytest.fixture
Expand Down
Loading
Loading