ロギングクックブック—Pythonドキュメント

提供:Dev Guides
< PythonPython/docs/3.9/howto/logging-cookbook
移動先:案内検索

ロギングクックブック

著者
ビナイサジップ

このページには、過去に役立つことがわかっているロギングに関連するレシピがいくつか含まれています。

複数のモジュールへのログインの使用

logging.getLogger('someLogger')を複数回呼び出すと、同じロガーオブジェクトへの参照が返されます。 これは、同じモジュール内だけでなく、同じPythonインタープリタープロセス内にある限り、モジュール間でも当てはまります。 同じオブジェクトへの参照にも当てはまります。 さらに、アプリケーションコードは、1つのモジュールで親ロガーを定義および構成し、別のモジュールで子ロガーを作成(構成はしない)することができ、子へのすべてのロガー呼び出しは親に渡されます。 メインモジュールは次のとおりです。

import logging
import auxiliary_module

# create logger with 'spam_application'
logger = logging.getLogger('spam_application')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)

logger.info('creating an instance of auxiliary_module.Auxiliary')
a = auxiliary_module.Auxiliary()
logger.info('created an instance of auxiliary_module.Auxiliary')
logger.info('calling auxiliary_module.Auxiliary.do_something')
a.do_something()
logger.info('finished auxiliary_module.Auxiliary.do_something')
logger.info('calling auxiliary_module.some_function()')
auxiliary_module.some_function()
logger.info('done with auxiliary_module.some_function()')

補助モジュールは次のとおりです。

import logging

# create logger
module_logger = logging.getLogger('spam_application.auxiliary')

class Auxiliary:
    def __init__(self):
        self.logger = logging.getLogger('spam_application.auxiliary.Auxiliary')
        self.logger.info('creating an instance of Auxiliary')

    def do_something(self):
        self.logger.info('doing something')
        a = 1 + 1
        self.logger.info('done doing something')

def some_function():
    module_logger.info('received a call to "some_function"')

出力は次のようになります。

2005-03-23 23:47:11,663 - spam_application - INFO -
   creating an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,665 - spam_application.auxiliary.Auxiliary - INFO -
   creating an instance of Auxiliary
2005-03-23 23:47:11,665 - spam_application - INFO -
   created an instance of auxiliary_module.Auxiliary
2005-03-23 23:47:11,668 - spam_application - INFO -
   calling auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,668 - spam_application.auxiliary.Auxiliary - INFO -
   doing something
2005-03-23 23:47:11,669 - spam_application.auxiliary.Auxiliary - INFO -
   done doing something
2005-03-23 23:47:11,670 - spam_application - INFO -
   finished auxiliary_module.Auxiliary.do_something
2005-03-23 23:47:11,671 - spam_application - INFO -
   calling auxiliary_module.some_function()
2005-03-23 23:47:11,672 - spam_application.auxiliary - INFO -
   received a call to 'some_function'
2005-03-23 23:47:11,673 - spam_application - INFO -
   done with auxiliary_module.some_function()

複数のスレッドからのロギング

複数のスレッドからのロギングには、特別な労力は必要ありません。 次の例は、メイン(初期)スレッドと別のスレッドからのロギングを示しています。

import logging
import threading
import time

def worker(arg):
    while not arg['stop']:
        logging.debug('Hi from myfunc')
        time.sleep(0.5)

def main():
    logging.basicConfig(level=logging.DEBUG, format='%(relativeCreated)6d %(threadName)s %(message)s')
    info = {'stop': False}
    thread = threading.Thread(target=worker, args=(info,))
    thread.start()
    while True:
        try:
            logging.debug('Hello from main')
            time.sleep(0.75)
        except KeyboardInterrupt:
            info['stop'] = True
            break
    thread.join()

if __name__ == '__main__':
    main()

実行すると、スクリプトは次のように出力されます。

   0 Thread-1 Hi from myfunc
   3 MainThread Hello from main
 505 Thread-1 Hi from myfunc
 755 MainThread Hello from main
1007 Thread-1 Hi from myfunc
1507 MainThread Hello from main
1508 Thread-1 Hi from myfunc
2010 Thread-1 Hi from myfunc
2258 MainThread Hello from main
2512 Thread-1 Hi from myfunc
3009 MainThread Hello from main
3013 Thread-1 Hi from myfunc
3515 Thread-1 Hi from myfunc
3761 MainThread Hello from main
4017 Thread-1 Hi from myfunc
4513 MainThread Hello from main
4518 Thread-1 Hi from myfunc

これは、予想どおりにログ出力が散在していることを示しています。 もちろん、このアプローチは、ここに示されているよりも多くのスレッドで機能します。


複数のハンドラーとフォーマッター

ロガーはプレーンなPythonオブジェクトです。 addHandler()メソッドには、追加できるハンドラーの数に対する最小または最大のクォータがありません。 アプリケーションがすべての重大度のすべてのメッセージをテキストファイルに記録すると同時に、エラー以上をコンソールに記録すると便利な場合があります。 これを設定するには、適切なハンドラーを構成するだけです。 アプリケーションコードのロギング呼び出しは変更されません。 これは、前の単純なモジュールベースの構成例へのわずかな変更です。

import logging

logger = logging.getLogger('simple_example')
logger.setLevel(logging.DEBUG)
# create file handler which logs even debug messages
fh = logging.FileHandler('spam.log')
fh.setLevel(logging.DEBUG)
# create console handler with a higher log level
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
fh.setFormatter(formatter)
# add the handlers to logger
logger.addHandler(ch)
logger.addHandler(fh)

# 'application' code
logger.debug('debug message')
logger.info('info message')
logger.warning('warn message')
logger.error('error message')
logger.critical('critical message')

'application'コードは複数のハンドラーを気にしないことに注意してください。 変更されたのは、 fh という名前の新しいハンドラーの追加と構成だけでした。

重大度の高いまたは低いフィルターを使用して新しいハンドラーを作成する機能は、アプリケーションを作成およびテストするときに非常に役立ちます。 デバッグに多くのprintステートメントを使用する代わりに、logger.debugを使用します。後で削除またはコメントアウトする必要があるprintステートメントとは異なり、logger.debugステートメントはソースコードにそのまま残すことができます。再び必要になるまで休眠状態を保ちます。 その時点で発生する必要がある唯一の変更は、デバッグするロガーやハンドラーの重大度レベルを変更することです。


複数の宛先へのロギング

さまざまなメッセージ形式でさまざまな状況でコンソールとファイルにログを記録するとします。 レベルがDEBUG以上のメッセージをファイルに記録し、レベルがINFO以上のメッセージをコンソールに記録するとします。 また、ファイルにはタイムスタンプが含まれている必要がありますが、コンソールメッセージには含まれていないと仮定しましょう。 これを実現する方法は次のとおりです。

import logging

# set up logging to file - see previous section for more details
logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(name)-12s %(levelname)-8s %(message)s',
                    datefmt='%m-%d %H:%M',
                    filename='/temp/myapp.log',
                    filemode='w')
# define a Handler which writes INFO messages or higher to the sys.stderr
console = logging.StreamHandler()
console.setLevel(logging.INFO)
# set a format which is simpler for console use
formatter = logging.Formatter('%(name)-12s: %(levelname)-8s %(message)s')
# tell the handler to use this format
console.setFormatter(formatter)
# add the handler to the root logger
logging.getLogger('').addHandler(console)

# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')

# Now, define a couple of other loggers which might represent areas in your
# application:

logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')

logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')

これを実行すると、コンソールに表示されます

root        : INFO     Jackdaws love my big sphinx of quartz.
myapp.area1 : INFO     How quickly daft jumping zebras vex.
myapp.area2 : WARNING  Jail zesty vixen who grabbed pay from quack.
myapp.area2 : ERROR    The five boxing wizards jump quickly.

ファイルには次のようなものが表示されます

10-22 22:19 root         INFO     Jackdaws love my big sphinx of quartz.
10-22 22:19 myapp.area1  DEBUG    Quick zephyrs blow, vexing daft Jim.
10-22 22:19 myapp.area1  INFO     How quickly daft jumping zebras vex.
10-22 22:19 myapp.area2  WARNING  Jail zesty vixen who grabbed pay from quack.
10-22 22:19 myapp.area2  ERROR    The five boxing wizards jump quickly.

ご覧のとおり、DEBUGメッセージはファイルにのみ表示されます。 他のメッセージは両方の宛先に送信されます。

この例では、コンソールハンドラーとファイルハンドラーを使用していますが、選択したハンドラーの数と組み合わせを任意に使用できます。


構成サーバーの例

ロギング構成サーバーを使用するモジュールの例を次に示します。

import logging
import logging.config
import time
import os

# read initial config file
logging.config.fileConfig('logging.conf')

# create and start listener on port 9999
t = logging.config.listen(9999)
t.start()

logger = logging.getLogger('simpleExample')

try:
    # loop through logging calls to see the difference
    # new configurations make, until Ctrl+C is pressed
    while True:
        logger.debug('debug message')
        logger.info('info message')
        logger.warning('warn message')
        logger.error('error message')
        logger.critical('critical message')
        time.sleep(5)
except KeyboardInterrupt:
    # cleanup
    logging.config.stopListening()
    t.join()

そして、これがファイル名を受け取り、そのファイルをサーバーに送信するスクリプトです。新しいロギング構成として、適切にバイナリエンコードされた長さが前に付けられます。

#!/usr/bin/env python
import socket, sys, struct

with open(sys.argv[1], 'rb') as f:
    data_to_send = f.read()

HOST = 'localhost'
PORT = 9999
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print('connecting...')
s.connect((HOST, PORT))
print('sending config...')
s.send(struct.pack('>L', len(data_to_send)))
s.send(data_to_send)
s.close()
print('complete')

ブロックするハンドラーの処理

場合によっては、ロギング元のスレッドをブロックせずにロギングハンドラーに作業を行わせる必要があります。 これはWebアプリケーションでは一般的ですが、もちろん他のシナリオでも発生します。

動作が遅いことを示す一般的な原因は、 SMTPHandler です。開発者の制御が及ばないさまざまな理由(たとえば、メールやネットワークインフラストラクチャのパフォーマンスの低下)により、電子メールの送信に時間がかかる場合があります。 ただし、ほとんどすべてのネットワークベースのハンドラーがブロックできます。 SocketHandler 操作でさえ、速度が遅すぎる内部でDNSクエリを実行する可能性があります(このクエリは、Pythonレイヤーの下のソケットライブラリコードの奥深くにある可能性があります。そしてあなたのコントロールの外)。

1つの解決策は、2つの部分からなるアプローチを使用することです。 最初の部分では、パフォーマンスが重要なスレッドからアクセスされるロガーに QueueHandler のみをアタッチします。 キューに書き込むだけです。キューは、十分な大きさの容量にサイズ設定することも、サイズに上限を設けずに初期化することもできます。 キューへの書き込みは通常すぐに受け入れられますが、コードの予防措置として queue.Full 例外をキャッチする必要があります。 コードにパフォーマンスが重要なスレッドがあるライブラリ開発者の場合は、コードを使用する他の開発者のために、必ずこれを文書化してください(QueueHandlersのみをロガーに添付することをお勧めします)。 。

ソリューションの2番目の部分は、 QueueListener です。これは、 QueueHandler に対応するものとして設計されています。 QueueListener は非常に単純です。キューといくつかのハンドラーが渡され、QueueHandlers(または[X201Xの他のソース)から送信されたLogRecordsのキューをリッスンする内部スレッドを起動します。 ] 、そのことについて)。 LogRecordsはキューから削除され、処理のためにハンドラーに渡されます。

個別の QueueListener クラスを持つことの利点は、同じインスタンスを使用して複数のQueueHandlersにサービスを提供できることです。 これは、たとえば、既存のハンドラークラスのスレッドバージョンを使用するよりもリソースに適しています。既存のハンドラークラスは、ハンドラーごとに1つのスレッドを消費し、特別なメリットはありません。

これら2つのクラスの使用例は次のとおりです(インポートは省略)。

que = queue.Queue(-1)  # no limit on size
queue_handler = QueueHandler(que)
handler = logging.StreamHandler()
listener = QueueListener(que, handler)
root = logging.getLogger()
root.addHandler(queue_handler)
formatter = logging.Formatter('%(threadName)s: %(message)s')
handler.setFormatter(formatter)
listener.start()
# The log output will display the thread which generated
# the event (the main thread) rather than the internal
# thread which monitors the internal queue. This is what
# you want to happen.
root.warning('Look out!')
listener.stop()

これを実行すると、次のようになります。

MainThread: Look out!

バージョン3.5で変更: Python 3.5より前は、 QueueListener は常に、キューから受信したすべてのメッセージを、初期化されたすべてのハンドラーに渡していました。 (これは、レベルフィルタリングがすべて、キューがいっぱいになる反対側で行われると想定されていたためです。)3.5以降、この動作は、キーワード引数respect_handler_level=Trueをリスナーのコンストラクターに渡すことで変更できます。 これが行われると、リスナーは各メッセージのレベルをハンドラーのレベルと比較し、適切な場合にのみメッセージをハンドラーに渡します。


ネットワークを介したロギングイベントの送受信

ネットワークを介してログイベントを送信し、受信側でそれらを処理するとします。 これを行う簡単な方法は、 SocketHandler インスタンスを送信側のルートロガーにアタッチすることです。

import logging, logging.handlers

rootLogger = logging.getLogger('')
rootLogger.setLevel(logging.DEBUG)
socketHandler = logging.handlers.SocketHandler('localhost',
                    logging.handlers.DEFAULT_TCP_LOGGING_PORT)
# don't bother with a formatter, since a socket handler sends the event as
# an unformatted pickle
rootLogger.addHandler(socketHandler)

# Now, we can log to the root logger, or any other logger. First the root...
logging.info('Jackdaws love my big sphinx of quartz.')

# Now, define a couple of other loggers which might represent areas in your
# application:

logger1 = logging.getLogger('myapp.area1')
logger2 = logging.getLogger('myapp.area2')

logger1.debug('Quick zephyrs blow, vexing daft Jim.')
logger1.info('How quickly daft jumping zebras vex.')
logger2.warning('Jail zesty vixen who grabbed pay from quack.')
logger2.error('The five boxing wizards jump quickly.')

受信側では、 socketserver モジュールを使用して受信機をセットアップできます。 基本的な作業例を次に示します。

import pickle
import logging
import logging.handlers
import socketserver
import struct


class LogRecordStreamHandler(socketserver.StreamRequestHandler):
    """Handler for a streaming logging request.

    This basically logs the record using whatever logging policy is
    configured locally.
    """

    def handle(self):
        """
        Handle multiple requests - each expected to be a 4-byte length,
        followed by the LogRecord in pickle format. Logs the record
        according to whatever policy is configured locally.
        """
        while True:
            chunk = self.connection.recv(4)
            if len(chunk) < 4:
                break
            slen = struct.unpack('>L', chunk)[0]
            chunk = self.connection.recv(slen)
            while len(chunk) < slen:
                chunk = chunk + self.connection.recv(slen - len(chunk))
            obj = self.unPickle(chunk)
            record = logging.makeLogRecord(obj)
            self.handleLogRecord(record)

    def unPickle(self, data):
        return pickle.loads(data)

    def handleLogRecord(self, record):
        # if a name is specified, we use the named logger rather than the one
        # implied by the record.
        if self.server.logname is not None:
            name = self.server.logname
        else:
            name = record.name
        logger = logging.getLogger(name)
        # N.B. EVERY record gets logged. This is because Logger.handle
        # is normally called AFTER logger-level filtering. If you want
        # to do filtering, do it at the client end to save wasting
        # cycles and network bandwidth!
        logger.handle(record)

class LogRecordSocketReceiver(socketserver.ThreadingTCPServer):
    """
    Simple TCP socket-based logging receiver suitable for testing.
    """

    allow_reuse_address = True

    def __init__(self, host='localhost',
                 port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
                 handler=LogRecordStreamHandler):
        socketserver.ThreadingTCPServer.__init__(self, (host, port), handler)
        self.abort = 0
        self.timeout = 1
        self.logname = None

    def serve_until_stopped(self):
        import select
        abort = 0
        while not abort:
            rd, wr, ex = select.select([self.socket.fileno()],
                                       [], [],
                                       self.timeout)
            if rd:
                self.handle_request()
            abort = self.abort

def main():
    logging.basicConfig(
        format='%(relativeCreated)5d %(name)-15s %(levelname)-8s %(message)s')
    tcpserver = LogRecordSocketReceiver()
    print('About to start TCP server...')
    tcpserver.serve_until_stopped()

if __name__ == '__main__':
    main()

最初にサーバーを実行し、次にクライアントを実行します。 クライアント側では、コンソールには何も出力されません。 サーバー側では、次のように表示されます。

About to start TCP server...
   59 root            INFO     Jackdaws love my big sphinx of quartz.
   59 myapp.area1     DEBUG    Quick zephyrs blow, vexing daft Jim.
   69 myapp.area1     INFO     How quickly daft jumping zebras vex.
   69 myapp.area2     WARNING  Jail zesty vixen who grabbed pay from quack.
   69 myapp.area2     ERROR    The five boxing wizards jump quickly.

一部のシナリオでは、pickleにセキュリティ上の問題があることに注意してください。 これらが影響する場合は、makePickle()メソッドをオーバーライドしてそこに代替を実装し、上記のスクリプトを代替のシリアル化を使用するように適合させることで、代替のシリアル化スキームを使用できます。


ログ出力にコンテキスト情報を追加する

ロギング呼び出しに渡されるパラメーターに加えて、ロギング出力にコンテキスト情報を含めたい場合があります。 たとえば、ネットワークアプリケーションでは、クライアント固有の情報をログに記録することが望ましい場合があります(たとえば、 リモートクライアントのユーザー名、またはIPアドレス)。 extra パラメーターを使用してこれを実現することもできますが、この方法で情報を渡すことが常に便利であるとは限りません。 接続ごとにLoggerインスタンスを作成したくなるかもしれませんが、これらのインスタンスはガベージコレクションされていないため、これはお勧めできません。 これは実際には問題ではありませんが、Loggerインスタンスの数がアプリケーションのロギングで使用する粒度のレベルに依存している場合、 [の数が管理しにくい場合があります。 X212X]インスタンスは事実上無制限になります。

LoggerAdaptersを使用してコンテキスト情報を伝達する

ロギングイベント情報とともに出力されるコンテキスト情報を渡すことができる簡単な方法は、LoggerAdapterクラスを使用することです。 このクラスはLoggerのように設計されているため、debug()info()warning()error()、[X114X ] 、critical()log()。 これらのメソッドは、Loggerの対応するメソッドと同じシグネチャを持っているため、2つのタイプのインスタンスを同じ意味で使用できます。

LoggerAdapterのインスタンスを作成するときは、Loggerインスタンスと、コンテキスト情報を含むdictのようなオブジェクトを渡します。 LoggerAdapterのインスタンスでロギングメソッドの1つを呼び出すと、コンストラクターに渡されたLoggerの基になるインスタンスに呼び出しが委任され、委任された呼び出しでコンテキスト情報が渡されるように調整されます。 。 LoggerAdapterのコードの抜粋は次のとおりです。

def debug(self, msg, /, *args, **kwargs):
    """
    Delegate a debug call to the underlying logger, after adding
    contextual information from this adapter instance.
    """
    msg, kwargs = self.process(msg, kwargs)
    self.logger.debug(msg, *args, **kwargs)

LoggerAdapterprocess()メソッドでは、コンテキスト情報がロギング出力に追加されます。 ロギング呼び出しのメッセージとキーワードの引数が渡され、基になるロガーへの呼び出しで使用するために、これらの(潜在的に)変更されたバージョンが返されます。 このメソッドのデフォルトの実装では、メッセージはそのままですが、コンストラクターに渡されるdictのようなオブジェクトを値とするキーワード引数に「extra」キーが挿入されます。 もちろん、アダプターの呼び出しで「extra」キーワード引数を渡した場合は、サイレントに上書きされます。

'extra'を使用する利点は、dictのようなオブジェクトの値がLogRecordインスタンスの__dict__にマージされ、キーを認識しているFormatterインスタンスでカスタマイズされた文字列を使用できることです。 dictのようなオブジェクトの。 別の方法が必要な場合、例えば メッセージ文字列にコンテキスト情報を追加または追加する場合は、LoggerAdapterをサブクラス化し、process()をオーバーライドするだけで必要な処理を実行できます。 簡単な例を次に示します。

class CustomAdapter(logging.LoggerAdapter):
    """
    This example adapter expects the passed in dict-like object to have a
    'connid' key, whose value in brackets is prepended to the log message.
    """
    def process(self, msg, kwargs):
        return '[%s] %s' % (self.extra['connid'], msg), kwargs

あなたはこのように使うことができます:

logger = logging.getLogger(__name__)
adapter = CustomAdapter(logger, {'connid': some_conn_id})

次に、アダプタにログを記録するイベントには、ログメッセージの前にsome_conn_idの値が付加されます。

dict以外のオブジェクトを使用してコンテキスト情報を渡す

実際のdictをLoggerAdapterに渡す必要はありません。__getitem____iter__を実装するクラスのインスタンスを渡して、次のdictのように見せることができます。ロギング。 これは、値を動的に生成する場合に役立ちます(dictの値は一定です)。


フィルタを使用してコンテキスト情報を伝達する

ユーザー定義のFilterを使用して、コンテキスト情報をログ出力に追加することもできます。 Filterインスタンスは、渡されたLogRecordsを変更できます。これには、適切なフォーマット文字列を使用して出力できる属性を追加したり、必要に応じてカスタムFormatterを追加したりできます。

たとえば、Webアプリケーションでは、処理中のリクエスト(または少なくともその興味深い部分)をthreadlocal( threading.local )変数に格納し、 [からアクセスできます。 X202X]たとえば、 [のように属性名「ip」と「user」を使用して、リクエストからの情報(たとえば、リモートIPアドレスとリモートユーザーのユーザー名)をLogRecordに追加します。 X382X]上記の例。 その場合、同じフォーマット文字列を使用して、上記と同様の出力を取得できます。 スクリプトの例を次に示します。

import logging
from random import choice

class ContextFilter(logging.Filter):
    """
    This is a filter which injects contextual information into the log.

    Rather than use actual contextual information, we just use random
    data in this demo.
    """

    USERS = ['jim', 'fred', 'sheila']
    IPS = ['123.231.231.123', '127.0.0.1', '192.168.0.1']

    def filter(self, record):

        record.ip = choice(ContextFilter.IPS)
        record.user = choice(ContextFilter.USERS)
        return True

if __name__ == '__main__':
    levels = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL)
    logging.basicConfig(level=logging.DEBUG,
                        format='%(asctime)-15s %(name)-5s %(levelname)-8s IP: %(ip)-15s User: %(user)-8s %(message)s')
    a1 = logging.getLogger('a.b.c')
    a2 = logging.getLogger('d.e.f')

    f = ContextFilter()
    a1.addFilter(f)
    a2.addFilter(f)
    a1.debug('A debug message')
    a1.info('An info message with %s', 'some parameters')
    for x in range(10):
        lvl = choice(levels)
        lvlname = logging.getLevelName(lvl)
        a2.log(lvl, 'A message at %s level with %d %s', lvlname, 2, 'parameters')

これを実行すると、次のようなものが生成されます。

2010-09-06 22:38:15,292 a.b.c DEBUG    IP: 123.231.231.123 User: fred     A debug message
2010-09-06 22:38:15,300 a.b.c INFO     IP: 192.168.0.1     User: sheila   An info message with some parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1       User: sheila   A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR    IP: 127.0.0.1       User: jim      A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG    IP: 127.0.0.1       User: sheila   A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,300 d.e.f ERROR    IP: 123.231.231.123 User: fred     A message at ERROR level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 192.168.0.1     User: jim      A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f CRITICAL IP: 127.0.0.1       User: sheila   A message at CRITICAL level with 2 parameters
2010-09-06 22:38:15,300 d.e.f DEBUG    IP: 192.168.0.1     User: jim      A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f ERROR    IP: 127.0.0.1       User: sheila   A message at ERROR level with 2 parameters
2010-09-06 22:38:15,301 d.e.f DEBUG    IP: 123.231.231.123 User: fred     A message at DEBUG level with 2 parameters
2010-09-06 22:38:15,301 d.e.f INFO     IP: 123.231.231.123 User: fred     A message at INFO level with 2 parameters

複数のプロセスから単一のファイルにログを記録する

ロギングはスレッドセーフであり、単一プロセスの複数のスレッドから単一ファイルへのロギングはサポートされていますが、複数プロセスから単一ファイルへのロギングはではありません ]サポートされています。これは、Pythonの複数のプロセス間で単一のファイルへのアクセスをシリアル化する標準的な方法がないためです。 複数のプロセスから単一のファイルにログを記録する必要がある場合、これを行う1つの方法は、すべてのプロセスをSocketHandlerにログに記録し、ソケットから読み取るソケットサーバーを実装する別のプロセスを用意することです。ファイルにログを記録します。 (必要に応じて、既存のプロセスの1つに1つのスレッドを割り当てて、この機能を実行できます。)このセクションでは、このアプローチについて詳しく説明し、開始点として使用できる動作中のソケットレシーバーが含まれています。あなたがあなた自身のアプリケーションに適応するために。

multiprocessing モジュールの Lock クラスを使用して、プロセスからファイルへのアクセスをシリアル化する独自のハンドラーを作成することもできます。 既存のFileHandlerおよびサブクラスは、将来的には使用する可能性がありますが、現在マルチプロセッシングを使用していません。 現在、マルチプロセッシングモジュールは、すべてのプラットフォームで機能するロック機能を提供しているわけではないことに注意してください( https://bugs.python.org/issue3770 を参照)。

または、QueueQueueHandler を使用して、すべてのログイベントをマルチプロセスアプリケーションのプロセスの1つに送信することもできます。 次のサンプルスクリプトは、これを行う方法を示しています。 この例では、別のリスナープロセスが他のプロセスから送信されたイベントをリッスンし、独自のログ構成に従ってそれらをログに記録します。 この例はそれを行う1つの方法のみを示していますが(たとえば、個別のリスナープロセスではなく、リスナースレッドを使用したい場合があります。実装は類似しています)、リスナーと他のプロセスに対して完全に異なるロギング構成を許可します。アプリケーションで使用でき、独自の特定の要件を満たすコードの基礎として使用できます。

# You'll need these imports in your own code
import logging
import logging.handlers
import multiprocessing

# Next two import lines for this demo only
from random import choice, random
import time

#
# Because you'll want to define the logging configurations for listener and workers, the
# listener and worker process functions take a configurer parameter which is a callable
# for configuring logging for that process. These functions are also passed the queue,
# which they use for communication.
#
# In practice, you can configure the listener however you want, but note that in this
# simple example, the listener does not apply level or filter logic to received records.
# In practice, you would probably want to do this logic in the worker processes, to avoid
# sending events which would be filtered out between processes.
#
# The size of the rotated files is made small so you can see the results easily.
def listener_configurer():
    root = logging.getLogger()
    h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
    h.setFormatter(f)
    root.addHandler(h)

# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
    configurer()
    while True:
        try:
            record = queue.get()
            if record is None:  # We send this as a sentinel to tell the listener to quit.
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)  # No level or filter logic applied - just do it!
        except Exception:
            import sys, traceback
            print('Whoops! Problem:', file=sys.stderr)
            traceback.print_exc(file=sys.stderr)

# Arrays used for random selections in this demo

LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING,
          logging.ERROR, logging.CRITICAL]

LOGGERS = ['a.b.c', 'd.e.f']

MESSAGES = [
    'Random message #1',
    'Random message #2',
    'Random message #3',
]

# The worker configuration is done at the start of the worker process run.
# Note that on Windows you can't rely on fork semantics, so each process
# will run the logging configuration code when it starts.
def worker_configurer(queue):
    h = logging.handlers.QueueHandler(queue)  # Just the one handler needed
    root = logging.getLogger()
    root.addHandler(h)
    # send all messages, for demo; no other level or filter logic applied.
    root.setLevel(logging.DEBUG)

# This is the worker process top-level loop, which just logs ten events with
# random intervening delays before terminating.
# The print messages are just so you know it's doing something!
def worker_process(queue, configurer):
    configurer(queue)
    name = multiprocessing.current_process().name
    print('Worker started: %s' % name)
    for i in range(10):
        time.sleep(random())
        logger = logging.getLogger(choice(LOGGERS))
        level = choice(LEVELS)
        message = choice(MESSAGES)
        logger.log(level, message)
    print('Worker finished: %s' % name)

# Here's where the demo gets orchestrated. Create the queue, create and start
# the listener, create ten workers and start them, wait for them to finish,
# then send a None to the queue to tell the listener to finish.
def main():
    queue = multiprocessing.Queue(-1)
    listener = multiprocessing.Process(target=listener_process,
                                       args=(queue, listener_configurer))
    listener.start()
    workers = []
    for i in range(10):
        worker = multiprocessing.Process(target=worker_process,
                                         args=(queue, worker_configurer))
        workers.append(worker)
        worker.start()
    for w in workers:
        w.join()
    queue.put_nowait(None)
    listener.join()

if __name__ == '__main__':
    main()

上記のスクリプトの変形は、メインプロセスのロギングを別のスレッドに保持します。

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue
import random
import threading
import time

def logger_thread(q):
    while True:
        record = q.get()
        if record is None:
            break
        logger = logging.getLogger(record.name)
        logger.handle(record)


def worker_process(q):
    qh = logging.handlers.QueueHandler(q)
    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(qh)
    levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
              logging.CRITICAL]
    loggers = ['foo', 'foo.bar', 'foo.bar.baz',
               'spam', 'spam.ham', 'spam.ham.eggs']
    for i in range(100):
        lvl = random.choice(levels)
        logger = logging.getLogger(random.choice(loggers))
        logger.log(lvl, 'Message no. %d', i)

if __name__ == '__main__':
    q = Queue()
    d = {
        'version': 1,
        'formatters': {
            'detailed': {
                'class': 'logging.Formatter',
                'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'INFO',
            },
            'file': {
                'class': 'logging.FileHandler',
                'filename': 'mplog.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'foofile': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-foo.log',
                'mode': 'w',
                'formatter': 'detailed',
            },
            'errors': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-errors.log',
                'mode': 'w',
                'level': 'ERROR',
                'formatter': 'detailed',
            },
        },
        'loggers': {
            'foo': {
                'handlers': ['foofile']
            }
        },
        'root': {
            'level': 'DEBUG',
            'handlers': ['console', 'file', 'errors']
        },
    }
    workers = []
    for i in range(5):
        wp = Process(target=worker_process, name='worker %d' % (i + 1), args=(q,))
        workers.append(wp)
        wp.start()
    logging.config.dictConfig(d)
    lp = threading.Thread(target=logger_thread, args=(q,))
    lp.start()
    # At this point, the main process could do some useful work of its own
    # Once it's done that, it can wait for the workers to terminate...
    for wp in workers:
        wp.join()
    # And now tell the logging thread to finish up, too
    q.put(None)
    lp.join()

このバリアントは、次の方法を示しています。 特定のロガーに構成を適用します-例: fooロガーには、fooサブシステム内のすべてのイベントをファイルmplog-foo.logに格納する特別なハンドラーがあります。 これは、メッセージを適切な宛先に送信するために、メインプロセスのロギング機構によって使用されます(ロギングイベントはワーカープロセスで生成されますが)。

コンカレント.futures.ProcessPoolExecutorの使用

concurrent.futures.ProcessPoolExecutor を使用してワーカープロセスを開始する場合は、キューを少し異なる方法で作成する必要があります。 それ以外の

queue = multiprocessing.Queue(-1)

あなたが使用する必要があります

queue = multiprocessing.Manager().Queue(-1)  # also works with the examples above

次に、これからワーカーの作成を置き換えることができます。

workers = []
for i in range(10):
    worker = multiprocessing.Process(target=worker_process,
                                     args=(queue, worker_configurer))
    workers.append(worker)
    worker.start()
for w in workers:
    w.join()

これに(最初に current.futures をインポートすることを忘れないでください):

with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
    for i in range(10):
        executor.submit(worker_process, queue, worker_configurer)

ファイルローテーションの使用

ログファイルを特定のサイズに拡大してから、新しいファイルを開いてそれにログを記録したい場合があります。 これらのファイルを一定数保持することをお勧めします。その数のファイルが作成されたら、ファイルの数とサイズの両方が制限されるようにファイルをローテーションします。 この使用パターンの場合、ロギングパッケージはRotatingFileHandlerを提供します。

import glob
import logging
import logging.handlers

LOG_FILENAME = 'logging_rotatingfile_example.out'

# Set up a specific logger with our desired output level
my_logger = logging.getLogger('MyLogger')
my_logger.setLevel(logging.DEBUG)

# Add the log message handler to the logger
handler = logging.handlers.RotatingFileHandler(
              LOG_FILENAME, maxBytes=20, backupCount=5)

my_logger.addHandler(handler)

# Log some messages
for i in range(20):
    my_logger.debug('i = %d' % i)

# See what files are created
logfiles = glob.glob('%s*' % LOG_FILENAME)

for filename in logfiles:
    print(filename)

結果は6つの個別のファイルになり、それぞれにアプリケーションのログ履歴の一部が含まれます。

logging_rotatingfile_example.out
logging_rotatingfile_example.out.1
logging_rotatingfile_example.out.2
logging_rotatingfile_example.out.3
logging_rotatingfile_example.out.4
logging_rotatingfile_example.out.5

最新のファイルは常にlogging_rotatingfile_example.outであり、サイズ制限に達するたびに、サフィックス.1で名前が変更されます。 既存の各バックアップファイルの名前が変更されてサフィックスがインクリメントされ(.1.2などになります)、.6ファイルは消去されます。

明らかに、この例では、極端な例としてログの長さを非常に小さく設定しています。 maxBytes を適切な値に設定することをお勧めします。


代替フォーマットスタイルの使用

ロギングがPython標準ライブラリに追加されたとき、可変コンテンツでメッセージをフォーマットする唯一の方法は、%-f ormattingメソッドを使用することでした。 それ以来、Pythonは2つの新しいフォーマットアプローチを獲得しました: string.Template (Python 2.4で追加)と str.format()(Python 2.6で追加)。

ロギング(3.2以降)は、これら2つの追加のフォーマットスタイルのサポートを改善します。 Formatterクラスが拡張され、styleという名前の追加のオプションのキーワードパラメーターを受け取るようになりました。 これはデフォルトで'%'になりますが、他の可能な値は'{''$'で、他の2つのフォーマットスタイルに対応します。 下位互換性はデフォルトで維持されますが(予想どおり)、スタイルパラメータを明示的に指定することで、 str.format()または string.Templateで機能するフォーマット文字列を指定できます。 。 可能性を示すためのコンソールセッションの例を次に示します。

>>> import logging
>>> root = logging.getLogger()
>>> root.setLevel(logging.DEBUG)
>>> handler = logging.StreamHandler()
>>> bf = logging.Formatter('{asctime} {name} {levelname:8s} {message}',
...                        style='{')
>>> handler.setFormatter(bf)
>>> root.addHandler(handler)
>>> logger = logging.getLogger('foo.bar')
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:11:55,341 foo.bar DEBUG    This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:12:11,526 foo.bar CRITICAL This is a CRITICAL message
>>> df = logging.Formatter('$asctime $name ${levelname} $message',
...                        style='$')
>>> handler.setFormatter(df)
>>> logger.debug('This is a DEBUG message')
2010-10-28 15:13:06,924 foo.bar DEBUG This is a DEBUG message
>>> logger.critical('This is a CRITICAL message')
2010-10-28 15:13:11,494 foo.bar CRITICAL This is a CRITICAL message
>>>

ログへの最終出力用のログメッセージのフォーマットは、個々のログメッセージの作成方法とは完全に独立していることに注意してください。 ここに示すように、それでも%-f ormattingを使用できます。

>>> logger.error('This is an%s %s %s', 'other,', 'ERROR,', 'message')
2010-10-28 15:19:29,833 foo.bar ERROR This is another, ERROR, message
>>>

ロギングコール(logger.debug()logger.info()など)は、実際のロギングメッセージ自体の位置パラメータのみを取り、キーワードパラメータは、実際のロギングコールの処理方法のオプションを決定するためにのみ使用されます(例: トレースバック情報をログに記録する必要があることを示すexc_infoキーワードパラメーター、またはログに追加する追加のコンテキスト情報を示すextraキーワードパラメーター)。 したがって、 str.format()または string.Template 構文を使用してロギング呼び出しを直接行うことはできません。これは、ロギングパッケージが内部的に%-f ormattingを使用してフォーマット文字列と変数引数をマージするためです。 。 既存のコードにあるすべてのロギング呼び出しは%-f ormat文字列を使用するため、下位互換性を維持しながらこれを変更することはありません。

ただし、{}-および$-フォーマットを使用して個々のログメッセージを作成する方法があります。 メッセージの場合、任意のオブジェクトをメッセージ形式の文字列として使用でき、ロギングパッケージはそのオブジェクトに対してstr()を呼び出して、実際の形式の文字列を取得することを思い出してください。 次の2つのクラスを検討してください。

class BraceMessage:
    def __init__(self, fmt, /, *args, **kwargs):
        self.fmt = fmt
        self.args = args
        self.kwargs = kwargs

    def __str__(self):
        return self.fmt.format(*self.args, **self.kwargs)

class DollarMessage:
    def __init__(self, fmt, /, **kwargs):
        self.fmt = fmt
        self.kwargs = kwargs

    def __str__(self):
        from string import Template
        return Template(self.fmt).substitute(**self.kwargs)

これらのいずれかをフォーマット文字列の代わりに使用して、{}-または$-フォーマットを使用して、「%(message)s」または「」の代わりにフォーマットされたログ出力に表示される実際の「メッセージ」部分を作成できます。 {メッセージ}」または「$メッセージ」。 何かをログに記録するときはいつでもクラス名を使用するのは少し扱いにくいですが、__(二重アンダースコア-_と混同しないでください。単一のアンダースコアは[の同義語/エイリアスとして使用されます)などのエイリアスを使用すると非常に快適です。 X236X] gettext.gettext()またはその兄弟)。

上記のクラスはPythonには含まれていませんが、コピーして独自のコードに貼り付けるのは簡単です。 これらは次のように使用できます(whereverというモジュールで宣言されていると仮定します)。

>>> from wherever import BraceMessage as __
>>> print(__('Message with {0} {name}', 2, name='placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})',
...       point=p))
Message with coordinates: (0.50, 0.50)
>>> from wherever import DollarMessage as __
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>

上記の例ではprint()を使用してフォーマットがどのように機能するかを示していますが、もちろんlogger.debug()などを使用して、このアプローチを使用して実際にログを記録します。

注意すべき点の1つは、このアプローチではパフォーマンスが大幅に低下することはないということです。実際のフォーマットは、ロギング呼び出しを行ったときではなく、ロギングされたメッセージがハンドラーによって実際にログに出力されようとしているとき(およびその場合)に発生します。 したがって、つまずく可能性のあるわずかに珍しいことは、かっこがフォーマット文字列だけでなく、フォーマット文字列と引数を囲んでいることです。 これは、__表記が、XXXMessageクラスの1つへのコンストラクター呼び出しの単なるシンタックスシュガーであるためです。

必要に応じて、次の例のように、LoggerAdapterを使用して、上記と同様の効果を実現できます。

import logging

class Message:
    def __init__(self, fmt, args):
        self.fmt = fmt
        self.args = args

    def __str__(self):
        return self.fmt.format(*self.args)

class StyleAdapter(logging.LoggerAdapter):
    def __init__(self, logger, extra=None):
        super().__init__(logger, extra or {})

    def log(self, level, msg, /, *args, **kwargs):
        if self.isEnabledFor(level):
            msg, kwargs = self.process(msg, kwargs)
            self.logger._log(level, Message(msg, args), (), **kwargs)

logger = StyleAdapter(logging.getLogger(__name__))

def main():
    logger.debug('Hello, {}', 'world!')

if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    main()

上記のスクリプトは、Python 3.2以降で実行すると、メッセージHello, world!をログに記録する必要があります。


LogRecordのカスタマイズ

すべてのロギングイベントは、 LogRecord インスタンスで表されます。 イベントがログに記録され、ロガーのレベルでフィルターで除外されない場合、 LogRecord が作成され、イベントに関する情報が入力されてから、そのロガー(およびその祖先、までは階層のそれ以上の伝播が無効になっているロガー)。 Python 3.2より前は、この作成が行われた場所は2つだけでした。

  • Logger.makeRecord()。これは、イベントをログに記録する通常のプロセスで呼び出されます。 これにより、 LogRecord が直接呼び出され、インスタンスが作成されました。
  • makeLogRecord()。これは、LogRecordに追加される属性を含むディクショナリで呼び出されます。 これは通常、適切な辞書がネットワーク経由で受信されたときに呼び出されます(例: SocketHandler を介したピクルス形式、または HTTPHandler を介したJSON形式)。

これは通常、 LogRecord で何か特別なことをする必要がある場合、次のいずれかを行う必要があることを意味します。

  • Logger.makeRecord()をオーバーライドする独自の Logger サブクラスを作成し、 setLoggerClass()を使用して設定してから、気になるロガーをインスタンス化します。
  • Filter をロガーまたはハンドラーに追加します。これにより、 filter()メソッドが呼び出されたときに必要な特別な操作が行われます。

最初のアプローチは、(たとえば)いくつかの異なるライブラリが異なることをしたいというシナリオでは少し扱いにくいでしょう。 それぞれが独自の Logger サブクラスを設定しようとし、これを最後に実行したものが勝ちます。

2番目のアプローチは、多くの場合に適切に機能しますが、たとえば、 LogRecord の特殊なサブクラスを使用します。 ライブラリ開発者はロガーに適切なフィルターを設定できますが、新しいロガーを導入するたびにこれを行うことを忘れないでください(新しいパッケージまたはモジュールを追加して実行するだけで実行できます)。

logger = logging.getLogger(__name__)

モジュールレベルで)。 それはおそらく考えるには多すぎることの1つです。 開発者は、トップレベルのロガーに接続された NullHandler にフィルターを追加することもできますが、アプリケーション開発者がハンドラーを下位レベルのライブラリロガーにアタッチした場合、これは呼び出されません。したがって、そのハンドラーからの出力はライブラリ開発者の意図を反映していません。

Python 3.2以降では、 LogRecord の作成は、指定可能なファクトリを介して行われます。 ファクトリは、 setLogRecordFactory()で設定し、 getLogRecordFactory()で問い合わせることができる呼び出し可能ファイルです。 LogRecord がファクトリのデフォルト設定であるため、ファクトリは LogRecord コンストラクタと同じ署名で呼び出されます。

このアプローチにより、カスタムファクトリはLogRecord作成のすべての側面を制御できます。 たとえば、次のようなパターンを使用して、サブクラスを返すか、作成したレコードにいくつかの属性を追加することができます。

old_factory = logging.getLogRecordFactory()

def record_factory(*args, **kwargs):
    record = old_factory(*args, **kwargs)
    record.custom_attribute = 0xdecafbad
    return record

logging.setLogRecordFactory(record_factory)

このパターンにより、さまざまなライブラリがファクトリをチェーン化できます。それらが互いの属性を上書きしたり、標準として提供されている属性を意図せずに上書きしたりしない限り、驚くことはありません。 ただし、チェーン内の各リンクはすべてのロギング操作に実行時のオーバーヘッドを追加することに注意してください。この手法は、フィルターを使用しても目的の結果が得られない場合にのみ使用してください。


QueueHandlerのサブクラス化-ZeroMQの例

QueueHandlerサブクラスを使用して、ZeroMQの「公開」ソケットなどの他の種類のキューにメッセージを送信できます。 以下の例では、ソケットは個別に作成され、ハンドラーに(「キュー」として)渡されます。

import zmq   # using pyzmq, the Python binding for ZeroMQ
import json  # for serializing records portably

ctx = zmq.Context()
sock = zmq.Socket(ctx, zmq.PUB)  # or zmq.PUSH, or other suitable value
sock.bind('tcp://*:5556')        # or wherever

class ZeroMQSocketHandler(QueueHandler):
    def enqueue(self, record):
        self.queue.send_json(record.__dict__)


handler = ZeroMQSocketHandler(sock)

もちろん、これを整理する方法は他にもあります。たとえば、ハンドラーがソケットを作成するために必要なデータを渡すなどです。

class ZeroMQSocketHandler(QueueHandler):
    def __init__(self, uri, socktype=zmq.PUB, ctx=None):
        self.ctx = ctx or zmq.Context()
        socket = zmq.Socket(self.ctx, socktype)
        socket.bind(uri)
        super().__init__(socket)

    def enqueue(self, record):
        self.queue.send_json(record.__dict__)

    def close(self):
        self.queue.close()

QueueListenerのサブクラス化-ZeroMQの例

QueueListenerをサブクラス化して、ZeroMQの「サブスクライブ」ソケットなどの他の種類のキューからメッセージを取得することもできます。 次に例を示します。

class ZeroMQSocketListener(QueueListener):
    def __init__(self, uri, /, *handlers, **kwargs):
        self.ctx = kwargs.get('ctx') or zmq.Context()
        socket = zmq.Socket(self.ctx, zmq.SUB)
        socket.setsockopt_string(zmq.SUBSCRIBE, '')  # subscribe to everything
        socket.connect(uri)
        super().__init__(socket, *handlers, **kwargs)

    def dequeue(self):
        msg = self.queue.recv_json()
        return logging.makeLogRecord(msg)

も参照してください

モジュールロギング
ロギングモジュールのAPIリファレンス。
モジュール logging.config
ロギングモジュールの構成API。
モジュール logging.handlers
ロギングモジュールに含まれている便利なハンドラー。

基本的なロギングチュートリアル

より高度なロギングチュートリアル


辞書ベースの構成の例

以下は、ロギング構成ディクショナリの例です。これは、Djangoプロジェクトドキュメントから抜粋したものです。 このディクショナリは dictConfig()に渡され、構成が有効になります。

LOGGING = {
    'version': 1,
    'disable_existing_loggers': True,
    'formatters': {
        'verbose': {
            'format': '%(levelname)s %(asctime)s %(module)s %(process)d %(thread)d %(message)s'
        },
        'simple': {
            'format': '%(levelname)s %(message)s'
        },
    },
    'filters': {
        'special': {
            '()': 'project.logging.SpecialFilter',
            'foo': 'bar',
        }
    },
    'handlers': {
        'null': {
            'level':'DEBUG',
            'class':'django.utils.log.NullHandler',
        },
        'console':{
            'level':'DEBUG',
            'class':'logging.StreamHandler',
            'formatter': 'simple'
        },
        'mail_admins': {
            'level': 'ERROR',
            'class': 'django.utils.log.AdminEmailHandler',
            'filters': ['special']
        }
    },
    'loggers': {
        'django': {
            'handlers':['null'],
            'propagate': True,
            'level':'INFO',
        },
        'django.request': {
            'handlers': ['mail_admins'],
            'level': 'ERROR',
            'propagate': False,
        },
        'myproject.custom': {
            'handlers': ['console', 'mail_admins'],
            'level': 'INFO',
            'filters': ['special']
        }
    }
}

この構成の詳細については、Djangoドキュメントの関連セクションを参照してください。


ローテーターとネーマーを使用してログローテーション処理をカスタマイズする

ネーマーとローテーターを定義する方法の例を次のスニペットに示します。これは、ログファイルのzlibベースの圧縮を示しています。

def namer(name):
    return name + ".gz"

def rotator(source, dest):
    with open(source, "rb") as sf:
        data = sf.read()
        compressed = zlib.compress(data, 9)
        with open(dest, "wb") as df:
            df.write(compressed)
    os.remove(source)

rh = logging.handlers.RotatingFileHandler(...)
rh.rotator = rotator
rh.namer = namer

これらは、実際のgzipファイルにあるような「コンテナ」がない裸の圧縮データであるため、「真の」.gzファイルではありません。 このスニペットは、説明のみを目的としています。


より複雑なマルチプロセッシングの例

次の作業例は、構成ファイルを使用したマルチプロセッシングでログを使用する方法を示しています。 構成はかなり単純ですが、実際のマルチプロセッシングシナリオでより複雑な構成を実装する方法を説明するのに役立ちます。

この例では、メインプロセスがリスナープロセスと一部のワーカープロセスを生成します。 メインプロセス、リスナー、およびワーカーのそれぞれには、3つの別個の構成があります(ワーカーはすべて同じ構成を共有します)。 メインプロセスでのロギング、ワーカーがQueueHandlerにログを記録する方法、リスナーがQueueListenerとより複雑なロギング構成を実装する方法、およびキューを介して受信したイベントを構成で指定されたハンドラーにディスパッチするように調整する方法を確認できます。 これらの構成は単なる例示であることに注意してください。ただし、この例を独自のシナリオに適合させることができるはずです。

スクリプトは次のとおりです。docstringとコメントは、スクリプトがどのように機能するかを説明していることを願っています。

import logging
import logging.config
import logging.handlers
from multiprocessing import Process, Queue, Event, current_process
import os
import random
import time

class MyHandler:
    """
    A simple handler for logging events. It runs in the listener process and
    dispatches events to loggers based on the name in the received record,
    which then get dispatched, by the logging system, to the handlers
    configured for those loggers.
    """

    def handle(self, record):
        if record.name == "root":
            logger = logging.getLogger()
        else:
            logger = logging.getLogger(record.name)

        if logger.isEnabledFor(record.levelno):
            # The process name is transformed just to show that it's the listener
            # doing the logging to files and console
            record.processName = '%s (for %s)' % (current_process().name, record.processName)
            logger.handle(record)

def listener_process(q, stop_event, config):
    """
    This could be done in the main process, but is just done in a separate
    process for illustrative purposes.

    This initialises logging according to the specified configuration,
    starts the listener and waits for the main process to signal completion
    via the event. The listener is then stopped, and the process exits.
    """
    logging.config.dictConfig(config)
    listener = logging.handlers.QueueListener(q, MyHandler())
    listener.start()
    if os.name == 'posix':
        # On POSIX, the setup logger will have been configured in the
        # parent process, but should have been disabled following the
        # dictConfig call.
        # On Windows, since fork isn't used, the setup logger won't
        # exist in the child, so it would be created and the message
        # would appear - hence the "if posix" clause.
        logger = logging.getLogger('setup')
        logger.critical('Should not appear, because of disabled logger ...')
    stop_event.wait()
    listener.stop()

def worker_process(config):
    """
    A number of these are spawned for the purpose of illustration. In
    practice, they could be a heterogeneous bunch of processes rather than
    ones which are identical to each other.

    This initialises logging according to the specified configuration,
    and logs a hundred messages with random levels to randomly selected
    loggers.

    A small sleep is added to allow other processes a chance to run. This
    is not strictly needed, but it mixes the output from the different
    processes a bit more than if it's left out.
    """
    logging.config.dictConfig(config)
    levels = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
              logging.CRITICAL]
    loggers = ['foo', 'foo.bar', 'foo.bar.baz',
               'spam', 'spam.ham', 'spam.ham.eggs']
    if os.name == 'posix':
        # On POSIX, the setup logger will have been configured in the
        # parent process, but should have been disabled following the
        # dictConfig call.
        # On Windows, since fork isn't used, the setup logger won't
        # exist in the child, so it would be created and the message
        # would appear - hence the "if posix" clause.
        logger = logging.getLogger('setup')
        logger.critical('Should not appear, because of disabled logger ...')
    for i in range(100):
        lvl = random.choice(levels)
        logger = logging.getLogger(random.choice(loggers))
        logger.log(lvl, 'Message no. %d', i)
        time.sleep(0.01)

def main():
    q = Queue()
    # The main process gets a simple configuration which prints to the console.
    config_initial = {
        'version': 1,
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'level': 'INFO'
            }
        },
        'root': {
            'handlers': ['console'],
            'level': 'DEBUG'
        }
    }
    # The worker process configuration is just a QueueHandler attached to the
    # root logger, which allows all messages to be sent to the queue.
    # We disable existing loggers to disable the "setup" logger used in the
    # parent process. This is needed on POSIX because the logger will
    # be there in the child following a fork().
    config_worker = {
        'version': 1,
        'disable_existing_loggers': True,
        'handlers': {
            'queue': {
                'class': 'logging.handlers.QueueHandler',
                'queue': q
            }
        },
        'root': {
            'handlers': ['queue'],
            'level': 'DEBUG'
        }
    }
    # The listener process configuration shows that the full flexibility of
    # logging configuration is available to dispatch events to handlers however
    # you want.
    # We disable existing loggers to disable the "setup" logger used in the
    # parent process. This is needed on POSIX because the logger will
    # be there in the child following a fork().
    config_listener = {
        'version': 1,
        'disable_existing_loggers': True,
        'formatters': {
            'detailed': {
                'class': 'logging.Formatter',
                'format': '%(asctime)s %(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            },
            'simple': {
                'class': 'logging.Formatter',
                'format': '%(name)-15s %(levelname)-8s %(processName)-10s %(message)s'
            }
        },
        'handlers': {
            'console': {
                'class': 'logging.StreamHandler',
                'formatter': 'simple',
                'level': 'INFO'
            },
            'file': {
                'class': 'logging.FileHandler',
                'filename': 'mplog.log',
                'mode': 'w',
                'formatter': 'detailed'
            },
            'foofile': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-foo.log',
                'mode': 'w',
                'formatter': 'detailed'
            },
            'errors': {
                'class': 'logging.FileHandler',
                'filename': 'mplog-errors.log',
                'mode': 'w',
                'formatter': 'detailed',
                'level': 'ERROR'
            }
        },
        'loggers': {
            'foo': {
                'handlers': ['foofile']
            }
        },
        'root': {
            'handlers': ['console', 'file', 'errors'],
            'level': 'DEBUG'
        }
    }
    # Log some initial events, just to show that logging in the parent works
    # normally.
    logging.config.dictConfig(config_initial)
    logger = logging.getLogger('setup')
    logger.info('About to create workers ...')
    workers = []
    for i in range(5):
        wp = Process(target=worker_process, name='worker %d' % (i + 1),
                     args=(config_worker,))
        workers.append(wp)
        wp.start()
        logger.info('Started worker: %s', wp.name)
    logger.info('About to create listener ...')
    stop_event = Event()
    lp = Process(target=listener_process, name='listener',
                 args=(q, stop_event, config_listener))
    lp.start()
    logger.info('Started listener')
    # We now hang around for the workers to finish their work.
    for wp in workers:
        wp.join()
    # Workers all done, listening can now stop.
    # Logging in the parent still works normally.
    logger.info('Telling listener to stop ...')
    stop_event.set()
    lp.join()
    logger.info('All done.')

if __name__ == '__main__':
    main()

SysLogHandlerに送信されるメッセージへのBOMの挿入

RFC 5424 では、Unicodeメッセージを次の構造を持つバイトのセットとしてsyslogデーモンに送信する必要があります:オプションの純粋なASCIIコンポーネントとそれに続くUTF-8バイトOrder Mark(BOM)の後に、UTF-8を使用してエンコードされたUnicodeが続きます。 (仕様 関連セクションを参照してください。)

Python 3.1では、メッセージにBOMを挿入するコードが SysLogHandler に追加されましたが、残念ながら、BOMがメッセージの先頭に表示されるため、純粋なASCIIコンポーネントが許可されないという誤った実装が行われました。その前に表示されます。

この動作が壊れているため、Python3.2.4以降から誤ったBOM挿入コードが削除されています。 ただし、これは置き換えられていません。 RFC 5424 準拠のメッセージを生成する場合は、BOM、その前のオプションの純粋なASCIIシーケンス、およびその後の任意のUnicodeが含まれます。 UTF-8を使用してエンコードされている場合は、次の手順を実行する必要があります。

  1. Formatter インスタンスを SysLogHandler インスタンスに、次のようなフォーマット文字列でアタッチします。

    'ASCII section\ufeffUnicode section'

    UnicodeコードポイントU + FEFFは、UTF-8を使用してエンコードされると、UTF-8 BOM(バイト文字列b'\xef\xbb\xbf')としてエンコードされます。

  2. ASCIIセクションを任意のプレースホルダーに置き換えますが、置換後にそこに表示されるデータが常にASCIIであることを確認してください(そうすると、UTF-8エンコード後も変更されません)。

  3. Unicodeセクションを任意のプレースホルダーに置き換えます。 置換後に表示されるデータにASCII範囲外の文字が含まれている場合は、問題ありません。UTF-8を使用してエンコードされます。

フォーマットされたメッセージは、SysLogHandlerによるUTF-8エンコーディングを使用してエンコードされます。 上記のルールに従うと、 RFC 5424 準拠のメッセージを生成できるはずです。 そうしないと、ロギングは文句を言わないかもしれませんが、メッセージはRFC 5424に準拠せず、syslogデーモンが文句を言うかもしれません。


構造化ロギングの実装

ほとんどのロギングメッセージは人間が読むことを目的としているため、機械で簡単に解析することはできませんが、プログラムで解析できる構造化された形式でメッセージを出力したい場合があります(ログメッセージを解析するために複雑な正規表現が必要です)。 これは、ロギングパッケージを使用して簡単に実現できます。 これを実現する方法はいくつかありますが、以下はJSONを使用してマシンで解析可能な方法でイベントをシリアル化する簡単なアプローチです。

import json
import logging

class StructuredMessage:
    def __init__(self, message, /, **kwargs):
        self.message = message
        self.kwargs = kwargs

    def __str__(self):
        return '%s >>> %s' % (self.message, json.dumps(self.kwargs))

_ = StructuredMessage   # optional, to improve readability

logging.basicConfig(level=logging.INFO, format='%(message)s')
logging.info(_('message 1', foo='bar', bar='baz', num=123, fnum=123.456))

上記のスクリプトを実行すると、次のように出力されます。

message 1 >>> {"fnum": 123.456, "num": 123, "bar": "baz", "foo": "bar"}

使用するPythonのバージョンによって、アイテムの順序が異なる場合があることに注意してください。

より特殊な処理が必要な場合は、次の完全な例のように、カスタムJSONエンコーダーを使用できます。

from __future__ import unicode_literals

import json
import logging

# This next bit is to ensure the script runs unchanged on 2.x and 3.x
try:
    unicode
except NameError:
    unicode = str

class Encoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, set):
            return tuple(o)
        elif isinstance(o, unicode):
            return o.encode('unicode_escape').decode('ascii')
        return super().default(o)

class StructuredMessage:
    def __init__(self, message, /, **kwargs):
        self.message = message
        self.kwargs = kwargs

    def __str__(self):
        s = Encoder().encode(self.kwargs)
        return '%s >>> %s' % (self.message, s)

_ = StructuredMessage   # optional, to improve readability

def main():
    logging.basicConfig(level=logging.INFO, format='%(message)s')
    logging.info(_('message 1', set_value={1, 2, 3}, snowman='\u2603'))

if __name__ == '__main__':
    main()

上記のスクリプトを実行すると、次のように出力されます。

message 1 >>> {"snowman": "\u2603", "set_value": [1, 2, 3]}

使用するPythonのバージョンによって、アイテムの順序が異なる場合があることに注意してください。


dictConfig()を使用したハンドラーのカスタマイズ

ロギングハンドラーを特定の方法でカスタマイズしたい場合があります。 dictConfig()を使用すると、サブクラス化せずにこれを実行できる場合があります。 例として、ログファイルの所有権を設定したい場合があると考えてください。 POSIXでは、これは shutil.chown()を使用して簡単に実行できますが、stdlibのファイルハンドラーは組み込みのサポートを提供していません。 次のような単純な関数を使用して、ハンドラーの作成をカスタマイズできます。

def owned_file_handler(filename, mode='a', encoding=None, owner=None):
    if owner:
        if not os.path.exists(filename):
            open(filename, 'a').close()
        shutil.chown(filename, *owner)
    return logging.FileHandler(filename, mode, encoding)

次に、 dictConfig()に渡されるログ構成で、次の関数を呼び出してログハンドラーを作成するように指定できます。

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'default': {
            'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
        },
    },
    'handlers': {
        'file':{
            # The values below are popped from this dictionary and
            # used to create the handler, set the handler's level and
            # its formatter.
            '()': owned_file_handler,
            'level':'DEBUG',
            'formatter': 'default',
            # The values below are passed to the handler creator callable
            # as keyword arguments.
            'owner': ['pulse', 'pulse'],
            'filename': 'chowntest.log',
            'mode': 'w',
            'encoding': 'utf-8',
        },
    },
    'root': {
        'handlers': ['file'],
        'level': 'DEBUG',
    },
}

この例では、説明のために、pulseユーザーとグループを使用して所有権を設定しています。 それをまとめて動作するスクリプトchowntest.py

import logging, logging.config, os, shutil

def owned_file_handler(filename, mode='a', encoding=None, owner=None):
    if owner:
        if not os.path.exists(filename):
            open(filename, 'a').close()
        shutil.chown(filename, *owner)
    return logging.FileHandler(filename, mode, encoding)

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'default': {
            'format': '%(asctime)s %(levelname)s %(name)s %(message)s'
        },
    },
    'handlers': {
        'file':{
            # The values below are popped from this dictionary and
            # used to create the handler, set the handler's level and
            # its formatter.
            '()': owned_file_handler,
            'level':'DEBUG',
            'formatter': 'default',
            # The values below are passed to the handler creator callable
            # as keyword arguments.
            'owner': ['pulse', 'pulse'],
            'filename': 'chowntest.log',
            'mode': 'w',
            'encoding': 'utf-8',
        },
    },
    'root': {
        'handlers': ['file'],
        'level': 'DEBUG',
    },
}

logging.config.dictConfig(LOGGING)
logger = logging.getLogger('mylogger')
logger.debug('A debug message')

これを実行するには、おそらくrootとして実行する必要があります。

$ sudo python3.3 chowntest.py
$ cat chowntest.log
2013-11-05 09:34:51,128 DEBUG mylogger A debug message
$ ls -l chowntest.log
-rw-r--r-- 1 pulse pulse 55 2013-11-05 09:34 chowntest.log

この例ではPython3.3を使用していることに注意してください。これは、 shutil.chown()が表示される場所だからです。 このアプローチは、 dictConfig()をサポートするすべてのPythonバージョン、つまりPython 2.7、3.2以降で機能するはずです。 3.3より前のバージョンでは、たとえばを使用して実際の所有権の変更を実装する必要があります。 os.chown()

実際には、ハンドラー作成関数は、プロジェクトのどこかにあるユーティリティモジュールにある場合があります。 構成の行の代わりに:

'()': owned_file_handler,

たとえば、次のように使用できます。

'()': 'ext://project.util.owned_file_handler',

ここで、project.utilは、関数が存在するパッケージの実際の名前に置き換えることができます。 上記の動作スクリプトでは、'ext://__main__.owned_file_handler'を使用すると動作するはずです。 ここで、実際の呼び出し可能オブジェクトは、ext://仕様の dictConfig()によって解決されます。

この例は、他の種類のファイル変更を実装する方法も示していることを願っています。 特定のPOSIX許可ビットを設定する-同じ方法で、 os.chmod()を使用します。

もちろん、このアプローチは、 FileHandler 以外のタイプのハンドラー(たとえば、回転するファイルハンドラーの1つ、またはまったく別のタイプのハンドラー)に拡張することもできます。


アプリケーション全体で特定のフォーマットスタイルを使用する

Python 3.2では、 Formatterstyleキーワードパラメーターを取得しました。これは、下位互換性のためにデフォルトで%に設定されていますが、{または$は、 str.format()および string.Template でサポートされているフォーマットアプローチをサポートします。 これは、ログへの最終出力用のログメッセージのフォーマットを管理し、個々のログメッセージの作成方法と完全に直交していることに注意してください。

ロギング呼び出し( debug()info()など)は、実際のロギングメッセージ自体の位置パラメーターのみを取り、キーワードパラメーターはロギングの処理方法のオプションを決定するためにのみ使用されます。呼び出し(例: トレースバック情報をログに記録する必要があることを示すexc_infoキーワードパラメーター、またはログに追加する追加のコンテキスト情報を示すextraキーワードパラメーター)。 したがって、 str.format()または string.Template 構文を使用してロギング呼び出しを直接行うことはできません。これは、ロギングパッケージが内部的に%-f ormattingを使用してフォーマット文字列と変数引数をマージするためです。 。 既存のコードにあるすべてのロギング呼び出しは%-f ormat文字列を使用するため、下位互換性を維持しながらこれを変更することはありません。

There have been suggestions to associate format styles with specific loggers, but that approach also runs into backward compatibility problems because any existing code could be using a given logger name and using %-formatting.

サードパーティのライブラリとコードの間でロギングが相互運用可能に機能するためには、フォーマットに関する決定を個々のロギング呼び出しのレベルで行う必要があります。 これにより、代替のフォーマットスタイルに対応できるいくつかの方法が開かれます。

LogRecordファクトリの使用

Python 3.2では、上記の Formatter の変更に加えて、ロギングパッケージで、ユーザーが setLogRecordFactory()を使用して独自の LogRecord サブクラスを設定できるようになりました。 ] 関数。 これを使用して、 LogRecord の独自のサブクラスを設定できます。これは、 getMessage()メソッドをオーバーライドすることで正しいことを行います。 このメソッドの基本クラスの実装は、msg % argsフォーマットが発生する場所であり、代替フォーマットに置き換えることができます。 ただし、他のコードとの相互運用性を確保するために、すべてのフォーマットスタイルをサポートし、デフォルトとして%-f ormattingを許可するように注意する必要があります。 基本実装と同様に、str(self.msg)の呼び出しにも注意する必要があります。

詳細については、 setLogRecordFactory()および LogRecord のリファレンスドキュメントを参照してください。


カスタムメッセージオブジェクトの使用

{}-および$-フォーマットを使用して個々のログメッセージを作成する、もう1つのおそらくより簡単な方法があります。 (任意のオブジェクトをメッセージとして使用するから)ロギング時に任意のオブジェクトをメッセージ形式の文字列として使用でき、ロギングパッケージがその上で str()を呼び出すことを思い出してください。実際のフォーマット文字列を取得するオブジェクト。 次の2つのクラスを検討してください。

class BraceMessage:
    def __init__(self, fmt, /, *args, **kwargs):
        self.fmt = fmt
        self.args = args
        self.kwargs = kwargs

    def __str__(self):
        return self.fmt.format(*self.args, **self.kwargs)

class DollarMessage:
    def __init__(self, fmt, /, **kwargs):
        self.fmt = fmt
        self.kwargs = kwargs

    def __str__(self):
        from string import Template
        return Template(self.fmt).substitute(**self.kwargs)

これらのいずれかをフォーマット文字列の代わりに使用して、{}-または$-フォーマットを使用して、「%(message)s」または「」の代わりにフォーマットされたログ出力に表示される実際の「メッセージ」部分を作成できます。 {メッセージ}」または「$メッセージ」。 何かをログに記録するときにクラス名を使用するのが少し扱いにくい場合は、メッセージにM_などのエイリアスを使用すると(またはローカリゼーションに_を使用している場合は、おそらく__)。

このアプローチの例を以下に示します。 まず、 str.format()でフォーマットします。

>>> __ = BraceMessage
>>> print(__('Message with {0} {1}', 2, 'placeholders'))
Message with 2 placeholders
>>> class Point: pass
...
>>> p = Point()
>>> p.x = 0.5
>>> p.y = 0.5
>>> print(__('Message with coordinates: ({point.x:.2f}, {point.y:.2f})', point=p))
Message with coordinates: (0.50, 0.50)

次に、 string.Template を使用したフォーマット:

>>> __ = DollarMessage
>>> print(__('Message with $num $what', num=2, what='placeholders'))
Message with 2 placeholders
>>>

注意すべき点の1つは、このアプローチではパフォーマンスが大幅に低下することはないということです。実際のフォーマットは、ロギング呼び出しを行ったときではなく、ロギングされたメッセージがハンドラーによって実際にログに出力されようとしているとき(およびその場合)に発生します。 したがって、つまずく可能性のあるわずかに珍しいことは、かっこがフォーマット文字列だけでなく、フォーマット文字列と引数を囲んでいることです。 これは、__表記が、上記のXXXMessageクラスの1つへのコンストラクター呼び出しの単なるシンタックスシュガーであるためです。


dictConfig()を使用したフィルターの構成

dictConfig()を使用してフィルターを構成できますが、その方法は一見して明らかではないかもしれません(したがってこのレシピ)。 Filter は標準ライブラリに含まれる唯一のフィルタークラスであり、多くの要件に対応する可能性は低いため(基本クラスとしてのみ存在します)、通常は独自の Filterを定義する必要があります。 サブクラスとオーバーライドされた filter()メソッド。 これを行うには、フィルターの構成ディクショナリで()キーを指定し、フィルターの作成に使用される呼び出し可能オブジェクトを指定します(クラスが最も明白ですが、[ X227X] Filter インスタンス)。 完全な例を次に示します。

import logging
import logging.config
import sys

class MyFilter(logging.Filter):
    def __init__(self, param=None):
        self.param = param

    def filter(self, record):
        if self.param is None:
            allow = True
        else:
            allow = self.param not in record.msg
        if allow:
            record.msg = 'changed: ' + record.msg
        return allow

LOGGING = {
    'version': 1,
    'filters': {
        'myfilter': {
            '()': MyFilter,
            'param': 'noshow',
        }
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'filters': ['myfilter']
        }
    },
    'root': {
        'level': 'DEBUG',
        'handlers': ['console']
    },
}

if __name__ == '__main__':
    logging.config.dictConfig(LOGGING)
    logging.debug('hello')
    logging.debug('hello - noshow')

この例は、インスタンスを構成するcallableに構成データをキーワードパラメーターの形式で渡す方法を示しています。 実行すると、上記のスクリプトが出力されます。

changed: hello

これは、フィルターが構成どおりに機能していることを示しています。

注意すべきいくつかの追加ポイント:

  • 構成で呼び出し可能オブジェクトを直接参照できない場合(例: 別のモジュールにあり、構成ディクショナリがある場所に直接インポートできない場合は、外部オブジェクトへのアクセスの説明に従ってext://...の形式を使用できます。 たとえば、上記の例では、MyFilterの代わりにテキスト'ext://__main__.MyFilter'を使用できます。
  • フィルタの場合と同様に、この手法を使用してカスタムハンドラとフォーマッタを構成することもできます。 ロギングが構成でのユーザー定義オブジェクトの使用をサポートする方法の詳細については、ユーザー定義オブジェクトを参照してください。また、上記の他のクックブックレシピ dictConfig()を使用したハンドラーのカスタマイズを参照してください。


カスタマイズされた例外フォーマット

カスタマイズされた例外フォーマットを実行したい場合があります-引数のために、例外情報が存在する場合でも、ログに記録されたイベントごとに正確に1行が必要だとします。 次の例に示すように、カスタムフォーマッタクラスを使用してこれを行うことができます。

import logging

class OneLineExceptionFormatter(logging.Formatter):
    def formatException(self, exc_info):
        """
        Format an exception so that it prints on a single line.
        """
        result = super().formatException(exc_info)
        return repr(result)  # or format into one line however you want to

    def format(self, record):
        s = super().format(record)
        if record.exc_text:
            s = s.replace('\n', '') + '|'
        return s

def configure_logging():
    fh = logging.FileHandler('output.txt', 'w')
    f = OneLineExceptionFormatter('%(asctime)s|%(levelname)s|%(message)s|',
                                  '%d/%m/%Y %H:%M:%S')
    fh.setFormatter(f)
    root = logging.getLogger()
    root.setLevel(logging.DEBUG)
    root.addHandler(fh)

def main():
    configure_logging()
    logging.info('Sample message')
    try:
        x = 1 / 0
    except ZeroDivisionError as e:
        logging.exception('ZeroDivisionError: %s', e)

if __name__ == '__main__':
    main()

実行すると、正確に2行のファイルが生成されます。

28/01/2015 07:21:23|INFO|Sample message|
28/01/2015 07:21:23|ERROR|ZeroDivisionError: integer division or modulo by zero|'Traceback (most recent call last):\n  File "logtest7.py", line 30, in main\n    x = 1 / 0\nZeroDivisionError: integer division or modulo by zero'|

上記の処理は単純ですが、例外情報を好みに合わせてフォーマットする方法を示しています。 traceback モジュールは、より専門的なニーズに役立つ場合があります。


ロギングメッセージを話す

ロギングメッセージを可視形式ではなく可聴形式でレンダリングすることが望ましい場合があります。 これは、Pythonバインディングがなくても、システムでテキスト読み上げ(TTS)機能を使用できる場合は簡単に実行できます。 ほとんどのTTSシステムには、実行できるコマンドラインプログラムがあり、これはサブプロセスを使用してハンドラーから呼び出すことができます。 ここでは、TTSコマンドラインプログラムがユーザーとの対話や完了までに長い時間がかかることを想定しておらず、ログに記録されるメッセージの頻度がユーザーをメッセージでいっぱいにするほど高くはなく、メッセージは同時にではなく一度に1つずつ話されます。以下の実装例では、次のメッセージが処理される前に1つのメッセージが話されるのを待ちます。これにより、他のハンドラーが待機し続ける可能性があります。 これは、espeak TTSパッケージが利用可能であることを前提としたアプローチを示す短い例です。

import logging
import subprocess
import sys

class TTSHandler(logging.Handler):
    def emit(self, record):
        msg = self.format(record)
        # Speak slowly in a female English voice
        cmd = ['espeak', '-s150', '-ven+f3', msg]
        p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
                             stderr=subprocess.STDOUT)
        # wait for the program to finish
        p.communicate()

def configure_logging():
    h = TTSHandler()
    root = logging.getLogger()
    root.addHandler(h)
    # the default formatter just returns the message
    root.setLevel(logging.DEBUG)

def main():
    logging.info('Hello')
    logging.debug('Goodbye')

if __name__ == '__main__':
    configure_logging()
    sys.exit(main())

実行すると、このスクリプトは女性の声で「こんにちは」、次に「さようなら」と言うはずです。

もちろん、上記のアプローチは、他のTTSシステム、さらにはコマンドラインから実行される外部プログラムを介してメッセージを処理できる他のシステムにも適用できます。


ロギングメッセージをバッファリングし、条件付きで出力する

一時的な領域にメッセージを記録し、特定の条件が発生した場合にのみメッセージを出力したい場合があります。 たとえば、関数でデバッグイベントのログ記録を開始し、関数がエラーなしで完了した場合、収集したデバッグ情報でログを乱雑にしたくないが、エラーがある場合は、すべてのデバッグが必要な場合があります。出力する情報とエラー。

これは、ロギングをこのように動作させたい関数のデコレータを使用してこれを行う方法を示す例です。 logging.handlers.MemoryHandler を利用します。これにより、何らかの条件が発生するまでログに記録されたイベントをバッファリングできます。条件が発生すると、バッファリングされたイベントはflushed-別のハンドラー([X212X ] ハンドラー)処理用。 デフォルトでは、MemoryHandlerは、バッファーがいっぱいになるか、レベルが指定されたしきい値以上のイベントが表示されるとフラッシュされます。 カスタムフラッシュ動作が必要な場合は、MemoryHandlerのより特殊なサブクラスでこのレシピを使用できます。

サンプルスクリプトには、fooという単純な関数があります。この関数は、すべてのログレベルを循環し、sys.stderrに書き込み、ログを記録しようとしているレベルを示し、実際にそのレベルでメッセージをログに記録します。レベル。 パラメータをfooに渡すことができます。これは、trueの場合、ERRORおよびCRITICALレベルでログに記録されます。それ以外の場合は、DEBUG、INFO、およびWARNINGレベルでのみログに記録されます。

スクリプトは、必要な条件付きロギングを実行するデコレータでfooをデコレートするように調整するだけです。 デコレータはロガーをパラメータとして受け取り、デコレートされた関数の呼び出し中にメモリハンドラをアタッチします。 デコレータは、ターゲットハンドラ、フラッシュが発生するレベル、およびバッファの容量(バッファされたレコードの数)を使用して、さらにパラメータ化できます。 これらはデフォルトで StreamHandler になり、それぞれsys.stderrlogging.ERROR、および100に書き込みます。

スクリプトは次のとおりです。

import logging
from logging.handlers import MemoryHandler
import sys

logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

def log_if_errors(logger, target_handler=None, flush_level=None, capacity=None):
    if target_handler is None:
        target_handler = logging.StreamHandler()
    if flush_level is None:
        flush_level = logging.ERROR
    if capacity is None:
        capacity = 100
    handler = MemoryHandler(capacity, flushLevel=flush_level, target=target_handler)

    def decorator(fn):
        def wrapper(*args, **kwargs):
            logger.addHandler(handler)
            try:
                return fn(*args, **kwargs)
            except Exception:
                logger.exception('call failed')
                raise
            finally:
                super(MemoryHandler, handler).flush()
                logger.removeHandler(handler)
        return wrapper

    return decorator

def write_line(s):
    sys.stderr.write('%s\n' % s)

def foo(fail=False):
    write_line('about to log at DEBUG ...')
    logger.debug('Actually logged at DEBUG')
    write_line('about to log at INFO ...')
    logger.info('Actually logged at INFO')
    write_line('about to log at WARNING ...')
    logger.warning('Actually logged at WARNING')
    if fail:
        write_line('about to log at ERROR ...')
        logger.error('Actually logged at ERROR')
        write_line('about to log at CRITICAL ...')
        logger.critical('Actually logged at CRITICAL')
    return fail

decorated_foo = log_if_errors(logger)(foo)

if __name__ == '__main__':
    logger.setLevel(logging.DEBUG)
    write_line('Calling undecorated foo with False')
    assert not foo(False)
    write_line('Calling undecorated foo with True')
    assert foo(True)
    write_line('Calling decorated foo with False')
    assert not decorated_foo(False)
    write_line('Calling decorated foo with True')
    assert decorated_foo(True)

このスクリプトを実行すると、次の出力が表示されます。

Calling undecorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling undecorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
about to log at CRITICAL ...
Calling decorated foo with False
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
Calling decorated foo with True
about to log at DEBUG ...
about to log at INFO ...
about to log at WARNING ...
about to log at ERROR ...
Actually logged at DEBUG
Actually logged at INFO
Actually logged at WARNING
Actually logged at ERROR
about to log at CRITICAL ...
Actually logged at CRITICAL

ご覧のとおり、実際のログ出力は、重大度がERROR以上のイベントがログに記録された場合にのみ発生しますが、その場合、重大度の低い以前のイベントもログに記録されます。

もちろん、従来の装飾手段を使用することもできます。

@log_if_errors(logger)
def foo(fail=False):
    ...

構成によるUTC(GMT)を使用した時間のフォーマット

UTCを使用して時刻をフォーマットしたい場合があります。これは、以下に示す UTCFormatter などのクラスを使用して実行できます。

import logging
import time

class UTCFormatter(logging.Formatter):
    converter = time.gmtime

その後、 Formatter の代わりにUTCFormatterをコードで使用できます。 構成を介してこれを行う場合は、 dictConfig() APIを使用して、次の完全な例で示すアプローチを使用できます。

import logging
import logging.config
import time

class UTCFormatter(logging.Formatter):
    converter = time.gmtime

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'utc': {
            '()': UTCFormatter,
            'format': '%(asctime)s %(message)s',
        },
        'local': {
            'format': '%(asctime)s %(message)s',
        }
    },
    'handlers': {
        'console1': {
            'class': 'logging.StreamHandler',
            'formatter': 'utc',
        },
        'console2': {
            'class': 'logging.StreamHandler',
            'formatter': 'local',
        },
    },
    'root': {
        'handlers': ['console1', 'console2'],
   }
}

if __name__ == '__main__':
    logging.config.dictConfig(LOGGING)
    logging.warning('The local time is %s', time.asctime())

このスクリプトを実行すると、次のように出力されます。

2015-10-17 12:53:29,501 The local time is Sat Oct 17 13:53:29 2015
2015-10-17 13:53:29,501 The local time is Sat Oct 17 13:53:29 2015

ハンドラーごとに1つずつ、現地時間とUTCの両方として時刻がどのようにフォーマットされるかを示します。


選択的ロギングのためのコンテキストマネージャーの使用

ロギング構成を一時的に変更し、何かを行った後に元に戻すと便利な場合があります。 このため、コンテキストマネージャは、ロギングコンテキストを保存および復元する最も明白な方法です。 このようなコンテキストマネージャーの簡単な例を次に示します。これにより、オプションでログレベルを変更し、純粋にコンテキストマネージャーのスコープ内にログハンドラーを追加できます。

import logging
import sys

class LoggingContext:
    def __init__(self, logger, level=None, handler=None, close=True):
        self.logger = logger
        self.level = level
        self.handler = handler
        self.close = close

    def __enter__(self):
        if self.level is not None:
            self.old_level = self.logger.level
            self.logger.setLevel(self.level)
        if self.handler:
            self.logger.addHandler(self.handler)

    def __exit__(self, et, ev, tb):
        if self.level is not None:
            self.logger.setLevel(self.old_level)
        if self.handler:
            self.logger.removeHandler(self.handler)
        if self.handler and self.close:
            self.handler.close()
        # implicit return of None => don't swallow exceptions

レベル値を指定すると、ロガーのレベルは、コンテキストマネージャーがカバーするwithブロックのスコープ内でその値に設定されます。 ハンドラーを指定すると、ブロックに入るときにロガーに追加され、ブロックから出るときに削除されます。 ブロック終了時にハンドラーを閉じるようにマネージャーに依頼することもできます。ハンドラーが不要になった場合は、これを行うことができます。

それがどのように機能するかを説明するために、上記に次のコードブロックを追加できます。

if __name__ == '__main__':
    logger = logging.getLogger('foo')
    logger.addHandler(logging.StreamHandler())
    logger.setLevel(logging.INFO)
    logger.info('1. This should appear just once on stderr.')
    logger.debug('2. This should not appear.')
    with LoggingContext(logger, level=logging.DEBUG):
        logger.debug('3. This should appear once on stderr.')
    logger.debug('4. This should not appear.')
    h = logging.StreamHandler(sys.stdout)
    with LoggingContext(logger, level=logging.DEBUG, handler=h, close=True):
        logger.debug('5. This should appear twice - once on stderr and once on stdout.')
    logger.info('6. This should appear just once on stderr.')
    logger.debug('7. This should not appear.')

最初にロガーのレベルをINFOに設定したため、メッセージ#1は表示され、メッセージ#2は表示されません。 次に、次のwithブロックで一時的にレベルをDEBUGに変更すると、メッセージ#3が表示されます。 ブロックが終了すると、ロガーのレベルがINFOに復元されるため、メッセージ#4は表示されません。 次のwithブロックでは、レベルをDEBUGに再度設定しますが、sys.stdoutに書き込むハンドラーも追加します。 したがって、メッセージ#5はコンソールに2回表示されます(1回はstderr経由、もう1回はstdout経由)。 withステートメントの完了後、ステータスは以前と同じであるため、メッセージ#6は表示されますが(メッセージ#1のように)、メッセージ#7は表示されません(メッセージ#2のように)。

結果のスクリプトを実行すると、結果は次のようになります。

$ python logctx.py
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.

もう一度実行しても、stderr/dev/nullにパイプすると、stdoutに書き込まれる唯一のメッセージである次のメッセージが表示されます。

$ python logctx.py 2>/dev/null
5. This should appear twice - once on stderr and once on stdout.

繰り返しますが、stdout/dev/nullにパイプすると、次のようになります。

$ python logctx.py >/dev/null
1. This should appear just once on stderr.
3. This should appear once on stderr.
5. This should appear twice - once on stderr and once on stdout.
6. This should appear just once on stderr.

この場合、stdoutに出力されたメッセージ#5は期待通りに表示されません。

もちろん、ここで説明するアプローチは、たとえばロギングフィルターを一時的にアタッチするなど、一般化することができます。 上記のコードはPython2とPython3で機能することに注意してください。


CLIアプリケーションスターターテンプレート

これがあなたができる方法を示す例です:

  • コマンドライン引数に基づいてログレベルを使用する
  • 別々のファイル内の複数のサブコマンドにディスパッチし、すべてが一貫した方法で同じレベルでログを記録します
  • シンプルで最小限の構成を利用する

一部のサービスを停止、開始、または再起動するコマンドラインアプリケーションがあるとします。 これは、説明の目的で、アプリケーションのメインスクリプトであるファイルapp.pyとして編成でき、個々のコマンドはstart.pystop.py、および [に実装されています。 X191X]。 さらに、コマンドライン引数を介してアプリケーションの詳細度を制御したいとします。デフォルトはlogging.INFOです。 app.pyを作成する1つの方法は次のとおりです。

import argparse
import importlib
import logging
import os
import sys

def main(args=None):
    scriptname = os.path.basename(__file__)
    parser = argparse.ArgumentParser(scriptname)
    levels = ('DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL')
    parser.add_argument('--log-level', default='INFO', choices=levels)
    subparsers = parser.add_subparsers(dest='command',
                                       help='Available commands:')
    start_cmd = subparsers.add_parser('start', help='Start a service')
    start_cmd.add_argument('name', metavar='NAME',
                           help='Name of service to start')
    stop_cmd = subparsers.add_parser('stop',
                                     help='Stop one or more services')
    stop_cmd.add_argument('names', metavar='NAME', nargs='+',
                          help='Name of service to stop')
    restart_cmd = subparsers.add_parser('restart',
                                        help='Restart one or more services')
    restart_cmd.add_argument('names', metavar='NAME', nargs='+',
                             help='Name of service to restart')
    options = parser.parse_args()
    # the code to dispatch commands could all be in this file. For the purposes
    # of illustration only, we implement each command in a separate module.
    try:
        mod = importlib.import_module(options.command)
        cmd = getattr(mod, 'command')
    except (ImportError, AttributeError):
        print('Unable to find the code for command \'%s\'' % options.command)
        return 1
    # Could get fancy here and load configuration from file or dictionary
    logging.basicConfig(level=options.log_level,
                        format='%(levelname)s %(name)s %(message)s')
    cmd(options)

if __name__ == '__main__':
    sys.exit(main())

また、startstop、およびrestartコマンドは、次のように別々のモジュールに実装できます。

# start.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    logger.debug('About to start %s', options.name)
    # actually do the command processing here ...
    logger.info('Started the \'%s\' service.', options.name)

したがって、停止するために:

# stop.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    n = len(options.names)
    if n == 1:
        plural = ''
        services = '\'%s\'' % options.names[0]
    else:
        plural = 's'
        services = ', '.join('\'%s\'' % name for name in options.names)
        i = services.rfind(', ')
        services = services[:i] + ' and ' + services[i + 2:]
    logger.debug('About to stop %s', services)
    # actually do the command processing here ...
    logger.info('Stopped the %s service%s.', services, plural)

同様に再起動する場合:

# restart.py
import logging

logger = logging.getLogger(__name__)

def command(options):
    n = len(options.names)
    if n == 1:
        plural = ''
        services = '\'%s\'' % options.names[0]
    else:
        plural = 's'
        services = ', '.join('\'%s\'' % name for name in options.names)
        i = services.rfind(', ')
        services = services[:i] + ' and ' + services[i + 2:]
    logger.debug('About to restart %s', services)
    # actually do the command processing here ...
    logger.info('Restarted the %s service%s.', services, plural)

このアプリケーションをデフォルトのログレベルで実行すると、次のような出力が得られます。

$ python app.py start foo
INFO start Started the 'foo' service.

$ python app.py stop foo bar
INFO stop Stopped the 'foo' and 'bar' services.

$ python app.py restart foo bar baz
INFO restart Restarted the 'foo', 'bar' and 'baz' services.

最初の単語はログレベルであり、2番目の単語はイベントがログに記録された場所のモジュールまたはパッケージ名です。

ログレベルを変更すると、ログに送信される情報を変更できます。 たとえば、より多くの情報が必要な場合:

$ python app.py --log-level DEBUG start foo
DEBUG start About to start foo
INFO start Started the 'foo' service.

$ python app.py --log-level DEBUG stop foo bar
DEBUG stop About to stop 'foo' and 'bar'
INFO stop Stopped the 'foo' and 'bar' services.

$ python app.py --log-level DEBUG restart foo bar baz
DEBUG restart About to restart 'foo', 'bar' and 'baz'
INFO restart Restarted the 'foo', 'bar' and 'baz' services.

そして、私たちがもっと少なくしたいのなら:

$ python app.py --log-level WARNING start foo
$ python app.py --log-level WARNING stop foo bar
$ python app.py --log-level WARNING restart foo bar baz

この場合、WARNINGレベル以上のログは記録されないため、コマンドはコンソールに何も出力しません。


ロギング用のQtGUI

時々出てくる質問は、GUIアプリケーションにログインする方法についてです。 Qt フレームワークは、 PySide2 または PyQt5 ライブラリを使用したPythonバインディングを備えた人気のあるクロスプラットフォームUIフレームワークです。

次の例は、QtGUIにログインする方法を示しています。 これにより、呼び出し可能オブジェクトを受け取る単純なQtHandlerクラスが導入されます。これは、GUI更新を行うメインスレッドのスロットである必要があります。 ワーカースレッドも作成され、UI自体(手動ログのボタンを使用)とバックグラウンドで作業を行うワーカースレッド(ここでは、ランダムなレベルでメッセージをランダムにログに記録するだけ)の両方からGUIにログを記録する方法を示します。間の短い遅延)。

ワーカースレッドは、 threading モジュールではなくQtのQThreadクラスを使用して実装されます。これは、他のQtコンポーネント。

このコードは、PySide2またはPyQt5の最近のリリースで機能するはずです。 このアプローチを以前のバージョンのQtに適応させることができるはずです。 詳細については、コードスニペットのコメントを参照してください。

import datetime
import logging
import random
import sys
import time

# Deal with minor differences between PySide2 and PyQt5
try:
    from PySide2 import QtCore, QtGui, QtWidgets
    Signal = QtCore.Signal
    Slot = QtCore.Slot
except ImportError:
    from PyQt5 import QtCore, QtGui, QtWidgets
    Signal = QtCore.pyqtSignal
    Slot = QtCore.pyqtSlot


logger = logging.getLogger(__name__)


#
# Signals need to be contained in a QObject or subclass in order to be correctly
# initialized.
#
class Signaller(QtCore.QObject):
    signal = Signal(str, logging.LogRecord)

#
# Output to a Qt GUI is only supposed to happen on the main thread. So, this
# handler is designed to take a slot function which is set up to run in the main
# thread. In this example, the function takes a string argument which is a
# formatted log message, and the log record which generated it. The formatted
# string is just a convenience - you could format a string for output any way
# you like in the slot function itself.
#
# You specify the slot function to do whatever GUI updates you want. The handler
# doesn't know or care about specific UI elements.
#
class QtHandler(logging.Handler):
    def __init__(self, slotfunc, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.signaller = Signaller()
        self.signaller.signal.connect(slotfunc)

    def emit(self, record):
        s = self.format(record)
        self.signaller.signal.emit(s, record)

#
# This example uses QThreads, which means that the threads at the Python level
# are named something like "Dummy-1". The function below gets the Qt name of the
# current thread.
#
def ctname():
    return QtCore.QThread.currentThread().objectName()


#
# Used to generate random levels for logging.
#
LEVELS = (logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
          logging.CRITICAL)

#
# This worker class represents work that is done in a thread separate to the
# main thread. The way the thread is kicked off to do work is via a button press
# that connects to a slot in the worker.
#
# Because the default threadName value in the LogRecord isn't much use, we add
# a qThreadName which contains the QThread name as computed above, and pass that
# value in an "extra" dictionary which is used to update the LogRecord with the
# QThread name.
#
# This example worker just outputs messages sequentially, interspersed with
# random delays of the order of a few seconds.
#
class Worker(QtCore.QObject):
    @Slot()
    def start(self):
        extra = {'qThreadName': ctname() }
        logger.debug('Started work', extra=extra)
        i = 1
        # Let the thread run until interrupted. This allows reasonably clean
        # thread termination.
        while not QtCore.QThread.currentThread().isInterruptionRequested():
            delay = 0.5 + random.random() * 2
            time.sleep(delay)
            level = random.choice(LEVELS)
            logger.log(level, 'Message after delay of %3.1f: %d', delay, i, extra=extra)
            i += 1

#
# Implement a simple UI for this cookbook example. This contains:
#
# * A read-only text edit window which holds formatted log messages
# * A button to start work and log stuff in a separate thread
# * A button to log something from the main thread
# * A button to clear the log window
#
class Window(QtWidgets.QWidget):

    COLORS = {
        logging.DEBUG: 'black',
        logging.INFO: 'blue',
        logging.WARNING: 'orange',
        logging.ERROR: 'red',
        logging.CRITICAL: 'purple',
    }

    def __init__(self, app):
        super().__init__()
        self.app = app
        self.textedit = te = QtWidgets.QPlainTextEdit(self)
        # Set whatever the default monospace font is for the platform
        f = QtGui.QFont('nosuchfont')
        f.setStyleHint(f.Monospace)
        te.setFont(f)
        te.setReadOnly(True)
        PB = QtWidgets.QPushButton
        self.work_button = PB('Start background work', self)
        self.log_button = PB('Log a message at a random level', self)
        self.clear_button = PB('Clear log window', self)
        self.handler = h = QtHandler(self.update_status)
        # Remember to use qThreadName rather than threadName in the format string.
        fs = '%(asctime)s %(qThreadName)-12s %(levelname)-8s %(message)s'
        formatter = logging.Formatter(fs)
        h.setFormatter(formatter)
        logger.addHandler(h)
        # Set up to terminate the QThread when we exit
        app.aboutToQuit.connect(self.force_quit)

        # Lay out all the widgets
        layout = QtWidgets.QVBoxLayout(self)
        layout.addWidget(te)
        layout.addWidget(self.work_button)
        layout.addWidget(self.log_button)
        layout.addWidget(self.clear_button)
        self.setFixedSize(900, 400)

        # Connect the non-worker slots and signals
        self.log_button.clicked.connect(self.manual_update)
        self.clear_button.clicked.connect(self.clear_display)

        # Start a new worker thread and connect the slots for the worker
        self.start_thread()
        self.work_button.clicked.connect(self.worker.start)
        # Once started, the button should be disabled
        self.work_button.clicked.connect(lambda : self.work_button.setEnabled(False))

    def start_thread(self):
        self.worker = Worker()
        self.worker_thread = QtCore.QThread()
        self.worker.setObjectName('Worker')
        self.worker_thread.setObjectName('WorkerThread')  # for qThreadName
        self.worker.moveToThread(self.worker_thread)
        # This will start an event loop in the worker thread
        self.worker_thread.start()

    def kill_thread(self):
        # Just tell the worker to stop, then tell it to quit and wait for that
        # to happen
        self.worker_thread.requestInterruption()
        if self.worker_thread.isRunning():
            self.worker_thread.quit()
            self.worker_thread.wait()
        else:
            print('worker has already exited.')

    def force_quit(self):
        # For use when the window is closed
        if self.worker_thread.isRunning():
            self.kill_thread()

    # The functions below update the UI and run in the main thread because
    # that's where the slots are set up

    @Slot(str, logging.LogRecord)
    def update_status(self, status, record):
        color = self.COLORS.get(record.levelno, 'black')
        s = '<pre><font color="%s">%s</font></pre>' % (color, status)
        self.textedit.appendHtml(s)

    @Slot()
    def manual_update(self):
        # This function uses the formatted message passed in, but also uses
        # information from the record to format the message in an appropriate
        # color according to its severity (level).
        level = random.choice(LEVELS)
        extra = {'qThreadName': ctname() }
        logger.log(level, 'Manually logged!', extra=extra)

    @Slot()
    def clear_display(self):
        self.textedit.clear()


def main():
    QtCore.QThread.currentThread().setObjectName('MainThread')
    logging.getLogger().setLevel(logging.DEBUG)
    app = QtWidgets.QApplication(sys.argv)
    example = Window(app)
    example.show()
    sys.exit(app.exec_())

if __name__=='__main__':
    main()

避けるべきパターン

前のセクションでは、実行または対処する必要がある可能性のあることを行う方法について説明しましたが、役に立たないであるため、ほとんどの場合回避する必要があるいくつかの使用パターンについて言及する価値があります。 以下のセクションは順不同です。

同じログファイルを複数回開く

Windowsでは、「ファイルは別のプロセスで使用されています」というエラーが発生するため、通常、同じファイルを複数回開くことはできません。 ただし、POSIXプラットフォームでは、同じファイルを複数回開いてもエラーは発生しません。 これは、たとえば次のようにして誤って行われる可能性があります。

  • 同じファイルを参照するファイルハンドラーを複数回追加する(例: コピー/貼り付け/変更を忘れるエラーによる)。
  • 名前が異なるために外観が異なるが、一方が他方へのシンボリックリンクであるため同じである2つのファイルを開く。
  • プロセスをフォークし、その後、親と子の両方が同じファイルへの参照を持ちます。 これは、たとえば、マルチプロセッシングモジュールの使用による可能性があります。

ファイルを複数回開くと、ほとんどの場合動作するように見える場合がありますが、実際にはいくつかの問題が発生する可能性があります。

  • 複数のスレッドまたはプロセスが同じファイルに書き込もうとするため、ログ出力が文字化けする可能性があります。 ロギングは、複数のスレッドによる同じハンドラーインスタンスの同時使用を防ぎますが、同じファイルを指す2つの異なるハンドラーインスタンスを使用して2つの異なるスレッドによって同時書き込みが試行された場合、そのような保護はありません。
  • ファイルを削除しようとしました(例: ファイルのローテーション中)は、それを指す別の参照があるため、サイレントに失敗します。 これにより、混乱が生じ、デバッグ時間が無駄になる可能性があります。ログエントリが予期しない場所に配置されたり、完全に失われたりします。

複数のプロセスから単一のファイルにログを記録するで概説されている手法を使用して、このような問題を回避します。


ロガーをクラスの属性として使用するか、パラメーターとして渡す

これを行う必要があるという珍しいケースがあるかもしれませんが、ロガーはシングルトンであるため、一般的には意味がありません。 コードは常にlogging.getLogger(name)を使用して名前で特定のロガーインスタンスにアクセスできるため、インスタンスを渡し、インスタンス属性として保持することは無意味です。 JavaやC#などの他の言語では、ロガーは静的なクラス属性であることが多いことに注意してください。 ただし、このパターンは、モジュール(クラスではなく)がソフトウェア分解の単位であるPythonでは意味がありません。


NullHandler以外のハンドラーをライブラリのロガーに追加する

ハンドラー、フォーマッター、フィルターを追加してロギングを構成するのは、ライブラリ開発者ではなく、アプリケーション開発者の責任です。 ライブラリを保守している場合は、 NullHandler インスタンス以外のロガーにハンドラーを追加しないようにしてください。


たくさんのロガーを作成する

ロガーは、スクリプトの実行中に解放されることのないシングルトンであるため、多数のロガーを作成すると、メモリを使い果たして解放できなくなります。 たとえばごとにロガーを作成するのではなく ファイル処理またはネットワーク接続が確立されたら、既存のメカニズムを使用してコンテキスト情報をログに渡し、作成されたロガーをアプリケーション内の領域を記述するものに制限します(通常はモジュールですが、それよりも少し細かい場合もあります) 。