Skip to content

Commit 87e8826

Browse files
author
hkuepers
committed
Update lambda materialization engine
Signed-off-by: hkuepers <[email protected]>
1 parent ef3dc05 commit 87e8826

File tree

2 files changed

+70
-58
lines changed

2 files changed

+70
-58
lines changed
Lines changed: 65 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1,84 +1,93 @@
1+
"""Based on from feast.infra.materialization.aws_lambda."""
2+
13
import base64
2-
import json
3-
import sys
4+
import logging
45
import tempfile
5-
import traceback
66
from pathlib import Path
77

88
import pyarrow.parquet as pq
9-
109
from feast import FeatureStore
1110
from feast.constants import FEATURE_STORE_YAML_ENV_NAME
12-
from feast.infra.materialization.local_engine import DEFAULT_BATCH_SIZE
11+
from feast.infra.compute_engines.aws_lambda.lambda_engine import DEFAULT_BATCH_SIZE
1312
from feast.utils import _convert_arrow_to_proto, _run_pyarrow_field_mapping
1413

14+
logger = logging.getLogger()
15+
logger.setLevel("INFO")
1516

1617
def handler(event, context):
17-
"""Provide an event that contains the following keys:
18-
19-
- operation: one of the operations in the operations dict below
20-
- tableName: required for operations that interact with DynamoDB
21-
- payload: a parameter to pass to the operation being performed
18+
"""Load a parquet file and write the feature values to the online store.
19+
20+
Args:
21+
event (dict): payload containing the following keys:
22+
FEATURE_STORE_YAML_ENV_NAME: Base64 encoded feature store config
23+
view_name: Name of FeatureView to be materialized
24+
view_type: Type of FeatureView
25+
path: Path to parquet batch file on S3 bucket
26+
context (dict): Lambda runtime context, not used.
2227
"""
23-
print("Received event: " + json.dumps(event, indent=2), flush=True)
28+
logger.info(f"Received event: {event}")
2429

2530
try:
2631
config_base64 = event[FEATURE_STORE_YAML_ENV_NAME]
2732

2833
config_bytes = base64.b64decode(config_base64)
2934

3035
# Create a new unique directory for writing feature_store.yaml
31-
repo_path = Path(tempfile.mkdtemp())
32-
33-
with open(repo_path / "feature_store.yaml", "wb") as f:
34-
f.write(config_bytes)
36+
with tempfile.TemporaryDirectory() as repo_posix_path:
37+
repo_path = Path(repo_posix_path)
3538

36-
# Initialize the feature store
37-
store = FeatureStore(repo_path=str(repo_path.resolve()))
39+
with open(repo_path / "feature_store.yaml", "wb") as f:
40+
f.write(config_bytes)
3841

39-
view_name = event["view_name"]
40-
view_type = event["view_type"]
41-
path = event["path"]
42+
# Initialize the feature store
43+
store = FeatureStore(repo_path=str(repo_path.resolve()))
4244

43-
bucket = path[len("s3://") :].split("/", 1)[0]
44-
key = path[len("s3://") :].split("/", 1)[1]
45-
print(f"Inferred Bucket: `{bucket}` Key: `{key}`", flush=True)
45+
view_name = event["view_name"]
46+
view_type = event["view_type"]
47+
path = event["path"]
4648

47-
if view_type == "batch":
48-
# TODO: This probably needs to be become `store.get_batch_feature_view` at some point.
49-
feature_view = store.get_feature_view(view_name)
50-
else:
51-
feature_view = store.get_stream_feature_view(view_name)
49+
bucket, key = path[len("s3://") :].split("/", 1)
50+
logger.info(f"Inferred Bucket: `{bucket}` Key: `{key}`")
5251

53-
print(f"Got Feature View: `{feature_view}`", flush=True)
52+
if view_type == "batch":
53+
# TODO: This probably needs to be become `store.get_batch_feature_view` at some point. # noqa: E501,W505
54+
feature_view = store.get_feature_view(view_name)
55+
else:
56+
feature_view = store.get_stream_feature_view(view_name)
5457

55-
table = pq.read_table(path)
56-
if feature_view.batch_source.field_mapping is not None:
57-
table = _run_pyarrow_field_mapping(
58-
table, feature_view.batch_source.field_mapping
58+
logger.info(
59+
f"Got Feature View: `{feature_view.name}`, \
60+
last updated: {feature_view.last_updated_timestamp}"
5961
)
6062

61-
join_key_to_value_type = {
62-
entity.name: entity.dtype.to_value_type()
63-
for entity in feature_view.entity_columns
64-
}
65-
66-
written_rows = 0
67-
68-
for batch in table.to_batches(DEFAULT_BATCH_SIZE):
69-
rows_to_write = _convert_arrow_to_proto(
70-
batch, feature_view, join_key_to_value_type
71-
)
72-
store._provider.online_write_batch(
73-
store.config,
74-
feature_view,
75-
rows_to_write,
76-
lambda x: None,
63+
table = pq.read_table(path)
64+
if feature_view.batch_source.field_mapping is not None:
65+
table = _run_pyarrow_field_mapping(
66+
table, feature_view.batch_source.field_mapping
67+
)
68+
69+
join_key_to_value_type = {
70+
entity.name: entity.dtype.to_value_type()
71+
for entity in feature_view.entity_columns
72+
}
73+
74+
written_rows = 0
75+
76+
for batch in table.to_batches(DEFAULT_BATCH_SIZE):
77+
rows_to_write = _convert_arrow_to_proto(
78+
batch, feature_view, join_key_to_value_type
79+
)
80+
store._provider.online_write_batch(
81+
store.config,
82+
feature_view,
83+
rows_to_write,
84+
lambda x: None,
85+
)
86+
written_rows += len(rows_to_write)
87+
logger.info(
88+
f"Successfully updated {written_rows} rows.",
89+
extra={"num_updated_rows": written_rows, "feature_view": view_name},
7790
)
78-
written_rows += len(rows_to_write)
79-
return {"written_rows": written_rows}
80-
except Exception as e:
81-
print(f"Exception: {e}", flush=True)
82-
print("Traceback:", flush=True)
83-
print(traceback.format_exc(), flush=True)
84-
sys.exit(1)
91+
except Exception:
92+
logger.exception("Error in processing materialization.")
93+
raise

sdk/python/feast/infra/compute_engines/aws_lambda/lambda_engine.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -108,9 +108,12 @@ def update(
108108
r = self.lambda_client.create_function(
109109
FunctionName=self.lambda_name,
110110
PackageType="Image",
111-
Role=self.repo_config.batch_engine.lambda_role,
112-
Code={"ImageUri": self.repo_config.batch_engine.materialization_image},
111+
Role=self.repo_config.batch_engine_config.lambda_role,
112+
Code={"ImageUri": self.repo_config.batch_engine_config.materialization_image},
113113
Timeout=DEFAULT_TIMEOUT,
114+
LoggingConfig={
115+
"LogFormat": "JSON",
116+
},
114117
Tags={
115118
"feast-owned": "True",
116119
"project": project,

0 commit comments

Comments
 (0)