Программирование PySpark | Кодементор

Python и Apache Spark — самые популярные словечки в индустрии аналитики. Apache Spark — это популярная платформа с открытым исходным кодом, которая обеспечивает молниеносную обработку данных и поддерживает различные языки, такие как Scala, Python, Java и R. Затем все зависит от ваших языковых предпочтений и объема работы. В этой статье по программированию PySpark я буду говорить о Spark с Python, чтобы продемонстрировать, как Python использует функциональные возможности Apache Spark.

PySpark — это совместная работа Apache Spark и Python.

Апач Спарк — это среда кластерных вычислений с открытым исходным кодом, построенная на скорости, простоте использования и потоковой аналитике, в то время как Питон это универсальный язык программирования высокого уровня. Он предоставляет широкий спектр библиотек и в основном используется для машинного обучения и потоковой аналитики в реальном времени.

Другими словами, это API Python для Spark, который позволяет использовать простоту Python и мощь Apache Spark для управления большими данными.

PySpark.png

Вам может быть интересно, почему я выбрал Python для работы со Spark, когда доступны другие языки. Чтобы ответить на этот вопрос, я перечислил несколько преимуществ Python:

  • Python очень прост в изучении и реализации.
  • Он предоставляет простой и всеобъемлющий API.
  • С Python читабельность кода, обслуживание и знакомство намного лучше.
  • Он предоставляет различные варианты визуализации данных, что затруднительно при использовании Scala или Java.
  • Python поставляется с широким спектром библиотек, таких как numpy, pandas, scikit-learn, seaborn, matplotlib и т. д.
  • Он поддерживается огромным и активным сообществом.

Теперь, когда вы знаете о преимуществах программирования PySpark, давайте просто углубимся в основы PySpark.

Устойчивые распределенные наборы данных (RDD)

СДР являются строительными блоками любого приложения Spark. RDD означает:

  • Устойчивый: Он отказоустойчив и способен восстанавливать данные в случае сбоя.
  • Распространено: Данные распределяются между несколькими узлами в кластере.
  • Набор данных: Сбор секционированных данных со значениями.

Это слой абстрагированных данных над распределенной коллекцией. Он неизменен по своей природе и следует ленивое преобразование.

С RDD можно выполнять два типа операций:

  1. Преобразования: Эти операции применяются для создания нового RDD.
  2. Действия: Эти операции применяются к RDD, чтобы указать Apache Spark применить вычисления и передать результат обратно драйверу.

кадр данных

Датафрейм в PySpark — это распределенный набор структурированных или полуструктурированных данных. Эти данные в Dataframe хранятся в строках под именованными столбцами, что аналогично таблицам реляционной базы данных или листам Excel.

Он также имеет некоторые общие атрибуты с RDD, такие как неизменяемость по своей природе, следует ленивым вычислениям и распространяется по своей природе. Он поддерживает широкий спектр форматов, таких как JSON, CSV, TXT и многие другие. Кроме того, вы можете загрузить его из существующих RDD или программно указав схему.

PySpark SQL

PySpark SQL — это модуль абстракции более высокого уровня по сравнению с ядром PySpark. Он в основном используется для обработки структурированных и полуструктурированных наборов данных. Он также предоставляет оптимизированный API, который может считывать данные из различных источников данных, содержащих файлы разных форматов. Таким образом, с PySpark вы можете обрабатывать данные, используя SQL, а также HiveQL. Благодаря этой функции PySparkSQL постепенно набирает популярность среди программистов баз данных и пользователей Apache Hive.

Потоковая передача PySpark

PySpark Streaming — это масштабируемая, отказоустойчивая система, которая следует пакетной парадигме RDD. В основном он работает с мини-пакетами или пакетными интервалами, которые могут варьироваться от 500 мс до больших окон интервалов.

При этом Spark Streaming получает непрерывный поток входных данных из таких источников, как Apache Flume, Kinesis, Kafka, сокеты TCP и т. д. Эти потоковые данные затем внутренне разбиваются на несколько более мелких пакетов на основе пакетный интервал и отправлены в Spark Engine. Spark Engine обрабатывает эти пакеты данных, используя сложные алгоритмы, выраженные функциями высокого уровня, такими как отображение, сокращение, объединение и окно. После завершения обработки обработанные пакеты затем отправляются в базы данных, файловые системы и живые информационные панели.

Pyspark-streaming.png

Ключевой абстракцией для потоковой передачи Spark является дискретизированный поток (DStream). DStreams построены на RDD, что позволяет разработчикам Spark работать в одном контексте RDD и пакетов для решения проблем с потоковой передачей. Более того, Spark Streaming также интегрируется с MLlib, SQL, DataFrames и GraphX, что расширяет ваши возможности. Будучи высокоуровневым API, Spark Streaming обеспечивает отказоустойчивую семантику «точно один раз» для операций с отслеживанием состояния.

ПРИМЕЧАНИЕ : семантика «точно один раз» означает, что события будут обрабатываться «ровно один раз» всеми операторами в потоковом приложении, даже если произойдет какой-либо сбой.

На приведенной ниже диаграмме представлены основные компоненты Spark Streaming.

Spark-Streaming-Components.png

Как видите, данные поступают в Spark Stream из различных источников, таких как Kafka, Flume, Twitter, ZeroMQ, Kinesis, TCP-сокеты и многих других. Далее эти данные обрабатываются с использованием сложных алгоритмов, выраженных высокоуровневыми функциями, такими как карта, уменьшение, объединение и окно. Наконец, эти обработанные данные передаются в различные файловые системы, базы данных и интерактивные информационные панели для дальнейшего использования.

Я надеюсь, что это дало вам четкое представление о том, как работает PySpark Streaming. Давайте теперь перейдем к последней, но самой заманчивой теме этой статьи о программировании PySpark, т. е. к машинному обучению.

Машинное обучение

Как вы уже знаете, Python — это зрелый язык, который с незапамятных времен активно используется в науке о данных и машинном обучении. В PySpark машинное обучение облегчается библиотекой Python под названием MLlib (библиотека машинного обучения). Это не что иное, как оболочка над PySpark Core, которая выполняет анализ данных с использованием алгоритмов машинного обучения, таких как классификация, кластеризация, линейная регрессия и некоторые другие.

Одной из заманчивых особенностей машинного обучения с PySpark является то, что оно работает в распределенных системах и обладает высокой масштабируемостью.

MLlib предоставляет три основные функции машинного обучения с помощью PySpark:

  1. Подготовка данных: Это предоставляет различные функции, такие как извлечение, преобразование, выбор, хеширование и т. д.
  2. Алгоритмы машинного обучения: Он использует некоторые популярные и продвинутые алгоритмы регрессии, классификации и кластеризации для машинного обучения.
  3. Утилиты: Он имеет статистические методы, такие как тестирование хи-квадрат, описательная статистика, линейная алгебра и методы оценки моделей.

Позвольте мне показать вам, как реализовать машинное обучение, используя классификация через логистическую регрессию.

Здесь я проведу простой прогнозный анализ данных проверки продуктов питания в Чикаго.

##Importing the required libraries
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import * ##creating a RDD by importing and parsing the input data
def csvParse(s):
import csv
from StringIO import StringIO
sio = StringIO(s)
value = csv.reader(sio).next()
sio.close()
return value food_inspections = sc.textFile('file:////home/edureka/Downloads/Food_Inspections_Chicago_data.csv')\
.map(csvParse) ##Display data format
food_inspections.take(1)

вывод-1-1.png

#Structuring the data
schema = StructType([
StructField("id", IntegerType(), False),
StructField("name", StringType(), False),
StructField("results", StringType(), False),
StructField("violations", StringType(), True)])
#creating a dataframe and a temporary table (Results) required for the predictive analysis. ##sqlContext is used to perform transformations on structured data
ins_df = spark.createDataFrame(food_inspections.map(lambda l: (int(l[0]), l[1], l[12], l[13])) , schema)
ins_df.registerTempTable('Count_Results')
ins_df.show()

вывод-2.png

##Let's now understand our dataset
#show the distinct values in the results column
result_data = ins_df.select('results').distinct().show()

вывод-3.png

##converting the existing dataframe into a new dataframe ###each inspection is represented as a label-violations pair. ####Here 0.0 represents a failure, 1.0 represents a success, and -1.0 represents some results besides those two
def label_Results(s):
if s == 'Fail':
return 0.0
elif s == 'Pass with Conditions' or s == 'Pass':
return 1.0
else:
return -1.0
ins_label = UserDefinedFunction(label_Results, DoubleType())
labeled_Data = ins_df.select(ins_label(ins_df.results).alias('label'), ins_df.violations).where('label >= 0')
labeled_Data.take(1)

выход-4.png

##Creating a logistic regression model from the input dataframe
tokenizer = Tokenizer(inputCol="violations", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(labeled_Data)
## Evaluating with Test Data test_Data = sc.textFile('file:////home/edureka/Downloads/Food_Inspections_test.csv')\
.map(csvParse) \
.map(lambda l: (int(l[0]), l[1], l[12], l[13]))
test_df = spark.createDataFrame(test_Data, schema).where("results="Fail" OR results="Pass" OR results="Pass with Conditions"")
predict_Df = model.transform(test_df)
predict_Df.registerTempTable('Predictions')
predict_Df.columns

выход-5.png

## Printing 1st row
predict_Df.take(1)

вывод-6.png

## Predicting the final result
numOfSuccess = predict_Df.where("""(prediction = 0 AND results="Fail") OR
(prediction = 1 AND (results="Pass" OR
results="Pass with Conditions"))""").count()
numOfInspections = predict_Df.count()
print "There were", numOfInspections, "inspections and there were", numOfSuccess, "successful predictions"
print "This is a", str((float(numOfSuccess) / float(numOfInspections)) * 100) + "%", "success rate"

выход-7.png

На этом мы подошли к концу этого блога о программировании PySpark. Надеюсь, это помогло повысить ценность ваших знаний.

Если вы нашли этот блог по программированию PySpark интересным, вы можете продолжить и читать подобные блоги здесь.

Есть к нам вопрос? Пожалуйста, укажите это в разделе комментариев, и мы свяжемся с вами.

Похожие записи

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *