From 0903a52c5e3b6ca3e84333ea5824f049e72776d1 Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 13 Feb 2024 15:57:29 +0000 Subject: [PATCH 1/2] Better logging for transaction writer Signed-off-by: Peter Broadhurst --- go.mod | 2 +- go.sum | 2 ++ internal/persistence/postgres/transaction_writer.go | 12 ++++++++++-- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 776e43e..3ec5065 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/gorilla/mux v1.8.1 github.com/gorilla/websocket v1.5.1 github.com/hashicorp/golang-lru/v2 v2.0.7 - github.com/hyperledger/firefly-common v1.4.2 + github.com/hyperledger/firefly-common v1.4.6-0.20240131185020-80d20a173401 github.com/lib/pq v1.10.9 github.com/oklog/ulid/v2 v2.1.0 github.com/prometheus/client_golang v1.18.0 diff --git a/go.sum b/go.sum index bf26b38..08e886f 100644 --- a/go.sum +++ b/go.sum @@ -92,6 +92,8 @@ github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/hyperledger/firefly-common v1.4.2 h1:sBbiTFWDu1qCnXFA6JobasJl4AXphCAUZU/R4nyWPdE= github.com/hyperledger/firefly-common v1.4.2/go.mod h1:jkErZdQmC9fsAJZQO427tURdwB9iiW+NMUZSqS3eBIE= +github.com/hyperledger/firefly-common v1.4.6-0.20240131185020-80d20a173401 h1:bcIg8zUalHyjxPmhIggwg/VK/IVDvpH2XJwCkfAYrSU= +github.com/hyperledger/firefly-common v1.4.6-0.20240131185020-80d20a173401/go.mod h1:jkErZdQmC9fsAJZQO427tURdwB9iiW+NMUZSqS3eBIE= github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4= github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY= diff --git a/internal/persistence/postgres/transaction_writer.go b/internal/persistence/postgres/transaction_writer.go index 27f7d95..1eee076 100644 --- a/internal/persistence/postgres/transaction_writer.go +++ b/internal/persistence/postgres/transaction_writer.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -40,6 +40,7 @@ type transactionOperation struct { sentConflict bool done chan error + opID string isShutdown bool txInsert *apitypes.ManagedTX noncePreAssigned bool @@ -122,6 +123,7 @@ func newTransactionWriter(bgCtx context.Context, p *sqlPersistence, conf config. func newTransactionOperation(txID string) *transactionOperation { return &transactionOperation{ + opID: fftypes.ShortID(), txID: txID, done: make(chan error, 1), // 1 slot to ensure we don't block the writer } @@ -130,6 +132,7 @@ func newTransactionOperation(txID string) *transactionOperation { func (op *transactionOperation) flush(ctx context.Context) error { select { case err := <-op.done: + log.L(ctx).Debugf("Flushed write operation %s (err=%v)", op.opID, err) return err case <-ctx.Done(): return i18n.NewError(ctx, i18n.MsgContextCanceled) @@ -165,6 +168,7 @@ func (tw *transactionWriter) queue(ctx context.Context, op *transactionOperation h := fnv.New32a() // simple non-cryptographic hash algo _, _ = h.Write([]byte(hashKey)) routine := h.Sum32() % tw.workerCount + log.L(ctx).Debugf("Queuing write operation %s to worker tx_writer_%.4d", op.opID, routine) select { case tw.workQueues[routine] <- op: // it's queued case <-ctx.Done(): // timeout of caller context @@ -180,6 +184,7 @@ func (tw *transactionWriter) worker(i int) { defer close(tw.workersDone[i]) workerID := fmt.Sprintf("tx_writer_%.4d", i) ctx := log.WithLogField(tw.bgCtx, "job", workerID) + l := log.L(ctx) var batch *transactionWriterBatch batchCount := 0 workQueue := tw.workQueues[i] @@ -208,11 +213,12 @@ func (tw *transactionWriter) worker(i int) { batchCount++ } batch.ops = append(batch.ops, op) + l.Debugf("Added write operation %s to batch %s (len=%d)", op.opID, batch.id, len(batch.ops)) case <-timeoutContext.Done(): timedOut = true select { case <-ctx.Done(): - log.L(ctx).Debugf("Transaction writer ending") + l.Debugf("Transaction writer ending") return default: } @@ -220,6 +226,7 @@ func (tw *transactionWriter) worker(i int) { if batch != nil && (timedOut || (len(batch.ops) >= tw.batchMaxSize)) { batch.timeoutCancel() + l.Debugf("Running batch %s (len=%d)", batch.id, len(batch.ops)) tw.runBatch(ctx, batch) batch = nil } @@ -383,6 +390,7 @@ func (tw *transactionWriter) preInsertIdempotencyCheck(ctx context.Context, b *t txOp.sentConflict = true txOp.done <- i18n.NewError(ctx, tmmsgs.MsgDuplicateID, txOp.txID) } else { + log.L(ctx).Debugf("Adding TX %s from write operation %s to insert idx=%d", txOp.txID, txOp.opID, len(validInserts)) validInserts = append(validInserts, txOp.txInsert) } } From 58daa9ddd592f625f61aa189c9bbf721ca8b165e Mon Sep 17 00:00:00 2001 From: Peter Broadhurst Date: Tue, 13 Feb 2024 17:50:51 +0000 Subject: [PATCH 2/2] Merge TX updates in same batch Signed-off-by: Peter Broadhurst --- go.sum | 2 - .../persistence/postgres/eventstreams_test.go | 7 +- .../postgres/transaction_writer.go | 14 +++- .../postgres/transaction_writer_test.go | 73 +++++++++++++++++++ pkg/apitypes/managed_tx.go | 47 +++++++++++- pkg/apitypes/managed_tx_test.go | 28 +++++++ 6 files changed, 164 insertions(+), 7 deletions(-) diff --git a/go.sum b/go.sum index 08e886f..e53bffb 100644 --- a/go.sum +++ b/go.sum @@ -90,8 +90,6 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/huandu/xstrings v1.3.3/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= github.com/huandu/xstrings v1.4.0 h1:D17IlohoQq4UcpqD7fDk80P7l+lwAmlFaBHgOipl2FU= github.com/huandu/xstrings v1.4.0/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= -github.com/hyperledger/firefly-common v1.4.2 h1:sBbiTFWDu1qCnXFA6JobasJl4AXphCAUZU/R4nyWPdE= -github.com/hyperledger/firefly-common v1.4.2/go.mod h1:jkErZdQmC9fsAJZQO427tURdwB9iiW+NMUZSqS3eBIE= github.com/hyperledger/firefly-common v1.4.6-0.20240131185020-80d20a173401 h1:bcIg8zUalHyjxPmhIggwg/VK/IVDvpH2XJwCkfAYrSU= github.com/hyperledger/firefly-common v1.4.6-0.20240131185020-80d20a173401/go.mod h1:jkErZdQmC9fsAJZQO427tURdwB9iiW+NMUZSqS3eBIE= github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= diff --git a/internal/persistence/postgres/eventstreams_test.go b/internal/persistence/postgres/eventstreams_test.go index a95b19a..c5ac39a 100644 --- a/internal/persistence/postgres/eventstreams_test.go +++ b/internal/persistence/postgres/eventstreams_test.go @@ -108,8 +108,11 @@ func TestEventStreamAfterPaginatePSQL(t *testing.T) { var eventStreams []*apitypes.EventStream for i := 0; i < 20; i++ { es := &apitypes.EventStream{ - ID: fftypes.NewUUID(), - Name: strPtr(fmt.Sprintf("es_%.3d", i)), + ID: fftypes.NewUUID(), + Name: strPtr(fmt.Sprintf("es_%.3d", i)), + BatchTimeout: ffDurationPtr(22222 * time.Second), + RetryTimeout: ffDurationPtr(33333 * time.Second), + BlockedRetryDelay: ffDurationPtr(44444 * time.Second), } err := p.WriteStream(ctx, es) assert.NoError(t, err) diff --git a/internal/persistence/postgres/transaction_writer.go b/internal/persistence/postgres/transaction_writer.go index 1eee076..72f1151 100644 --- a/internal/persistence/postgres/transaction_writer.go +++ b/internal/persistence/postgres/transaction_writer.go @@ -421,9 +421,19 @@ func (tw *transactionWriter) executeBatchOps(ctx context.Context, b *transaction } } // Do all the transaction updates + mergedUpdates := make(map[string]*apitypes.TXUpdates) for _, op := range b.txUpdates { - if err := tw.p.updateTransaction(ctx, op.txID, op.txUpdate); err != nil { - log.L(ctx).Errorf("Update transaction %s failed: %s", op.txID, err) + update, merge := mergedUpdates[op.txID] + if merge { + update.Merge(op.txUpdate) + } else { + mergedUpdates[op.txID] = op.txUpdate + } + log.L(ctx).Debugf("Updating transaction %s in write operation %s (merged=%t)", op.txID, op.opID, merge) + } + for txID, update := range mergedUpdates { + if err := tw.p.updateTransaction(ctx, txID, update); err != nil { + log.L(ctx).Errorf("Update transaction %s failed: %s", txID, err) return err } } diff --git a/internal/persistence/postgres/transaction_writer_test.go b/internal/persistence/postgres/transaction_writer_test.go index 1e42cc1..61856be 100644 --- a/internal/persistence/postgres/transaction_writer_test.go +++ b/internal/persistence/postgres/transaction_writer_test.go @@ -27,6 +27,7 @@ import ( "github.com/hyperledger/firefly-common/pkg/fftypes" "github.com/hyperledger/firefly-transaction-manager/pkg/apitypes" "github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" ) @@ -275,6 +276,48 @@ func TestExecuteBatchOpsUpdateTXFail(t *testing.T) { assert.NoError(t, mdb.ExpectationsWereMet()) } +func TestExecuteBatchOpsUpdateTXMerge(t *testing.T) { + logrus.SetLevel(logrus.TraceLevel) + + ctx, p, mdb, done := newMockSQLPersistence(t) + defer done() + + mdb.ExpectBegin() + mdb.ExpectExec("UPDATE.*").WillReturnResult(sqlmock.NewResult(-1, 1)) + mdb.ExpectExec("UPDATE.*").WillReturnResult(sqlmock.NewResult(-1, 1)) + mdb.ExpectCommit() + + err := p.db.RunAsGroup(ctx, func(ctx context.Context) error { + return p.writer.executeBatchOps(ctx, &transactionWriterBatch{ + txUpdates: []*transactionOperation{ + { + txID: "11111", + txUpdate: &apitypes.TXUpdates{ + Status: ptrTo(apitypes.TxStatusPending), + From: strPtr("0xaaaaa"), + }, + }, + { + txID: "22222", + txUpdate: &apitypes.TXUpdates{ + Status: ptrTo(apitypes.TxStatusPending), + }, + }, + { + txID: "11111", + txUpdate: &apitypes.TXUpdates{ + Status: ptrTo(apitypes.TxStatusSucceeded), + TransactionHash: strPtr("0xaabbcc"), + }, + }, + }, + }) + }) + assert.NoError(t, err) + + assert.NoError(t, mdb.ExpectationsWereMet()) +} + func TestExecuteBatchOpsUpsertReceiptFail(t *testing.T) { ctx, p, mdb, done := newMockSQLPersistence(t) defer done() @@ -455,3 +498,33 @@ func TestQueueClosedContext(t *testing.T) { p.writer.queue(closedCtx, newTransactionOperation("tx1")) } + +func TestStopDoneWorker(t *testing.T) { + tw := &transactionWriter{ + workersDone: []chan struct{}{ + make(chan struct{}), + }, + } + tw.bgCtx, tw.cancelCtx = context.WithCancel(context.Background()) + close(tw.workersDone[0]) + tw.stop() +} + +func TestStopDoneCtx(t *testing.T) { + tw := &transactionWriter{ + workersDone: []chan struct{}{ + make(chan struct{}, 1), + }, + } + tw.bgCtx, tw.cancelCtx = context.WithCancel(context.Background()) + tw.cancelCtx() + go func() { + time.Sleep(10 * time.Millisecond) + tw.workersDone[0] <- struct{}{} + }() + tw.stop() +} + +func ptrTo[T any](v T) *T { + return &v +} diff --git a/pkg/apitypes/managed_tx.go b/pkg/apitypes/managed_tx.go index cdc1308..f4c7a40 100644 --- a/pkg/apitypes/managed_tx.go +++ b/pkg/apitypes/managed_tx.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -208,6 +208,51 @@ type TXUpdates struct { ErrorMessage *string `json:"errorMessage,omitempty"` } +func (txu *TXUpdates) Merge(txu2 *TXUpdates) { + if txu2.Status != nil { + txu.Status = txu2.Status + } + if txu2.DeleteRequested != nil { + txu.DeleteRequested = txu2.DeleteRequested + } + if txu2.From != nil { + txu.From = txu2.From + } + if txu2.To != nil { + txu.To = txu2.To + } + if txu2.Nonce != nil { + txu.Nonce = txu2.Nonce + } + if txu2.Gas != nil { + txu.Gas = txu2.Gas + } + if txu2.Value != nil { + txu.Value = txu2.Value + } + if txu2.GasPrice != nil { + txu.GasPrice = txu2.GasPrice + } + if txu2.TransactionData != nil { + txu.TransactionData = txu2.TransactionData + } + if txu2.TransactionHash != nil { + txu.TransactionHash = txu2.TransactionHash + } + if txu2.PolicyInfo != nil { + txu.PolicyInfo = txu2.PolicyInfo + } + if txu2.FirstSubmit != nil { + txu.FirstSubmit = txu2.FirstSubmit + } + if txu2.LastSubmit != nil { + txu.LastSubmit = txu2.LastSubmit + } + if txu2.ErrorMessage != nil { + txu.ErrorMessage = txu2.ErrorMessage + } +} + // TXWithStatus is a convenience object that fetches all data about a transaction into one // large JSON payload (with limits on certain parts, such as the history entries). // Note that in LevelDB persistence this is the stored form of the single document object. diff --git a/pkg/apitypes/managed_tx_test.go b/pkg/apitypes/managed_tx_test.go index 06d6393..6eb84f2 100644 --- a/pkg/apitypes/managed_tx_test.go +++ b/pkg/apitypes/managed_tx_test.go @@ -85,3 +85,31 @@ func TestReceiptRecord(t *testing.T) { r.SetUpdated(t2) assert.Equal(t, t2, r.Updated) } + +func TestTXUpdatesMerge(t *testing.T) { + txu := &TXUpdates{} + txu2 := &TXUpdates{ + Status: ptrTo(TxStatusPending), + DeleteRequested: fftypes.Now(), + From: ptrTo("1111"), + To: ptrTo("2222"), + Nonce: fftypes.NewFFBigInt(3333), + Gas: fftypes.NewFFBigInt(4444), + Value: fftypes.NewFFBigInt(5555), + GasPrice: fftypes.JSONAnyPtr(`{"some": "stuff"}`), + TransactionData: ptrTo("xxxx"), + TransactionHash: ptrTo("yyyy"), + PolicyInfo: fftypes.JSONAnyPtr(`{"more": "stuff"}`), + FirstSubmit: fftypes.Now(), + LastSubmit: fftypes.Now(), + ErrorMessage: ptrTo("pop"), + } + txu.Merge(txu2) + assert.Equal(t, *txu2, *txu) + txu.Merge(&TXUpdates{}) + assert.Equal(t, *txu2, *txu) +} + +func ptrTo[T any](v T) *T { + return &v +}