PySpark RDD — основа PySpark

Apache Spark — один из лучших фреймворков для аналитики больших данных. Как только эта мощная технология интегрируется с простым, но эффективным языком, таким как Python, она дает нам чрезвычайно удобный и простой в использовании API под названием PySpark. В этой статье я собираюсь пролить свет на один из строительные блоки PySpark называется Устойчивый распределенный набор данных или более известный как PySpark СДР.

Почему РДД?

Итеративные распределенные вычисления, т. е. обработка данных в нескольких заданиях, требуют повторного использования и совместного использования данных между ними. До появления RDD такие фреймворки, как Hadoop, сталкивались с трудностями при обработке нескольких операций/заданий. Кроме того, требовалось стабильное и распределенное промежуточное хранилище данных, такое как HDFS или Amazon S3. Эти средства массовой информации для Обмен данными помог в выполнении различных вычислений, таких как логистическая регрессия, кластеризация K-средних, алгоритмы ранжирования страниц, специальные запросы и т. д. Но ничего не бывает бесплатно, совместное использование данных приводит к медленной обработке данных из-за множественных операций ввода-вывода, таких как репликация и сериализация. Этот сценарий изображен ниже:

Shared-Memory.png

Таким образом, возникла потребность в чем-то, что могло бы решить проблему множественных операций ввода-вывода за счет совместного использования данных и уменьшить их количество. Это где СДР точно вписался в картинку.

Что такое PySpark RDD?

Устойчивые распределенные наборы данных (RDD) — это абстракция распределенной памяти, которая помогает программисту выполнять вычисления в памяти на больших кластерах отказоустойчивым образом.

RDD считаются основа PySpark. Это один из пионеров фундаментальной структуры данных без схемы, которая может обрабатывать как структурированные, так и неструктурированные данные. Совместное использование данных в памяти делает RDD 10-100x Быстрее чем сеть и общий доступ к диску.

Распределенная-память.png

Теперь вам может быть интересно, как он работает. Что ж, данные в RDD разбиты на куски на основе ключа. RDD отличаются высокой отказоустойчивостью, т. е. они способны быстро восстанавливаться после любых проблем, поскольку одни и те же фрагменты данных реплицируются между несколькими узлами-исполнителями. Таким образом, даже если один исполнительный узел выйдет из строя, другой все равно будет обрабатывать данные. Это позволяет вам очень быстро выполнять функциональные расчеты с вашим набором данных, используя возможности нескольких узлов.

Разделы-768x343.png

Более того, как только вы создадите RDD, он станет неизменный. Под неизменяемым я подразумеваю объект, состояние которого нельзя изменить после его создания, но его можно преобразовать.

Прежде чем я перейду к этому руководству по PySpark RDD, позвольте мне рассказать еще о нескольких интригующих функциях PySpark.

Особенности RDD

PySpark-RDD-Features.png

  1. Вычисления в памяти : повышает производительность на порядок.
  2. Ленивая оценка : Все преобразования в RDD ленивы, т. е. не вычисляют свои результаты сразу.
  3. Отказоустойчивой : RDD отслеживают информацию о происхождении данных для автоматического восстановления потерянных данных.
  4. неизменность : данные можно создавать или извлекать в любое время, и после определения их значение нельзя изменить.
  5. Разделение : это основная единица параллелизма в PySpark RDD.
  6. Упорство : пользователи могут повторно использовать RDD PySpark и выбирать для них стратегию хранения.
  7. Грубые операции : эти операции применяются ко всем элементам в наборах данных с помощью карт, фильтров или группировок по операциям.

В следующем разделе руководства PySpark RDD я познакомлю вас с различными операциями, предлагаемыми PySpark RDD.

Операции RDD в PySpark

RDD поддерживает два типа операций, а именно:

  1. Преобразования : это операции, которые применяются к RDD для создания нового RDD. Преобразования осуществляются по принципу Ленивые оценки (что означает, что выполнение не начнется, пока действие не будет запущено). Это позволяет вам выполнять операции в любое время, просто вызывая действие над данными. Вот некоторые из преобразований, предоставляемых RDD:
  • карта
  • квартираКарта
  • фильтр
  • отчетливый
  • уменьшить по ключу
  • картаРазделы
  • Сортировать по
  1. Действия : действия — это операции, которые применяются к RDD, чтобы дать указание Apache Spark применить вычисление и передать результат обратно драйверу. Некоторые действия включают в себя:
  • собирать
  • собирать как карту
  • уменьшать
  • countByKey/countByValue
  • брать
  • первый

Позвольте мне помочь вам создать RDD в PySpark и применить к ним несколько операций.

Создание и отображение RDD

myRDD = sc.parallelize([('JK', 22), ('V', 24), ('Jimin',24), ('RM', 25), ('J-Hope', 25), ('Suga', 26), ('Jin', 27)])
myRDD.take(7)

вывод-1-3.png

Чтение данных из текстового файла и отображение первых 4 элементов

New_RDD = sc.textFile("file:///home/edureka/Desktop/Sample")
New_RDD.take(4)

выход-2-1.png

Изменение минимального количества разделов и сопоставление данных из списка строк в список списков

CSV_RDD = (sc.textFile("file:///home/edureka/Downloads/fifa_players.csv", minPartitions= 4).map(lambda element: element.split("\t")))
CSV_RDD.take(3)

выход-3-1.png

Подсчет общего количества строк в RDD

CSV_RDD.count()

новый.png

Создание функции для преобразования данных в нижний регистр и их разделения

def Func(lines):
lines = lines.lower()
lines = lines.split()
return lines
Split_rdd = New_RDD.map(Func)
Split_rdd.take(5)

выход-4-1.png

Создание нового RDD с плоскими данными и фильтрация «стоп-слов» из всего RDD

stopwords = ['a','all','the','as','is','am','an','and','be','been','from','had','I','I’d','why','with']
RDD = New_RDD.flatMap(Func)
RDD1 = RDD.filter(lambda x: x not in stopwords)
RDD1.take(4)

выход-5-1.png

Фильтрация слов, начинающихся с ‘c’

import re
filteredRDD = RDD.filter(lambda x: x.startswith('c'))
filteredRDD.distinct().take(50)

выход-6-1.png

Группировка данных по ключу и последующая сортировка

rdd_mapped = RDD.map(lambda x: (x,1))
rdd_grouped = rdd_mapped.groupByKey()
rdd_frequency = rdd_grouped.mapValues(sum).map(lambda x: (x[1],x[0])).sortByKey(False)
rdd_frequency.take(10)

выход-8.png

Создание RDD с парой ключ-значение

a = sc.parallelize([('a',2),('b',3)])
b = sc.parallelize([('a',9),('b',7),('c',10)])

Выполнение операции соединения на RDD

c = a.join(b)
c.collect()

выход-9.png

Создание RDD и выполнение лямбда-функции для получения суммы элементов в RDD

num_rdd = sc.parallelize(range(1,5000))
num_rdd.reduce(lambda x,y: x+y)

вывод-10.png

Использование преобразования ReduceByKey для сокращения данных

data_keydata_key = sc.parallelize([('a', 4),('b', 3),('c', 2),('a', 8),('d', 2),('b', 1),('d', 3)],4)
data_keydata_key.reduceByKey(lambda x, y: x + y).collect()

вывод-11.png

Сохранение данных в текстовом файле

RDD3.saveAsTextFile("file:///home/edureka/Desktop/newoutput.txt")

Сортировка данных по ключу

test = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
sc.parallelize(test).sortByKey(True, 1).collect()

2018-08-10-15_59_30-Window.png

Выполнение операций над множествами

##Creating two new RDDs rdd_a = sc.parallelize([1,2,3,4])
rdd_b = sc.parallelize([3,4,5,6])

Перекресток

rdd_a.intersection(rdd_b).collect()

вывод-12.png

вычитание

rdd_a.subtract(rdd_b).collect()

вывод-13.png

декартовский

rdd_a.cartesian(rdd_b).collect()

вывод-14.png

Союз

rdd_a.union(rdd_b).collect()

выход-8-1.png

Надеюсь, вы уже знакомы с RDD PySpark. Итак, давайте углубимся и посмотрим, как вы можете использовать эти RDD для решения реального варианта использования.

Вариант использования PySpark RDD

Page-Ranking-528x288.png

Постановка задачи

Вы должны рассчитать рейтинг страницы набора веб-страниц на основе проиллюстрированной системы веб-страниц. Ниже приведена диаграмма, представляющая четыре веб-страницы: Amazon, Google, Wikipedia и Youtube в нашей системе. Для простоты доступа назовем их a,b,c и d соответственно. Здесь веб-страница «а» имеет исходящие ссылки на страницы b, c и d. Точно так же страница «b» имеет внешнюю ссылку на страницы d и c. Веб-страница «c» имеет внешнюю ссылку на страницу b, а страница «d» имеет внешнюю ссылку на страницы a и c.

Веб-страница-Система-1-528x283.png

Решение

Чтобы решить эту проблему, мы реализуем алгоритм ранжирования страниц, разработанный Сергей Брин а также Ларри Пейдж. Этот алгоритм помогает определить рейтинг конкретной веб-страницы в группе веб-страниц. Чем выше рейтинг страницы, тем выше она будет отображаться в списке результатов поиска. Таким образом, будет иметь большую актуальность.

Вклад в рейтинг страницы определяется по следующей формуле:

Страница-Вклад-Формула-528x214.png

Позвольте мне разбить его для вас:

PRt+1(Пи) = Рейтинг страницы сайта

PRt(Pj) = Рейтинг страницы входящей ссылки

С(Pj) = Количество ссылок на этой странице

В нашей постановке задачи показано, что веб-страница «а» имеет три исходящие ссылки. Итак, согласно алгоритму, вклад страницы а в ранжирование страницы d равен PR(a)/3. Теперь нам нужно рассчитать вклад страницы b в страницу d. Страница b имеет две исходящие ссылки: первая на страницу c, а вторая на страницу d. Следовательно, вклад страницы b равен PR(b)/2.

Таким образом, рейтинг страницы d будет обновляться следующим образом, где s известен как коэффициент демпфирования:

PR(d) = 1 – s + s × (PR(a)/3 + PR(b)/2)

Давайте теперь выполним это, используя RDD PySpark.

##Creating Nested Lists of Web Pages with Outbound Links
pageLinks = [['a', ['b','c','d']],
['c', ['b']],['b', ['d','c']],['d', ['a','c']]] ##Initializing Rank #1 to all the webpages
pageRanks = [['a',1],['c',1],['b',1],['d',1]] ##Defining the number of iterations for running the page rank
###It will return the contribution to the page rank for the list of URIs
def rankContribution(uris, rank):
numberOfUris = len(uris)
rankContribution = float(rank) / numberOfUris
newrank =[]
for uri in uris:
newrank.append((uri, rankContribution))
return newrank ##Creating paired RDDs of link data
pageLinksRDD = sc.parallelize(pageLinks, 2)
pageLinksRDD.collect()

вебранк-выход-1.png

##Creating the paired RDD of our rank data pageRanksRDD = sc.parallelize(pageRanks, 2)
pageRanksRDD.collect()

веб-вывод-2.png

##Defining the number of iterations and the damping factor, s
numIter = 20
s = 0.85 ##Creating a Loop for Updating Page Rank
for i in range(numIter):
linksRank = pageLinksRDD.join(pageRanksRDD)
contributedRDD = linksRank.flatMap(lambda x : rankContribution(x[1][0],x[1][1]))
sumRanks = contributedRDD.reduceByKey(lambda v1,v2 : v1+v2)
pageRanksRDD = sumRanks.map(lambda x : (x[0],(1-s)+s*x[1])) pageRanksRDD.collect()

выход-final.png

Это дает нам результат, который ‘c’ имеет самый высокий рейтинг страницы, за которым следуют ‘a’, ‘d’ и ‘b’.

На этом мы подошли к концу этого PySpark RDD. Надеюсь, это помогло повысить ценность ваших знаний. Если вы хотите узнать больше о PySpark, вы можете продолжить и читать подобные блоги здесь

Похожие записи

Добавить комментарий

Ваш адрес email не будет опубликован. Обязательные поля помечены *