Ask AI

You are viewing an unreleased or outdated version of the documentation

Source code for dagster._core.definitions.asset_spec

from enum import Enum
from functools import cached_property
from typing import TYPE_CHECKING, Any, Iterable, Mapping, NamedTuple, Optional, Sequence, Set

import dagster._check as check
from dagster._annotations import PublicAttr, experimental_param
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.declarative_automation.automation_condition import (
    AutomationCondition,
)
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster._core.definitions.freshness_policy import FreshnessPolicy
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.partition_mapping import PartitionMapping
from dagster._core.definitions.utils import (
    resolve_automation_condition,
    validate_asset_owner,
    validate_group_name,
    validate_tags_strict,
)
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._core.storage.tags import KIND_PREFIX
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.internal_init import IHasInternalInit

if TYPE_CHECKING:
    from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep

# SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE lives on the metadata of an asset
# (which currently ends up on the Output associated with the asset key)
# whih encodes the execution type the of asset. "Unexecutable" assets are assets
# that cannot be materialized in Dagster, but can have events in the event
# log keyed off of them, making Dagster usable as a observability and lineage tool
# for externally materialized assets.
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE = "dagster/asset_execution_type"


# SYSTEM_METADATA_KEY_IO_MANAGER_KEY lives on the metadata of an asset without a node def and
# determines the io_manager_key that can be used to load it. This is necessary because IO manager
# keys are otherwise encoded inside OutputDefinitions within NodeDefinitions.
SYSTEM_METADATA_KEY_IO_MANAGER_KEY = "dagster/io_manager_key"

# SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES lives on the metadata of
# external assets resulting from a source asset conversion. It contains the
# `auto_observe_interval_minutes` value from the source asset and is consulted
# in the auto-materialize daemon. It should eventually be eliminated in favor
# of an implementation of `auto_observe_interval_minutes` in terms of
# `AutoMaterializeRule`.
SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES = "dagster/auto_observe_interval_minutes"

# SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET lives on the metadata of external assets that are
# created for undefined but referenced assets during asset graph normalization. For example, in the
# below definitions, `foo` is referenced by upstream `bar` but has no corresponding definition:
#
#
#     @asset(deps=["foo"])
#     def bar(context: AssetExecutionContext):
#         ...
#
#     defs=Definitions(assets=[bar])
#
# During normalization we create a "stub" definition for `foo` and attach this metadata to it.
SYSTEM_METADATA_KEY_AUTO_CREATED_STUB_ASSET = "dagster/auto_created_stub_asset"


@whitelist_for_serdes
class AssetExecutionType(Enum):
    OBSERVATION = "OBSERVATION"
    UNEXECUTABLE = "UNEXECUTABLE"
    MATERIALIZATION = "MATERIALIZATION"


[docs] @experimental_param(param="owners") @experimental_param(param="tags") class AssetSpec( NamedTuple( "_AssetSpec", [ ("key", PublicAttr[AssetKey]), ("deps", PublicAttr[Iterable["AssetDep"]]), ("description", PublicAttr[Optional[str]]), ("metadata", PublicAttr[Mapping[str, Any]]), ("group_name", PublicAttr[Optional[str]]), ("skippable", PublicAttr[bool]), ("code_version", PublicAttr[Optional[str]]), ("freshness_policy", PublicAttr[Optional[FreshnessPolicy]]), ("automation_condition", PublicAttr[Optional[AutomationCondition]]), ("owners", PublicAttr[Sequence[str]]), ("tags", PublicAttr[Mapping[str, str]]), ("partitions_def", PublicAttr[Optional[PartitionsDefinition]]), ], ), IHasInternalInit, ): """Specifies the core attributes of an asset, except for the function that materializes or observes it. An asset spec plus any materialization or observation function for the asset constitutes an "asset definition". Attributes: key (AssetKey): The unique identifier for this asset. deps (Optional[AbstractSet[AssetKey]]): The asset keys for the upstream assets that materializing this asset depends on. description (Optional[str]): Human-readable description of this asset. metadata (Optional[Dict[str, Any]]): A dict of static metadata for this asset. For example, users can provide information about the database table this asset corresponds to. skippable (bool): Whether this asset can be omitted during materialization, causing downstream dependencies to skip. group_name (Optional[str]): A string name used to organize multiple assets into groups. If not provided, the name "default" is used. code_version (Optional[str]): The version of the code for this specific asset, overriding the code version of the materialization function freshness_policy (Optional[FreshnessPolicy]): (Deprecated) A policy which indicates how up to date this asset is intended to be. auto_materialize_policy (Optional[AutoMaterializePolicy]): AutoMaterializePolicy to apply to the specified asset. backfill_policy (Optional[BackfillPolicy]): BackfillPolicy to apply to the specified asset. owners (Optional[Sequence[str]]): A list of strings representing owners of the asset. Each string can be a user's email address, or a team name prefixed with `team:`, e.g. `team:finops`. tags (Optional[Mapping[str, str]]): Tags for filtering and organizing. These tags are not attached to runs of the asset. partitions_def (Optional[PartitionsDefinition]): Defines the set of partition keys that compose the asset. """ def __new__( cls, key: CoercibleToAssetKey, *, deps: Optional[Iterable["CoercibleToAssetDep"]] = None, description: Optional[str] = None, metadata: Optional[Mapping[str, Any]] = None, skippable: bool = False, group_name: Optional[str] = None, code_version: Optional[str] = None, freshness_policy: Optional[FreshnessPolicy] = None, automation_condition: Optional[AutomationCondition] = None, owners: Optional[Sequence[str]] = None, tags: Optional[Mapping[str, str]] = None, # TODO: FOU-243 auto_materialize_policy: Optional[AutoMaterializePolicy] = None, partitions_def: Optional[PartitionsDefinition] = None, ): from dagster._core.definitions.asset_dep import coerce_to_deps_and_check_duplicates key = AssetKey.from_coercible(key) asset_deps = coerce_to_deps_and_check_duplicates(deps, key) validate_group_name(group_name) owners = check.opt_sequence_param(owners, "owners", of_type=str) for owner in owners: validate_asset_owner(owner, key) kind_tags = {tag_key for tag_key in (tags or {}).keys() if tag_key.startswith(KIND_PREFIX)} if kind_tags is not None and len(kind_tags) > 3: raise DagsterInvalidDefinitionError("Assets can have at most three kinds currently.") return super().__new__( cls, key=key, deps=asset_deps, description=check.opt_str_param(description, "description"), metadata=check.opt_mapping_param(metadata, "metadata", key_type=str), skippable=check.bool_param(skippable, "skippable"), group_name=check.opt_str_param(group_name, "group_name"), code_version=check.opt_str_param(code_version, "code_version"), freshness_policy=check.opt_inst_param( freshness_policy, "freshness_policy", FreshnessPolicy, ), automation_condition=check.opt_inst_param( resolve_automation_condition(automation_condition, auto_materialize_policy), "automation_condition", AutomationCondition, ), owners=owners, tags=validate_tags_strict(tags) or {}, partitions_def=check.opt_inst_param( partitions_def, "partitions_def", PartitionsDefinition ), ) @staticmethod def dagster_internal_init( *, key: CoercibleToAssetKey, deps: Optional[Iterable["CoercibleToAssetDep"]], description: Optional[str], metadata: Optional[Mapping[str, Any]], skippable: bool, group_name: Optional[str], code_version: Optional[str], freshness_policy: Optional[FreshnessPolicy], automation_condition: Optional[AutomationCondition], owners: Optional[Sequence[str]], tags: Optional[Mapping[str, str]], auto_materialize_policy: Optional[AutoMaterializePolicy], partitions_def: Optional[PartitionsDefinition], ) -> "AssetSpec": check.invariant(auto_materialize_policy is None) return AssetSpec( key=key, deps=deps, description=description, metadata=metadata, skippable=skippable, group_name=group_name, code_version=code_version, freshness_policy=freshness_policy, automation_condition=automation_condition, owners=owners, tags=tags, partitions_def=partitions_def, ) @cached_property def partition_mappings(self) -> Mapping[AssetKey, PartitionMapping]: return { dep.asset_key: dep.partition_mapping for dep in self.deps if dep.partition_mapping is not None } @property def auto_materialize_policy(self) -> Optional[AutoMaterializePolicy]: # TODO: FOU-243 return ( self.automation_condition.as_auto_materialize_policy() if self.automation_condition else None ) @cached_property def kinds(self) -> Set[str]: return {tag[len(KIND_PREFIX) :] for tag in self.tags if tag.startswith(KIND_PREFIX)}