Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ⚡ update task logs #3

Merged
merged 9 commits into from
Jun 21, 2024
12 changes: 7 additions & 5 deletions apps/be/common/src/axios/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ export const httpTimerAdapter = async (
});
};

export const defaultHeaders = {
'Content-Type': 'application/json, text/plain, */*',
'User-Agent':
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.142.86 Safari/537.36',
};

export const axiosConfig: HttpModuleOptions = {
headers: {
'Content-Type': 'application/text',
'User-Agent':
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/89.0.142.86 Safari/537.36',
},
headers: defaultHeaders,
httpAgent: new http.Agent({ keepAlive: true }),
httpsAgent: new https.Agent({ keepAlive: true }),
adapter: httpTimerAdapter,
Expand Down
1 change: 1 addition & 0 deletions apps/be/common/src/redis/redis.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import { RedisService } from './services';
port: config.getOrThrow<number>('REDIS_PORT'),
password: config.getOrThrow<string>('REDIS_PASSWORD'),
connectTimeout: Number(config.get<number>('REDIS_CONNECT_TIMEOUT')) || 20000,
keepAlive: Number(config.get<number>('REDIS_KEEP_ALIVE')) || 10000, // Send a PING every 10 seconds
lehuygiang28 marked this conversation as resolved.
Show resolved Hide resolved
maxRetriesPerRequest: null,
reconnectOnError: () => {
const reconnectAndResendFailedCmd = 2;
Expand Down
30 changes: 28 additions & 2 deletions apps/be/common/src/utils/abstract/abstract.repository.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import {
} from 'mongoose';
import { NotFoundException } from '@nestjs/common';
import type { NullableType } from '../types';
import { PaginationRequestDto } from '../dtos';

export abstract class AbstractRepository<TDocument extends AbstractDocument> {
protected abstract readonly logger: PinoLogger;
Expand Down Expand Up @@ -143,36 +144,61 @@ export abstract class AbstractRepository<TDocument extends AbstractDocument> {
});
}

/**
* Pagination will automatically calculate, if query is provided
* @returns
*/
async find({
filterQuery,
queryOptions,
projectionType,
query,
}: {
filterQuery: FilterQuery<TDocument>;
queryOptions?: Partial<QueryOptions<TDocument>>;
projectionType?: ProjectionType<TDocument>;
query?: PaginationRequestDto;
}): Promise<NullableType<TDocument[]>> {
const document = await this.model.find(filterQuery, projectionType, {
const filter: FilterQuery<TDocument> = { ...filterQuery };
const options: Partial<QueryOptions<TDocument>> = {
lean: true,
...queryOptions,
});
};

if (query?.sortBy && query?.sortOrder) {
options.sort = { [query.sortBy]: query.sortOrder };
}

if (query?.page && query?.limit) {
options.skip = (query.page - 1) * query.limit;
options.limit = query.limit;
}

const document = await this.model.find(filter, projectionType, options);
lehuygiang28 marked this conversation as resolved.
Show resolved Hide resolved

return document ?? null;
}

/**
* Pagination will automatically calculate, if query is provided
* @returns
*/
async findOrThrow({
filterQuery,
queryOptions,
projectionType,
query,
}: {
filterQuery: FilterQuery<TDocument>;
queryOptions?: Partial<QueryOptions<TDocument>>;
projectionType?: ProjectionType<TDocument>;
query?: PaginationRequestDto;
}): Promise<TDocument[]> {
const document = await this.find({
filterQuery,
queryOptions,
projectionType,
query,
});

if (!document || document.length <= 0) {
Expand Down
23 changes: 23 additions & 0 deletions apps/be/common/src/utils/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,26 @@ export function valuesOfEnum<T extends object>(enumObj: T): (string | number)[]

return values;
}

/**
* Normalizes headers by converting all keys to lowercase and trimming whitespace.
* @param headers
* @returns
*/
export function normalizeHeaders(headers: Record<string, unknown>) {
if (typeof headers !== 'object' || headers === null || headers === undefined) {
throw new Error('Headers must be an object');
}

return Object.entries(headers).reduce((acc, [key, value]) => {
if (typeof key !== 'string') {
throw new Error('Header keys must be strings');
}

if (value === undefined || value === null) {
return acc;
}

return { ...acc, [key.trim().toLowerCase()]: value };
}, {});
}
8 changes: 0 additions & 8 deletions apps/be/common/src/utils/dtos/pagination-response.dto.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,11 @@ export class PaginationResponseDto<TData> {
constructor(data: { data: TData[]; total: number; limit: number; page: number }) {
this.data = data.data;
this.total = data.total;
this.limit = data.limit;
this.page = data.page;
}

@ApiProperty()
data: TData[];

@ApiProperty()
total: number;

@ApiProperty()
limit: number;

@ApiProperty()
page: number;
}
8 changes: 7 additions & 1 deletion apps/be/src/app/task-logs/dtos/task-log.dto.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Types } from 'mongoose';
import { ApiProperty } from '@nestjs/swagger';
import { IsDate, IsNumber, IsOptional, IsString } from 'class-validator';
import { LogTimingPhases, TaskLog } from '../task-log.schema';
import { LogTimingPhases, RequestObject, ResponseObject, TaskLog } from '../task-log.schema';

export class LogTimingPhasesDto implements LogTimingPhases {
@ApiProperty({ type: Number, required: false, description: 'The time spent waiting' })
Expand Down Expand Up @@ -111,6 +111,12 @@ export class TaskLogDto implements TaskLog {
@ApiProperty({ type: LogTimingPhasesDto, required: false })
timings?: LogTimingPhasesDto;

@ApiProperty({ type: RequestObject, required: false })
request?: RequestObject;

@ApiProperty({ type: ResponseObject, required: false })
response?: ResponseObject;

@ApiProperty({ type: Date, required: false })
@IsDate()
@IsOptional()
Expand Down
20 changes: 20 additions & 0 deletions apps/be/src/app/task-logs/task-log.schema.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Prop, Schema, SchemaFactory } from '@nestjs/mongoose';
import { IntersectionType, PickType } from '@nestjs/swagger';
import { type Timings } from '@szmarczak/http-timer';
import { HydratedDocument, Types } from 'mongoose';

Expand Down Expand Up @@ -32,6 +33,19 @@ export class LogTimingPhases implements Phases {
total?: number;
}

export class RequestObject extends Object {
@Prop({ required: false, type: Object })
headers?: Record<string, unknown>;

@Prop({ required: false, type: String, default: '' })
body?: string;
}

export class ResponseObject extends IntersectionType(
Object,
PickType(RequestObject, ['headers', 'body']),
) {}

@Schema({
timestamps: true,
collection: 'taskLogs',
Expand Down Expand Up @@ -67,6 +81,12 @@ export class TaskLog extends AbstractDocument {
@Prop({ required: false, type: LogTimingPhases })
timings?: LogTimingPhases;

@Prop({ required: false, type: RequestObject })
request?: RequestObject;

@Prop({ required: false, type: ResponseObject })
response?: ResponseObject;

@Prop({ required: false, default: new Date() })
createdAt?: Date;

Expand Down
2 changes: 0 additions & 2 deletions apps/be/src/app/task-logs/task-logs.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ export class TaskLogsService {
return {
data: logs,
total,
page: query.page || 1,
limit: query.limit || logs?.length || 0,
};
}

Expand Down
31 changes: 27 additions & 4 deletions apps/be/src/app/tasks/processors/task.processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { BULLMQ_TASK_LOG_QUEUE, BULLMQ_TASK_QUEUE } from '~be/common/bullmq';
import { Task } from '~be/app/tasks/schemas/task.schema';
import { CreateTaskLogDto, TaskLogsJobName } from '~be/app/task-logs';
import { type Timings } from '@szmarczak/http-timer';
import { defaultHeaders } from '~be/common/axios';
import { normalizeHeaders } from '~be/common/utils';

@Injectable()
@Processor(BULLMQ_TASK_QUEUE, {
Expand Down Expand Up @@ -45,10 +47,14 @@ export class TaskProcessor extends WorkerHost implements OnModuleInit {
const now = Date.now();
const { name, endpoint, method, body, headers } = job.data;

const normalizedHeaders = headers ? normalizeHeaders(JSON.parse(headers)) : {};

const headersValidated = Object.assign(normalizeHeaders(defaultHeaders), normalizedHeaders);

const config: AxiosRequestConfig = {
url: endpoint,
method,
headers: headers ? JSON.parse(headers) : undefined,
headers: headersValidated,
data: body,
};

Expand All @@ -59,7 +65,7 @@ export class TaskProcessor extends WorkerHost implements OnModuleInit {
timings = response.request['timings'] || null;

this.logger.info(
`FETCH ${name} - ${response?.status} - ${timings?.phases?.total} ms - ${JSON.stringify(response?.data)?.length ?? 0} bytes`,
`FETCH ${name} - ${response?.status} - ${timings?.phases?.total} ms - ${String(response?.data ?? '')?.length ?? 0} bytes`,
);
} catch (error: AxiosError | unknown) {
if (error instanceof AxiosError) {
Expand All @@ -71,6 +77,8 @@ export class TaskProcessor extends WorkerHost implements OnModuleInit {
timings = null;
}

const stringBody = String(response?.data ?? '');

const taskLog: CreateTaskLogDto = {
taskId: job.data._id,
endpoint,
Expand All @@ -81,13 +89,28 @@ export class TaskProcessor extends WorkerHost implements OnModuleInit {

duration: timings?.phases?.total ?? 0,
statusCode: response?.status ?? 0,
responseSizeBytes: JSON.stringify(response?.data)?.length ?? 0,
responseSizeBytes: stringBody?.length ?? 0,
timings: timings?.phases || {},

request: {
headers: headersValidated,
body: String(config?.data || ''),
},

response: {
headers: response?.headers,
body:
stringBody?.length > Number(process.env['MAX_BODY_LOG_SIZE'] || 1024 * 50) // Default 50KB
? `Body too large (${stringBody?.length} bytes), will not be logged.`
: stringBody,
},
};

await this.taskLogQueue.add(`saveTaskLog`, taskLog, {
removeOnComplete: 1,
attempts: 5,
removeOnFail: 1,
attempts: 10,
// delay: 5000,
backoff: {
type: 'exponential',
delay: 5000,
Expand Down
2 changes: 0 additions & 2 deletions apps/be/src/app/tasks/tasks.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -335,8 +335,6 @@ export class TasksService implements OnModuleInit {
return {
data: tasks,
total,
page: query.page || 1,
limit: query.limit || tasks?.length,
};
}

Expand Down
Loading