From 5fe4a2c753f4284036e0ca5647e0779dc4052161 Mon Sep 17 00:00:00 2001 From: Oshank Kumar Date: Sat, 8 Feb 2025 22:46:15 +0530 Subject: [PATCH] Add example compose file for dev testing --- Dockerfile-taskqueue-manager | 2 +- docker-compose.yml | 62 +++++++++++++++++++++++++++++++---- examples/basic-worker/main.go | 9 +++-- examples/enqueuer/main.go | 27 ++++++++++----- worker.go | 2 +- 5 files changed, 82 insertions(+), 20 deletions(-) diff --git a/Dockerfile-taskqueue-manager b/Dockerfile-taskqueue-manager index 37078c2..9ed44c1 100644 --- a/Dockerfile-taskqueue-manager +++ b/Dockerfile-taskqueue-manager @@ -46,7 +46,7 @@ COPY --from=backend-builder /app/backend/taskqueue-manager . COPY --from=frontend-builder /app/frontend/dist/spa/ ./frontend/ # Expose the service port -EXPOSE 8080 +EXPOSE 8050 ENV WEB_STATIC_DIR='/app/frontend/' # Command to run the service diff --git a/docker-compose.yml b/docker-compose.yml index ae8eeb3..8198490 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,15 +1,63 @@ services: - redis: + taskqueue-redis: image: 'redis:latest' - ports: - - "6379:6379" + init: true + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 5s + retries: 5 + start_period: 3s taskqueue-manager: - image: 'oshank/taskqueue-manager:latest' + image: 'oshank/taskqueue-manager:main' + init: true ports: - '8050:8050' environment: - REDIS_QUEUE_ADDR: 'redis:6379' - REDIS_HEARTBEAT_ADDR: 'redis:6379' + REDIS_QUEUE_ADDR: 'taskqueue-redis:6379' + REDIS_HEARTBEAT_ADDR: 'taskqueue-redis:6379' + REDIS_METRICS_BACKEND_ADDR: 'taskqueue-redis:6379' + depends_on: + taskqueue-redis: + condition: service_healthy + + example-worker: + image: golang:1.23.0 + init: true + deploy: + replicas: 5 + working_dir: /workspace/taskqueue-go + command: > + bash -c "go build -o /app ./examples/basic-worker && /app --redis-addr taskqueue-redis:6379" + volumes: + - ${PWD}:/workspace/taskqueue-go:ro + - go-build-cache:/root/.cache/go-build + - go-mod-cache:/go/pkg/mod depends_on: - - redis + taskqueue-redis: + condition: service_healthy + + example-enqueuer: + image: golang:1.23.0 + init: true + deploy: + replicas: 5 + working_dir: /workspace/taskqueue-go + command: > + bash -c "go build -o /app ./examples/enqueuer && /app --redis-addr taskqueue-redis:6379" + volumes: + - ${PWD}:/workspace/taskqueue-go:ro + - go-build-cache:/root/.cache/go-build + - go-mod-cache:/go/pkg/mod + depends_on: + taskqueue-redis: + condition: service_healthy + +networks: + default: + name: taskqueue + +volumes: + go-mod-cache: + go-build-cache: \ No newline at end of file diff --git a/examples/basic-worker/main.go b/examples/basic-worker/main.go index d733b18..c336c45 100644 --- a/examples/basic-worker/main.go +++ b/examples/basic-worker/main.go @@ -20,12 +20,15 @@ import ( const ns = "taskqueue" -var id = flag.String("id", "", "worker id") +var ( + id = flag.String("id", "", "worker id") + redisAddr = flag.String("redis-addr", ":6379", "redis address") +) func main() { flag.Parse() - rc := redis.NewClient(&redis.Options{Addr: ":6379"}) + rc := redis.NewClient(&redis.Options{Addr: *redisAddr}) worker := taskqueue.NewWorker(&taskqueue.WorkerOptions{ ID: *id, @@ -95,7 +98,7 @@ func main() { worker.Stop() - fmt.Printf("taskqueue: shutting down. job processed email = %d payment = %d notification = %d\n", + fmt.Printf("taskqueue: shutting down. job processed email=%d payment=%d notification=%d\n", emailProcessed.Load(), paymentProcessed.Load(), notifyProcessed.Load(), ) } diff --git a/examples/enqueuer/main.go b/examples/enqueuer/main.go index 2145068..2484bf5 100644 --- a/examples/enqueuer/main.go +++ b/examples/enqueuer/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "flag" "fmt" "log" "math/rand" @@ -15,16 +16,20 @@ import ( const ns = "taskqueue" +var redisAddr = flag.String("redis-addr", ":6379", "Redis address") + func main() { - rc := redis.NewClient(&redis.Options{Addr: ":6379"}) + flag.Parse() + + rc := redis.NewClient(&redis.Options{Addr: *redisAddr}) enq := redisq.NewQueue(rc, redisq.WithNamespace(ns)) - n1 := queuePaymentJob(enq) - n2 := queueEmailJob(enq) + n1 := queueEmailJob(enq) + n2 := queuePaymentJob(enq) n3 := queueNotificationJob(enq) - fmt.Println("Jobs Enqueued", "payment", n1, "email", n2, "notification", n3) + fmt.Println("Jobs Enqueued", "email", n1, "payment", n2, "notification", n3, "total", n1+n2+n3) } func queueNotificationJob(enq taskqueue.Enqueuer) int { @@ -55,15 +60,21 @@ func queueNotificationJob(enq taskqueue.Enqueuer) int { return count } +type paymentPayload struct { + Gateway string `json:"gateway"` + Amount int `json:"amount"` + WalletID int `json:"wallet_id"` +} + func queuePaymentJob(enq taskqueue.Enqueuer) int { count := rand.Intn(100) + 100 for i := range count { paymentJob := taskqueue.NewJob() - _ = paymentJob.JSONMarshalPayload(map[string]interface{}{ - "gateway": "razorpay", - "amount": 500 + i, - "wallet_id": "1", + _ = paymentJob.JSONMarshalPayload(paymentPayload{ + Gateway: "razorpay", + Amount: rand.Intn(1000) + 10000, + WalletID: i, }) if err := enq.Enqueue(context.Background(), paymentJob, &taskqueue.EnqueueOptions{ QueueName: "payment_queue", diff --git a/worker.go b/worker.go index d9b123a..ac8a2c0 100644 --- a/worker.go +++ b/worker.go @@ -174,7 +174,7 @@ func (w *Worker) Start(ctx context.Context) { }() for _, h := range w.queueHandlers { - go w.handleQueue(ctx, h) + w.handleQueue(ctx, h) } }