Skip to content

prefect_fugue.tasks

fsql

Function for running Fugue SQL.

This function generates the Prefect task that runs Fugue SQL.

Parameters:

Name Type Description Default
query str

the Fugue SQL query

required
yields Any

the yielded dataframes from the previous tasks, defaults to None. It can be a single yielded result or an array of yielded results (see example)

None
engine Any

execution engine expression that can be recognized by Fugue, default to None (the default ExecutionEngine of Fugue)

None
engine_conf Any

extra execution engine configs, defaults to None

None
checkpoint bool

whether to checkpoint this task in Prefect, defaults to None (determined by the fugue_engine context).

None
**kwargs Any

additional kwargs to pass to Fugue's fsql function

{}
References
Example
from prefect import flow, task
from prefect.tasks.fugue import fsql, fugue_engine
import pandas as pd

# Basic usage
@flow
def flow1()
    res1 = fsql("CREATE [[0]] SCHEMA a:int YIELD DATAFRAME AS x")
    res2 = fsql("CREATE [[1],[2]] SCHEMA a:int YIELD DATAFRAME AS y")
    fsql('''
    SELECT * FROM x UNION SELECT * FROM y
    SELECT * WHERE a<2
    PRINT
    ''', [res1, res2]) # SQL union using pandas
    fsql('''
    SELECT * FROM x UNION SELECT * FROM y
    SELECT * WHERE a<2
    PRINT
    ''', [res1, res2], engine="duckdb") # SQL union using duckdb (if installed)

# Pass in other parameters and dataframes
@task
def gen_df():
    return pd.DataFrame(dict(a=[1]))

@task
def gen_path():
    return "/tmp/t.parquet"

@flow
def flow2()
    df = gen_df()
    path = gen_path()
    fsql('''
    SELECT a+1 AS a FROM df
    SAVE OVERWRITE {{path}}
    ''', df=df, path=path)

# Disable checkpoint for distributed dataframes
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

@flow
def flow3()
    with fugue_engine(spark, checkpoint=False):
        # res1 needs to turn off checkpoint because it yields
        # a Spark DataFrame
        res1 = fsql('''
            CREATE [[1],[2]] SCHEMA a:int YIELD DATAFRAME AS y
        ''')

        # res2 doesn't need to turn off checkpoint because it yields
        # a local DataFrame (most likely Pandas DataFrame)
        res2 = fsql('''
            CREATE [[1],[2]] SCHEMA a:int YIELD LOCAL DATAFRAME AS y
        ''', checkpoint=True)

        # res3 doesn't need to turn off checkpoint because it yields
        # a file (the dataframe is cached in the file)
        res3 = fsql('''
            CREATE [[-1],[3]] SCHEMA a:int YIELD FILE AS z
        ''', checkpoint=True)

        # this step doesn't need to turn off checkpoint because it
        # doesn't have any output
        fsql('''
        SELECT * FROM x UNION SELECT * FROM y UNION SELECT * FROM z
        SELECT * WHERE a<2
        PRINT
        ''', [res1, res2, res3])

you want to yield a distributed dataframe such as Spark or Dask, think it twice. YIELD FILE is always preferred when Fugue SQL is running as a Prefect task. If you feel YIELD FILE is too heavy, that means your SQL logic may not be heavy enough to be broken into multiple tasks.

Source code in prefect_fugue/tasks.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def fsql(
    query: str,
    yields: Any = None,
    engine: Any = None,
    engine_conf: Any = None,
    checkpoint: Optional[bool] = None,
    fsql_ignore_case: bool = False,
    **kwargs: Any
) -> dict:
    """
    Function for running Fugue SQL.

    This function generates the Prefect task that runs Fugue SQL.

    Args:
        query (str): the Fugue SQL query
        yields (Any): the yielded dataframes from the previous tasks,
            defaults to None. It can be a single yielded result or an array of
            yielded results (see example)
        engine (Any): execution engine expression that can be recognized by Fugue,
            default to None (the default ExecutionEngine of Fugue)
        engine_conf (Any): extra execution engine configs, defaults to None
        checkpoint (bool, optional): whether to checkpoint this task in Prefect,
            defaults to None (determined by the ``fugue_engine`` context).
        **kwargs (Any, optional): additional kwargs to pass to Fugue's `fsql` function

    References:
        See: [Fugue SQL
            Tutorial](https://fugue-tutorials.readthedocs.io/tutorials/quick_look/ten_minutes_sql.html)


    Example:
        ```python
        from prefect import flow, task
        from prefect.tasks.fugue import fsql, fugue_engine
        import pandas as pd

        # Basic usage
        @flow
        def flow1()
            res1 = fsql("CREATE [[0]] SCHEMA a:int YIELD DATAFRAME AS x")
            res2 = fsql("CREATE [[1],[2]] SCHEMA a:int YIELD DATAFRAME AS y")
            fsql('''
            SELECT * FROM x UNION SELECT * FROM y
            SELECT * WHERE a<2
            PRINT
            ''', [res1, res2]) # SQL union using pandas
            fsql('''
            SELECT * FROM x UNION SELECT * FROM y
            SELECT * WHERE a<2
            PRINT
            ''', [res1, res2], engine="duckdb") # SQL union using duckdb (if installed)

        # Pass in other parameters and dataframes
        @task
        def gen_df():
            return pd.DataFrame(dict(a=[1]))

        @task
        def gen_path():
            return "/tmp/t.parquet"

        @flow
        def flow2()
            df = gen_df()
            path = gen_path()
            fsql('''
            SELECT a+1 AS a FROM df
            SAVE OVERWRITE {{path}}
            ''', df=df, path=path)

        # Disable checkpoint for distributed dataframes
        from pyspark.sql import SparkSession

        spark = SparkSession.builder.getOrCreate()

        @flow
        def flow3()
            with fugue_engine(spark, checkpoint=False):
                # res1 needs to turn off checkpoint because it yields
                # a Spark DataFrame
                res1 = fsql('''
                    CREATE [[1],[2]] SCHEMA a:int YIELD DATAFRAME AS y
                ''')

                # res2 doesn't need to turn off checkpoint because it yields
                # a local DataFrame (most likely Pandas DataFrame)
                res2 = fsql('''
                    CREATE [[1],[2]] SCHEMA a:int YIELD LOCAL DATAFRAME AS y
                ''', checkpoint=True)

                # res3 doesn't need to turn off checkpoint because it yields
                # a file (the dataframe is cached in the file)
                res3 = fsql('''
                    CREATE [[-1],[3]] SCHEMA a:int YIELD FILE AS z
                ''', checkpoint=True)

                # this step doesn't need to turn off checkpoint because it
                # doesn't have any output
                fsql('''
                SELECT * FROM x UNION SELECT * FROM y UNION SELECT * FROM z
                SELECT * WHERE a<2
                PRINT
                ''', [res1, res2, res3])
        ```

    Note: The best practice is always yielding files or local dataframes. If
    you want to yield a distributed dataframe such as Spark or Dask, think it twice.
    `YIELD FILE` is always preferred when Fugue SQL is running as a Prefect task.
    If you feel `YIELD FILE` is too heavy, that means your
    SQL logic may not be heavy enough to be broken into multiple tasks.
    """
    tn = _truncate_name(query)
    if engine is None and engine_conf is None:
        engine = current_fugue_engine()
    elif checkpoint is None:
        checkpoint = False

    global_vars, local_vars = get_caller_global_local_vars()

    @task(
        name=tn + suffix(),
        description=query,
        cache_key_fn=_get_cache_key_fn(checkpoint),
    )
    def run_fsql(
        query: str,
        yields: Any,
        engine: Any = None,
        engine_conf: Any = None,
    ) -> dict:
        logger = get_run_logger()
        logger.debug(query)
        dag = fugue_sql.FugueSQLWorkflow(
            None, {FUGUE_CONF_SQL_IGNORE_CASE: fsql_ignore_case}
        )
        try:
            dag._sql(
                query, global_vars, local_vars, *_normalize_yields(yields), **kwargs
            )
        except SyntaxError as ex:
            raise SyntaxError(str(ex)).with_traceback(None) from None
        dag.run(engine, engine_conf)
        result: Dict[str, Any] = {}
        for k, v in dag.yields.items():
            if isinstance(v, fugue.dataframe.YieldedDataFrame):
                result[k] = v.result  # type: ignore
            else:
                result[k] = v  # type: ignore
        return result

    return run_fsql(query=query, yields=yields, engine=engine, engine_conf=engine_conf)

transform

Function for running Fugue transform function.

This function generates the Prefect task that runs Fugue transform.

Parameters:

Name Type Description Default
df Any

a dataframe or a file path generated from the previous steps

required
transformer Any

a function or class that be recognized by Fugue as a transformer

required
engine Any

execution engine expression that can be recognized by Fugue, default to None (the default ExecutionEngine of Fugue)

None
engine_conf Any

extra execution engine configs, defaults to None

None
checkpoint bool

whether to checkpoint this task in Prefect, defaults to None (determined by the fugue_engine context).

None
**kwargs Any

additional kwargs to pass to Fugue's transform function

{}
References
Example
from prefect import flow, task
from prefect.tasks.fugue import transform, fsql, fugue_engine
from dask.distributed import Client
import pandas as pd

client = Client(processes=True)

# Basic usage
@task
def gen_df() -> pd.DataFrame:
    return pd.DataFrame(dict(a=[1]))

@task
def show_df(dask_df):
    print(dask_df.compute())

def add_col(df:pd.DataFrame) -> pd.DataFrame
    return df.assign(b=2)

@flow
def flow1():
    df = gen_df()
    dask_df = transform(df, add_col, schema="*,b:int", engine=client)
    show_df(dask_df)

# Turning on checkpoint when returning a local dataframe
@flow
def flow2():
    df = gen_df()
    local_df = transform(df, add_col, schema="*,b:int",
        engine=client, as_local=True, checkpoint=True)

# fsql + transform
@flow
def flow3():
    with fugue_engine(client, checkpoint=False):
        dfs = fsql("CREATE [[0]] SCHEMA a:int YIELD DATAFRAME AS x")
        dask_df = transform(dfs["x"], add_col, schema="*,b:int")
        fsql('''
            SELECT * FROM df WHERE b<3
            PRINT
        ''', df=dask_df)
Source code in prefect_fugue/tasks.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
def transform(
    df: Any,
    transformer: Any,
    engine: Any = None,
    engine_conf: Any = None,
    checkpoint: Optional[bool] = None,
    **kwargs
) -> Any:
    """
    Function for running Fugue transform function.

    This function generates the Prefect task that runs Fugue transform.

    Args:
        df (Any): a dataframe or a file path generated from the previous steps
        transformer (Any): a function or class that be recognized by
            Fugue as a transformer
        engine (Any): execution engine expression that can be recognized by Fugue,
            default to None (the default ExecutionEngine of Fugue)
        engine_conf (Any): extra execution engine configs, defaults to None
        checkpoint (bool, optional): whether to checkpoint this task in Prefect,
            defaults to None (determined by the ``fugue_engine`` context).
        **kwargs (Any, optional): additional kwargs to pass to
            Fugue's `transform` function

    References:
        See: [Fugue
            Transform](https://fugue-tutorials.readthedocs.io/tutorials/extensions/transformer.html)

    Example:
        ```python
        from prefect import flow, task
        from prefect.tasks.fugue import transform, fsql, fugue_engine
        from dask.distributed import Client
        import pandas as pd

        client = Client(processes=True)

        # Basic usage
        @task
        def gen_df() -> pd.DataFrame:
            return pd.DataFrame(dict(a=[1]))

        @task
        def show_df(dask_df):
            print(dask_df.compute())

        def add_col(df:pd.DataFrame) -> pd.DataFrame
            return df.assign(b=2)

        @flow
        def flow1():
            df = gen_df()
            dask_df = transform(df, add_col, schema="*,b:int", engine=client)
            show_df(dask_df)

        # Turning on checkpoint when returning a local dataframe
        @flow
        def flow2():
            df = gen_df()
            local_df = transform(df, add_col, schema="*,b:int",
                engine=client, as_local=True, checkpoint=True)

        # fsql + transform
        @flow
        def flow3():
            with fugue_engine(client, checkpoint=False):
                dfs = fsql("CREATE [[0]] SCHEMA a:int YIELD DATAFRAME AS x")
                dask_df = transform(dfs["x"], add_col, schema="*,b:int")
                fsql('''
                    SELECT * FROM df WHERE b<3
                    PRINT
                ''', df=dask_df)
        ```
    """
    tn = transformer.__name__ + " (transfomer)"
    if engine is None and engine_conf is None:
        engine = current_fugue_engine()
    elif checkpoint is None:
        checkpoint = False

    _t = _to_transformer(transformer, kwargs.get("schema", None))

    @task(name=tn + suffix(), cache_key_fn=_get_cache_key_fn(checkpoint))
    def _run_with_func(df: Any, **kwargs):
        kw = dict(kwargs)
        kw.pop("schema", None)
        return fugue.transform(df, _t, **kw)

    return _run_with_func(df, engine=engine, engine_conf=engine_conf, **kwargs)