Manjusaka

Manjusaka

日常辣鸡水文: logging のプロセス安全問題について

日常辣鸡水文:关于 logging 的进程安全问题#

チームの食事会で少しお酒を飲んだので、ゴミ文書エンジニアとして日常の水文を書いてみます。

正文#

現在、チームのログ収集方法は、元々の TCP 直送 logstash から、単一ファイルに書き込む方式に改善され、FileBeat をログ収集のフロントエンドとして使用しています。しかし、これによりログが失われるという問題が頻繁に発生します。

ええ、私たちのオンラインサービスは Gunicorn を使用して複数の Worker を起動して処理しています。これには問題があります。私たちは皆、logging モジュールがスレッドセーフであることを知っています。標準の Log Handler 内部に一連のロックを追加してスレッドの安全性を確保していますが、logging がファイルに直接書き込むことはプロセスセーフなのでしょうか?

分析#

私たちがファイルに書き込む方法は、logging モジュールに内蔵されている FileHandler を使用しています。まず、そのソースコードを見てみましょう。

class FileHandler(StreamHandler):
    """
    フォーマットされたログ記録をディスクファイルに書き込むハンドラークラス。
    """
    def __init__(self, filename, mode='a', encoding=None, delay=False):
        """
        指定されたファイルを開き、ログ用のストリームとして使用します。
        """
        # Issue #27493: Path オブジェクトのサポートを追加
        filename = os.fspath(filename)
        #絶対パスを保持します。そうしないと、このクラスを使用する派生クラスが
        #現在のディレクトリが変更されたときに問題を引き起こす可能性があります。
        self.baseFilename = os.path.abspath(filename)
        self.mode = mode
        self.encoding = encoding
        self.delay = delay
        if delay:
            #ストリームを開きませんが、レベル、フォーマッター、ロックなどを設定するために
            #Handler コンストラクタを呼び出す必要があります。
            Handler.__init__(self)
            self.stream = None
        else:
            StreamHandler.__init__(self, self._open())

    def close(self):
        """
        ストリームを閉じます。
        """
        self.acquire()
        try:
            try:
                if self.stream:
                    try:
                        self.flush()
                    finally:
                        stream = self.stream
                        self.stream = None
                        if hasattr(stream, "close"):
                            stream.close()
            finally:
                # Issue #19523: 遅延が設定されている場合にハンドラーリークを防ぐために
                StreamHandler.close(self)
        finally:
            self.release()

    def _open(self):
        """
        現在のベースファイルを(元の)モードとエンコーディングで開きます。
        結果のストリームを返します。
        """
        return open(self.baseFilename, self.mode, encoding=self.encoding)

    def emit(self, record):
        """
        レコードを発行します。

        コンストラクタで 'delay' が指定されていない場合、スーパーの emit を呼び出す前にストリームを開きます。
        """
        if self.stream is None:
            self.stream = self._open()
        StreamHandler.emit(self, record)

    def __repr__(self):
        level = getLevelName(self.level)
        return '<%s %s (%s)>' % (self.__class__.__name__, self.baseFilename, level)

ええ、その中で注目すべき点は _open メソッドと emit メソッドです。まず、背景知識を説明します。私たちが logging を使用してログを出力する際、logging モジュールは対応する Handlerhandle メソッドを呼び出します。handle メソッドの中で、emit メソッドが呼び出され、最終的なログが出力されます。したがって、FileHandler を使用する場合、最初に handle メソッドの呼び出しをトリガーし、その後 emit メソッドをトリガーし、_open メソッドを呼び出して file point を取得し、その後親クラス(より正確には MRO の上位クラス)である StreamHandleremit メソッドを呼び出します。

次に、StreamHandleremit メソッドを見てみましょう。

class StreamHandler(Handler):
    """
    ログ記録を適切にフォーマットしてストリームに書き込むハンドラークラス。
    このクラスはストリームを閉じません。sys.stdout または sys.stderr が使用される可能性があります。
    """

    terminator = '\n'

    def __init__(self, stream=None):
        """
        ハンドラーを初期化します。

        ストリームが指定されていない場合、sys.stderr が使用されます。
        """
        Handler.__init__(self)
        if stream is None:
            stream = sys.stderr
        self.stream = stream

    def flush(self):
        """
        ストリームをフラッシュします。
        """
        self.acquire()
        try:
            if self.stream and hasattr(self.stream, "flush"):
                self.stream.flush()
        finally:
            self.release()

    def emit(self, record):
        """
        レコードを発行します。

        フォーマッターが指定されている場合、それを使用してレコードをフォーマットします。
        レコードはその後、ストリームに改行を付けて書き込まれます。例外情報が存在する場合、それは
        traceback.print_exception を使用してフォーマットされ、ストリームに追加されます。ストリームに
        'encoding' 属性がある場合、それはストリームへの出力方法を決定するために使用されます。
        """
        try:
            msg = self.format(record)
            stream = self.stream
            stream.write(msg)
            stream.write(self.terminator)
            self.flush()
        except Exception:
            self.handleError(record)

    def __repr__(self):
        level = getLevelName(self.level)
        name = getattr(self.stream, 'name', '')
        if name:
            name += ' '
        return '<%s %s(%s)>' % (self.__class__.__name__, name, level)

ええ、とてもシンプルです。以前取得した file point を使用してファイルにデータを書き込みます。

問題はここにあります。FileHandler_open 関数で open 関数を呼び出すとき、選択される mode'a' です。つまり、通常の O_APPEND モードです。私たちは通常、O_APPEND はプロセスセーフと見なすことができることを知っています。なぜなら、O_APPEND は内容が他の O_APPEND 書き込み操作によって上書きされないことを保証するからです。しかし、なぜここでログが失われる状況が発生するのでしょうか?

理由は、POSIX に特別な設計が存在するためです。《POSIX Programmers Guide》という書籍では、次のように説明されています。

  • PIPE_BUF バイト未満の書き込みは原子性があり、データは同じパイプに書き込む他のプロセスからのデータと干渉しません。PIPE_BUF を超える書き込みは、任意の方法でデータが干渉する可能性があります。

この文の翻訳は、POSIX には PIPE_BUF という変数が存在し、そのサイズは 512 であることを示しています。書き込み操作のサイズが PIPE_BUF の値未満である場合、原子性があり、すなわち中断されることはなく、他のプロセスが書き込む値と混乱することはありません。しかし、書き込む内容が PIPE_BUF を超える場合、オペレーティングシステムはこの点を保証できません。

Linux オペレーティングシステムでは、この値が少し変わります。

  • POSIX.1 は、PIPE_BUF バイト未満の write (2) は原子性があると述べています。出力データはパイプに連続したシーケンスとして書き込まれます。PIPE_BUF バイトを超える書き込みは原子性がない可能性があります。カーネルは他のプロセスによって書き込まれたデータと干渉する可能性があります。POSIX.1 は PIPE_BUF を少なくとも 512 バイトとすることを要求します。(Linux では、PIPE_BUF は 4096 バイトです。)

つまり、4K を超える書き込み操作は原子性を保証できず、データの混乱が発生する可能性があります。

データの混乱が発生すると、ログのフォーマットが不定になり、最終的に解析側が解析できず、結果としてログが失われます。

ここで再現してみましょう。まずテストコードを。

最后#

このような操作は以前は考えたことがありませんでしたが、今日は新しい扉を開いたようです。最後に、@依云先輩の指導に感謝します。先輩の指摘がなければ、O_APPEND モードでもデータが安全であるとは全く考えられませんでした。

Reference#

文中で参考にした資料が二つあります。リンクは以下の通りです。

1.OReilly POSIX Programmers Guide

2.Linux Man: PIPE

読み込み中...
文章は、創作者によって署名され、ブロックチェーンに安全に保存されています。