警告:Jupyter启动时会重置教程的笔记本。如果你想修改并保存它们,请复制一份
初始化到Spark/Livy的连接
我们鼓励测试人员在他们自己的用户账户中运行这个笔记本。
代码:
from os import getenv from scale.spark import connect # 创建到Livy的连接 session = connect() # 创建SparkSession的本地代理 spark = session.proxy('spark') # 获取当前用户,防止用户覆盖其他用户的工作 USER = getenv('USER')
输出:
无
从先前作业读取结果
代码:
results = spark.read.parquet(f'/user/{USER}/results/vtd-tutorial3').repartitionTo('analytics') total_simulations = results.where("frame==0").count().wait_for(verbose=False) total_frames = results.count().wait_for(verbose=False) print(f"Number of simulations = {total_simulations}") print(f"Number of frames = {total_frames}")
输出:
Number of simulations = 2 Number of frames = 1202
使用Numpy进行本地内存分析
代码:
import numpy as np from numpy.polynomial.polynomial import polyfit import matplotlib.pyplot as plt successful_results = results.where('frame==0').where('status=="success"') plt.close('all') f, axes = plt.subplots(1,3, squeeze=False, sharey=True, figsize=(10, 5)) plots = axes[0] X_labels = [ 'param_TgtTTC_StartLaneChange', 'param_TgtRate', 'param_TgtSpeed'] X = np.array(successful_results.select(X_labels).collect().wait_for()) y = [float(value[0]) for value in successful_results.select('kr_safety_factor').collect().wait_for()] f.suptitle('Parameters vs Safety Factor') for index, X_label in enumerate(X_labels): x = X[:, index] b, m = polyfit(x, y, 1) plots[index].plot(x, y, '.') plots[index].plot(x, b + m * x, '-') plots[index].set(xlabel=X_labels[index], ylabel='Safety Factor' if index==0 else None)
输出:
Add Comment