Skip to content

prefect_fugue.blocks

FugueEngine

FugueEngine contains Fugue ExecutionEngine name, configs and secret configs. Available names depends on what Fugue plugins have been installed.

Parameters:

Name Type Description Default
engine

Fugue ExecutionEngine name

required
engine_conf

Fugue ExecutionEngine config

required
secret_conf

Fugue ExecutionEngine config that should be encoded. For example the token for accessing Databricks.

required
Note

It is not recommented to directly use this Block. Instead, you should use the full block expression fugue/<block_name> as the engine name in :func:prefect_fugue.context.fugue_engine

Source code in prefect_fugue/blocks.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class FugueEngine(Block):
    """
    FugueEngine contains Fugue ExecutionEngine name, configs and secret
    configs. Available names depends on what Fugue plugins have been
    installed.

    Args:
        engine: Fugue ExecutionEngine name
        engine_conf: Fugue ExecutionEngine config
        secret_conf: Fugue ExecutionEngine config that should be encoded.
            For example the token for accessing Databricks.

    Note:
        It is not recommented to directly use this Block. Instead, you should
        use the full block expression `fugue/<block_name>` as the engine name in
        :func:`prefect_fugue.context.fugue_engine`
    """

    _block_type_name = "Fugue Engine"
    _block_type_slug = "fugue"
    _logo_url = "https://avatars.githubusercontent.com/u/65140352?s=200&v=4"  # noqa
    _description = "Configs that can consturct a Fugue Execution Engine"

    engine: str = Field(..., alias="engine_name")
    conf: Optional[Dict[str, Any]] = Field(
        default=None,
        alias="engine_config",
        description="A JSON-dict-compatible value",
    )
    secret_conf: Optional[Dict[str, SecretStr]] = Field(
        default=None,
        alias="secret_config",
        description="A JSON-dict-compatible value",
    )

    def make_engine(self, custom_conf: Any = None) -> ExecutionEngine:
        conf = ParamDict(self.conf)
        if self.secret_conf is not None:
            for k, v in self.secret_conf.items():
                conf[k] = v.get_secret_value()
        conf.update(ParamDict(custom_conf))
        return make_execution_engine(self.engine, conf)