地方エンジニアの学習日記

興味ある技術の雑なメモだったりを書いてくブログ。たまに日記とガジェット紹介。

【LSM-tree】SSTable

SSTable(Sorted String Table)は、LSM-Treeでデータを永続的に保存するためのファイルフォーマットです。SSTableは、ディスクに効率的にデータを格納するためにソートされたキーとバリューのペアを格納します。主に以下の構造を持っています。

  1. SSTableの基本構造

SSTableファイルは、通常以下の要素から構成されています:

  • データブロック(Data Block)
  • メタデータ(Metadata)
  • インデックス(Index)
  • フィルタ(Filter)
  • チェックサム(Checksum)(オプション)

これらの要素が一つのSSTableファイルに格納され、効率的に検索と書き込みが行えるようになっています。

Goで実装

SSTableは、主にKVSのデータ構造です。LSM-Treeや多くのNoSQLデータベースで使用される永続的なデータフォーマットとして、キーとバリューのペアをソートされた形式で格納することに特徴があるらしいです。Goでのサンプル実装。

package main

import (
    "fmt"
    "os"
    "log"
    "time"
    "bytes"
    "encoding/gob"
)

// MemTableはメモリ内に保持するデータ構造
type MemTable struct {
    data map[string]string
}

// 新しいMemTableを作成
func NewMemTable() *MemTable {
    return &MemTable{data: make(map[string]string)}
}

// MemTableにデータを追加
func (mt *MemTable) Put(key, value string) {
    mt.data[key] = value
}

// MemTableからデータを取得
func (mt *MemTable) Get(key string) (string, bool) {
    value, exists := mt.data[key]
    return value, exists
}

// MemTableの内容をディスクにフラッシュ
func (mt *MemTable) FlushToDisk(fileName string) error {
    var buffer bytes.Buffer
    enc := gob.NewEncoder(&buffer)
    if err := enc.Encode(mt.data); err != nil {
        return err
    }

    file, err := os.Create(fileName)
    if err != nil {
        return err
    }
    defer file.Close()

    _, err = file.Write(buffer.Bytes())
    return err
}

// SSTableをディスクから読み込む
func ReadSSTable(fileName string) (map[string]string, error) {
    file, err := os.Open(fileName)
    if err != nil {
        return nil, err
    }
    defer file.Close()

    var data map[string]string
    dec := gob.NewDecoder(file)
    if err := dec.Decode(&data); err != nil {
        return nil, err
    }

    return data, nil
}

// LSM-Tree構造体
type LSMTree struct {
    memTable    *MemTable
    sstableFile string
}

// 新しいLSM-Treeを作成
func NewLSMTree(sstableFile string) *LSMTree {
    return &LSMTree{
        memTable:    NewMemTable(),
        sstableFile: sstableFile,
    }
}

// LSM-Treeにデータを挿入
func (lsm *LSMTree) Put(key, value string) {
    // MemTableに追加
    lsm.memTable.Put(key, value)

    // MemTableが一定サイズに達した場合、SSTableにフラッシュ
    if len(lsm.memTable.data) > 10 { // 任意の閾値
        // フラッシュ処理
        err := lsm.memTable.FlushToDisk(lsm.sstableFile)
        if err != nil {
            log.Fatal(err)
        }
        // 新しいMemTableを作成
        lsm.memTable = NewMemTable()
    }
}

// LSM-Treeからデータを取得
func (lsm *LSMTree) Get(key string) (string, bool) {
    // MemTableから検索
    if value, found := lsm.memTable.Get(key); found {
        return value, true
    }

    // MemTableに無い場合、SSTableを検索
    data, err := ReadSSTable(lsm.sstableFile)
    if err != nil {
        log.Fatal(err)
    }

    // SSTableから検索
    if value, found := data[key]; found {
        return value, true
    }

    return "", false
}

func main() {
    // LSM-Treeの作成
    lsm := NewLSMTree("sstable.data")

    // データ挿入
    lsm.Put("key1", "value1")
    lsm.Put("key2", "value2")
    lsm.Put("key3", "value3")

    // データ取得
    if value, found := lsm.Get("key1"); found {
        fmt.Println("key1:", value)
    } else {
        fmt.Println("key1 not found")
    }

    // データ取得
    if value, found := lsm.Get("key2"); found {
        fmt.Println("key2:", value)
    } else {
        fmt.Println("key2 not found")
    }

    // データ取得
    if value, found := lsm.Get("key3"); found {
        fmt.Println("key3:", value)
    } else {
        fmt.Println("key3 not found")
    }
}

コンパクション

// コンパクション(複数のSSTableをマージ)
func Compaction(sstableFiles []string, newSSTableFile string) error {
    mergedData := make(map[string]string)

    // 各SSTableファイルを読み込んでマージ
    for _, fileName := range sstableFiles {
        data, err := ReadSSTable(fileName)
        if err != nil {
            return err
        }

        // データをマージ(重複を排除)
        for key, value := range data {
            mergedData[key] = value
        }
    }

    // マージされたデータを新しいSSTableとしてディスクに書き込む
    var buffer bytes.Buffer
    enc := gob.NewEncoder(&buffer)
    if err := enc.Encode(mergedData); err != nil {
        return err
    }

    // 新しいSSTableファイルを作成して書き込み
    file, err := os.Create(newSSTableFile)
    if err != nil {
        return err
    }
    defer file.Close()

    _, err = file.Write(buffer.Bytes())
    return err
}

【MySQL】MySQLにおけるオンラインスキーママイグレーションの現状

planetscale.com

MySQLでのオンラインスキーママイグレーションは、変更内容や環境によって適切な手法を選択することが重要です。

INSTANTは、適用可能な変更に対して非常に高速で効率的ですが、リバートが必要な場合や変更内容が複雑な場合には注意が必要です。

INPLACEは、非ブロッキング変更には適していません。

gh-ostやpt-online-schema-changeなどの外部ツールは、MySQLの標準機能では対応できない複雑なスキーマ変更に有効です。

また、PlanetScaleでは、オンラインスキーママイグレーションをサポートするツールやワークフローを提供しており、開発者が安全かつ効率的にスキーマ変更を行えるよう支援しています。

【curl】curlの--aws-sigv4オプション

概要

AWS SigV4 (Signature Version 4) は、AWSAPIリクエストを認証するための署名方法です。この署名プロトコルは、リクエストが正当なものであり、AWSアカウントの認証情報を使って送信されたことを証明するために使用されます。SigV4は、AWSのサービスにアクセスする際のセキュリティを強化するために使われる重要な技術です。

docs.aws.amazon.com

curlで使えた

       --aws-sigv4 <provider1[:prvdr2[:reg[:srv]]]>
              (HTTP) Use AWS V4 signature authentication in the transfer.

              The provider argument is a string that is used by the algorithm when creating outgoing authentication headers.

              The region argument is a string that points to a geographic area of a resources collection (region-code) when the region name is omitted from the endpoint.

              The service argument is a string that points to a function provided by a cloud (service-code) when the service name is omitted from the endpoint.

              If --aws-sigv4 is provided several times, the last set value is used.

              Example:
              curl --aws-sigv4 "aws:amz:us-east-2:es" --user "key:secret" https://example.com

              Added in 7.75.0. See also --basic and -u, --user.

GoでOpenSearchに対して通信する

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "log"

    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/opensearch-project/opensearch-go/v2"
    requestsigner "github.com/opensearch-project/opensearch-go/v2/signer/awsv2"
)

func main() {
    ctx := context.Background()
    cfg, _ := config.LoadDefaultConfig(ctx)
    signer, _ := requestsigner.NewSigner(cfg)

    endpoint := "ドメイン"

    client, _ := opensearch.NewClient(opensearch.Config{
        Addresses: []string{endpoint},
        Signer:    signer,
    })

    if info, err := client.Info(); err != nil {
        log.Fatal("info", err)
    } else {
        var r map[string]interface{}
        json.NewDecoder(info.Body).Decode(&r)
        version := r["version"].(map[string]interface{})
        fmt.Printf("%s: %s\n", version["distribution"], version["number"])
    }
}

【Python】ElastiCache for RedisのFOに対応する

import redis

r = redis.Redis(host='localhost', port=6379, db=0)

r.set('mykey', 'Hello, Redis!')
value = r.get('mykey')

print(value.decode('utf-8'))  # 出力: Hello, Redis!

何もしないとこんな感じのコードになる。exponential_backoff_retryを仕込む。

import redis
import time
import random

def exponential_backoff_retry(func, max_retries=5, base_delay=1.0):
    """
    指数バックオフを使用してRedis操作を再試行するヘルパー関数
    :param func: 実行する関数(通常、redisコマンド)
    :param max_retries: 最大リトライ回数
    :param base_delay: 最初の遅延時間(秒)
    """
    retries = 0
    while retries < max_retries:
        try:
            return func()  # Redisの操作を実行
        except redis.exceptions.ConnectionError as e:
            retries += 1
            wait_time = base_delay * (2 ** retries) + random.uniform(0, 1)
            print(f"Connection failed, retrying in {wait_time:.2f} seconds...")
            time.sleep(wait_time)
    raise Exception("Maximum retries reached. Could not connect to Redis.")


r = redis.Redis(host='localhost', port=6379, db=0)

def set_key():
    r.set('mykey', 'Hello with Exponential Backoff!')

exponential_backoff_retry(set_key)

そしてこんなことしなくても公式で実装されていることに気づいた。これで良いですね。

github.com

import os
from redis import Redis
from redis.backoff import ExponentialBackoff
from redis.retry import Retry
from redis.exceptions import ConnectionError, ReadOnlyError

retry = Retry(ExponentialBackoff(), 3)

redis_client = Redis(
    host=os.environ["REDIS_HOST"],
    port=int(os.environ["REDIS_PORT"]),
    retry=retry,
    retry_on_error=[ConnectionError, ReadOnlyError],
)