Home >Technology peripherals >AI >Construction practice of real-time feature engineering platform integrating streaming and batching
This article mainly shares the platform practice and construction experience of the Alibaba Cloud FeatHub project team in feature engineering development.
This sharing is divided into four parts. The first part generally introduces the scenarios, goals, pain points and challenges FeatHub faces in the feature development, deployment, monitoring and sharing process; The second part introduces the practice of FeatHub's architectural ideas and related core concepts; the third part introduces the basic use of APIs, basic computing functions, and code practices of sample scenarios during the use of FeatHub, as well as performance optimization and future expansion goals. As well as the co-construction of the open source community, it provides learning, development and use of the project. It will also share the playback function of FeatHub historical data, support offline, near-line, online processing and support for Alibaba Cloud's upstream and downstream components.
The entire quality and efficiency of the feature engineering job not only depends on whether the job has bugs, but also depends on the upstream input data numerical distribution meeting certain characteristics, such as being close to the data numerical distribution during training. The inference performance of many jobs decreases, often due to changes in the distribution of data produced by upstream jobs. In this case, developers need to track the entire link, segment by segment to see where the feature data distribution has changed, and see whether retraining or bug fixes are needed based on the specific situation. The excessive workload of this part of manpower is also a pain point.
① Duplication of development work
Although the development teams and scenarios of many feature calculation jobs are different, similar or even the same feature definitions are actually used . Many companies do not have a good channel for different teams within the company to query and reuse existing features. This results in different teams often needing to do repeated development, and even need to run jobs repeatedly to generate some features for the same features. This brings a waste of manpower and computing/storage resources, because more computing, memory, and storage space are needed to generate the same features.
② point-in-time correct semantics
In order for everyone to understand what feature crossing is, the above figure shows A simple example to illustrate this problem. The table on the upper left of the figure is a behavioral characteristic of the user, which expresses the number of clicks in the last two minutes for a user with a given ID at different time nodes. This number of clicks may help us infer whether a user will click on an ad. In order to use these features for training, it is usually necessary to splice the features into some user data sets with Labels. The table on the lower left of the figure shows a data set of some positive samples and negative samples of whether a user actually clicked on the advertisement, marking the positive samples or negative samples generated by the user at different points in time. In order to splice the features in these two data sets to form a training data set, it is usually necessary to splice features based on the user ID as the key. If you simply perform Table Join without considering the timestamp, feature crossing problems may occur. For example, at 6:03 minutes, the number of clicks of the user in the last 2 minutes should be 10, but the feature value obtained by splicing may be 6 from 7:00 minutes. This kind of feature crossing will bring about a decrease in the actual reasoning effect. A Join result with point-in-time correct semantics should be as shown in the figure below:
In order to avoid feature crossing when splicing samples, for the left part of the above figure For each piece of data in table , the feature value whose timestamp is smaller than and closest to the timestamp in the left table should be found among the multiple version features of the dimension table, and spliced into the final generated training data set. . Such a splicing with point-in-time correct semantics will produce the training data set shown on the right side of the above figure. For different time points, there are corresponding feature values generated in the last two minutes. The training data set generated in this way can improve the performance of training and inference.
Next, we will introduce FeatHub as a Feature Store and the problems it tries to solve at each stage of the entire feature development cycle. and tools provided.
In the feature development stage, FeatHub will provide a highly easy-to-use SDK based on Python, allowing users to express the calculation logic of features concisely. Feature calculation is essentially the ETL of a feature. The most important thing during the development phase is the ease of use and simplicity of the SDK.
In the feature deployment phase, FeatHub will provide an execution engine to implement the deployment of high-performance, low-latency feature calculation logic, and can connect to different feature stores. The most important thing in the deployment phase is the performance of the execution engine and the ability to connect different feature stores.
In the feature monitoring phase, in order to facilitate developers to promptly detect changes in feature value distribution and respond, FeatHub will generate some common indicators in the future to cover common feature quality issues. For example, feature proportions with illegal values, or feature averages, and alarms are issued based on these indicators to promptly notify the person in charge to investigate the causes of changes in relevant feature distribution and make responses to maintain the effect of end-to-end recommended links.
In the feature sharing stage, FeatHub will provide feature registration and search capabilities in the future to support developers from different teams in the same company to query the features they want. does not already exist, and reuses these feature definitions and already generated feature data.
The above picture illustrates the core features of FeatHub. During the development phase, FeatHub can provide an easy-to-use SDK that supports feature splicing, feature aggregation and other logic with point-in-time correct semantics. During the deployment phase, FeatHub can support high-throughput, low-latency feature generation, support using Flink as an execution engine to calculate features, and can support multiple feature storage systems, allowing users to freely choose the storage type they want to use. During the monitoring phase, FeatHub will be able to provide real-time indicators to monitor changes in feature distribution, including offline and real-time monitoring, to facilitate developers to detect problems in a timely manner. In the sharing stage, FeatHub will provide an easy-to-use Web UI and SDK to support developers to register, search and reuse features.
There are already some representative Feature Store projects in the Feature Store field, such as Feathr, which was open sourced by LinkedIn at the beginning of this year, and Feast, which has been open sourced for many years. We investigated these projects and found that they were not well suited to achieve the target scenarios we proposed.
Compared with existing solutions, FeatHub brings additional value including:
① Simple and easy-to-use Python SDK. FeatHub's SDK refers to the SDKs of existing Feature Store projects, supports the core functions of these projects, and further improves the abstraction capabilities and ease of use of the SDK.
② Supports stand-alone development and experimentation. Developers do not need to connect to distributed Flink or Spark clusters to run experiments, but only need to use the CPU or memory resources on a single machine to develop and experiment, and can use machine learning algorithms on a single machine such as scikit-learn. library.
③ The execution engine can be switched without modifying the code. After the user completes the development on a single machine, the single-machine execution engine can be switched to a distributed execution engine such as Flink or Spark without modifying the code expressing the feature calculation logic. Using Flink as the execution engine allows Feathhub to support high-throughput, low-latency real-time feature calculations. FeatHub will further support the use of Spark as an execution engine in the future, allowing users to obtain potentially better throughput performance in offline scenarios and freely choose the most appropriate execution engine according to the scenario.
④ Provides expansion capabilities for the execution engine. FeatHub not only supports Flink and Spark as execution engines, but also supports developers to customize execution engines and use the company's internally developed execution engines for feature ETL.
⑤ The code is open source, allows users to freely choose the cloud vendor to deploy FeatHub, or deploy it in a private cloud.
The above is an architecture diagram containing the main modules of FeatHub. The top layer provides a set of Python SDK to support user-defined data sources, data end points, and feature calculation logic. Features defined by the SDK can be registered in the feature metadata center, allowing other users and jobs to query and reuse features, and even further analyze feature lineage based on feature metadata. The feature definition includes the source and sink of the feature, as well as common calculation logic, such as UDF calls, feature splicing, aggregation based on over windows and sliding windows, etc. When it is necessary to generate user-defined features, FeatHub will provide some built-in Feature Processors, that is, execution engines, to execute the calculation logic of existing features. When users need to do experiments on a single machine, they can use Local Processor to use the resources on the single machine without connecting to a remote cluster. When you need to generate real-time features, you can use Flink Processor to complete high-throughput, low-latency streaming feature calculations.
In the future, Feature Service similar to Lambda Function can also be supported to implement online feature calculation, and it can be connected to Spark to complete high-throughput offline feature calculation. The execution engine can interface with different offline and online feature storage systems, such as using Redis for online feature storage, HDFS for offline feature storage, and Kafka for near-line feature storage.
The above figure shows how FeatHub is used by users and connects with downstream machine learning training and inference programs. Users or developers will use the SDK to express the features they wish to calculate. , and then submit it to the execution engine for deployment. After features are calculated, they need to be output to feature stores, such as Redis and HDFS. A machine learning offline training program can directly read the data in HDFS for batch training. An online machine learning inference program can directly read the data in Redis for online inference.
The above figure shows the relationship between the core concepts in FeatHub. A TableDescriptor represents a collection of features. TableDescriptor can produce a new TableDescriptor through logical transformation.
TableDescriptor is divided into two categories. FeatureTable expresses a table with a specific physical address, for example, it can be a table in Redis or a table in HDFS. FeatureViews are logical tables that do not necessarily have physical addresses. They are usually obtained from a FeatureTable after a series of logical string conversions.
FeatureView has the following 3 subclasses:
① DerivedFeatureView The rows of the output feature table and its input feature table (i.e. source) are basically one-to-one. It can support the expression of single-line conversion logic (e.g. addition, subtraction, multiplication and division), over window aggregation logic, and feature splicing logic. It can be used to generate training data. For example, in the example introduced before, if you need to splice training samples with features from different dimension tables to obtain actual training data, you can use DerivedFeatureView to complete this.
② SlidingFeatureView supports the expression of features calculated by sliding windows. The rows of the feature table it outputs and the feature table it inputs are not necessarily one-to-one. This is because the feature values calculated by the sliding window will change over time even if there is no new input. SlidingFeatureView can be used to maintain real-time generated features and output them to an online feature store, such as Redis, for online inference. For example, we can use SlidingFeatureView to calculate the number of times each user clicks on a certain web page in the last two minutes, and update the feature value to Redis in real time. Then the ad recommendation link can query the value of this feature online for online reasoning.
③OnDemandFeatureView Can be used with Feature Service to support online feature calculation. For example, when using Amap, developers may wish to calculate the speed and direction of the user's movement based on the user's current physical location and the physical location when the last request was sent, after receiving the user's request, to assist in recommendations. Route decisions. These features must be calculated online upon receiving a user request. OnDemandFeatureView can be used to support such scenarios.
Transform expresses feature calculation logic. FeatHub currently supports the following 5 types of feature calculation logic:
① Expression supports users to express a single line of feature calculation logic based on a DSL language. Its expression ability is close to the select statement in the SQL language, and it can support addition, subtraction, multiplication, and division as well as built-in function calls, allowing developers who are familiar with SQL to get started quickly.
② Join expresses the feature splicing logic. Developers can specify information such as the name of the dimension table and the name of the features to be spliced.
③ PythonUDF supports user-defined Python functions to calculate features.
④ OverWindow expresses the Over window aggregation logic. For example, when receiving a row of data, the user wants to aggregate the previous 5 rows of data and calculate how many pieces of data match a certain rule.
⑤ SlidingWindow expresses the sliding window aggregation logic.
As you can see from the above figure, usually a feature ETL job will read features from the feature source table, generate new features through multiple feature calculation logic, and The generated features are output to the feature results table. The feature source table can be connected to different feature stores, such as FileSystem, Kafka, Hive, etc. Similarly, the feature result table can also be connected to feature storage such as FileSystem, Kafka, and Redis.
Processor includes LocalProcessor, FlinkProcessor, and SparkProcessor, which can use stand-alone physical resources, distributed Flink clusters, and distributed Spark clusters to execute user-defined feature calculation logic.
After introducing the architecture and core concepts of FeatHub, we will Some sample programs are used to demonstrate the expressiveness and ease of use of FeatHub SDK. For feature development SDK, its core capability is how to express new feature calculation logic. FeatHub SDK supports capabilities such as feature splicing, window aggregation, built-in function calling, and custom Python. In the future, it will also support UDF calling based on JAVA or C.
The above picture shows a code snippet of feature splicing. In this example, it is assumed that there is original positive and negative sample data in HDFS, which records the user's purchase behavior. We would like to further obtain the product price when the user purchases each product. A price_updates table maintains data on product price changes. Every time the product price changes, a row of data will be generated in the price_updates table, including the product ID and the latest product price. We can use JoinTransform and set table_name=price_updates, feature_name=price, and key=item_id to express the corresponding feature splicing logic. In this way, FeatHub can find the row with the given item_id in price_updates and find the most appropriate price value based on the timestamp to splice it into the sample data table.
The Over window aggregation code snippet shows how to use OverWindowTransform to calculate features. Users can use expr=”item_counts * price” and agg_fun=”SUM” to calculate the total consumption in the latest time window based on the quantity and price of purchased items. The window length is 2 minutes. group_by_keys=["user_id"] means that we will calculate the corresponding total consumption separately for each user.
Sliding window aggregation is similar to Over window aggregation. The only difference in the API is that step_size can be additionally specified. If step_size=1 minute, the window will slide and generate new feature values every minute.
The code snippet of built-in function calls shows how to use DSL language to express addition, subtraction, multiplication, division and UDF calls. Suppose the input data contains timestamps of when taxis pick up and drop off passengers. We can convert the timestamp of picking up and dropping off passengers into an epoch time of integer type by calling the UNIX_TIMESTAMP built-in function, and then subtract the obtained epoch time to get the length of each journey, which can be used as a feature for subsequent training and inference.
In the code snippet called by PythonUDF, the user can customize a Python function to perform arbitrary processing on the input features, such as generating lowercase strings.
Through the above code snippets, we can see that FeatHub's API is relatively simple and easy to use. Users only need to set the necessary parameters for the calculation logic without knowing the details of the processing engine.
#In the above sample scenario, the user has two data sources. Its Purchase Events contain sample data of products purchased by users, which can come from Kafka or FileSystem; Item Price Events contain data on product price changes. Every time the price of an item changes, a row of data will be generated in Item Price Events, including the item ID and the latest item price. We hope that for each sample data of a user purchasing a product, we can calculate the total consumption of the user in the last two minutes when the behavior occurred, and use it as a feature to help infer whether the user will purchase a certain product. In order to generate this feature, you can use the calculation logic described in the figure above to first splice the price feature in Item Price Events to Purchase Events using item_id as the join_key. Then aggregate based on the time window and using user_id as group_by _keys to calculate the total consumption of each user in the last two minutes.
The above code snippet shows the steps that need to be completed for a sample FeatHub application.
① First, the user needs to create a FeatHubClient and set processor_type. If it is a local experiment, it can be set to Local. If it is a remote distributed production deployment, it can be set to Flink.
② Users need to create a Source to read data. For example, you can use FileSystemSource to read data in an offline storage system, or use KafkaSource to read real-time data in a near-line storage system. In FileSystemSource, users can specify information such as data_format, schema, file location, etc. It is worth noting that users can provide time_stamp_field and time_stamp_format to express the column representing time in the data source table and the corresponding parsing format respectively. FeatHub will use this information to complete point-in-time correct feature calculations to avoid feature crossing problems.
③ Users can create a FeatureView to express the logic of feature splicing and aggregation. If you want to splice, the user can use item_price_events.price to express the features you want to splice. FeatHub will find the table named item_price_events and get the feature named price from it. Users can also use OverWindowTransform to complete Over window aggregation and define a characteristic named total_payment_last_two_minutes. Where window_size=2 minutes means applying the specified expression and aggregate function to calculate features for data within two minutes.
④ For the defined FeatureView, if the user wants to develop and experiment locally, and use the scikit-learn algorithm library for training on a single machine, he can use the to_pandas() API To get the data into the memory of a single machine in Pandas DataFrame format.
⑤ When users need to complete the production deployment of features, they can use FileSystemSink to specify offline feature storage for storing data. Then call execute_insert() to output the features to the specified Sink.
The basic value of FeatHub is to provide an SDK to facilitate users to develop features and an execution engine to calculate features. In addition, FeatHub will also provide performance optimization of the execution engine, allowing users to gain more benefits during the feature deployment phase. For example, for features based on sliding window aggregation, if you currently use the native Flink API to calculate, Flink will output the corresponding feature value at each sliding step_size, regardless of whether the feature value has changed. For a sliding window with window_size=1 hour and step_size=1 second, Flink may output the same feature value in most cases. This will waste network traffic, downstream storage and other resources. FeatHub supports users to configure the behavior of the sliding window, allowing the sliding window to only output features when the feature value changes to optimize the resource usage of the feature calculation job.
In addition, FeatHub will further optimize the memory and CPU usage of the sliding window. In some scenarios, users will settle on many similar sliding window features. These features differ only in window size. For example, we may want to get the total amount spent by each user on purchases in the last 1 minute, 5 minutes, and 10 minutes. If the native Flink API is used for calculation, the job may use three aggregation operators to calculate these three features respectively. Each aggregation operator will have a separate memory space. Considering that the data and calculation logic processed by these operators have a large overlap, FeatHub can use a custom operator to uniformly complete the calculation of these features to achieve the goal of saving memory and CPU resources.
FeatHub is currently open source on GitHub and can support some basic LocalProcessor and FlinkProcessor functions. We will further improve the core functions of FeatHub to facilitate the development and implementation of user feature engineering. These include supporting more commonly used offline storage and online storage, docking with Notebook, providing a Web UI to visualize feature metadata, supporting users to register, search, and reuse features, and supporting the use of Spark as the execution engine of FeatHub.
FeatHub code base: https://github.com/alibaba/FeatHub
FeatHub code sample:https://github.com/flink-extended/FeatHub-examples
The FeatHub code base is currently placed in the github/alibaba directory. In order to make it easier for everyone to learn to use FeatHub and quickly find and refer to code snippets that meet the needs of the required scenarios, we provide additional code examples in the flink-extended/feathub-examples code library, which you can freely use and try. Everyone is welcome to provide feedback and contribute PRs.
A1: In principle, even if the data is not out of order, if the timestamp field is not taken into account when joining, it may lead to out of order. In actual scenarios, the source data may also be out of order. At this time, you can use a watermark strategy similar to that in Flink to wait for late-arriving data and reduce the impact of out-of-order. In addition, we can use regular offline jobs to backfill online feature data, thereby further reducing the impact of data disorder.
A2: FeatHub API can support playback, but this part of the function has not yet been production verified. FeatHub will support the use of Flink and Spark as execution engines, so the computing capabilities of Flink and Spark can be reused to complete the playback of historical data. For example, we can start a Spark job, set the Source to process all the data on HDFS in the past month, execute the defined feature splicing and aggregation logic, and then output the calculated features.
A3: Feature calculation is divided into offline, nearline and online. Flink is a nearline execution engine that can calculate features such as the number of user clicks in the last 5 minutes in real time. At the same time Offline calculations can also be supported. Therefore, FeatHub can support offline and near-line feature calculations. FeatHub plans to support online feature calculation in the future, using an architecture based on Feature Service to calculate the features expressed by OnDemandFeatureView.
A4: FeatHub will support all Source/Sink supported by Flink, including ODPS, Holo and other services provided by Alibaba Cloud. Currently FeatHub only supports Kafka and FileSystem. We will gradually add more storage support.
The above is the detailed content of Construction practice of real-time feature engineering platform integrating streaming and batching. For more information, please follow other related articles on the PHP Chinese website!