Getting started with Blazing Notebooks: From zero to working with RAPIDS/BlazingSQL in under 10 minutes. This is a crash course on BlazingSQL. If you have used Apache Spark with PySpark, this should be very familiar to you. We are going to show off the main features of a BlazingSQL instance in this guide.
First, import the required libraries. BlazingSQL uses cuDF to handoff results, so it’s always a good idea to import it as well. For more information on cuDF, please check out the RAPIDS.ai Documentation Page.
You must establish a BlazingContext to connect to a BlazingSQL instance to create tables, run queries, and basically do anything with BlazingSQL.
from blazingsql import BlazingContext bc = BlazingContext()
Connect a Storage Plugin¶
BlazingSQL can connect multiple storage solutions in order to query files from numerous local and/or distributed filesystems.
Here, we are showing how you would connect to an AWS S3 bucket.
bc.s3('dir_name', bucket_name='bucket_name', access_key_id='access_key', secrect_key='secret_key')
BlazingSQL can query raw files or in-memory DataFrames, but you must create a table to run a query. We’re going to create three tables; two, from files in AWS S3, and one from a local, already existent, GPU DataFrame (GDF).
For more info check the Blazing SQL API documentation.
# create table 01 from CSV file bc.create_table('table_name_01', 's3://dir_name/file1.csv') # create table 02 from Parquet file bc.create_table('table_name_02', 's3://dir_name/file2.parquet') # create table 03 from cuDF DataFrame bc.create_table('table_name_03', existing_gdf)
Once you have created your tables, you will now be able to use SQL to query said tables. BlazingSQL processes every file as a GDF, which means you can run federated queries over multiple data sources and file formats.
You can read more about our supported SQL in the General SQL reference section of the Blazing SQL API documentation.
query = ''' SELECT a.* FROM table_name_01 AS a UNION ALL SELECT b.* FROM table_name_02 as b UNION ALL SELECT c.* FROM table_name_03 as c ''' # type(gdf) == cudf.core.dataframe.DataFrame gdf = bc.sql(query)
To better understand what’s going on, BlazingContext’s
.explain() method can
be called to break down a given query’s algebra plan (query execution plan).
# define a query query = ''' SELECT a.* FROM table_name_01 AS a UNION ALL SELECT b.* FROM table_name_02 as b UNION ALL SELECT c.* FROM table_name_03 as c ''' # what's going on when this query runs? bc.explain(query)
To drop a table from BlazingContext we call
drop_table and pass in the name of the table to drop.
# drop table 01 bc.drop_table('table_name_01') # drop table 03 bc.drop_table('table_name_03')
Handing off a GDF¶
The resulting GDF resides in GPU memory. The GDF is built on Apache Arrow and interoperates with the rest of the RAPIDS.ai ecosystem.
For example, we can run typical pandas-like commands but using cuDF.
print(gdf.mean(), gdf.var()) print(gdf.as_matrix())
Or, we can pass the result GDF to a Pandas DataFrame:
import pandas # from cuDF DataFrame to pandas DataFrame df = gdf.to_pandas()
But the real power is passing to other GPU accelerated libraries. cuML offers the algorithms found in SK-Learn but operates on GDFs. Here we are passing GDFs to cuML for use in training linear regression algorithm.
import cuml from cuml import LinearRegression # create model lr = LinearRegression(fit_intercept=True, normalize=False, algorithm="eig") # train model reg = lr.fit(X_train_gdf, y_train_gdf) print("Coefficients:") print(reg.coef_) print(" ") print(" Y intercept:") print(reg.intercept_)
Run Distributed SQL¶
BlazingSQL strives to make it easy to scale up a workload. With minimal code changes, we can make queries run on an arbitrary number of GPUs. The results are returned as a Dask-cuDF, a partitioned GPU DataFrame, distributed across the number of GPUs in a cluster.
from blazingsql import BlazingContext import cudf import dask_cudf import dask from dask.distributed import Client # set Dask Client client = Client('127.0.0.1:8786') # start Blazing Context bc = BlazingContext(dask_client=client) # register AWS S3 bucket to Blazing Context bc.s3('dir_name' , bucket_name='bucket_name' , access_key_id='access_key' , secret_key='secret_key' ) table_list = [ 's3://dir_name/parquet_dir/1_0_0.parquet' , 's3://dir_name/parquet_dir/1_1_0.parquet' , 's3://dir_name/parquet_dir/1_2_0.parquet' ] # create table from multiple Parquet files bc.create_table('table_name', table_list) # query the table (returns cuDF DataFrame) gdf = bc.sql('SELECT COUNT(*) FROM table_name') print(gdf.head())