Source code for dagster.utils.log

import copy
import logging
import sys
import traceback
from contextlib import contextmanager
from typing import Dict, NamedTuple, Optional

import coloredlogs
import pendulum

from dagster import check, seven
from dagster.config import Enum, EnumValue
from dagster.core.definitions.logger_definition import logger
from dagster.core.utils import PYTHON_LOGGING_LEVELS_MAPPING, coerce_valid_log_level

LogLevelEnum = Enum("log_level", list(map(EnumValue, PYTHON_LOGGING_LEVELS_MAPPING.keys())))


class JsonFileHandler(logging.Handler):
    def __init__(self, json_path):
        super(JsonFileHandler, self).__init__()
        self.json_path = check.str_param(json_path, "json_path")

    def emit(self, record):
        try:
            log_dict = copy.copy(record.__dict__)

            # This horrific monstrosity is to maintain backwards compatability
            # with the old behavior of the JsonFileHandler, which the clarify
            # project has a dependency on. It relied on the dagster-defined
            # properties smashing all the properties of the LogRecord object
            # and uploads all of those properties to a redshift table for
            # in order to do analytics on the log

            if "dagster_meta" in log_dict:
                dagster_meta_dict = log_dict["dagster_meta"]
                del log_dict["dagster_meta"]
            else:
                dagster_meta_dict = {}

            log_dict.update(dagster_meta_dict)

            with open(self.json_path, "a") as ff:
                text_line = seven.json.dumps(log_dict)
                ff.write(text_line + "\n")
        # Need to catch Exception here, so disabling lint
        except Exception as e:  # pylint: disable=W0703
            logging.critical("[{}] Error during logging!".format(self.__class__.__name__))
            logging.exception(str(e))


class StructuredLoggerMessage(
    NamedTuple(
        "_StructuredLoggerMessage",
        [
            ("name", str),
            ("message", str),
            ("level", int),
            ("meta", Dict[object, object]),
            ("record", logging.LogRecord),
        ],
    )
):
    def __new__(
        cls,
        name: str,
        message: str,
        level: int,
        meta: Dict[object, object],
        record: logging.LogRecord,
    ):
        return super(StructuredLoggerMessage, cls).__new__(
            cls,
            check.str_param(name, "name"),
            check.str_param(message, "message"),
            coerce_valid_log_level(level),
            check.dict_param(meta, "meta"),
            check.inst_param(record, "record", logging.LogRecord),
        )


class JsonEventLoggerHandler(logging.Handler):
    def __init__(self, json_path, construct_event_record):
        super(JsonEventLoggerHandler, self).__init__()
        self.json_path = check.str_param(json_path, "json_path")
        self.construct_event_record = construct_event_record

    def emit(self, record):
        try:
            event_record = self.construct_event_record(record)
            with open(self.json_path, "a") as ff:
                text_line = seven.json.dumps(event_record.to_dict())
                ff.write(text_line + "\n")

        # Need to catch Exception here, so disabling lint
        except Exception as e:  # pylint: disable=W0703
            logging.critical("[{}] Error during logging!".format(self.__class__.__name__))
            logging.exception(str(e))


class StructuredLoggerHandler(logging.Handler):
    def __init__(self, callback):
        super(StructuredLoggerHandler, self).__init__()
        self.callback = check.is_callable(callback, "callback")

    def emit(self, record):
        try:
            self.callback(
                StructuredLoggerMessage(
                    name=record.name,
                    message=record.msg,
                    level=record.levelno,
                    meta=record.dagster_meta,
                    record=record,
                )
            )
        # Need to catch Exception here, so disabling lint
        except Exception as e:  # pylint: disable=W0703
            logging.critical("[{}] Error during logging!".format(self.__class__.__name__))
            logging.exception(str(e))


def construct_single_handler_logger(name, level, handler):
    check.str_param(name, "name")
    check.inst_param(handler, "handler", logging.Handler)

    level = coerce_valid_log_level(level)

    @logger
    def single_handler_logger(_init_context):
        klass = logging.getLoggerClass()
        logger_ = klass(name, level=level)
        logger_.addHandler(handler)
        handler.setLevel(level)
        return logger_

    return single_handler_logger


# Base python logger whose messages will be captured as structured Dagster log messages.
BASE_DAGSTER_LOGGER = logging.getLogger(name="dagster")


[docs]def get_dagster_logger(name: Optional[str] = None) -> logging.Logger: """ Creates a python logger whose output messages will be captured and converted into Dagster log messages. This means they will have structured information such as the step_key, run_id, etc. embedded into them, and will show up in the Dagster event log. This can be used as a more convenient alternative to `context.log` in most cases. If log level is not set explicitly, defaults to DEBUG. Args: name (Optional[str]): If supplied, will create a logger with the name "dagster.builtin.{name}", with properties inherited from the base Dagster logger. If omitted, the returned logger will be named "dagster.builtin". Returns: :class:`logging.Logger`: A logger whose output will be captured by Dagster. Example: .. code-block:: python from dagster import get_dagster_logger, op @op def hello_op(): log = get_dagster_logger() for i in range(5): # do something log.info(f"Did {i+1} things!") """ # enforce that the parent logger will always have a DEBUG log level BASE_DAGSTER_LOGGER.setLevel(logging.DEBUG) base_builtin = BASE_DAGSTER_LOGGER.getChild("builtin") if name: return base_builtin.getChild(name) return base_builtin
def define_structured_logger(name, callback, level): check.str_param(name, "name") check.callable_param(callback, "callback") level = coerce_valid_log_level(level) return construct_single_handler_logger(name, level, StructuredLoggerHandler(callback)) def define_json_file_logger(name, json_path, level): check.str_param(name, "name") check.str_param(json_path, "json_path") level = coerce_valid_log_level(level) stream_handler = JsonFileHandler(json_path) stream_handler.setFormatter(define_default_formatter()) return construct_single_handler_logger(name, level, stream_handler) def get_stack_trace_array(exception): check.inst_param(exception, "exception", Exception) if hasattr(exception, "__traceback__"): tb = exception.__traceback__ else: _exc_type, _exc_value, tb = sys.exc_info() return traceback.format_tb(tb) def _mockable_formatTime(record, datefmt=None): # pylint: disable=unused-argument """Uses pendulum.now to determine the logging time, causing pendulum mocking to affect the logger timestamp in tests.""" return pendulum.now().strftime(datefmt if datefmt else default_date_format_string()) def default_format_string(): return "%(asctime)s - %(name)s - %(levelname)s - %(message)s" def default_date_format_string(): return "%Y-%m-%d %H:%M:%S %z" def define_default_formatter(): return logging.Formatter(default_format_string(), default_date_format_string()) @contextmanager def quieten(quiet=True, level=logging.WARNING): if quiet: logging.disable(level) try: yield finally: if quiet: logging.disable(logging.NOTSET) def configure_loggers(handler="default", log_level="INFO"): LOGGING_CONFIG = { "version": 1, "disable_existing_loggers": False, "formatters": { "colored": { "()": coloredlogs.ColoredFormatter, "fmt": default_format_string(), "datefmt": default_date_format_string(), "field_styles": {"levelname": {"color": "blue"}, "asctime": {"color": "green"}}, "level_styles": {"debug": {}, "error": {"color": "red"}}, }, }, "handlers": { "default": { "formatter": "colored", "class": "logging.StreamHandler", "stream": sys.stdout, "level": log_level, }, "null": { "class": "logging.NullHandler", }, }, "loggers": { "dagster": { "handlers": [handler], "level": "INFO", }, "dagit": { "handlers": [handler], "level": "INFO", }, }, } logging.config.dictConfig(LOGGING_CONFIG) if handler == "default": for name in ["dagster", "dagit"]: logging.getLogger(name).handlers[0].formatter.formatTime = _mockable_formatTime