Skip to content

Burmuley/priority-pubsub

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

8 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Priority Pub/Sub

Service to abstract interaction of your application with message queue while consuming messages from multiple queues considering queue priority.

The queue residing at the top of the list in the configuration has the highest priority and polled first. Only when no messages on the current level left Poller starts checking other queues further down in the list.

You can define your own Poller implementation in the code and implement your own polling logic.

Each Poller should consume only one message and block until Processor return the result (or error). With this you can control number of concurrent messages your application can handle in parallel.

You can also add a TransformationFunc for message data to adjust contents of the message data before sending it to the application.

Currently supported queues:

  • AWS SQS
  • GCP Pub/Sub

Building

go build -o priority_pubsub .

Usage

priority_pubsub -config <path to configuration file>

The default configuration file name is config.json.

Configuration

Configuration structure

{
  "poller": {
    "type": "POLLER TYPE",
    "concurrency": "NUMBER OF CONCURRENT POLLERS"
  },
  "queues": {
    "type": "QUEUE TYPE",
    "config": [
      {
        "QUEUE SPECIFIC CONFIG #1"
      },
      {
        "QUEUE SPECIFIC CONFIG #2"
      }
    ]
  },
  "processor": {
    "type": "PROCESSOR TYPE",
    "config": {
      "PROCESSOR SPECIFIC CONFIG"
    }
  },
  "transformer": {
    "type": "TRANSFORMER FUNCTION NAME"
  }
}

Configuration fields for poller:

  • type: type of the Poller to use; currently only simple option is supported
  • concurrency: number of concurrent Poller instances to run

Configuration fields for processor:

  • type - type of the Processor to use for message processing; available values - http
  • config - processor specific configuration; currently the only available Processor implementation support the following options:
    • subscriber_url - hte HTTP URL to forward messages for processing
    • method - HTTP method to use when submitting message to subscriber_url; default - POST
    • timeout - HTTP timeout to use, i.e. time to wait for message to be processed before failing the operation
    • fatal_codes - list of HTTP codes assumed as Fatal, i.e. when message should not be returned back to the queue for retry
    • content_type - the content-type HTTP header value to use when ppotings messages to the target application; default value - text/plain

Configuration fields for transformer:

  • type - name of the transformer function; currently only one value is available - dapr_aws

Configuration fields for queues:

  • type - queue type; currently supported awssqs (AWS SQS) and gcppubsub (GCP Pub/Sub)
  • config - list of queue specific configurations; priority counts from the top, i.e. the top first queue definition has the highest priority

Configuration fields for awssqs queue type:

  • name - name of the AWS SQS queue
  • visibility_timeout - message visibility timeout to set when consuming message from the queue
  • endpoint - custom endpoint to use for interactions with AWS SQS; useful if you're testing with Local Stack
  • region - AWS Region

Configuration fields for gcppubsub queue type:

  • subscription_id - the ID of the GCP Pub/Sub subscription
  • ack_deadline - timeout for ACK for the consumed message (similar to AWS SQS Visibility Timeout)

Configuration example for AWS SQS with LocalStack:

{
  "poll_concurrency": 4,
  "queues": {
    "type": "aws_sqs",
    "config": [
      {
        "name": "high-priority",
        "visibility_timeout": 600,
        "endpoint": "http://localhost:4566",
        "region": "us-west-2"
      },
      {
        "name": "low-priority",
        "visibility_timeout": 600,
        "endpoint": "http://localhost:4566",
        "region": "us-west-2"
      }
    ]
  },
  "processor": {
    "type": "http",
    "config": {
      "subscriber_url": "http://localhost:5000/",
      "method": "POST",
      "timeout": 570,
      "fatal_codes": [412, 450]
    }
  }
}

Configuration example for GCP Pub/Sub:

{
  "poll_concurrency": 4,
  "queues": {
    "type": "gcp_pubsub",
    "config": [
      {
        "subscription_id": "projects/test-project-156022/subscriptions/high-priority-sub",
        "ack_deadline": 600
      },
      {
        "subscription_id": "projects/test-project-156022/subscriptions/low-priority-sub",
        "ack_deadline": 600
      }
    ]
  },
  "processor": {
    "type": "http",
    "config": {
      "subscriber_url": "http://localhost:5000/",
      "method": "POST",
      "timeout": 570,
      "fatal_codes": [412, 450]
    }
  }
}

Configuration example for AWS SQS with LocalStack and Dapr as publisher:

{
  "poll_concurrency": 4,
  "queues": {
    "type": "aws_sqs",
    "config": [
      {
        "name": "high-priority",
        "visibility_timeout": 600,
        "endpoint": "http://localhost:4566",
        "region": "us-west-2"
      },
      {
        "name": "low-priority",
        "visibility_timeout": 600,
        "endpoint": "http://localhost:4566",
        "region": "us-west-2"
      }
    ]
  },
  "processor": {
    "type": "http",
    "config": {
      "subscriber_url": "http://localhost:5000/",
      "method": "POST",
      "timeout": 570,
      "fatal_codes": [412, 450]
    }
  },
  "transformer": {
    "type": "dapr_aws"
  }
}

Note: I wasn't able to make work lightweight SubscriptionClient with Pub/Sub emulator, to test it out you need to create real resources in GCP.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages