Skip to content

Commit

Permalink
feat(release): update 72ae3db (#12)
Browse files Browse the repository at this point in the history
  • Loading branch information
fedellen authored May 16, 2024
2 parents b642484 + a0b3ad6 commit 0b95dff
Show file tree
Hide file tree
Showing 26 changed files with 841 additions and 272 deletions.
1 change: 1 addition & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"INJECTEDAPTOS",
"Irys",
"knexfile",
"localstack",
"MULTIAPTOS",
"multistream",
"NOWAIT",
Expand Down
5 changes: 5 additions & 0 deletions ecs/fulfillment-pipeline/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import { Message } from "@aws-sdk/client-sqs";
import { Consumer } from "sqs-consumer";

import { ArweaveGateway } from "../../../src/arch/arweaveGateway";
import { PostgresDatabase } from "../../../src/arch/db/postgres";
import { TurboPaymentService } from "../../../src/arch/payment";
import { migrateOnStartup } from "../../../src/constants";
Expand Down Expand Up @@ -73,6 +74,7 @@ const uploadDatabase = new PostgresDatabase({
});
const objectStore = getS3ObjectStore();
const paymentService = new TurboPaymentService();
const arweaveGateway = new ArweaveGateway();

// Set up queue handler configurations for jobs based on a planId
export const queues: QueueHandlerConfig[] = [
Expand Down Expand Up @@ -115,6 +117,7 @@ const consumers: ConsumerQueue[] = queues.map((queue) => ({
database: uploadDatabase,
objectStore,
paymentService,
arweaveGateway,
}),
...queue,
}));
Expand Down Expand Up @@ -306,13 +309,15 @@ if (process.env.PLAN_BUNDLE_ENABLED === "true") {
planBundleJobScheduler = new PlanBundleJobScheduler({
intervalMs: +(process.env.PLAN_BUNDLE_INTERVAL_MS ?? 60_000),
logger: globalLogger,
database: uploadDatabase,
});
setUpAndStartJobScheduler(planBundleJobScheduler);
}
if (process.env.VERIFY_BUNDLE_ENABLED === "true") {
verifyBundleJobScheduler = new VerifyBundleJobScheduler({
intervalMs: +(process.env.VERIFY_BUNDLE_INTERVAL_MS ?? 60_000),
logger: globalLogger,
database: uploadDatabase,
});
setUpAndStartJobScheduler(verifyBundleJobScheduler);
}
8 changes: 7 additions & 1 deletion ecs/fulfillment-pipeline/src/jobs/plan.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,28 @@
*/
import winston from "winston";

import { Database } from "../../../../src/arch/db/database";
import { planBundleHandler } from "../../../../src/jobs/plan";
import { JobScheduler } from "../utils/jobScheduler";

export class PlanBundleJobScheduler extends JobScheduler {
private database: Database;

constructor({
intervalMs = 60_000,
logger,
database,
}: {
intervalMs: number;
logger: winston.Logger;
database: Database;
}) {
super({ intervalMs, schedulerName: "plan-bundle", logger });
this.database = database;
}

async processJob(): Promise<void> {
await planBundleHandler(undefined, undefined, this.logger).catch(
await planBundleHandler(this.database, undefined, this.logger).catch(
(error) => {
this.logger.error("Error planning bundle", error);
}
Expand Down
10 changes: 9 additions & 1 deletion ecs/fulfillment-pipeline/src/jobs/verify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,34 @@
*/
import winston from "winston";

import { Database } from "../../../../src/arch/db/database";
import { verifyBundleHandler } from "../../../../src/jobs/verify";
import { JobScheduler } from "../utils/jobScheduler";

export class VerifyBundleJobScheduler extends JobScheduler {
private database: Database;
constructor({
intervalMs = 60_000,
logger,
database,
}: {
intervalMs: number;
logger: winston.Logger;
database: Database;
}) {
super({
intervalMs,
schedulerName: "verify-bundle",
logger,
});
this.database = database;
}

async processJob(): Promise<void> {
await verifyBundleHandler({ logger: this.logger }).catch((error) => {
await verifyBundleHandler({
database: this.database,
logger: this.logger,
}).catch((error) => {
this.logger.error("Error verifying bundle", error);
});
}
Expand Down
7 changes: 7 additions & 0 deletions ecs/fulfillment-pipeline/src/utils/planIdMessageHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { Message, SQSClient, SQSClientConfig } from "@aws-sdk/client-sqs";
import { Consumer } from "sqs-consumer";
import winston from "winston";

import { ArweaveGateway } from "../../../../src/arch/arweaveGateway";
import { Database } from "../../../../src/arch/db/database";
import { ObjectStore } from "../../../../src/arch/objectStore";
import { PaymentService } from "../../../../src/arch/payment";
Expand All @@ -31,13 +32,15 @@ export const planIdMessageHandler = ({
database,
objectStore,
paymentService,
arweaveGateway,
}: {
message: Message;
logger: winston.Logger;
queue: QueueHandlerConfig;
database: Database;
objectStore: ObjectStore;
paymentService: PaymentService;
arweaveGateway: ArweaveGateway;
}) => {
const messageLogger = logger.child({
messageId: message.MessageId,
Expand Down Expand Up @@ -68,6 +71,7 @@ export const planIdMessageHandler = ({
database,
objectStore,
paymentService,
arweaveGateway,
},
// provide our message logger to the handler
messageLogger.child({ planId })
Expand All @@ -80,12 +84,14 @@ export function createPlanIdHandlingSQSConsumer({
database,
objectStore,
paymentService,
arweaveGateway,
}: {
queue: QueueHandlerConfig;
sqsOptions?: Partial<SQSClientConfig>;
database: Database;
objectStore: ObjectStore;
paymentService: PaymentService;
arweaveGateway: ArweaveGateway;
}) {
const { queueUrl, consumerOptions, logger } = queue;
return Consumer.create({
Expand All @@ -98,6 +104,7 @@ export function createPlanIdHandlingSQSConsumer({
database,
objectStore,
paymentService,
arweaveGateway,
}),
sqs: new SQSClient(sqsOptions),
batchSize: 1,
Expand Down
2 changes: 1 addition & 1 deletion src/arch/arweaveGateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ export class ArweaveGateway implements Gateway {
endpoint = gatewayUrl,
retryStrategy = new ExponentialBackoffRetryStrategy({}),
axiosInstance = axios.create(), // defaults to throwing errors for status codes >400
}: GatewayAPIConstParams) {
}: GatewayAPIConstParams = {}) {
this.endpoint = endpoint;
this.retryStrategy = retryStrategy;
this.axiosInstance = axiosInstance;
Expand Down
17 changes: 13 additions & 4 deletions src/arch/db/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import {
DataItemFailedReason,
FinishedMultiPartUpload,
InFlightMultiPartUpload,
InsertNewBundleParams,
Expand All @@ -27,7 +28,12 @@ import {
PostedNewDataItem,
SeededBundle,
} from "../../types/dbTypes";
import { TransactionId, UploadId, Winston } from "../../types/types";
import {
DataItemId,
TransactionId,
UploadId,
Winston,
} from "../../types/types";

// TODO: this could be an interface since no functions have a default implementation
export interface Database {
Expand Down Expand Up @@ -125,11 +131,12 @@ export interface Database {
/** Gets latest status of a data item from the database */
getDataItemInfo(dataItemId: TransactionId): Promise<
| {
status: "new" | "pending" | "permanent";
status: "new" | "pending" | "permanent" | "failed";
assessedWinstonPrice: Winston;
bundleId?: TransactionId;
uploadedTimestamp: number;
deadlineHeight?: number;
failedReason?: DataItemFailedReason;
}
| undefined
>;
Expand Down Expand Up @@ -176,8 +183,10 @@ export interface Database {
uploadId: UploadId
): Promise<void>;

/** TODO: create failed_data_item table instead, remove this */
deletePlannedDataItem(dataItemId: string): Promise<void>;
updatePlannedDataItemAsFailed(params: {
dataItemId: DataItemId;
failedReason: DataItemFailedReason;
}): Promise<void>;
}

export type UpdateDataItemsToPermanentParams = {
Expand Down
1 change: 1 addition & 0 deletions src/arch/db/dbConstants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ export const tableNames = {
permanentBundle: "permanent_bundle",
permanentDataItem: "permanent_data_item",
plannedDataItem: "planned_data_item",
failedDataItem: "failed_data_item",
postedBundle: "posted_bundle",
seededBundle: "seeded_bundle",
/** @deprecated */
Expand Down
41 changes: 41 additions & 0 deletions src/arch/db/dbMaps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
*/
import { defaultPremiumFeatureType } from "../../constants";
import {
DataItemFailedReason,
FailedBundle,
FailedBundleDBResult,
FailedDataItem,
FailedDataItemDBResult,
NewBundle,
NewBundleDBResult,
NewDataItem,
Expand Down Expand Up @@ -173,3 +176,41 @@ export function permanentDataItemDbResultToPermanentDataItemMap({
deadlineHeight: deadline_height ? +deadline_height : undefined,
};
}

export function failedDataItemDbResultToFailedDataItemMap({
assessed_winston_price,
byte_count,
data_item_id,
owner_public_address,
uploaded_date,
data_start,
failed_bundles,
signature_type,
content_type,
premium_feature_type,
plan_id,
planned_date,
deadline_height,
failed_date,
failed_reason,
signature,
}: FailedDataItemDBResult): FailedDataItem {
return {
assessedWinstonPrice: W(assessed_winston_price),
dataItemId: data_item_id,
ownerPublicAddress: owner_public_address,
byteCount: +byte_count,
uploadedDate: uploaded_date,
premiumFeatureType: premium_feature_type ?? defaultPremiumFeatureType,
failedBundles: failed_bundles ? failed_bundles.split(",") : [],
signatureType: signature_type ?? undefined,
payloadDataStart: data_start ?? undefined,
payloadContentType: content_type ?? undefined,
signature: signature ?? undefined,
planId: plan_id,
plannedDate: planned_date,
deadlineHeight: deadline_height ? +deadline_height : undefined,
failedDate: failed_date,
failedReason: failed_reason as DataItemFailedReason,
};
}
Loading

0 comments on commit 0b95dff

Please sign in to comment.