Hướng dẫn dùng pyspark join python

Trong một thế giới nơi dữ liệu được tạo ra với tốc độ đáng báo động như vậy, việc phân tích chính xác dữ liệu đó vào đúng thời điểm là rất hữu ích. Một trong những khung tuyệt vời nhất để xử lý dữ liệu lớn trong thời gian thực và phân tích thực hiện là Apache Spark, và nếu chúng ta nói về các ngôn ngữ lập trình đang được sử dụng hiện nay để xử lý các nhiệm vụ phân tích dữ liệu và phân tích dữ liệu phức tạp, tôi chắc chắn Python sẽ đứng đầu đồ thị. Vì vậy, trong hướng dẫn PySpark này , tôi sẽ thảo luận về các chủ đề sau:

  • PySpark là gì?
  • PySpark trong ngành công nghiệp
  • Tại sao lại đi tìm Python?
  • Spark RDD
  • Học máy với PySpark

Hướng dẫn của PySpark: PySpark là gì?

Apache Spark là một khung tính toán cụm nhanh được sử dụng để xử lý, truy vấn và phân tích dữ liệu lớn. Dựa trên tính toán trong bộ nhớ, nó có lợi thế hơn một số khung dữ liệu lớn khác.

Hướng dẫn dùng pyspark join python

Được viết bằng ngôn ngữ lập trình Scala, cộng đồng nguồn mở đã phát triển một công cụ tuyệt vời để hỗ trợ Python cho Apache Spark. PySpark giúp các nhà khoa học dữ liệu giao tiếp với RDD trong Apache Spark và Python thông qua thư viện Py4j. Có nhiều tính năng giúp PySpark trở thành một khung tốt hơn các tính năng khác:

  • Tốc độ: Nó nhanh hơn 100 lần so với các khung xử lý dữ liệu quy mô lớn truyền thống.
  • Bộ nhớ đệm mạnh mẽ: Lớp lập trình đơn giản cung cấp khả năng lưu trữ bộ nhớ cache và ổ đĩa mạnh mẽ.
  • Triển khai: Có thể được triển khai thông qua Mesos, Hadoop thông qua Sợi hoặc trình quản lý cụm riêng của Spark.
  • Thời gian thực: Tính toán thời gian thực và độ trễ thấp vì tính toán trong bộ nhớ.
  • Polyglot:  Hỗ trợ lập trình trong Scala, Java, Python và R.

Hãy tiếp tục với Blog Hướng dẫn PySpark của chúng tôi và xem Spark được sử dụng trong ngành công nghiệp ở đâu.

PySpark trong ngành công nghiệp

Hãy tiếp tục với hướng dẫn PySpark của chúng tôi và xem Spark được sử dụng trong ngành công nghiệp.

Mọi ngành công nghiệp đều xoay quanh dữ liệu lớn và nơi có dữ liệu lớn, có phân tích liên quan. Vì vậy, hãy xem xét các ngành công nghiệp khác nhau nơi Apache Spark được sử dụng.

Hướng dẫn dùng pyspark join python

Truyền thông là một trong những ngành công nghiệp lớn nhất phát triển theo hướng phát trực tuyến. Netflix sử dụng Apache Spark để xử lý luồng thời gian thực để cung cấp các đề xuất trực tuyến được cá nhân hóa cho khách hàng của mình. Nó xử lý 450 tỷ sự kiện mỗi ngày chảy vào các ứng dụng phía máy chủ.

Hướng dẫn dùng pyspark join python

Tài chính là một lĩnh vực khác mà việc xử lý thời gian thực của Apache Spark đóng vai trò quan trọng. Các ngân hàng đang sử dụng Spark để truy cập và phân tích hồ sơ truyền thông xã hội để hiểu rõ hơn có thể giúp họ đưa ra quyết định kinh doanh đúng đắn để đánh giá rủi ro tín dụng , quảng cáo được nhắm mục tiêu và phân khúc khách hàng. Khách hàng cũng giảm thời gian sử dụng Spark. Phát hiện gian lận là một trong những lĩnh vực được sử dụng rộng rãi nhất trong học máy mà Spark có liên quan.

Hướng dẫn dùng pyspark join python

Các nhà cung cấp dịch vụ chăm sóc sức khỏe đang sử dụng Apache Spark để phân tích hồ sơ bệnh nhân cùng với dữ liệu lâm sàng trong quá khứ để xác định bệnh nhân nào có khả năng phải đối mặt với các vấn đề sức khỏe sau khi được xuất viện. Apache Spark được sử dụng trong giải trình tự bộ gen để giảm thời gian cần thiết để xử lý dữ liệu bộ gen.

Hướng dẫn dùng pyspark join python

Bán lẻ và thương mại điện tử là một ngành mà người ta không thể tưởng tượng nó đang chạy mà không sử dụng phân tích và quảng cáo được nhắm mục tiêu. Một trong những nền tảng thương mại điện tử lớn nhất hiện nay, Alibabarun một số công việc Spark lớn nhất trên thế giới để phân tích petabyte dữ liệu. Alibaba thực hiện trích xuất tính năng trong dữ liệu hình ảnh. eBay sử dụng Apache Spark để cung cấp các ưu đãi được nhắm mục tiêu, nâng cao trải nghiệm của khách hàng và tối ưu hóa hiệu suất tổng thể.

Hướng dẫn dùng pyspark join python

Các ngành công nghiệp du lịch cũng sử dụng Apache Spark. TripAdvisor , một trang web du lịch hàng đầu giúp người dùng lên kế hoạch cho một chuyến đi hoàn hảo, đang sử dụng Apache Spark để tăng tốc các đề xuất tùy chỉnh được cá nhân hóa. TripAdvisor sử dụng Apache Spark để cung cấp lời khuyên cho hàng triệu khách du lịch bằng cách so sánh hàng trăm trang web để tìm giá khách sạn tốt nhất cho khách hàng của mình.

Một khía cạnh quan trọng của hướng dẫn PySpark này là để hiểu lý do tại sao chúng ta cần sử dụng Python. Tại sao không phải là Java, Scala hay R?

Dễ học: Đối với lập trình viên, Python tương đối dễ học hơn vì cú pháp và thư viện chuẩn của nó. Hơn nữa, đây là ngôn ngữ được gõ động, có nghĩa là RDD có thể chứa các đối tượng thuộc nhiều loại.

Hướng dẫn dùng pyspark join python

Một bộ thư viện khổng lồ: Scala không có đủ các công cụ và thư viện khoa học dữ liệu như Python để học máy và xử lý ngôn ngữ tự nhiên. Hơn nữa, Scala thiếu hình ảnh tốt và biến đổi dữ liệu cục bộ.

Hướng dẫn dùng pyspark join python

Hỗ trợ cộng đồng khổng lồ: Python có một cộng đồng toàn cầu với hàng triệu nhà phát triển tương tác trực tuyến và ngoại tuyến ở hàng ngàn vị trí ảo và thực.

Hướng dẫn dùng pyspark join python

Một trong những chủ đề quan trọng nhất trong hướng dẫn PySpark này là việc sử dụng RDD. Hãy hiểu RDD là gì.

Spark RDD

Khi nói đến điện toán phân tán lặp, tức là xử lý dữ liệu qua nhiều công việc trong tính toán, chúng ta cần sử dụng lại hoặc chia sẻ dữ liệu giữa nhiều công việc. Các khung trước đó như Hadoop có vấn đề trong khi xử lý nhiều hoạt động / công việc như:

  • Lưu trữ dữ liệu trong bộ lưu trữ trung gian như HDFS.
  • Nhiều công việc I / O làm cho việc tính toán chậm.
  • Sao chép và tuần tự hóa mà lần lượt làm cho quá trình thậm chí chậm hơn.

RDD cố gắng giải quyết tất cả các vấn đề bằng cách cho phép tính toán trong bộ nhớ phân phối chịu lỗi. RDD là viết tắt của Bộ dữ liệu phân tán linh hoạt. RDD là một bản tóm tắt bộ nhớ phân tán cho phép các lập trình viên thực hiện các tính toán trong bộ nhớ trên các cụm lớn theo cách chịu lỗi. Chúng là tập hợp các đối tượng chỉ đọc được phân vùng trên một tập hợp các máy có thể được xây dựng lại nếu phân vùng bị mất. Có một số hoạt động được thực hiện trên RDD:

  • Biến đổi: Biến đổi tạo ra một tập dữ liệu mới từ một dữ liệu hiện có. Đánh giá lười biếng.
  • Hành động: Spark buộc các tính toán chỉ thực hiện khi các hành động được gọi trên RDD.

Chúng ta hãy hiểu một vài biến đổi, hành động và chức năng.

Đọc tệp và hiển thị các phần tử n hàng đầu:

rdd = sc.textFile("file:///home/edureka/Desktop/Sample")
rdd.take(n)
[u'Deforestation is arising as the main environmental and social issue which has now taken the form of more than a powerful demon. ',
 u'We must know about the causes, effects and ways to solve the problems arisen because of the deforestation. ',
 u'We have provided many paragraphs, long and short essay on deforestation in order to help your kids and children to get aware about the problem as well as get participated in the essay writing competition in the school or outside the school. ',
 u'You can select any deforestation essay given below according to the class standard. ',
 u'Deforestation is arising as the major global problem to the society and environment.']

Chuyển đổi thành chữ thường và chia tách: (Hạ và tách)

def Func(lines):
lines = lines.lower()
lines = lines.split()
return lines
rdd1 = rdd.map(Func)

rdd1.take(5)
[[u'deforestation',
  u'is',
  u'arising',
  u'as',
  u'the',
  u'main',
  u'environmental',
  u'and',
  u'social',
  u'issue',
  u'which',
  u'has',
  u'now',
  u'taken',
.....
.
.
.
]

Xóa Stopword: (Bộ lọc)

stop_words = ['a','all','the','as','is','am','an','and','be','been','from','had','I','I'd','why','with']
rdd2 = rdd1.filter(lambda z: z not in stop_words)
rdd2.take(10)
[u'deforestation',
 u'arising',
 u'main',
 u'environmental',
 u'social',
 u'issue',
 u'which',
 u'has',
 u'now',
 u'taken']

Tổng các số từ 1 đến 500: (Giảm)

sum_rdd = sc.parallelize(range(1,500))
sum_rdd.reduce(lambda x,y: x+y)
124750

Học máy với PySpark

Tiếp tục hướng dẫn PySpark của chúng tôi, hãy phân tích một số dữ liệu bóng rổ và đưa ra một số dự đoán. Vì vậy, ở đây chúng tôi sẽ sử dụng dữ liệu của tất cả các cầu thủ tại NBA kể từ năm 1980 [năm giới thiệu 3 Con trỏ].

Hướng dẫn dùng pyspark join python

df = spark.read.option('header','true')\
.option('inferSchema','true')
.csv("file:///home/edureka/Downloads/season_totals.csv")
print(df.columns)
['_c0', 'player', 'pos', 'age', 'team_id', 'g', 'gs', 'mp', 'fg', 'fga', 'fg_pct', 'fg3', 'fg3a', 'fg3_pct', 'fg2', 'fg2a', 'fg2_pct', 'efg_pct', 'ft', 'fta', 'ft_pct', 'orb', 'drb', 'trb', 'ast', 'stl', 'blk', 'tov', 'pf', 'pts', 'yr']

Sắp xếp người chơi ( OrderBy) và  toPandas:

Ở đây chúng tôi đang sắp xếp người chơi dựa trên số điểm ghi được trong một mùa.

df.orderBy('pts',ascending = False).limit(10).toPandas()[['yr','player','age','pts','fg3']]

Hướng dẫn dùng pyspark join python

Sử dụng DSL và matplotlib:

Ở đây chúng tôi đang phân tích số lần thử trung bình 3 điểm cho mỗi mùa trong giới hạn thời gian là 36 phút  [một khoảng thời gian tương ứng với một trận đấu NBA gần đúng với phần còn lại đầy đủ]. Chúng tôi tính toán số liệu này bằng cách sử dụng số lần thử mục tiêu trường 3 điểm (fg3a) và số phút đã chơi (mp) và sau đó vẽ kết quả bằng cách sử dụng matlplotlib.

from pyspark.sql.functions import col
fga_py = df.groupBy('yr')\
.agg({'mp' : 'sum', 'fg3a' : 'sum'})
.select(col('yr'), (36*col('sum(fg3a)')/col('sum(mp)')).alias('fg3a_p36m'))\
.orderBy('yr')

from matplotlib import pyplot as plt
import seaborn as sns
plt.style.use('fivethirtyeight')


_df = fga_py.toPandas()
plt.plot(_df.yr,_df.fg3a_p36m, color = '#CD5C5C')
plt.xlabel('Year')
_=plt.title('Player average 3-point attempts (per 36 minutes)')
plt.annotate('3 pointer introduced', xy=(1980, .5), xytext=(1981, 1.1), fontsize = 9,
arrowprops=dict(facecolor='grey', shrink=0, linewidth = 2))
plt.annotate('NBA moved in 3-point line', xy=(1996, 2.4), xytext=(1991.5, 2.7), fontsize = 9,
arrowprops=dict(facecolor='grey', shrink=0, linewidth = 2))
plt.annotate('NBA moved back\n3-point line', xy=(1998, 2.), xytext=(1998.5, 2.4), fontsize = 9, arrowprops=dict(facecolor='grey', shrink=0, linewidth = 2))

Hướng dẫn dùng pyspark join python

Hồi quy tuyến tính và VectorAssembler:

Chúng ta có thể điều chỉnh mô hình hồi quy tuyến tính cho đường cong này để mô hình số lần thử bắn trong 5 năm tới. Chúng ta phải chuyển đổi dữ liệu của mình bằng cách sử dụng  VectorAssembler hàm thành một cột duy nhất. Đây là một yêu cầu cho API hồi quy tuyến tính trong MLlib.

from pyspark.ml.feature import VectorAssembler
t = VectorAssembler(inputCols=['yr'], outputCol = 'features')
training = t.transform(fga_py)\
.withColumn('yr',fga_py.yr)\
.withColumn('label',fga_py.fg3a_p36m)
training.toPandas().head()

Hướng dẫn dùng pyspark join python

Sau đó chúng tôi xây dựng đối tượng mô hình hồi quy tuyến tính bằng cách sử dụng dữ liệu được chuyển đổi của chúng tôi.

from pyspark.ml.regression import LinearRegression
lr = LinearRegression(maxIter=10)
model = lr.fit(training)

Áp dụng mô hình được đào tạo vào bộ dữ liệu:

Chúng tôi áp dụng mô hình đối tượng mô hình được đào tạo của mình vào tập huấn luyện ban đầu cùng với 5 năm dữ liệu trong tương lai:

from pyspark.sql.types import Row

# apply model for the 1979-80 season thru 2020-21 season
training_yrs = training.select('yr').rdd.map(lambda x: x[0]).collect()
training_y = training.select('fg3a_p36m').rdd.map(lambda x: x[0]).collect()
prediction_yrs = [2017, 2018, 2019, 2020, 2021]
all_yrs = training_yrs + prediction_yrs

# built testing DataFrame
test_rdd = sc.parallelize(all_yrs)
row = Row('yr')&lt
all_years_features = t.transform(test_rdd.map(row).toDF())

# apply linear regression model
df_results = model.transform(all_years_features).toPandas()

Vẽ sơ đồ dự đoán cuối cùng:

Sau đó, chúng tôi có thể vẽ kết quả của chúng tôi và lưu biểu đồ ở một vị trí được chỉ định.

plt.plot(df_results.yr,df_results.prediction, linewidth = 2, linestyle = '--',color = '#224df7', label = 'L2 Fit')
plt.plot(training_yrs, training_y, color = '#f08080', label = None)
plt.xlabel('Year')
plt.ylabel('Number of attempts')
plt.legend(loc = 4)
_=plt.title('Player average 3-point attempts (per 36 minutes)')
plt.tight_layout()
plt.savefig("/home/edureka/Downloads/Images/REGRESSION.png")

Hướng dẫn dùng pyspark join python

Và, với biểu đồ này, chúng ta đi đến phần cuối của hướng dẫn PySpark này.

Vì vậy, đây là nó, các bạn!

Tôi hy vọng các bạn có ý tưởng về PySpark là gì, tại sao Python phù hợp nhất với Spark, RDD và một cái nhìn thoáng qua về học máy với Pyspark. Xin chúc mừng, bạn không còn là người mới chơi PySpark.