Skip to content

Data Sources

Tecton Data Sources are used to declare Tecton's connection to your data. Data Sources can be read to create features with a Feature View.

The Data Source abstraction simplifies your feature definitions by:

  • Providing a consistent data source interface that isolates modeling logic from the implementation details of any specific data source.
  • Merge stream and batch data sources to consistently supply online and historical data to Streaming Feature Views.

There are two types of Data Sources:

  1. BatchDataSource: Configures the connection to exactly one batch data source, such as a Hive table, data warehouse table, or a file.
  2. StreamDataSource: Configures the connection to one stream data source (such as a Kafka Topic or a Kinesis Stream). A stream source must be "backed" by a historical event log in a batch source.

Batch Data Sources

A BatchDataSource is required to create features from your data with a BatchFeatureView or BatchWindowAggregateFeatureView. When calculating feature values, Tecton will read data from the data source defined by BatchDataSource.

Defining a BatchDataSource

  1. Create a configuration that is specific to your data source. Tecton currently supports:

    • FileDSConfig: Single-file data source (such as a file on S3)
    • HiveDSConfig: Hive (or Glue) Table
    • RedshiftDSConfig: Redshift Table or Query
    • SnowflakeDSConfig: Snowflake Table or Query

    The complete list of configurations can be found in API Reference.

  2. Define a BatchDataSource that references the configuration defined in the previous step:

    • name: A unique identifier of the DS. For example, "click_event_log".
    • batch_ds_config: The configuration created in the step above.

See the Data Source API reference for detailed descriptions of Data Source attributes.

Example

The following example defines a Data Source based on a Snowflake table.

click_stream_snowflake_ds = SnowflakeDSConfig(
  url="https://[your-cluster].eu-west-1.snowflakecomputing.com/",
  database="YOUR_DB",
  schema="CLICK_STREAM_SCHEMA",
  warehouse="COMPUTE_WH",
  table="CLICK_STREAM",
)

clickstream_snowflake_ds = BatchDataSource(
    name="click_stream_snowflake_ds",
    batch_ds_config=click_stream_snowflake_ds,
)

Stream Data Sources

Tecton makes it easy to provide up-to-date streaming data for your models. Once you've defined a StreamDataSource, you can create features from a StreamFeatureView or StreamWindowAggregateFeatureView.

Stream + Batch Requirement

Tecton backfills and process streaming events from a single feature pipeline definition. In order to enable backfills, Tecton requires StreamDataSource to define both a streaming input, as well as a log of historical events from a batch source.

Tecton will read from the batch source for backfills or when materializing to the offline store, or interactively historical data using the SDK. Materialization to the online store in steady-state is based on the stream source.

Defining a StreamDataSource

  1. Create a configuration that is specific to your streaming data source. Tecton currently supports:

    • KinesisDSConfig: an AWS Kinesis Stream
    • KafkaDSConfig: a Kafka topic

    The complete list of configurations can be found in API Reference.

  2. Create a configuration that for your batch data source. E.g. HiveDSConfig or SnowflakeDSConfig.

  3. Define a StreamDataSource that references the configuration defined in the previous step:

    • name: A unique identifier of the DS. For example, "click_event_log".
    • batch_ds_config: The configuration created in the step above.
    • stream_ds_config: The configuration created in the step above.

Streaming Message Deserialization

A StreamDataSource requires instructions for deserializing each message. This is provided using the raw_stream_translator parameter in StreamDataSource. The stream translator enables you to turn a stream's raw data into a schema that matches the batch data source's schema. Tecton validates that the schemas match.

The translator is a Python function containing PySpark code that deserializes a PySpark DataFrame. Building on the example above, here's a stream translator that unpacks a payload nested in a raw JSON event envelope:

def raw_data_deserialization(df):
    from pyspark.sql.functions import from_json, col

    PAYLOAD_SCHEMA = (
      StructType()
            .add("accountId", StringType(), False)
            .add("description", StringType(), False)
            .add("transaction_id", StringType(), False)
    )

    EVENT_ENVELOPE_SCHEMA = (
      StructType()
            .add("timestamp", TimestampType(), False)
            .add("payload", PAYLOAD_SCHEMA, False)
  )

    value = col("value").cast("string")
    df = df.withColumn("event", from_json(value, EVENT_ENVELOPE_SCHEMA))
    df = df.withColumn("accountId", col("event.payload.accountId"))
    df = df.withColumn("description", col("event.payload.description"))
    df = df.withColumn("transaction_id", col("event.payload.transaction_id"))
    df = df.withColumn("timestamp", col("event.timestamp"))

    return df

click_stream_kafka_ds = KafkaDSConfig(
      default_watermark_delay_threshold="7 days",
      kafka_bootstrap_servers="127.0.0.1:12345",
            topics="click-events-json",
        timestamp_key="click_event_timestamp",
            **raw_stream_translator**=raw_data_deserialization
 )

Example

With our stream translator defined above, this sample below shows code that defines a Data Source backed by a Kafka topic and a Snowflake table.

click_stream_snowflake_ds = SnowflakeDSConfig(
    url="https://[your-cluster].eu-west-1.snowflakecomputing.com/",
    database="YOUR_DB",
    schema="CLICK_STREAM_SCHEMA",
    warehouse="COMPUTE_WH",
    table="CLICK_STREAM",
)

click_stream_kafka_ds = KafkaDSConfig(
    default_watermark_delay_threshold="7 days",
    kafka_bootstrap_servers="127.0.0.1:12345",
    topics="click-events-json",
    timestamp_key="click_event_timestamp",
    raw_stream_translator=raw_data_deserialization
 )

click_stream_ds = StreamDataSource(
    name="click_stream_ds",
    batch_ds_config=click_stream_snowflake_ds,
    stream_ds_config=click_stream_kafka_ds
)