Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions ingestion/src/main/java/feast/ingestion/ImportJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package feast.ingestion;

import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.dataflow.DataflowScopes;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
Expand Down Expand Up @@ -109,15 +107,11 @@ public static void main(String[] args) {

public static PipelineResult mainWithResult(String[] args) {
log.info("Arguments: " + Arrays.toString(args));
ImportJobOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ImportJobOptions.class);
ImportJobOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ImportJobOptions.class);
if (options.getJobName().isEmpty()) {
options.setJobName(generateName());
}
try {
options.setGcpCredential(GoogleCredentials.getApplicationDefault().createScoped(DataflowScopes.all()));
} catch (IOException e) {
log.error("Exception while setting gcp credential manually : ", e.getMessage());
}
log.info("options: " + options.toString());
ImportSpec importSpec = new ImportSpecSupplier(options).get();
Injector injector =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,32 +2,29 @@

import com.google.protobuf.InvalidProtocolBufferException;
import feast.types.FeatureRowProto.FeatureRow;
import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

/**
* Deserializer for Kafka to deserialize Protocol Buffers messages
*
* @param <FeatureRow> Protobuf message type
*/
public class FeatureRowDeserializer implements Deserializer<FeatureRow> {

@Override
public void configure(Map configs, boolean isKey) {
}
@Override
public void configure(Map configs, boolean isKey) {}

@Override
public FeatureRow deserialize(String topic, byte[] data) {
try {
return FeatureRow.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new SerializationException("Error deserializing FeatureRow from Protobuf message", e);
}
@Override
public FeatureRow deserialize(String topic, byte[] data) {
try {
return FeatureRow.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new SerializationException("Error deserializing FeatureRow from Protobuf message", e);
}
}

@Override
public void close() {
}
}
@Override
public void close() {}
}
Original file line number Diff line number Diff line change
@@ -1,33 +1,31 @@
package feast.ingestion.deserializer;

import com.google.protobuf.InvalidProtocolBufferException;
import feast.types.FeatureRowProto.*;
import feast.types.FeatureRowProto.FeatureRowKey;
import java.util.Map;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;

import java.util.Map;

/**
* Deserializer for Kafka to deserialize Protocol Buffers messages
*
* @param <FeatureRowKey> Protobuf message type
*/
public class FeatureRowKeyDeserializer implements Deserializer<FeatureRowKey> {

@Override
public void configure(Map configs, boolean isKey) {
}
@Override
public void configure(Map configs, boolean isKey) {}

@Override
public FeatureRowKey deserialize(String topic, byte[] data) {
try {
return FeatureRowKey.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new SerializationException("Error deserializing FeatureRowKey from Protobuf message", e);
}
@Override
public FeatureRowKey deserialize(String topic, byte[] data) {
try {
return FeatureRowKey.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new SerializationException(
"Error deserializing FeatureRowKey from Protobuf message", e);
}
}

@Override
public void close() {
}
}
@Override
public void close() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,14 @@

import com.google.auto.service.AutoService;
import java.util.Collections;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.metrics.MetricsSink;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
import org.apache.beam.sdk.options.Validation.Required;

public interface ImportJobOptions extends PipelineOptions, FlinkPipelineOptions, GcpOptions {
public interface ImportJobOptions extends PipelineOptions {
@Description("Import spec yaml file path")
@Required(groups = {"importSpec"})
String getImportSpecYamlFile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,84 +17,81 @@

package feast.ingestion.transform;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import static com.google.common.base.Preconditions.checkArgument;

import feast.ingestion.deserializer.FeatureRowDeserializer;
import feast.ingestion.deserializer.FeatureRowKeyDeserializer;
import feast.options.Options;
import feast.options.OptionsParser;
import feast.specs.ImportSpecProto.ImportSpec;
import feast.types.FeatureRowProto.FeatureRow;
import feast.types.FeatureRowProto.FeatureRowKey;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import javax.validation.constraints.NotEmpty;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.io.kafka.KafkaRecord;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static com.google.common.base.Preconditions.checkArgument;

public class FeatureRowKafkaIO {
static final String KAFKA_TYPE = "kafka";

static final String KAFKA_TYPE = "kafka";
/**
* Transform for reading {@link feast.types.FeatureRowProto.FeatureRow FeatureRow} proto messages
* from kafka one or more kafka topics.
*/
public static Read read(ImportSpec importSpec) {
return new Read(importSpec);
}

public static class KafkaReadOptions implements Options {
@NotEmpty public String server;
@NotEmpty public String topics;
}

/**
* Transform for reading {@link feast.types.FeatureRowProto.FeatureRow FeatureRow}
* proto messages from kafka one or more kafka topics.
*
*/
public static Read read(ImportSpec importSpec) {
return new Read(importSpec);
}
public static class Read extends FeatureIO.Read {

public static class Read extends FeatureIO.Read {
private ImportSpec importSpec;

private ImportSpec importSpec;

private Read(ImportSpec importSpec) {
this.importSpec = importSpec;
}

@Override
public PCollection<FeatureRow> expand(PInput input) {

checkArgument(importSpec.getType().equals(KAFKA_TYPE));

String bootstrapServer = importSpec.getOptionsMap().get("server");

Preconditions.checkArgument(
!Strings.isNullOrEmpty(bootstrapServer), "kafka bootstrap server must be set");

String topics = importSpec.getOptionsMap().get("topics");

Preconditions.checkArgument(
!Strings.isNullOrEmpty(topics), "kafka topic(s) must be set");

List<String> topicsList = new ArrayList<>(Arrays.asList(topics.split(",")));

KafkaIO.Read<FeatureRowKey, FeatureRow> kafkaIOReader = KafkaIO.<FeatureRowKey, FeatureRow>read()
.withBootstrapServers(bootstrapServer)
.withTopics(topicsList)
.withKeyDeserializer(FeatureRowKeyDeserializer.class)
.withValueDeserializer(FeatureRowDeserializer.class);

PCollection<KafkaRecord<FeatureRowKey, FeatureRow>> featureRowRecord = input.getPipeline().apply(kafkaIOReader);
private Read(ImportSpec importSpec) {
this.importSpec = importSpec;
}

PCollection<FeatureRow> featureRow = featureRowRecord.apply(
ParDo.of(
new DoFn<KafkaRecord<FeatureRowKey, FeatureRow>, FeatureRow>() {
@ProcessElement
public void processElement(ProcessContext processContext) {
KafkaRecord<FeatureRowKey, FeatureRow> record = processContext.element();
processContext.output(record.getKV().getValue());
}
}));
return featureRow;
}
@Override
public PCollection<FeatureRow> expand(PInput input) {

checkArgument(importSpec.getType().equals(KAFKA_TYPE));

KafkaReadOptions options =
OptionsParser.parse(importSpec.getOptionsMap(), KafkaReadOptions.class);

List<String> topicsList = new ArrayList<>(Arrays.asList(options.topics.split(",")));

KafkaIO.Read<FeatureRowKey, FeatureRow> kafkaIOReader =
KafkaIO.<FeatureRowKey, FeatureRow>read()
.withBootstrapServers(options.server)
.withTopics(topicsList)
.withKeyDeserializer(FeatureRowKeyDeserializer.class)
.withValueDeserializer(FeatureRowDeserializer.class);

PCollection<KafkaRecord<FeatureRowKey, FeatureRow>> featureRowRecord =
input.getPipeline().apply(kafkaIOReader);

PCollection<FeatureRow> featureRow =
featureRowRecord.apply(
ParDo.of(
new DoFn<KafkaRecord<FeatureRowKey, FeatureRow>, FeatureRow>() {
@ProcessElement
public void processElement(ProcessContext processContext) {
KafkaRecord<FeatureRowKey, FeatureRow> record = processContext.element();
processContext.output(record.getKV().getValue());
}
}));
return featureRow;
}
}
}
Loading