Nội dung
- 1 Spark DataFrame Spark dataframe
- 2 Viết chương trình pyspark để gửi Spark DataFrame dưới dạng bảng HTML trong email Write Pyspark program to send Spark
dataframe as HTML table in Email
- 2.1 Bước 1: Tạo DataFrame từ Bảng Hive Step 1 : Create dataframe from Hive table
- 2.2 Bước 2: Vòng lặp qua DataFrame bằng cách sử dụng cho vòng lặp Step 2 : Loop through dataframe using for loop
- 2.3 Bước 3: Chuẩn bị bảng HTML Step 3 : Prepare HTML table
- 2.4 Bước 4: Gửi email bằng pyspark Step 4 : Send email using Pyspark
- 2.5 Chương trình Pyspark hoàn chỉnh để gửi DataFrame qua email Complete Pyspark program to send dataframe via email
- 2.6 đầu ra Output
Spark DataFrame là một bộ sưu tập các hàng phân tán có cùng lược đồ. Lược đồ xác định cấu trúc của khung dữ liệu như tên cột và một loại liên quan. Spark DataFrame tương đương với một bảng trong cơ sở dữ liệu quan hệ hoặc DataFrame trong R/Python. DataFrame có sẵn cho các ngôn ngữ lập trình đa năng như Java, Python và Scala.
API DataFrame được phát hành dưới dạng trừu tượng trên đầu RDD, tiếp theo là API dữ liệu. DataFrame có thể được xây dựng từ các nguồn khác nhau như bảng Hive, cơ sở dữ liệu bên ngoài (ví dụ. Oracle, MySQL), các tệp dữ liệu có cấu trúc hoặc từ RDD hiện tại.
Viết chương trình pyspark để gửi Spark DataFrame dưới dạng bảng HTML trong email
Bước 1: Tạo DataFrame từ Bảng Hive
Trong ví dụ này, chúng tôi sẽ tạo ra một khung dữ liệu tia lửa bằng cách đọc bảng Hive. Hãy cùng xem xét rằng chúng tôi có một bảng bệnh nhân trong Hive có chi tiết đường trong máu của từng bệnh nhân.patient_report table in Hive which has the blood sugar details of each patients.
Từ bảng này, chúng tôi sẽ chiết xuất các bệnh nhân có lượng đường trong máu hơn 200mg/dL trong tháng 3 năm 2022.blood sugar level more than 200 mg/dL in the month of march,2022.
Hãy viết một chương trình pyspark để đọc những chi tiết bệnh nhân đó từ bảng Hive và tạo một khung dữ liệu Spark.
fetch_sql = "Chọn * từ bệnh viện_db.patient_report"="select * from hospital_db.patient_report" diagnosis_res_df=spark.sql(fetch_sql)=spark.sql(fetch_sql) diagnosis_res_df.show().show() |
Như chúng tôi đã trình bày trong chương trình trên, trước tiên, chúng tôi đang đọc các chi tiết bệnh nhân từ Hive Bệnh viện_db.patient_report và tạo ra một dữ liệu tia lửa dưới dạng chẩn đoán_res_df.hospital_db.patient_report and creating a Spark dataframe as diagnosis_res_df.
Bước 2: Vòng lặp qua DataFrame bằng cách sử dụng cho vòng lặp
Tiếp theo, chúng tôi đang đăng ký DataFrame vào bảng tạm thời với tên Chẩn đoán_temp_table. Để chúng ta có thể chạy Spark.sql () về điều này để chiết xuất các bệnh nhân có lượng đường trong máu hơn 200 trong tháng 3 năm 2022.diagnosis_temp_table. So that we can run a spark.sql() on this to extract the patients who has the blood sugar level more than 200 in the month of march,2022.
Ngoài ra, chúng tôi đã sử dụng hàm thu thập () để truy xuất dữ liệu từ DataFrame. Sau đó, chúng tôi đang lặp lại từng phần tử từ hàng và gán từng giá trị cột cho một biến riêng biệt.
diagnosis_res_df.createOrReplaceTempTable("diagnosis_temp_table").createOrReplaceTempTable("diagnosis_temp_table") Hàng tháng_rpt = Spark.sql ("Chọn * từ Chẩn đoán_temp_table trong đó Chẩn đoán_DT giữa '2022-03-01' và '2022-03-31' và Blood_sugar_level_in_mg_dl> = 200").=spark.sql("select * from diagnosis_temp_table where Diagnosis_dt between '2022-03-01' and '2022-03-31' and Blood_Sugar_level_in_mg_dL >=200").collect() print(monthly_rpt)(monthly_rpt) Forrow Inmonthly_rpt:row inmonthly_rpt: patient_id=str(row["patient_id"])patient_id=str(row["patient_id"]) patient_name=str(row["patient_name"])patient_name =str(row["patient_name"]) blood_sugar=str(row["blood_sugar_level_in_mg_dl"])blood_sugar=str(row["blood_sugar_level_in_mg_dl"]) diagnosis_dt=str(row["diagnosis_dt"])diagnosis_dt= str(row["diagnosis_dt"]) |
Bước 3: Chuẩn bị bảng HTML
Trong bước trước, chúng tôi đã nhận được các giá trị cần thiết trong một biến riêng biệt. Bây giờ chúng ta cần tạo một bảng HTML với các biến đó. Trong cú pháp bảng HTML, chúng ta phải xác định tiêu đề bảng, hàng bảng và dữ liệu bảng.
Hãy để xác định các mã đó trong pyspark như dưới đây. Cơ thể biến giữ phần tiêu đề của bảng HTML với các thuộc tính kiểu. Sau đó, biến có thể có các giá trị tiêu đề bảng. Tiếp theo trong vòng lặp For, chúng tôi đang chuẩn bị dữ liệu bảng sẽ được sử dụng trong hàng bảng.body holds the header section of the HTML table with Style properties. Then the variable strTable have the table header values. Next in the for loop, we are preparing the table data that will be used in the table row.
body = "Bảng {font-family: Arial, sans-serif; biên giới sụp đổ: sụp đổ; chiều rộng: 100%;} td, th {biên giới: 1pxsolid#dddddd; nth-child (chẵn) {màu nền: #dddddd;} "="table {font-family:arial,sans-serif;border-collapse:collapse;width:100%;}td, th {border:1pxsolid#dddddd;text-align:left;padding:8px;}tr:nth-child(even) {background-color:#dddddd;}" strTable="Patient_idPatient_NameBlood_Sugar_level_in_mg_dLDiagnosis_dt"=" strTable=body+strTable=body+strTable Forrow Inmonthly_rpt:row inmonthly_rpt:
patient_id=str(row["patient_id"])patient_id=str(row["patient_id"]) patient_name=str(row["patient_name"])patient_name=
str(row["patient_name"]) blood_sugar=str(row["blood_sugar_level_in_mg_dl"])blood_sugar=str(row["blood_sugar_level_in_mg_dl"]) diagnosis_dt=str(row["diagnosis_dt"])diagnosis_dt
=str(row["diagnosis_dt"]) strRW=""+patient_id+""+patient_name+""+blood_sugar+""+diagnosis_dt+""+""strRW=" strTable=strTable+strRWstrTable
=strTable+strRW strTable=strTable+""=strTable+" |
Bước 3: Chuẩn bị bảng HTML
Trong bước trước, chúng tôi đã nhận được các giá trị cần thiết trong một biến riêng biệt. Bây giờ chúng ta cần tạo một bảng HTML với các biến đó. Trong cú pháp bảng HTML, chúng ta phải xác định tiêu đề bảng, hàng bảng và dữ liệu bảng.
- Từ địa chỉ - đề cập đến người dùng đang gửi email– To mention the user who is sending the email
- Người nhận - Đề cập đến địa chỉ email của người nhận – To mention the email address of the recipients
- CC - Đề cập đến địa chỉ CC của người nhận – To mention the CC address of the recipients
- Chủ đề - để xác định chủ đề của email – To define the subject of the email
- Mimetext - Văn bản cơ thể của tin nhắn email. Ở đây cơ thể của tin nhắn là một bảng HTML. The body text of the email message. Here the body of the message is a HTML table.
Khi chúng tôi đặt tất cả các giá trị này, chúng tôi cần tạo một đối tượng SMTP. Trong đối tượng SMTP, chúng ta cần đề cập đến máy chủ thư sẽ được sử dụng để gửi email.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | Chủ đề = "Báo cáo bệnh tiểu đường vào tháng 3 năm 2022"="Diabetes Report in March,2022" # Tạo thông báo gốc và điền vào các tiêu đề từ, đến, CC và chủ đề msgRoot=MIMEMultipart('related')=MIMEMultipart('related') msgRoot['Subject']=subject['Subject']= subject msgRoot['From']=fromAddr['From']=fromAddr msgRoot['To']=recipients['To']=recipients msgRoot['Cc']=cc['Cc']=cc #Loại Mimetext như HTML msgText=MIMEText(strTable,'html') =MIMEText(strTable,'html') #Attach cơ thể của tin nhắn dưới dạng bảng HTML msgRoot.attach(msgText).attach(msgText) smtp=SMTP()=SMTP() smtp.connect('mx.bstn.mxc.com',25).connect('mx.bstn.mxc.com', 25) smtp.sendmail(fromAddr,recipients.split(','),msgRoot.as_string()).sendmail(fromAddr,recipients.split(','),msgRoot.as_string()) smtp.quit().quit() |
Hoàn thành Chương trình PysPark để gửi DataFrame qua email
Hãy cùng viết chương trình pyspark hoàn chỉnh bằng cách kết hợp tất cả các chức năng. Chương trình pyspark được lưu với tên của send_blood_sugar_report.py.send_blood_sugar_report.py.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | từ pyspark.sql Nhập Sparksessionpyspark.sql import SparkSession từ email.mime.multipart Nhập Mimemultipartemail.mime.multipart import MIMEMultipart từ email.mime.text Nhập Mimetextemail.mime.text import MIMEText từ Smtplib nhập smtpsmtplib import SMTP def notifyinemail (Chẩn đoán_res_df):NotifyInEmail(diagnosis_res_df): diagnosis_res_df.createOrReplaceTempView("diagnosis_temp_table")diagnosis_res_df.createOrReplaceTempView("diagnosis_temp_table") & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; ")monthly_rpt= spark.sql("select * from diagnosis_temp_table where Diagnosis_dt between '2022-03-01' and '2022-03-31' and Blood_Sugar_level_in_mg_dL >=200") .collect().collect() print(monthly_rpt)print(monthly_rpt) & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp;subject="Diabetes Report in March,2022" & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp;#Create body of the email message & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; #dddddd; text-align: trái; padding: 8px;} tr: nth-child (chẵn) {nền màu: #ddddddbody="table {font-family:arial,sans-serif;border-collapse:collapse;width:100%;}td, th {border:1pxsolid#dddddd;text-align: left;padding:8px;}tr:nth-child(even) {background-color:#dddddd;}" & nbsp; & nbsp; & nbsp; & nbsp;footer=" strTable="Patient_idPatient_NameBlood_Sugar_level_in_mg_dLDiagnosis_dt"strTable=" strTable=body+strTablestrTable=
body+strTable & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp;#Create dynamic HTML table with data & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp;forrow inmonthly_rpt: patient_id=str(row["patient_id"])patient_id
=str(row["patient_id"]) patient_name=str(row["patient_name"])patient_name=str(row["patient_name"]) blood_sugar=str(row["blood_sugar_level_in_mg_dl"])blood_sugar
=str(row["blood_sugar_level_in_mg_dl"]) diagnosis_dt=str(row["diagnosis_dt"])diagnosis_dt=str(row["diagnosis_dt"]) strRW=""+patient_id+""+patient_name+""+blood_sugar+""+diagnosis_dt+""+""strRW
=" strTable=strTable+strRWstrTable=strTable+strRW strTable=strTable+""strTable=
strTable+" & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp;#print HTML table print(strTable)print(strTable) & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp;# Create the root message and fill in the from,to,cc and subject msgRoot=MIMEMultipart('related')msgRoot =MIMEMultipart('related') msgRoot['Subject']=subjectmsgRoot['Subject']=subject msgRoot['From']=fromAddrmsgRoot['From']= fromAddr msgRoot['To']=recipientsmsgRoot['To']=recipients msgRoot['Cc']=ccmsgRoot['Cc']=cc & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp;#Set type of MIMEText as HTML msgText=MIMEText(strTable,'html')msgText=MIMEText(strTable,'html') & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp;#Attach body of the message as HTML table msgRoot.attach(msgText)msgRoot.attach(msgText) & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp;#Create SMTP object to send email smtp=SMTP()smtp=SMTP() & nbsp;#Connect specific mail server with the input of server address and its port smtp.connect('mx.bstn.mxc.com',25)smtp.connect('mx.bstn.mxc.com', 25) & nbsp; & nbsp; & nbsp; & nbsp;#Send email smtp.sendmail(fromAddr,recipients.split(','),msgRoot.as_string())smtp.sendmail(fromAddr,recipients.split(','), msgRoot.as_string()) smtp.quit()smtp.quit() & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp;print("Blood Sugar Report has been sent to Doctor successfully") Chức năng #Custom để truy cập bảng Hive def fetchhivetable ():FetchHiveTable(): & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp;fetch_sql="select * from hospital_db.patient_report" diagnosis_res_df=spark.sql(fetch_sql)diagnosis_res_df=spark.sql(fetch_sql) diagnosis_res_df.show()diagnosis_res_df.show() & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp;#Call next custom function NotifyInEmail() to prepare HTML table and send email NotifyInEmail(diagnosis_res_df)NotifyInEmail(diagnosis_res_df) Chương trình #Main bắt đầu ở đây if__name__=="__main__":__name__=="__main__": appname="Blood_Sugar_Diagnosis_Results"appname="Blood_Sugar_Diagnosis_Results" & nbsp; & nbsp; & nbsp; & nbsp;#Creating Spark Session spark=SparkSession.builder.appName(appname).enableHiveSupport().getOrCreate()spark =SparkSession.builder.appName(appname).enableHiveSupport().getOrCreate() & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp;print("Spark application name: "+ appname) & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp; & nbsp;#Call custom function FetchHiveTable() to generate and send blood sugar report FetchHiveTable()FetchHiveTable() spark.stop()spark.stop() exit(0)exit(0) |
Script shell để chạy chương trình pyspark & nbsp; => & nbsp; test_script.shsh=> test_script.sh
Hãy để viết một kịch bản shell để chạy chương trình pyspark này. Trong tập lệnh đó, chúng tôi đang thiết lập các biến môi trường Spark. Sau đó, chúng tôi đang đưa ra lệnh Submit Submit để thực hiện chương trình pyspark của chúng tôi Send_blood_sugar_report.py.send_blood_sugar_report.py.
#!/bin/bash Echo "Thông tin: Đặt các biến toàn cầu""Info: Setting global variables" Xuất Spark_major_version = 2SPARK_MAJOR_VERSION=2 Xuất Spark_Home =/usr/HDP/2.6.5.0-292/Spark2SPARK_HOME=/usr/hdp/2.6.5.0-292/spark2 Xuất đường dẫn = $ Spark_home/bin: $ PathPATH=$SPARK_HOME/bin:$PATH spark-submit/x/home/user_alex/test/send_blood_sugar_report.py-submit/x/home/user_alex/test/send_blood_sugar_report.py |
Thực hiện tập lệnh shell để chạy chương trình pyspark
Cuối cùng, chúng ta có thể chạy tập lệnh shell & nbsp; test_script.sh. Nó sẽ thực hiện chương trình pyspark để gửi báo cáo đường huyết cho bác sĩ.test_script.sh. It will execute Pyspark program to send blood sugar report to Doctor.
Đầu ra
Báo cáo trong email
Như chúng tôi đã trình bày dưới đây, báo cáo chứa các chi tiết bệnh nhân có lượng đường trong máu hơn 200 trong tháng 3 năm 2022.
Bài viết đề xuất
- Làm thế nào để truy cập vào một bảng tổ ong bằng pyspark?
- Làm thế nào để viết một chiếc DataFrame vào bảng Hive bằng cách sử dụng pyspark?
- Làm thế nào để đọc BigQuery Table bằng pyspark?