Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: restart from head, refactor & qol #12

Merged
merged 9 commits into from
Oct 29, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge remote-tracking branch 'origin/main' into restart-head
  • Loading branch information
fmorency committed Oct 29, 2024
commit 040319c71daa37ebc77e145c13df6d7c561bafba
3 changes: 2 additions & 1 deletion cmd/yaci/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os/signal"
"syscall"

"github.com/liftedinit/yaci/internal/utils"
"github.com/spf13/cobra"
"github.com/spf13/viper"

Expand Down Expand Up @@ -88,7 +89,7 @@ func extract(address string, outputHandler output.OutputHandler) error {
if stop == 0 {
stop, err = utils.GetLatestBlockHeightWithRetry(ctx, grpcConn, resolver, maxRetries)
if err != nil {
return errors.WithMessage(err, "failed to get latest block height")
return fmt.Errorf("failed to get latest block height: %w", err)
}
}

Expand Down
32 changes: 8 additions & 24 deletions cmd/yaci/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,28 +15,6 @@ var PostgresRunE = func(cmd *cobra.Command, args []string) error {
postgresConn := viper.GetString("postgres-conn")
slog.Debug("Command-line argument", "postgres-conn", postgresConn)

_, err := pgxpool.ParseConfig(postgresConn)
if err != nil {
return errors.WithMessage(err, "failed to parse PostgreSQL connection string")
}

outputHandler, err := output.NewPostgresOutputHandler(postgresConn)
if err != nil {
return errors.WithMessage(err, "failed to create PostgreSQL output handler")
}
defer outputHandler.Close()

return extract(args[0], outputHandler)
}

var PostgresCmd = &cobra.Command{
Use: "postgres [address] [flags]",
Short: "Extract chain data to a PostgreSQL database",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
postgresConn := viper.GetString("postgres-conn")
slog.Debug("Command-line argument", "postgres-conn", postgresConn)

_, err := pgxpool.ParseConfig(postgresConn)
if err != nil {
return fmt.Errorf("failed to parse PostgreSQL connection string: %w", err)
Expand All @@ -57,8 +35,14 @@ var PostgresCmd = &cobra.Command{
// start = latestBlock.ID + 1
//}

return extract(args[0], outputHandler)
},
return extract(args[0], outputHandler)
}

var PostgresCmd = &cobra.Command{
Use: "postgres [address] [flags]",
Short: "Extract chain data to a PostgreSQL database",
Args: cobra.ExactArgs(1),
RunE: PostgresRunE,
}

func init() {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/golang/protobuf v1.5.3
github.com/gruntwork-io/terratest v0.47.2
github.com/jackc/pgx/v4 v4.18.3
github.com/pkg/errors v0.9.1
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.9.0
Expand Down
1 change: 1 addition & 0 deletions internal/extractor/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/liftedinit/yaci/internal/utils"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/dynamicpb"
Expand Down
63 changes: 3 additions & 60 deletions internal/extractor/live.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,18 @@ package extractor

import (
"context"
"fmt"
"time"

"github.com/liftedinit/yaci/internal/output"
"github.com/liftedinit/yaci/internal/reflection"

"github.com/liftedinit/yaci/internal/utils"
"google.golang.org/grpc"
)

// ExtractLiveBlocksAndTransactions monitors the chain and processes new blocks as they are produced.
func ExtractLiveBlocksAndTransactions(ctx context.Context, grpcConn *grpc.ClientConn, resolver *reflection.CustomResolver, start uint64, outputHandler output.OutputHandler, blockTime, maxConcurrency, maxRetries uint) error {
// Prepare the Status method descriptors
statusMethodFullName := "cosmos.base.node.v1beta1.Service.Status"
statusServiceName, statusMethodNameOnly, err := parseMethodFullName(statusMethodFullName)
if err != nil {
return err
}

files := resolver.Files()

statusMethodDescriptor, err := reflection.FindMethodDescriptor(files, statusServiceName, statusMethodNameOnly)
if err != nil {
return fmt.Errorf("failed to find status method descriptor: %w", err)
}

statusFullMethodName := buildFullMethodName(statusMethodDescriptor)
currentHeight := start - 1

currentHeight := start
for {
select {
case <-ctx.Done():
Expand All @@ -53,45 +38,3 @@ func ExtractLiveBlocksAndTransactions(ctx context.Context, grpcConn *grpc.Client
}
}
}

func getLatestBlockHeightWithRetry(ctx context.Context, conn *grpc.ClientConn, fullMethodName string, methodDescriptor protoreflect.MethodDescriptor, maxRetries uint) (uint64, error) {
var latestHeight uint64
var err error

for attempt := uint(1); attempt <= maxRetries; attempt++ {
latestHeight, err = getLatestBlockHeight(ctx, conn, fullMethodName, methodDescriptor)
if err == nil {
return latestHeight, nil
}
slog.Warn("Retrying getting latest block height", "attempt", attempt, "error", err)
time.Sleep(time.Duration(2*attempt) * time.Second)
}

return 0, fmt.Errorf("failed to get latest block height after %d retries: %w", maxRetries, err)
}

func getLatestBlockHeight(ctx context.Context, conn *grpc.ClientConn, fullMethodName string, methodDescriptor protoreflect.MethodDescriptor) (uint64, error) {
// Create the request message (empty)
inputMsg := dynamicpb.NewMessage(methodDescriptor.Input())

// Create the response message
outputMsg := dynamicpb.NewMessage(methodDescriptor.Output())

err := conn.Invoke(ctx, fullMethodName, inputMsg, outputMsg)
if err != nil {
return 0, fmt.Errorf("error invoking status method: %w", err)
}

// Extract the latest block height from the response
latestHeightStr := outputMsg.ProtoReflect().Get(outputMsg.Descriptor().Fields().ByName("height"))
if !latestHeightStr.IsValid() {
return 0, fmt.Errorf("height field not found in status response: %w", err)
}

latestHeight, err := strconv.ParseUint(latestHeightStr.String(), 10, 64)
if err != nil {
return 0, fmt.Errorf("failed to parse latest block height: %w", err)
}

return latestHeight, nil
}
4 changes: 3 additions & 1 deletion internal/reflection/resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ func (r *CustomResolver) processFileDescriptors(fdProtos []*descriptorpb.FileDes
return fmt.Errorf("failed to create file descriptor for %s: %w", name, err)
}

if err := r.files.RegisterFile(fd); err != nil {
err = r.files.RegisterFile(fd)

if err != nil {
return fmt.Errorf("failed to register file %s: %w", name, err)
}
}
Expand Down
You are viewing a condensed version of this merge commit. You can view the full changes here.