Sparklyr is an open-source package that provides an interface between R and Apache Spark. Sparklyr provide an interface to use Spark as the backend for dplyr along with acess to Spark’s distributed machine learning algorithms This article is about using sparkly within an R session
Load required packages
require(sparklyr)
require(dplyr)
require(readr)
require(ggplot2)
Spark environment setup
# global spark memory
Sys.setenv("SPARK_MEM" = "32g")
# Initialize configuration with defaults
config <- spark_config()
# Memory
config["sparklyr.shell.driver-memory"] <- "32g"
# Cores
config["sparklyr.connect.cores.local"] <- 6
Create local Spark context
sc <- spark_connect(master = "local", version = "3.0", config = config)
Load csv files
system.time({
d_all <- spark_read_csv(sc ,
name = 'd_all',
path = "file:///data/airline/csv",
header = T)
})
user system elapsed
0.386 0.021 349.361
Save as parquet
system('rm -rf /data/airline/prq/')
system.time({
spark_write_parquet(d_all, path = '/data/airline/prq/')
})
user system elapsed
0.100 0.009 166.966
Disconnect from spark context and reload
spark_disconnect(sc)
sc <- spark_connect(master = "local", version = "3.0", config = config)
load parquet
system.time({
d_all <- spark_read_parquet(sc ,
name = 'd_all',
path = "file:///data/airline/prq/",
header = T)
})
user system elapsed
0.296 0.009 255.523
system.time({
n <- d_all %>%
summarise(n = n()) %>%
collect()
print(n)
})
# A tibble: 1 x 1
n
<dbl>
1 123536460
user system elapsed
0.106 0.000 0.665
Select variables of interests only
system.time({
d_model <- d_all %>%
mutate(ArrDelay = as.numeric(ArrDelay) ,
DepDelay = as.numeric(DepDelay) ,
Distance = as.numeric(Distance)) %>%
filter(!is.na(ArrDelay) & !is.na(DepDelay) & !is.na(Distance)) %>%
filter(DepDelay > -10 & DepDelay < 240) %>%
filter(ArrDelay > -60 & ArrDelay < 360) %>%
mutate(Gain = DepDelay - ArrDelay) %>%
select(Year, Month, ArrDelay, DepDelay, Distance, UniqueCarrier, Gain)
})
user system elapsed
0.018 0.000 0.018
Check newdata dimension
system.time({
n <- d_model %>%
summarise(n = n()) %>%
collect()
print(n)
})
# A tibble: 1 x 1
n
<dbl>
1 118734210
user system elapsed
0.219 0.007 51.889
import carriers data
d_carrier <- spark_read_csv(sc ,
name = 'd_carrier',
path = "file:///data/airline/csv/carriers.csv",
header = T)
join airlines data with carriers data
system.time({
d_model <- d_model %>%
left_join(d_carrier, by = c("UniqueCarrier" = "Code"))
})
user system elapsed
0.002 0.000 0.002
split data into training and out of sample test
system.time({
d_model_2008 <- d_model %>% filter(Year == 2008)
d_model <- d_model %>% filter(Year <= 2007)
})
user system elapsed
0.001 0.000 0.001
check dimensions
system.time({
n <- d_model %>% summarise(n = n())
n2008 <- d_model_2008 %>% summarise(n = n())
print(n)
print(n2008)
})
# Source: spark<?> [?? x 1]
n
<dbl>
1 112181520
# Source: spark<?> [?? x 1]
n
<dbl>
1 6552690
user system elapsed
1.103 0.004 87.384
partition the data into training and validation sets
system.time({
model_partition <- d_model %>%
sdf_random_split(d_trn = 0.8, d_tst = 0.2, seed = 5555)
})
user system elapsed
0.129 0.000 0.274
Fit a linear model
system.time({
fm <- ml_linear_regression(model_partition$d_trn,
formula = Gain ~ Distance + DepDelay + UniqueCarrier)
})
user system elapsed
0.831 0.017 452.703
Show summary
system.time({
summary(fm)
})
Deviance Residuals (approximate):
Min 1Q Median 3Q Max
-265.919 -5.051 1.269 6.894 65.380
Coefficients:
(Intercept) Distance DepDelay
0.531353277 0.001558469 -0.026397469
UniqueCarrier_DL UniqueCarrier_WN UniqueCarrier_AA
-1.743534978 2.303278665 -0.799773763
UniqueCarrier_US UniqueCarrier_UA UniqueCarrier_NW
-0.382524595 -0.665547422 -0.937718509
UniqueCarrier_CO UniqueCarrier_TW UniqueCarrier_HP
-1.120225017 -0.746713781 -0.990743725
UniqueCarrier_MQ UniqueCarrier_AS UniqueCarrier_OO
-0.237436154 -1.821968437 0.520332347
UniqueCarrier_XE UniqueCarrier_EV UniqueCarrier_OH
-2.088189831 2.174881667 0.974134885
UniqueCarrier_FL UniqueCarrier_EA UniqueCarrier_PI
-0.348562712 -0.064621000 -2.407988812
UniqueCarrier_DH UniqueCarrier_B6 UniqueCarrier_YV
1.957873863 -0.131815707 1.284944000
UniqueCarrier_PA (1) UniqueCarrier_9E UniqueCarrier_F9
-1.421766146 -0.100683233 -1.101344304
UniqueCarrier_HA UniqueCarrier_TZ UniqueCarrier_AQ
-0.866472290 -2.662288749 -0.648046560
UniqueCarrier_PS
-1.213938612
R-Squared: 0.01378
Root Mean Squared Error: 12.73
user system elapsed
0.155 0.017 196.604
Calculate average gains by predicted decile
system.time({
model_deciles <- lapply(model_partition,
function(x) {
ml_predict(fm, x) %>%
mutate(Decile = ntile(desc(prediction), 10)) %>%
group_by(Decile) %>%
summarize(Gain = mean(Gain)) %>%
select(Decile, Gain) %>%
collect()}
)
})
user system elapsed
0.560 0.021 532.135
Create a summary dataset for plotting
d_decile <- bind_rows(
as_tibble(model_deciles$d_trn) %>% mutate(partition = 'trn'),
as_tibble(model_deciles$d_tst) %>% mutate(partition = 'tst'))
Plot average gains by predicted decile
d_decile %>%
ggplot(aes(factor(Decile), Gain, fill = partition)) +
geom_bar(stat = 'identity', position = 'dodge') +
labs(title = 'Average gain by predicted decile', x = 'Decile', y = 'Minutes')
prediction time ahead
system.time({
pred_2008 <- ml_predict(fm, d_model_2008) %>%
group_by(Description) %>%
summarize(Gain = mean(Gain), prediction = mean(prediction), freq = n()) %>%
filter(freq > 10000) %>%
collect()
})
user system elapsed
0.240 0.004 35.131
Plot actual gains and predicted gains by airline carrier
ggplot(pred_2008, aes(Gain, prediction)) +
geom_point(alpha = 0.75, color = 'red', shape = 3) +
geom_abline(intercept = 0, slope = 1, alpha = 0.15, color = 'blue') +
geom_text(aes(label = substr(Description, 1, 20)), size = 3, alpha = 0.75, vjust = -1) +
labs(title='Average Gains Forecast', x = 'Actual', y = 'Predicted')
Disconnect from spark context
spark_disconnect(sc)
If you see mistakes or want to suggest changes, please create an issue on the source repository.
For attribution, please cite this work as
Spano (2020, Oct. 10). andreaspano blog: Data Analysis with Sparklyr. Retrieved from https://andreaspano.github.io/posts/2020-10-06-data-analysis-with-spraklyr/
BibTeX citation
@misc{spano2020data, author = {Spano, Andrea}, title = {andreaspano blog: Data Analysis with Sparklyr}, url = {https://andreaspano.github.io/posts/2020-10-06-data-analysis-with-spraklyr/}, year = {2020} }