• 1
  • 397
Giải quyết vấn đề Concurency với Golang và Redis
| January 18, 2024 | 5 min read

Với hệ thống phân tán, các giải pháp an toàn đồng thời (concurrency safety) của Golang như sync.Mutex chỉ hoạt động trong phạm vi một instance ứng dụng. Để quản lý an toàn đồng thời trên nhiều instance khác nhau, bạn cần sử dụng một hệ thống bên ngoài như Redis

Dưới đây là các kỹ thuật chính để xử lý concurrency an toàn trong hệ thống phân tán sử dụng Golang và Redis.

 

1. Khóa phân tán (Distributed Locks)

Khóa phân tán là phương pháp phổ biến nhất để đảm bảo rằng chỉ có một tiến trình được phép truy cập vào một tài nguyên dùng chung tại một thời điểm. 

Cách hoạt động:

  • Một instance ứng dụng sẽ cố gắng tạo một key duy nhất trong Redis.
  • Lệnh SET key value NX PX <milliseconds> của Redis là công cụ lý tưởng.
    • NX (Set if Not eXists): Đảm bảo rằng key chỉ được tạo nếu nó chưa tồn tại, ngăn chặn các instance khác lấy khóa cùng lúc.
    • PX (Expire in millisecond): Tự động xóa key sau một khoảng thời gian nhất định, giúp tránh tình trạng deadlock nếu một instance gặp sự cố trước khi kịp giải phóng khóa.
  • Khi hoàn thành công việc, instance sẽ xóa key để giải phóng khóa. 

Ví dụ sử dụng thư viện go-redis/v9:

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/redis/go-redis/v9"
)

func main() {
	client := redis.NewClient(&redis.Options{
		Addr: "localhost:6379",
	})
	ctx := context.Background()

	lockKey := "my:distributed:lock"
	lockValue := "instance-1" // Giá trị ngẫu nhiên để đảm bảo chỉ instance giữ khóa mới được xóa nó
	expiry := 10 * time.Second

	// Cố gắng lấy khóa
	ok, err := client.SetNX(ctx, lockKey, lockValue, expiry).Result()
	if err != nil {
		log.Fatal(err)
	}

	if ok {
		fmt.Println("Khóa đã được lấy thành công. Thực hiện công việc quan trọng...")
		// Giả lập công việc
		time.Sleep(5 * time.Second)

		// Giải phóng khóa (chỉ xóa nếu giá trị khớp để tránh xóa khóa của instance khác)
		// Dùng Lua script để đảm bảo tính nguyên tử
		luaScript := `
			if redis.call("get", KEYS[1]) == ARGV[1] then
				return redis.call("del", KEYS[1])
			else
				return 0
			end
		`
		_, err := client.Eval(ctx, luaScript, []string{lockKey}, lockValue).Result()
		if err != nil {
			log.Fatal(err)
		}
		fmt.Println("Khóa đã được giải phóng.")
	} else {
		fmt.Println("Không thể lấy khóa. Một instance khác đang làm việc.")
	}
}
  • Sử dụng thư viện chuyên dụng: Để xử lý các trường hợp phức tạp như tự động gia hạn khóa và chống deadlock, bạn nên dùng các thư viện như go-redsync/redsync hoặc bsm/redislock.
  • Tránh xóa khóa của instance khác: Như trong ví dụ trên, khi xóa khóa, cần kiểm tra xem giá trị của key có khớp với giá trị mà instance của bạn đã đặt hay không. Điều này ngăn một instance hoạt động chậm xóa nhầm khóa của một instance khác đã giành được. 

2. Sử dụng Redis Transactions với WATCH

Để thực hiện các thay đổi nguyên tử (atomic) trên nhiều key, bạn có thể sử dụng giao dịch Redis kết hợp với lệnh WATCH. Điều này còn được gọi là "Optimistic Locking". 

Cách hoạt động:

  • WATCH key: Giám sát một hoặc nhiều key.
  • MULTI: Bắt đầu một block giao dịch.
  • Thực hiện các lệnh thay đổi.
  • EXEC: Thực thi các lệnh trong giao dịch. Nếu bất kỳ key nào được WATCH đã bị thay đổi bởi instance khác, giao dịch sẽ bị hủy bỏ. 

Ví dụ về tăng giá trị một cách an toàn:

package main

import (
	"context"
	"fmt"
	"log"
	"strconv"
	"time"

	"github.com/redis/go-redis/v9"
)

func main() {
	client := redis.NewClient(&redis.Options{
		Addr: "localhost:6379",
	})
	ctx := context.Background()

	key := "my:counter"
	// Khởi tạo giá trị ban đầu
	client.Set(ctx, key, 0, 0)

	// Chạy nhiều goroutine để tăng giá trị
	for i := 0; i < 5; i++ {
		go func(id int) {
			for {
				// WATCH key và bắt đầu giao dịch
				err := client.Watch(ctx, func(tx *redis.Tx) error {
					currentValStr, err := tx.Get(ctx, key).Result()
					if err != nil && err != redis.Nil {
						return err
					}
					currentVal, _ := strconv.Atoi(currentValStr)

					// Thực hiện giao dịch nếu key chưa bị thay đổi
					_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
						pipe.Set(ctx, key, currentVal+1, 0)
						return nil
					})
					return err
				}, key)

				if err == nil {
					fmt.Printf("Goroutine %d đã cập nhật thành công.\n", id)
					break // Thoát vòng lặp nếu thành công
				}
				if err == redis.TxFailedErr {
					fmt.Printf("Goroutine %d: giao dịch thất bại, thử lại.\n", id)
					continue // Thử lại nếu giao dịch bị hủy
				}
				log.Printf("Goroutine %d: lỗi giao dịch: %v", id, err)
				return
			}
		}(i)
	}
	time.Sleep(1 * time.Second)
	finalVal, _ := client.Get(ctx, key).Result()
	fmt.Printf("Giá trị cuối cùng: %s\n", finalVal)
}

3. Hàng đợi công việc (Message Queues)

Đối với các tác vụ bất đồng bộ, hàng đợi công việc là một giải pháp hiệu quả. Redis cung cấp các câu lệnh để xây dựng hàng đợi đơn giản và mạnh mẽ. 

Cách hoạt động:

  • Producer: Đẩy các công việc vào một Redis list bằng lệnh LPUSH.
  • Consumer/Workers: Nhiều goroutine trên các instance khác nhau sử dụng lệnh BRPOP để lấy công việc một cách nguyên tử và chờ nếu hàng đợi trống. BRPOP đảm bảo rằng mỗi công việc chỉ được xử lý bởi một worker duy nhất. 

Ví dụ:

  • Producer (gửi công việc):
    client.LPush(ctx, "tasks_queue", "task_id_123")
  • Consumer (nhận công việc):
    client.BRPop(ctx, 0, "tasks_queue")

4. Tốc độ giới hạn (Rate Limiting)

Rate limiting được sử dụng để kiểm soát số lượng yêu cầu mà một người dùng hoặc một instance có thể thực hiện trong một khoảng thời gian, bảo vệ hệ thống khỏi việc quá tải. 

Cách hoạt động:

  • Dùng một key Redis cho mỗi người dùng hoặc địa chỉ IP (rate_limit:ip_address).
  • Lệnh INCR được dùng để tăng bộ đếm số lần yêu cầu.
  • Lệnh EXPIRE được dùng để đặt thời gian hết hạn cho key, giới hạn thời gian tính toán.
  • Sử dụng Redis Pipeline để đảm bảo các thao tác INCREXPIRE được thực hiện nguyên tử.

5. Pub/Sub (Publish/Subscribe)

Redis Pub/Sub là mô hình nhắn tin cho phép các instance giao tiếp mà không cần biết đến nhau, hữu ích cho các thông báo theo thời gian thực hoặc phân phối sự kiện. 

Cách hoạt động:

  • Publisher: Một instance PUBLISH một thông điệp vào một channel.
  • Subscriber: Các goroutine trên các instance khác nhau SUBSCRIBE vào channel đó và nhận thông điệp ngay lập tức. 

Ví dụ sử dụng go-redis/v9:

// Subscriber
pubsub := client.Subscribe(ctx, "my-channel")
ch := pubsub.Channel()
for msg := range ch {
	fmt.Printf("Nhận thông điệp từ channel %s: %s\n", msg.Channel, msg.Payload)
}

// Publisher
client.Publish(ctx, "my-channel", "Hello, from instance 1!")

Tóm tắt và lựa chọn kỹ thuật

Kỹ thuậtTrường hợp sử dụngƯu điểmNhược điểm
Khóa phân tánTruy cập độc quyền vào tài nguyên dùng chung (ví dụ: cập nhật cơ sở dữ liệu, xử lý thanh toán).Đơn giản, đảm bảo an toàn cao.Có thể gây tắc nghẽn nếu tài nguyên bị tranh chấp liên tục.
Optimistic LockingCập nhật giá trị một cách an toàn mà không cần khóa toàn bộ.Hiệu quả khi xung đột ít xảy ra, tối ưu hóa hiệu suất.Yêu cầu logic xử lý khi giao dịch bị hủy.
Hàng đợi công việcXử lý các tác vụ bất đồng bộ, tạo worker pool.Giảm tải cho các yêu cầu đồng bộ, dễ dàng mở rộng.Độ trễ xử lý phụ thuộc vào hàng đợi.
Rate LimitingNgăn chặn lạm dụng API, bảo vệ tài nguyên hệ thống.Dễ cài đặt, hiệu quả cao.Có thể bỏ sót một số trường hợp đặc biệt ở ranh giới thời gian.
Pub/SubTruyền tải thông điệp theo thời gian thực giữa các instance.Phi tập trung, dễ dàng mở rộng, độ trễ thấp.Có thể mất dữ liệu nếu subscriber không hoạt động.

 

Share on: