Data Analysis with Sparklyr

Sparklyr is an open-source package that provides an interface between R and Apache Spark. Sparklyr provide an interface to use Spark as the backend for dplyr along with acess to Spark’s distributed machine learning algorithms This article is about using sparkly within an R session

Andrea Spano andreaspano.github.io (Quantide)www.quantide.com
2020-10-10

Pkgs

Load required packages


require(sparklyr)
require(dplyr)
require(readr)
require(ggplot2)

Set up

Spark environment setup


# global spark memory
Sys.setenv("SPARK_MEM" = "32g")
# Initialize configuration with defaults
config <- spark_config()
# Memory
config["sparklyr.shell.driver-memory"] <- "32g"
# Cores
config["sparklyr.connect.cores.local"] <- 6

Connect

Create local Spark context


sc <- spark_connect(master = "local", version = "3.0", config = config)

csv files

Load csv files


system.time({
d_all <- spark_read_csv(sc , 
               name = 'd_all',
               path = "file:///data/airline/csv",
               header = T)
})

   user  system elapsed 
  0.386   0.021 349.361 

Save as parquet


system('rm -rf /data/airline/prq/')
system.time({
  spark_write_parquet(d_all, path = '/data/airline/prq/')
})

   user  system elapsed 
  0.100   0.009 166.966 

Disconnect from spark context and reload


spark_disconnect(sc)
sc <- spark_connect(master = "local", version = "3.0", config = config)

load parquet


system.time({
  d_all <- spark_read_parquet(sc , 
                          name = 'd_all',
                          path = "file:///data/airline/prq/", 
                          header = T)
  
})

   user  system elapsed 
  0.296   0.009 255.523 

Check data size


system.time({
n <- d_all %>% 
  summarise(n = n()) %>% 
  collect()

print(n) 
})

# A tibble: 1 x 1
          n
      <dbl>
1 123536460

   user  system elapsed 
  0.106   0.000   0.665 

prepare model data

Select variables of interests only


system.time({
d_model <- d_all %>%
  mutate(ArrDelay = as.numeric(ArrDelay) ,
         DepDelay = as.numeric(DepDelay) ,
         Distance = as.numeric(Distance)) %>% 
  filter(!is.na(ArrDelay) & !is.na(DepDelay) & !is.na(Distance)) %>%
  filter(DepDelay > -10 & DepDelay < 240) %>%
  filter(ArrDelay > -60 & ArrDelay < 360) %>% 
  mutate(Gain = DepDelay - ArrDelay) %>%
  select(Year, Month, ArrDelay, DepDelay, Distance, UniqueCarrier, Gain)
})

   user  system elapsed 
  0.018   0.000   0.018 

Check newdata dimension


system.time({
n <- d_model %>% 
  summarise(n = n()) %>% 
  collect()
print(n)
})

# A tibble: 1 x 1
          n
      <dbl>
1 118734210

   user  system elapsed 
  0.219   0.007  51.889 

import carriers data


d_carrier <- spark_read_csv(sc , 
                          name = 'd_carrier',
                          path = "file:///data/airline/csv/carriers.csv", 
                          header = T)

join airlines data with carriers data


system.time({
d_model <- d_model %>% 
  left_join(d_carrier, by = c("UniqueCarrier" = "Code"))
})

   user  system elapsed 
  0.002   0.000   0.002 

split data into training and out of sample test


system.time({
  d_model_2008 <- d_model %>% filter(Year == 2008)
  d_model <- d_model %>% filter(Year <= 2007)
})

   user  system elapsed 
  0.001   0.000   0.001 

check dimensions


system.time({
n <- d_model %>%  summarise(n = n())
n2008 <- d_model_2008 %>%  summarise(n = n())
print(n)
print(n2008)
})

# Source: spark<?> [?? x 1]
          n
      <dbl>
1 112181520
# Source: spark<?> [?? x 1]
        n
    <dbl>
1 6552690

   user  system elapsed 
  1.103   0.004  87.384 

partition the data into training and validation sets


system.time({
model_partition <- d_model %>% 
  sdf_random_split(d_trn = 0.8, d_tst = 0.2, seed = 5555)
})

   user  system elapsed 
  0.129   0.000   0.274 

Modelling

Fit a linear model


system.time({
  fm <-  ml_linear_regression(model_partition$d_trn, 
                            formula = Gain ~ Distance + DepDelay + UniqueCarrier)
})

   user  system elapsed 
  0.831   0.017 452.703 

Show summary


system.time({  
  summary(fm) 
})

Deviance Residuals (approximate):
     Min       1Q   Median       3Q      Max 
-265.919   -5.051    1.269    6.894   65.380 

Coefficients:
         (Intercept)             Distance             DepDelay 
         0.531353277          0.001558469         -0.026397469 
    UniqueCarrier_DL     UniqueCarrier_WN     UniqueCarrier_AA 
        -1.743534978          2.303278665         -0.799773763 
    UniqueCarrier_US     UniqueCarrier_UA     UniqueCarrier_NW 
        -0.382524595         -0.665547422         -0.937718509 
    UniqueCarrier_CO     UniqueCarrier_TW     UniqueCarrier_HP 
        -1.120225017         -0.746713781         -0.990743725 
    UniqueCarrier_MQ     UniqueCarrier_AS     UniqueCarrier_OO 
        -0.237436154         -1.821968437          0.520332347 
    UniqueCarrier_XE     UniqueCarrier_EV     UniqueCarrier_OH 
        -2.088189831          2.174881667          0.974134885 
    UniqueCarrier_FL     UniqueCarrier_EA     UniqueCarrier_PI 
        -0.348562712         -0.064621000         -2.407988812 
    UniqueCarrier_DH     UniqueCarrier_B6     UniqueCarrier_YV 
         1.957873863         -0.131815707          1.284944000 
UniqueCarrier_PA (1)     UniqueCarrier_9E     UniqueCarrier_F9 
        -1.421766146         -0.100683233         -1.101344304 
    UniqueCarrier_HA     UniqueCarrier_TZ     UniqueCarrier_AQ 
        -0.866472290         -2.662288749         -0.648046560 
    UniqueCarrier_PS 
        -1.213938612 

R-Squared: 0.01378
Root Mean Squared Error: 12.73

   user  system elapsed 
  0.155   0.017 196.604 

Calculate average gains by predicted decile


system.time({
model_deciles <- lapply(model_partition, 
                        function(x) {
                            ml_predict(fm, x) %>%
                              mutate(Decile = ntile(desc(prediction), 10)) %>%
                              group_by(Decile) %>%
                              summarize(Gain = mean(Gain)) %>%
                              select(Decile, Gain) %>%
                              collect()}
                        )
})

   user  system elapsed 
  0.560   0.021 532.135 

Create a summary dataset for plotting


d_decile <- bind_rows(
  as_tibble(model_deciles$d_trn) %>% mutate(partition = 'trn'),
  as_tibble(model_deciles$d_tst) %>% mutate(partition = 'tst'))

Plot average gains by predicted decile


d_decile %>%
  ggplot(aes(factor(Decile), Gain, fill = partition)) +
  geom_bar(stat = 'identity', position = 'dodge') +
  labs(title = 'Average gain by predicted decile', x = 'Decile', y = 'Minutes')

prediction time ahead


system.time({
pred_2008 <- ml_predict(fm, d_model_2008) %>%
  group_by(Description) %>%
  summarize(Gain = mean(Gain), prediction = mean(prediction), freq = n()) %>%
  filter(freq > 10000) %>%
  collect()
})

   user  system elapsed 
  0.240   0.004  35.131 

Plot actual gains and predicted gains by airline carrier


ggplot(pred_2008, aes(Gain, prediction)) + 
  geom_point(alpha = 0.75, color = 'red', shape = 3) +
  geom_abline(intercept = 0, slope = 1, alpha = 0.15, color = 'blue') +
  geom_text(aes(label = substr(Description, 1, 20)), size = 3, alpha = 0.75, vjust = -1) +
  labs(title='Average Gains Forecast', x = 'Actual', y = 'Predicted')

Closing

Disconnect from spark context


spark_disconnect(sc)

Corrections

If you see mistakes or want to suggest changes, please create an issue on the source repository.

Citation

For attribution, please cite this work as

Spano (2020, Oct. 10). andreaspano blog: Data Analysis with Sparklyr. Retrieved from https://andreaspano.github.io/posts/2020-10-06-data-analysis-with-spraklyr/

BibTeX citation

@misc{spano2020data,
  author = {Spano, Andrea},
  title = {andreaspano blog: Data Analysis with Sparklyr},
  url = {https://andreaspano.github.io/posts/2020-10-06-data-analysis-with-spraklyr/},
  year = {2020}
}