More and more enterprises are leveraging machine learning analytics solutions to get value from operational data, improve business decision-making and create new revenue streams.
AI and machine learning technologies are becoming an essential and integral part of the real-time data analysis process.
They can enable enterprises to understand and act on their data for personalized consumer applications and to optimize business operations, as well as help them to improve business performance and retain a competitive edge.
But in reality, only a few organizations are successful in deploying machine learning models in production. A McKinsey survey on AI adoption indicates that barely 3% of large firms have integrated AI across their full enterprise workflows.
Why is it so difficult to successfully deploy machine learning models in production?
With larger volumes of data that need to be ingested and analyzed combined with complex big data architectures, enterprises are facing challenges in achieving the speed, scale, and accuracy necessary for their mission-critical applications and services:
- Speed: Handling high data ingestion rates with low latency is required for building feature vectors in real-time. This involves many aggregations, contextualization with historical data from many external data sources that are fed to a prediction model, to obtain a score and act accordingly.
- Scalability: Addressing peak events (such as Black Friday, sports events such as the Superbowl, holidays, and unexpected political events…) requires extreme performance in order to instantly scale a machine learning pipeline to handle X20+ more users/data, without compromising on response time and customer experience.
- Accuracy: Measuring model accuracy must be a continuous process. This requires adjusting the model by feeding back observations to it, enforcing incremental or online learning and closing the loop by redeploying the revised model with the adjusted weights.
- Enterprise-Grade: Meeting enterprise-grade performance demands for no downtime, automatic recovery, and self-healing.
“The size, complexity, distributed nature of data, speed of action and the continuous intelligence required by digital business means that rigid and centralized architectures and tools break down. The continued survival of any business will depend upon an agile, data-centric architecture that responds to the constant rate of change.”
– Donald Feinberg, VP and Distinguished Research Analyst at Gartner
GigaSpaces Cloud: Helping Enterprises Jumpstart their Machine Learning Journey
Gartner receives hundreds of different machine learning questions weekly, from their clients, and recently came out with Machine Learning: FAQs from Clients to help data and analytics leaders understand issues before they arise, frame the next steps and guide their planning processes. It covers a range of subjects and illustrates the challenges involved in operationalizing machine learning for actual business results.
At GigaSpaces, we’ve launched InsightEdge as a managed service to provide easy access and fully automated service to simplify deployment and management while ensuring no-downtime. Sample applications are available to help you understand, experience and test the performance and powerful ML and predictive capabilities of the InsightEdge in-memory platform.
Start your free trial with GigaSpaces Cloud now
Experience Prediction Capabilities with GigaSpaces in 5 Easy Steps
Running real-time machine learning in production in order to reach accurate, fast predictions on behavior is often considered a complex and cumbersome task that requires a high level of expertise. But with GigaSpaces InsightEdge in-memory platform, the creation and running of a predictive model is a fast and simple process involving the following 5 simple steps:
- Data Loading: Fetching data and loading it to the InsightEdge Platform.
- Data Discovery: Exploring the data using Spark and SQL queries.
- Data Transformation and Enrichment: Transforming and enriching the data to obtain the relevant feature vector.
- Training and Validation: Building and training a prediction machine learning model based on the data.
- Prediction: Running the model with real, streaming data.
The sample interactive application provided on GigaSpaces Cloud demonstrates how InsightEdge operationalizes machine learning and transactional processing, at scale; enriching data with historical context from external data stores.
It’s an opportunity to get started and play with the InsightEdge Platform that has bundled, pre-loaded data in order to understand and observe:
- All the frameworks are necessary for scalable data-driven solutions and high-level engagement, including Zeppelin, Tableau, Kafka, SQL, Spark, streaming, machine learning, and deep learning.
- How to leverage faster and smarter insights from machine learning models running on any data source.
- The seamless accessing of historical data from data lakes such as Hadoop, Amazon S3 and Azure Data Lake Storage.
- The ultra-low latency, high-throughput transaction and stream processing offered by InsightEdge’s in-memory performance.
About the Predictive Flight Delay Application
The predictive flight delay sample application will show is to achieve a binary prediction that determines the likelihood of flight delays. It’s based on:
- Historical data of flight delays
- Streaming of real-time, hot data about flight delays
- The inclusion of various conditions that can influence flight-delay prediction
Machine Learning Model
Many machine learning models exist. The sample application implements the random decision forests machine learning technique, which typically performs well on challenges such as the prediction of flight delays.
Data Used
The following data is used in the sample application:
- 2017 flight delay data for training the predictive model
- 2018 flight delay data for evaluating the accuracy of the model
- Streaming 2019 data to achieve a prediction of whether any specific flight will be delayed
The data includes the month, day and departure time of flights from various airlines, as well as the departure and destination airports. Flight delays are measured in minutes. A positive number represents the actual delay in minutes, while a negative number indicates a flight that was early. The feature vector is supplemented with additional, weather-related data such as rain, wind and temperature.
Getting Started with the Application
To get started and access the application just register for the GigaSpaces Cloud Service here.
The following Getting Started/Flight Delays screen will be displayed with an introduction about the application and the steps involved in creating and running the predictive model.
Getting started with Flight Delays
In each step of the application, you can interact and run the code yourself by clicking the Run icon on the right-hand side of the screen. In this way, you can see how quickly InsightEdge performs and how easy it is to create a predictive model.
Step 1: Data Loading
This step involves the fetching and loading of data from a CSV or S3 bucket to the InsightEdge Platform. In the application, this consists of the loading of relevant flight data in CSV file format gathered from a cross-section of US airports in 2017 and 2018. Note that if the data is not available, it can be downloaded from https://insightedge-gettingstarted.s3.amazonaws.com/flightdelays20172018.csv.zip.
The following is an example of the code and screenshot obtained after importing the relevant flight delay data, which is based on various airlines (WN – SouthWest, OO – SkyWest Airlines Inc., “EV” – ExpressJet Airlines LLC, “B6” – JetBlue, “AS” Alaska Airlines Inc. “NK” – Spirit Airlines”, “F9” – “Frontier Airlines Inc.”) and different US airports (ORD – Chicago, DFW – Dallas, DEN – Denver, SFO – San Francisco):
%spark //explore data before we continue //WN - SouthWest, OO - SkyWest Airlines Inc., "EV" - ExpressJet Airlines LLC, "B6" - JetBlue, "AS" Alaska Airlines Inc. "NK" - Spirit Air Lines", "F9" - "Frontier Airlines Inc." val airlines = List ("UA","OO","B6","NK", "F9") // ORD - Chicago, DFW - Dallas, DEN - Denver, SFO - San Francisco val airports = List ("ORD","DFW","DEN","SFO") val flightDelaysDataFrame = sqlContext.read.option("header", "true").option("inferschema", "true").csv("/opt/gigaspaces/flightdelays20172018.csv").filter($"Reporting_Airline".isin(airlines:_*)).filter($"Origin".isin(airports:_*)).filter($"cancelled" === 0.0).filter($"DayofMonth" < 20.0) flightDelaysDataFrame.printSchema() // Create a temp view that we can query by SQL flightDelaysDataFrame.createOrReplaceTempView("FlightDelays") root |-- Year: integer (nullable = true) |-- Month: integer (nullable = true) |-- DayofMonth: integer (nullable = true) |-- DayOfWeek: integer (nullable = true) |-- Reporting_Airline: string (nullable = true) |-- Tail_Number: string (nullable = true) |-- Flight_Number: integer (nullable = true) |-- Origin: string (nullable = true) |-- Dest: string (nullable = true) |-- CRSDepTime: integer (nullable = true) |-- DepDelay: double (nullable = true) |-- DepDel15: double (nullable = true) |-- cancelled: double (nullable = true) airlines: List[String] = List(UA, OO, B6, NK, F9) airports: List[String] = List(ORD, DFW, DEN, SFO) flightDelaysDataFrame: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [Year: int, Month: int ... 11 more fields]
Step 2: Data Discovery
Data discovery is about exploring the loaded data using Spark and SQL queries, in order to understand the schema and how it behaves. It’s about visualizing the data with Zeppelin Notebook, by running various Spark and SQL queries to view different cross-sections of information.
For example, the following SQL query can be run to explore flight departure delays in 2017 and 2018 of various airlines from different airports:
%sql select * from FlightDelays order by DepDelay Desc limit 5
The following review of the sample data is displayed:
Review a Sample of the Data
The data visualization process also enables the viewing of the ratio between the number of flights and the number of delayed flights from each departure airport. Run the following SQL query:
%sql select Origin, count(*) as TotalFlights, SUM(DepDel15) as DelayedFlightsCount from FlightDelays group by Origin order by DelayedFlightsCount/TotalFlights desc limit 15
The following graph is displayed:
Total number of Flights vs. Delayed Fights by Origin Airport
And it’s possible to view the airports with the highest number of flight delays using the following SQL query:
%sql select origin, count(*) as vol from FlightDelays group by origin order by vol desc
The intuitive display of airports from this query shows that 32% of flights originating from Denver airport were delayed.
Airport popularity
Step 3: Data Transformation and Enrichment
Data transformation and enrichment involve the processing and enrichment of the input data to obtain the relevant feature vector, i.e., applying the capabilities of the InsightEdge Platform to transform the data into the required format and enriching it with relevant data, defining advanced indexing and providing SQL queries with a predicative pushdown.
In the sample application, this involves using InsightEdge Spark queries to transform the flight data information to obtain what is required for training the machine learning model so that it can make accurate flight delay predictions. The initial data includes a range of information per flight, such as date, the airport of origin, destination, airline, and departure delay. This data is then enriched with basic weather data at the time of departure from the airport of origin.
The data is processed by configuring an InsightEdge context, providing InsightEdge with Type information about the data that was loaded and loading the data to the InsightEdge in-memory store, in order to speed-up data access. The data is stored in a distributed manner based on a partition key, which is a unique ID based on the flight number and the date and time of departure.
The flight delay data is then written to the in-memory store using the following code:
flightDelaysRDD.rdd.saveToGrid()
The data can then be queried directly from the InsightEdge in-memory data grid by running the following:
%insightedge_jdbc select id, "year", "month", dayofMonth, dayOfWeek, crsDepTime, flight_number, origin, dest, depDelay from FlightDelaysWithWeather limit 5
This direct query of the data displays the following:
Query the data directly from the in-memory data grid
In the data enrichment step of the application, the flight delay data is enriched with weather data based on the timestamps of the flights, the coordinates of the airports being engaged and the addition of prevalent weather conditions such as rain, wind, and temperature, with a co-located task. This requires merging of the weather and flight delay data together as follows:
- Writing the additional weather type in the memory grid, for the same time period as the flight delay data which already exists in the memory grid.
- Defining a distributed task to be executed on the memory grid which will be sent to all the grid partitions.
- Executing the code to merge the weather data to the flight delay objects, i.e., enriching the flight delay records with relevant weather metrics.
- Executing the distributed task to obtain the enriched data as a Spark data frame.
To define a task that will run in a distributed manner, collocated in each partition, the def execute() function is run to enrich flight delay records with relevant weather metrics, as shown in the following code:
def execute() val origin:String = rec.origin weatherTemplate.setProperty("Date", dateField) weatherTemplate.setProperty("Station", origin) val weather = clusteredProxy.read(weatherTemplate) if (weather != null) { rec.awnd = "" + weather.getProperty("AWND") rec.prcp = "" + weather.getProperty("PRCP") rec.snow = "" + weather.getProperty("SNOW") rec.tmin = "" + weather.getProperty("TMIN") rec.tmax = "" + weather.getProperty("TMAX") rec.date = dateField writeCollection += rec }
On completion, it is possible to review the enriched data using the following code:
%insightedge_jdbc select carrier , origin, dest, depDelay, awnd, prcp, snow, tmin, tmax from FlightDelaysWithWeather where awnd is not null limit 20
This code produces the following display of the data which is now enriched with the weather variables, which are AWND (average daily wind speed in tenths of meters per second), PRCP (precipitation, in tenths of mm), snow (snowfall in mm), TMIN (minimum temperature in tenths of degrees C) and TMAX (maximum temperature in tenths of degrees C).
Review the enriched data
Step 4: Training and Validation
Training and validation involve the building and training of the machine learning model. Since the application seeks to create a model for the prediction of the likelihood of flight delays, this involves the following:
- Training a machine learning model using the 2017 flight delay data and delivering a fitted or trained model.
- Validating the data with the flight delays recorded in the 2018 data. This evaluation is performed using the helper eval_metrics and Metrics class to rate each prediction and thereby determine the accuracy of the model.
In the application, data – such as the month, day, departure time, departure airports and destination airports – is fed into the random forests machine learning model in order to train it for optimal accuracy, together with the feature vector supplemented with weather-related data such as rain, wind, and temperature.
To start the training and validation step, the merged, enriched data prepared in step 4 is used to prepare feature vectors for all the fields to be included in the model, with each vector labeled with the resulting delay (a positive number represents the delay in minutes, while a negative number indicates an early flight). For example, the following is an example of the code for splitting the 2017 and 2018 data to prepare the feature vectors for the training and validation of the machine learning model:
val FlightDelaysWithWeatherDataframe = spark.read.grid[FlightDelaysWithWeather].as[FlightDelaysWithWeather] val traningData = prepareFeaturesLabeledPoint(FlightDelaysWithWeatherDataframe.filter("Year = 2017")) val validationData = prepareFeaturesLabeledPoint(FlightDelaysWithWeatherDataframe.filter("Year = 2018"))
It is then possible to train the machine learning model and save it to the grid as follows:
%spark import org.apache.spark.mllib.tree.RandomForest import org.apache.spark.mllib.tree.configuration.Strategy import org.insightedge.spark.implicits.all._ val treeStrategy = Strategy.defaultStrategy("Classification") val numTrees = 10 val featureSubsetStrategy = "auto" // Let the algorithm choose val predictFlightDelaysRFModel = RandomForest.trainClassifier(traningData, treeStrategy, numTrees, featureSubsetStrategy, seed = 123)
The performance metrics are then defined according to true vs. false and positive vs. negative (tp, tn, fp and fn, respectively) classification, in order to determine the accuracy model, as shown in the following code. In the application, tp = delays that were predicted and occurred; tn = delays that were not predicted but occurred; fp = delays that were predicted but did not occur; and fn = delays that were not predicted and did not occur.
class Metrics(labelsAndPreds: RDD[(Double, Double)]) extends java.io.Serializable { private def filterCount(lftBnd:Int,rtBnd:Int):Double = labelsAndPreds .map(x => (x._1.toInt, x._2.toInt)) .filter(_ == (lftBnd,rtBnd)).count() lazy val tp = filterCount(1,1) // true positives lazy val tn = filterCount(0,0) // true negatives lazy val fp = filterCount(0,1) // false positives lazy val fn = filterCount(1,0) // false negatives lazy val precision = tp / (tp+fp) lazy val recall = tp / (tp+fn) lazy val F1 = 2*precision*recall / (precision+recall) lazy val accuracy = (tp+tn) / (tp+tn+fp+fn) }
Once the metrics have been defined, it is possible to validate and evaluate the model on the test data, i.e., measure the accuracy of the model based on the validation data from 2018, as shown in the following example defined in Scala on Zeppelin Notebook and executed from Zeppelin Notebook:
%spark println(validationData) val predictionsResultsComparedToActual = validationData.map { point => val prediction = predictFlightDelaysRFModel.predict(point.features) (point.label, prediction) } predictionsResultsComparedToActual.map(println) val modelMetrics = new Metrics(predictionsResultsComparedToActual) println("accuracy = %.2f" .format(modelMetrics.accuracy)) MapPartitionsRDD[30] at map at <console>:88 accuracy = 0.79 predictionsResultsComparedToActual: org.apache.spark.rdd.RDD[(Double, Double)] = MapPartitionsRDD[70] at map at <console>:98 modelMetrics: Metrics = [email protected]
Step 5: Prediction
To run the prediction model with real, streaming data, Spark streaming is used to stream 2019 flight data from Kafka and store it in the memory grid. This enables interactivity with Zeppelin Notebook through an automatic process that constantly streams relevant flight data through Spark. This is performed as follows:
%spark import kafka.serializer.StringDecoder import model.v1._ import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._ import com.google.gson.Gson val ssc = new StreamingContext(sc, Seconds(2)) val topics = "flights" val topicsSet = topics.split(",").toSet val kafkaParams = Map[String, String]("metadata.broker.list" -> System.getenv("KAFKA_URL")) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topicsSet) messages.map { o => val gson = new Gson() val fd = gson.fromJson( o._2, classOf[FlightDelaysWithWeather]) fd.generate_id() fd.prediction = "for_calc" fd }.saveToGrid() ssc.start
The following screenshot from Grafana shows the progress of all write operations on the in-memory grid and the spike generated by the write operation:
Progress of all write operations in Grafana
To run the Spark machine learning prediction model on the actual Kafka stream of 2019 flight data and predict the likelihood of flight delays, the flight data has to be read from the memory grid. This is performed using the following code:
%spark val for_clac = spark.read.grid[FlightDelaysWithWeather].as[FlightDelaysWithWeather].where("prediction == 'for_calc'") val flightDelayPredictions = for_clac.map{fd => val featuresVector = Array(fd.month.toDouble, fd.dayOfWeek.toDouble, ("%04d".format(fd.crsDepTime.toInt).take(2)).toDouble, fd.awnd.toDouble, fd.prcp.toDouble, fd.tmax.toDouble, fd.tmin.toDouble) val prediction = predictFlightDelaysRFModel.predict( Vectors.dense(featuresVector)) if(prediction > 0) fd.prediction = "Delayed" else fd.prediction = "OK" fd }.rdd.saveToGrid()
The prediction results received concerning the likelihood of flights being delayed can be reviewed using various queries. For example:
%insightedge_jdbc select * from FlightDelaysWithWeather where prediction is not null limit 50
This query displays the following predictions data:
Prediction data display
Here we can see that taking into account the actual flight data, historical data and weather conditions at the departure airport (San Francisco and Denver), the model predicts that 4 flights will be delayed, and only one flight from San Francisco will land on time.
Summary
GigaSpaces Cloud and the available sample applications offer you an opportunity to understand and experience the power, speed and easy implementation of the GigaSpaces InsightEdge Platform and its machine learning capabilities. It illustrates the platform’s added value and unique functionalities for predictive purposes, including:
- Loading of data from CSV or S3 bucket.
- Visualization of data with a Notebook.
- Saving of Spark data frames to space.
- Defining advanced indexing.
- Using a SQL query with predictive pushdown.
- Enriching data, such as adding weather information using a co-located task.
- Running the Spark machine learning model in the grid.
- Feeding data with Kafka.
- Analyzing streaming data on large timeframes.
Additional Links
For more information about GigaSpaces Cloud:
- Visit the FAQ page
- Learn more about getting started with GigaSpaces Cloud
GigaSpaces Cloud as a managed service has many benefits to your business and is a standout among the various SaaS cloud offerings available today. With GigaSpaces Cloud you can run your applications at speed and at scale and accelerate your Big Data stack for zero time to data. You will also be able to get a real-time 360-degree view of all your data and modernize your legacy stacks with microservices architecture.
With GigaSpaces Cloud, you are assured an always-on service. Our offering also allows you to easily operationalize AI and ML models into production, thereby enriching streaming and transactional data with historical data.
Try it for yourself for free on GigaSpaces Cloud or sign up for our webinar on March 4th.