Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
address review
Signed-off-by: Kevin Zhang <[email protected]>
  • Loading branch information
kevjumba committed Jul 29, 2022
commit 44164f8921dc453045a417504bff4a35f5778989
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
import pytz
import requests

from feast import FeatureService, ValueType
from feast.embedded_go.online_features_service import EmbeddedOnlineFeatureServer
from feast.feast_object import FeastObject
from feast.feature_logging import LoggingConfig
from feast.feature_service import FeatureService
from feast.infra.feature_servers.base_config import FeatureLoggingConfig
from feast.protos.feast.serving.ServingService_pb2 import (
FieldStatus,
Expand All @@ -22,6 +22,7 @@
from feast.protos.feast.serving.ServingService_pb2_grpc import ServingServiceStub
from feast.protos.feast.types.Value_pb2 import RepeatedValue
from feast.type_map import python_values_to_proto_values
from feast.value_type import ValueType
from feast.wait import wait_retry_backoff
from tests.integration.feature_repos.repo_configuration import (
construct_universal_feature_views,
Expand Down
44 changes: 44 additions & 0 deletions sdk/python/tests/integration/e2e/test_universal_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

import pytest

from feast import BigQuerySource, Entity, FeatureView, Field
from feast.types import Float32, String
from tests.integration.feature_repos.universal.entities import driver
from tests.integration.feature_repos.universal.feature_views import driver_feature_view
from tests.utils.cli_utils import CliRunner, get_example_repo
from tests.utils.e2e_test_utils import validate_offline_online_store_consistency
from tests.utils.online_read_write_test_utils import basic_rw_test


@pytest.mark.integration
Expand All @@ -27,3 +31,43 @@ def test_e2e_consistency(environment, e2e_data_sources, infer_features):
split_dt = df["ts_1"][4].to_pydatetime() - timedelta(seconds=1)

validate_offline_online_store_consistency(fs, fv, split_dt)


@pytest.mark.integration
def test_partial() -> None:
"""
Add another table to existing repo using partial apply API. Make sure both the table
applied via CLI apply and the new table are passing RW test.
"""

runner = CliRunner()
with runner.local_repo(
get_example_repo("example_feature_repo_1.py"), "bigquery"
) as store:
driver = Entity(name="driver", join_keys=["test"])

driver_locations_source = BigQuerySource(
table="feast-oss.public.drivers",
timestamp_field="event_timestamp",
created_timestamp_column="created_timestamp",
)

driver_locations_100 = FeatureView(
name="driver_locations_100",
entities=[driver],
ttl=timedelta(days=1),
schema=[
Field(name="lat", dtype=Float32),
Field(name="lon", dtype=String),
Field(name="name", dtype=String),
Field(name="test", dtype=String),
],
online=True,
batch_source=driver_locations_source,
tags={},
)

store.apply([driver_locations_100])

basic_rw_test(store, view_name="driver_locations")
basic_rw_test(store, view_name="driver_locations_100")
259 changes: 1 addition & 258 deletions sdk/python/tests/integration/registration/test_feature_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
# limitations under the License.
import os
import time
from datetime import datetime, timedelta
from tempfile import mkstemp
from datetime import timedelta

import pytest
from pytest_lazyfixture import lazy_fixture
Expand All @@ -27,7 +26,6 @@
from feast.field import Field
from feast.infra.offline_stores.file import FileOfflineStoreConfig
from feast.infra.online_stores.dynamodb import DynamoDBOnlineStoreConfig
from feast.infra.online_stores.sqlite import SqliteOnlineStoreConfig
from feast.repo_config import RepoConfig
from feast.types import Array, Bytes, Float64, Int64, String
from tests.utils.data_source_utils import (
Expand All @@ -37,35 +35,6 @@
)


@pytest.mark.integration
@pytest.mark.parametrize(
"test_feature_store",
[lazy_fixture("feature_store_with_local_registry")],
)
def test_apply_entity_success(test_feature_store):
entity = Entity(
name="driver_car_id",
description="Car driver id",
tags={"team": "matchmaking"},
)

# Register Entity
test_feature_store.apply(entity)

entities = test_feature_store.list_entities()

entity = entities[0]
assert (
len(entities) == 1
and entity.name == "driver_car_id"
and entity.description == "Car driver id"
and "team" in entity.tags
and entity.tags["team"] == "matchmaking"
)

test_feature_store.teardown()


@pytest.mark.integration
@pytest.mark.parametrize(
"test_feature_store",
Expand Down Expand Up @@ -106,61 +75,6 @@ def test_apply_entity_integration(test_feature_store):
test_feature_store.teardown()


@pytest.mark.integration
@pytest.mark.parametrize(
"test_feature_store",
[lazy_fixture("feature_store_with_local_registry")],
)
def test_apply_feature_view_success(test_feature_store):
# Create Feature Views
batch_source = FileSource(
file_format=ParquetFormat(),
path="file://feast/*",
timestamp_field="ts_col",
created_timestamp_column="timestamp",
date_partition_column="date_partition_col",
)

entity = Entity(name="fs1_my_entity_1", join_keys=["entity_id"])

fv1 = FeatureView(
name="my_feature_view_1",
schema=[
Field(name="fs1_my_feature_1", dtype=Int64),
Field(name="fs1_my_feature_2", dtype=String),
Field(name="fs1_my_feature_3", dtype=Array(String)),
Field(name="fs1_my_feature_4", dtype=Array(Bytes)),
Field(name="entity_id", dtype=Int64),
],
entities=[entity],
tags={"team": "matchmaking"},
batch_source=batch_source,
ttl=timedelta(minutes=5),
)

# Register Feature View
test_feature_store.apply([entity, fv1])

feature_views = test_feature_store.list_feature_views()

# List Feature Views
assert (
len(feature_views) == 1
and feature_views[0].name == "my_feature_view_1"
and feature_views[0].features[0].name == "fs1_my_feature_1"
and feature_views[0].features[0].dtype == Int64
and feature_views[0].features[1].name == "fs1_my_feature_2"
and feature_views[0].features[1].dtype == String
and feature_views[0].features[2].name == "fs1_my_feature_3"
and feature_views[0].features[2].dtype == Array(String)
and feature_views[0].features[3].name == "fs1_my_feature_4"
and feature_views[0].features[3].dtype == Array(Bytes)
and feature_views[0].entities[0] == "fs1_my_entity_1"
)

test_feature_store.teardown()


@pytest.mark.integration
@pytest.mark.parametrize(
"test_feature_store",
Expand Down Expand Up @@ -304,177 +218,6 @@ def test_apply_feature_view_integration(test_feature_store):
test_feature_store.teardown()


@pytest.mark.integration
@pytest.mark.parametrize(
"test_feature_store",
[lazy_fixture("feature_store_with_local_registry")],
)
def test_apply_object_and_read(test_feature_store):
assert isinstance(test_feature_store, FeatureStore)
# Create Feature Views
batch_source = FileSource(
file_format=ParquetFormat(),
path="file://feast/*",
timestamp_field="ts_col",
created_timestamp_column="timestamp",
)

e1 = Entity(name="fs1_my_entity_1", description="something")

e2 = Entity(name="fs1_my_entity_2", description="something")

fv1 = FeatureView(
name="my_feature_view_1",
schema=[
Field(name="fs1_my_feature_1", dtype=Int64),
Field(name="fs1_my_feature_2", dtype=String),
Field(name="fs1_my_feature_3", dtype=Array(String)),
Field(name="fs1_my_feature_4", dtype=Array(Bytes)),
Field(name="fs1_my_entity_1", dtype=Int64),
],
entities=[e1],
tags={"team": "matchmaking"},
batch_source=batch_source,
ttl=timedelta(minutes=5),
)

fv2 = FeatureView(
name="my_feature_view_2",
schema=[
Field(name="fs1_my_feature_1", dtype=Int64),
Field(name="fs1_my_feature_2", dtype=String),
Field(name="fs1_my_feature_3", dtype=Array(String)),
Field(name="fs1_my_feature_4", dtype=Array(Bytes)),
Field(name="fs1_my_entity_2", dtype=Int64),
],
entities=[e2],
tags={"team": "matchmaking"},
batch_source=batch_source,
ttl=timedelta(minutes=5),
)

# Register Feature View
test_feature_store.apply([fv1, e1, fv2, e2])

fv1_actual = test_feature_store.get_feature_view("my_feature_view_1")
e1_actual = test_feature_store.get_entity("fs1_my_entity_1")

assert e1 == e1_actual
assert fv2 != fv1_actual
assert e2 != e1_actual

test_feature_store.teardown()


@pytest.mark.integration
@pytest.mark.parametrize(
"test_feature_store",
[lazy_fixture("feature_store_with_local_registry")],
)
@pytest.mark.parametrize("dataframe_source", [lazy_fixture("simple_dataset_1")])
def test_reapply_feature_view_success(test_feature_store, dataframe_source):
with prep_file_source(df=dataframe_source, timestamp_field="ts_1") as file_source:

e = Entity(name="id", join_keys=["id_join_key"])

# Create Feature View
fv1 = FeatureView(
name="my_feature_view_1",
schema=[Field(name="string_col", dtype=String)],
entities=[e],
batch_source=file_source,
ttl=timedelta(minutes=5),
)

# Register Feature View
test_feature_store.apply([fv1, e])

# Check Feature View
fv_stored = test_feature_store.get_feature_view(fv1.name)
assert len(fv_stored.materialization_intervals) == 0

# Run materialization
test_feature_store.materialize(datetime(2020, 1, 1), datetime(2021, 1, 1))

# Check Feature View
fv_stored = test_feature_store.get_feature_view(fv1.name)
assert len(fv_stored.materialization_intervals) == 1

# Apply again
test_feature_store.apply([fv1])

# Check Feature View
fv_stored = test_feature_store.get_feature_view(fv1.name)
assert len(fv_stored.materialization_intervals) == 1

# Change and apply Feature View
fv1 = FeatureView(
name="my_feature_view_1",
schema=[Field(name="int64_col", dtype=Int64)],
entities=[e],
batch_source=file_source,
ttl=timedelta(minutes=5),
)
test_feature_store.apply([fv1])

# Check Feature View
fv_stored = test_feature_store.get_feature_view(fv1.name)
assert len(fv_stored.materialization_intervals) == 0

test_feature_store.teardown()


@pytest.mark.integration
def test_apply_conflicting_featureview_names(feature_store_with_local_registry):
"""Test applying feature views with non-case-insensitively unique names"""
driver = Entity(name="driver", join_keys=["driver_id"])
customer = Entity(name="customer", join_keys=["customer_id"])

driver_stats = FeatureView(
name="driver_hourly_stats",
entities=[driver],
ttl=timedelta(seconds=10),
online=False,
batch_source=FileSource(path="driver_stats.parquet"),
tags={},
)

customer_stats = FeatureView(
name="DRIVER_HOURLY_STATS",
entities=[customer],
ttl=timedelta(seconds=10),
online=False,
batch_source=FileSource(path="customer_stats.parquet"),
tags={},
)
try:
feature_store_with_local_registry.apply([driver_stats, customer_stats])
error = None
except ValueError as e:
error = e
assert (
isinstance(error, ValueError)
and "Please ensure that all feature view names are case-insensitively unique"
in error.args[0]
)

feature_store_with_local_registry.teardown()


@pytest.fixture
def feature_store_with_local_registry():
fd, registry_path = mkstemp()
fd, online_store_path = mkstemp()
return FeatureStore(
config=RepoConfig(
registry=registry_path,
project="default",
provider="local",
online_store=SqliteOnlineStoreConfig(path=online_store_path),
)
)


@pytest.fixture
def feature_store_with_gcs_registry():
from google.cloud import storage
Expand Down
Loading