Snowflake Tasks: Automate and Schedule Your Data Pipelines
Snowflake tasks are designed to automate and schedule business processes, ranging from simple to complex functions within your data pipeline. You can create single tasks for specific operations or use task graphs to orchestrate multiple tasks, forming complex data pipelines.
Tasks in Snowflake can execute various functions such as single SQL statements, calls to stored procedures, or procedural logic using Snowflake Scripting. They are particularly useful for generating periodic reports or performing routine database maintenance tasks.
Key Features of Snowflake Tasks:
- Automation and Scheduling: Schedule tasks to run SQL queries, stored procedures, or scripts automatically at defined intervals.
- Task Graphs: Combine multiple tasks in a sequence or flow to handle intricate data workflows.
- Integration with Table Streams: Use tasks with table streams to process data changes continuously, ideal for ELT workflows.
- Flexibility: Tasks can run SQL statements, call stored procedures, or execute Snowflake Scripting for procedural logic.
Creating and Managing Tasks
Task Creation Workflow
- Role Setup: Create a task administrator role to manage task operations.
- Task Creation: Use the
CREATE TASK
command to define a new task (see examples below). Tasks are initially suspended when created. - Testing and Adjustment: Test tasks manually and adjust settings as needed to ensure they perform as intended.
- Task Execution: Resume tasks with
ALTER TASK ... RESUME
to start automatic execution based on defined schedules.
Compute Resources
- Serverless Compute Model: Leverage Snowflake-managed compute resources without the need for user-managed virtual warehouses. This option automatically scales compute resources based on the workload.
- Costs are based on the actual compute resource usage. Snowflake adjusts resources dynamically, affecting the costs according to the workload.
- User-Managed Virtual Warehouse: Specify an existing virtual warehouse to manage compute resources for tasks. This choice requires careful selection of warehouse size based on the complexity and needs of the tasks.
- Costs are determined by the size of the virtual warehouse and the duration of task execution. Managing the warehouse size and suspend/resume settings can help control costs.
Viewing Task History
To monitor and analyze the performance of tasks, Snowflake provides the TASK_HISTORY
function. This function enables querying the execution history of tasks within a specified date range, offering insights into task performance and issues.
_10TASK_HISTORY(_10 [ SCHEDULED_TIME_RANGE_START => <constant_expr> ],_10 [ SCHEDULED_TIME_RANGE_END => <constant_expr> ],_10 [ RESULT_LIMIT => <integer> ],_10 [ TASK_NAME => '<string>' ],_10 [ ERROR_ONLY => { TRUE | FALSE } ],_10 [ ROOT_TASK_ID => '<string>' ]_10)
Usage:
- SCHEDULED_TIME_RANGE_START and SCHEDULED_TIME_RANGE_END: Specify the time period for which you want to view task executions.
- RESULT_LIMIT: Limits the number of results returned.
- TASK_NAME: Filters the history to only include executions of a specific task.
- ERROR_ONLY: When set to
TRUE
, only failed or cancelled tasks are returned. - ROOT_TASK_ID: Provides history for all tasks within a specific task graph.
Example query:
_10SELECT *_10FROM TABLE(INFORMATION_SCHEMA.TASK_HISTORY(_10 TASK_NAME => 'monthly_data_aggregation',_10 SCHEDULED_TIME_RANGE_START => DATEADD('day', -7, CURRENT_TIMESTAMP()),_10 SCHEDULED_TIME_RANGE_END => CURRENT_TIMESTAMP()_10));
Practical Use Cases for Snowflake Tasks
- Data Maintenance: Regularly purge old data, summarize data, or validate data integrity.
- Report Generation: Automatically generate and store reports at regular intervals.
- Data Integration: Use tasks to integrate data from various sources, transforming and loading data as needed.
Limitations
Schema evolution is not supported by tasks; once a task is set up to operate on a schema, changes to the schema’s structure might require task reconfiguration.
Examples
Using a User-Managed Virtual Warehouse
By specifying the WAREHOUSE
parameter, you directly assign a user-managed virtual warehouse to the task. This warehouse will provide the necessary compute resources whenever the task runs.
Snowflake SQL Query
_10CREATE OR REPLACE TASK my_task_10 WAREHOUSE = 'my_virtual_warehouse' -- Specify your virtual warehouse here_10 SCHEDULE = 'USING CRON 0 9-17 * * SUN America/Los_Angeles'_10 AS_10 CALL my_unload_sp();;
When to Use
This configuration is ideal if you need predictable performance and have specific control over the compute resources, such as scaling the warehouse up or down based on expected workloads.
Using Serverless Compute Resources
If you omit the WAREHOUSE
parameter, the task will default to using serverless compute resources. This means Snowflake manages the scaling and allocation of compute power automatically based on the task's needs.
This option simplifies resource management and is beneficial for varying workloads that do not predictably consume resources.
To influence the initial size of the compute resources for the first few runs, you can set the USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE
parameter. This helps Snowflake determine the starting point before it has enough run history to make automated adjustments.
Snowflake SQL Query
_10CREATE OR REPLACE TASK my_serverless_task_10 USER_TASK_MANAGED_INITIAL_WAREHOUSE_SIZE = 'MEDIUM' -- Optional, sets initial resource size_10 SCHEDULE = 'USING CRON 0 9-17 * * SUN America/Los_Angeles'_10 AS_10 INSERT INTO t(a, b, c) _10 SELECT a, b, c FROM stg_t;
When to Use
This approach is suitable when you prefer Snowflake to manage scaling, which can be particularly advantageous for tasks with unpredictable execution times or varying intensities of workload.
Integrating Tasks with Streams for Real-Time Data Processing
Tasks can be integrated with streams to process data changes in real-time. This setup is ideal for continuous ELT workflows, where tasks are triggered by new data in the stream.
Snowflake SQL Query
_10CREATE OR REPLACE STREAM my_stream ON TABLE my_table;_10_10CREATE OR REPLACE TASK my_stream_task_10 WAREHOUSE = 'my_warehouse'_10 WHEN SYSTEM$STREAM_HAS_DATA('my_stream') -- Task triggers when new data is detected in the stream_10 AS_10 INSERT INTO my_downstream_table_10 SELECT * FROM my_stream;
When to Use
This approach is suitable when you prefer Snowflake to manage scaling, which can be particularly advantageous for tasks with unpredictable execution times or varying intensities of workload.