mackerel-agent 徹底解説
TIME rest time current/total
TopicsPlaceHolder

mackerel-agent 徹底解説

GoCon 2014 autumn

Nov 30th, 2014

Profile

songmu

Mackerel Logo

 

Mackerel (日本語で鯖)

アーキテクチャ

Webアプリケーションの構成

mackerel-agentとは?

Why Go?

プラグインによる拡張

独自プラグインの書き方

mackerel-agentソースコード解説

やっと本題

更に

コア以外の依存

ディレクトリ構成

packageを細かく分けている(アンチパターンかも…)

main.go / main()

設定ファイル読み込んで、ロガー設定して、start()に処理を渡す

func main() {
    conf, printVersion := resolveConfig()
    // (snip)
    logger.Infof("Starting mackerel-agent version:%s, rev:%s", version.VERSION, version.GITCOMMIT)
    // (snip)
    if err := start(conf); err != nil {
        exit(1, conf)
    }
}

main.go / start()

シグナルハンドルとgraceful shutdown

c := make(chan os.Signal, 1)
termChan := make(chan chan int) // メインの処理との情報のやりとり
signal.Notify(c, os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGHUP)
go func() { // シグナルハンドリング用のgoroutine
    for sig := range c {
        if sig == syscall.SIGHUP { // sighupを受け取ったらhost情報を読み込み直す
            command.UpdateHostSpecs(conf, api, host)
        } else { // 他のシグナルを受け取ったら処理の終了を待ってexit
            exitChan := make(chan int)
            termChan <- exitChan // チャンネルにチャンネルを渡す
            go func() { // 但し処理に時間が掛かるようなら強制的に終了
                time.Sleep(MAX_TERMINATING_INTERVAL * time.Second)
                exitChan <- 1
            }()
            exitCode := <-exitChan // 渡したチャンネルからexitCodeの返却を待つ
            exit(exitCode, conf)
        }
    }
}()
command.Run(conf, api, host, termChan) // メインの処理

chan chan

command.go / Run()

メトリクス収集項目をロードしてAgentオブジェクトの作成した後、loop() (メイン処理)に処理を渡す

func Run(conf *config.Config, api *mackerel.API, host *mackerel.Host, termChan chan chan int) {
    logger.Infof("Start: apibase = %s, hostName = %s, hostId = %s", conf.Apibase, host.Name, host.Id)
    ag := &agent.Agent{
        MetricsGenerators: metricsGenerators(conf),
        PluginGenerators:  pluginGenerators(conf),
    }
    ag.InitPluginGenerators(api)
    loop(ag, conf, api, host, termChan)
}

command.go / loop()

状態管理について

queueStateというtypeを定義

type queueState int
const (
    queueStateFirst queueState = iota // 初期状態
    queueStateDefault  // 通常
    queueStateQueued   // キューが溜まっている場合
    queueStateTerminated  // シグナルを受け取っている場合
)

メトリクス投稿処理(初期化処理)

// 十分なバッファを溜め込めるように
postQueue := make(chan []*mackerel.CreatingMetricsValue, conf.Connection.Post_Metrics_Buffer_Size)
go func() {
    postDelaySeconds := delayByHost(host) // あとで解説
    qState := queueStateFirst             // キューの状態
    exitChan := make(chan int)            // 処理を抜ける際にexitCodeを送るチャンネル

続: メトリクス投稿(終了割り込みに対する処理)

for {
    select {
    case exitChan = <-termChan: // シグナルを受け取ったら割り込まれる
        if len(postQueue) <= 0 {
            exitChan <- 0 // キューにデータが残ってなかったら抜ける
        } else { // 残っている場合はキューの状態を更新して処理続行
            qState = queueStateTerminated
        }
    case values := <-postQueue:

続: メトリクス投稿(bulk送信)

case values := <-postQueue:
    if len(postQueue) > 0 {
        // 送り過ぎないように最大2件
        logger.Debugf("Merging datapoints with next queued ones")
        nextValues := <-postQueue
        values = append(values, nextValues...)
    }

続: メトリクス投稿(送信タイミング調整)

// 状態に応じてdelayさせるタイミングを調整
delaySeconds := 0
switch qState {
case queueStateTerminated:
    delaySeconds = 1
case queueStateFirst:
    // nop
case queueStateQueued:
    delaySeconds = conf.Connection.Post_Metrics_Dequeue_Delay_Seconds
default:
    // 通常状態で00秒にAPIへのサーバーがアクセスすると困るので投稿時間をhashingしている
    elapsedSeconds := time.Now().Second() % int(config.PostMetricsInterval.Seconds())
    if postDelaySeconds > elapsedSeconds {
        delaySeconds = postDelaySeconds - elapsedSeconds
    }
}

続: メトリクス投稿(状態更新)

if qState != queueStateTerminated {
    if len(postQueue) > 0 {
        qState = queueStateQueued
    } else {
        qState = queueStateDefault
    }
}

続: メトリクス投稿(sleep処理)

sleep中にシグナル受け取っても大丈夫なように待ち受け(汚い…)

    sleepCh := make(chan struct{})
    go func() {
        time.Sleep(delaySeconds * time.Second)
        sleepCh <- struct{}{}
    }()
sleepLoop:
    for {
        select {
        case <-sleepCh:
            break sleepLoop
        case exitChan = <-termChan:
            qState = queueStateTerminated
            break sleepLoop
        }
    }

続: メトリクス投稿(投稿処理)

泥臭い。ここは要改善。

tries := conf.Connection.Post_Metrics_Retry_Max
for {
    err := api.PostMetricsValues(values)
    if err == nil {
        logger.Debugf("Posting metrics succeeded.")
        break
    }
    tries -= 1
    if tries <= 0 {
        logger.Errorf("Give up retrying to post metrics.")
        break
    }
    time.Sleep(conf.Connection.Post_Metrics_Retry_Delay_Seconds * time.Second)
}

続: メトリクス投稿(終了処理)

最後です。

// シグナル受信状態でキューにデータが残ってなかったらexit
if qState == queueStateTerminated && len(postQueue) <= 0 {
    exitChan <- 0
}

結論

We are Hiring

hatena