BigData

Apache Spark로 집계하기 클러스터 컴퓨팅으로 집계 해결 (2)

IT오이시이 2017. 6. 14. 14:07
728x90


Aggregating with Apache Spark(2)

Solving aggregation with cluster computing


http://www.itworld.com/article/3184109/analytics/aggregating-with-apache-spark.html?page=2


MapReduce 관련 문제

MapReduce 솔루션은 확장 성을 달성하지만 확장 성은 상대적입니다. 우리는 1 조 개 이상의 피커를위한 꽃의 수를 찾기 위해 애플리케이션을 확장 할 수있었습니다. 그러나 우리가 선택한 꽃의 수 사이의 표준 편차를 찾는 것과 같은 또 다른 연산을 수행하기를 원한다면, 또는이 수의 평균 또는 모드? 각 계산에 대해 새로운 MapReduce 프로그램을 작성해야합니다.

모든 MapReduce 애플리케이션은 입력 데이터를 개별적으로 읽고 출력을 HDFS에 다시 씁니다. 결과적으로 MapReduce는 반복적 인 쿼리 또는 결과 나 입력에 대한 반복 실행을 허용하지 않는 패러다임 인  주기 데이터 흐름 모델 을 위해 만들어졌습니다 . 이것은 모든 솔루션에있어 항상 날개에 대기중인 또 다른 문제가 있기 때문에 제한적입니다.

–– ADVERTISEMENT ––

디스크 I / O가 매우 비싸고 대역폭이 제한되어 있으므로 HDFS에서 대용량 데이터 세트를 읽는 것이 반드시 필요합니다. 이상적으로는, 범용 컴퓨터에 파티션 된 메모리 내 구조가 필요하기 때문에 디스크에서 다시로드하지 않고도 반복 쿼리를 수행 할 수 있습니다.

Spark로 집계하기

우리가 문제 성명을 해결하는 여러 가지 방법을 시험해 볼 때 요구 사항 목록이 늘어나고 있습니다. Listing 6과 7의 코드는 길이가 104 라인이지만 아직 연산이나 계산 체인을 만들지 않습니다. 보다 복합적인 코드 외에도 응집력, 느슨한 결합 및 직교성과 같은 기능을 지원하는 언어가 필요합니다. 상대적으로 적은 수의 기본 구조를 조합하여 제어 및 데이터 구조를 구축 할 수 있습니다 언어의

자바 8은 언어에 기능적인 기능을 추가했지만 스칼라는 본질적으로 기능적이다. 이 섹션에서는 Apache Spark의 스칼라 프로그래밍이 테이블에 어떤 영향을 주는지 살펴 보겠습니다. 우리는 Spark에 대한 경험이 있고 Spark가 이미 설치되어 있고 개발 환경에 설치되어 있다고 가정합니다.

Spark와 MapReduce 비교

MapReduce가 Hadoop의 프로그래밍 모델과 마찬가지로 DAG (Direct Acyclic Graph)는 Spark의 프로그래밍 모델입니다. MapReduce에는 데이터 흐름을 생성하는 데 필요한 유연성이 없다는 점은 이미 언급했습니다. MapReduce에서는지도, 셔플러 및 축소 작업을 만듭니다. 맵은 개별 블록에서 실행되고, 출력은 순환 버퍼로 유출 된 다음 쉐이프되고 정렬되어 감속기가 선택됩니다. a ChainMapper또는 a를 사용한다고해도 ChainReducer모델은 [MAP+ / REDUCE MAP*]입니다. +여기서는 하나 이상의 관련 작업을 *나타내며 0 이상을 나타냅니다.

결과적으로 map->  shuffle->reduce->reduce반복적 인 알고리즘의 중요한 요구 사항 임에도 불구 하고 우리는 실행할 수있는 모델을 가질 수 없습니다 . 반복적 인 것을 설정하고 싶다면 유일한 옵션은 map->shuffle->reduce첫 번째 작업 으로 계획 하고 그 다음에 작업 을 계획 하는 것 map-> shuffle->reduce입니다. MapReduce의 작업은 상호 배타적이므로 첫 번째 작업은 두 번째 작업이 수행되는지 여부를 모릅니다. 이것은 그림 7에 표시되어 있습니다. 파란색 선은 map작업을 나타내고 검은 색 선은 reduce작업을 나타냅니다. 우리는 배제 shuffle하고 sort간결하게했습니다.

jw sparkaggregate fig7Ravishankar Nair

그림 7. Hadoop MapReduce 대 Spark DAG

이제 Spark에서 같은 솔루션을 분석해 보겠습니다. 그림 7의 오른쪽에 표시된 데이터 흐름을 볼 수 있습니다. 먼저 그림 7의 Spark 솔루션에 대한 리듀서 (즉, 파란색 원이 없음) 사이에 맵이 없음에 유의하십시오. 즉, 첫 번째 MapReduce의 출력은 직접 명백한 map과정 없이 제 2 세트의 감속기에 공급된다 . 둘째, 작업 간에는 HDFS 파일 시스템이 없습니다. 대신 Spark는 데이터가 데이터를 수용 할 수있는 한 메모리를 활용하므로 디스크 I / O 비용이 크게 절감됩니다. 가장 중요한 것은 STAGE 1이라고 표시된 다이어그램의 노란색 원 1->2->4과 3->5상호 배타적 인 부분을 확인하십시오. 이것들은 단순한 변환 일 수 있습니다. 가장 중요한 점은 스파크가 여러 단계를 실행 파이프 라인에 지능적으로 연결하여 병렬로 실행해야하는 단계를 결정할 수 있다는 것입니다.

클러스터 컴퓨팅으로 집계 해결

클러스터 컴퓨팅은 Spark을 집계 문제를 해결할 수있는 탁월한 선택입니다. 앞서 언급했듯이 Spark는 HDFS에 의존하기보다는 메모리를 활용합니다. 메모리에 저장된 객체는 RDD 또는 복원 된 분산 데이터 세트 라고 하며 Spark 시스템의 마력입니다.

RDD는 개별 메모리 단위 모음을위한 포인터입니다. 여러 RDD를 결합하면 전체 개체가 Spark에 저장됩니다. RDD에 변환 또는 작업을 적용하면 클러스터의 모든 개별 파티션이 영향을받습니다. 개별 단위에 대한 손실은 해당 단위의 생성과 관련된 계보 그래프에 의해 재구성 될 수 있습니다. 계통 그래프 평면 스파크 개별 RDD 단위 또는 파티션을 생성하도록 사용된다. 또한 RDD는 불변이며, 동작 (터미널 연산)이 실행될 때까지만 평가됩니다 (느린 평가). RDD는 모든 유형의 데이터를 처리 할 수 ​​있으므로 Spark가 널리 사용됩니다. 그러나 한 가지 단점이 있습니다. RDD에 대한 작업은 사용 가능한 작업의 수와 유형을 제한하는 모든 종류의 데이터에 적용 할 수 있도록 일반 사항이어야합니다.

Listing 8은 가장 간단한 형태로 Spark의 집계 함수를 정의한 것이다. 사용하기 전에 aggregate몇 개의 파티션을 사용할 지 지정해야합니다. 초기 RDD를 만들 때 파티션 수를 지정합니다.

Listing 8. 집계 함수의 정의


def aggregate( an initial value)(an intra-partition sequence operation)(an inter-partition combination operation)

이제이 집계 모델을 정원 예제로 되돌려 보겠습니다. JavaWorld 가든에는 남쪽, 북쪽, 동쪽 및 서쪽에 4 개의 사분면이 있음을 상기하십시오. 우리는 각 사분면마다 하나씩 네 개의 파티션을 사용합니다. Listing 1로 돌아가서 Scala에서 첫 번째 줄 (꽃의 수를 나열)을 작성하는 방법은 다음과 같다.

Listing 9. Spark를 사용하는 집계


val flowers =
     sc.parallelize(List(11,12,13,24,25, 26, 35,36,37, 24,15,16),4)

두 번째 매개 변수 인 4는 Spark 클러스터에서 사용 가능한 파티션 수를 나타냅니다.

이제 문제 성명과 관련된 데이터를 시각화하는 것이 쉽습니다. 11, 12 및 13은 정원의 남쪽 사분면에서 각 사람이 고른 꽃의 수입니다. 숫자 24, 25 및 26은 북쪽 사분면에서 나온 것입니다. 35, 36 및 37은 서쪽 구역에서 왔으며; 24, 25, 16은 동쪽에서 온 것입니다. 각 사분면은 Spark 클러스터의 한 노드에 해당합니다.

다음으로, 문제 문장을 두 부분으로 나눕니다. 문제의 첫 번째 부분은 각 사분면에서 선택한 꽃의 총 수를 집계하는 것입니다. 이것이 Listing 8의 파티션 내부 시퀀스 집계이다. 문제의 두 번째 부분은 파티션 전반에 걸쳐 이러한 개별 집계를 합산하는 것이다. 이것이 파티션 간 집계입니다.

따라서 파티션 내 시퀀스 집계 결과를 찾아 보겠습니다. 각 꽃 피커는 처음에는 빈 양동이가있는 정원으로갑니다. 0의 시작 값이됩니다.

남쪽 정원 : 11 + 12 + 13 = 36 
북쪽 정원 : 24 + 25 + 26 = 75 
서쪽 정원 : 35 + 36 + 37 = 108 
동쪽 정원 : 24 + 25 +16 = 65

다음으로 파티션 간 집계 결과를 계산합니다.

남쪽 + 북쪽 + 서쪽 + 이스트 사이드 = 36 + 75 + 108 + 65 = 284

RDD에 저장된 합계는 모든 종류의 변환 또는 다른 작업을 위해 더 이상 사용 및 처리 될 수 있으며 나중에 반복적으로 사용할 수 있습니다. 이제 코드의 마지막 부분을 작성합니다. Scala의이 한 행은 위의 두 집계를 수행합니다.

Listing 10. Scala를 사용한 복잡한 집계


val sum = flowers.aggregate(0)(_+_, _+_)

12 개의 각 버킷에서 초기 값으로 0부터 시작합니다. 첫 번째 _+_는 파티션 내 합계이며 정원의 각 사분면에서 각 선택기가 선택한 꽃의 총 개수를 더합니다. 두 번째 _+_는 각 사분면의 총 합계를 집계하는 파티션 간 합계입니다.

집계가 작동 reduce하려면 초기 값 다음에 두 가지 함수 가 필요 합니다. 초기 값이 0이 아니면 어떻게 될까요? 예를 들어, 5 인 경우, 그 수는 각 파티션 내 집계 W 파티션 간 집계에 추가됩니다. 따라서 첫 번째 계산은 다음과 같습니다.

남쪽 정원 : 11 + 12 + 13 = 36 + 5 = 41 
북쪽 정원 : 24 + 25 + 26 = 75 + 5 = 80 
서쪽 정원 : 35 + 36 + 37 = 108 + 5 = 113 
Eastside 정원 : 24 + 25 +16 = 65 + 5 = 70

다음은 초기 값이 5 인 파티션 간 집계 계산입니다.

남쪽 + 북쪽 + 서쪽 + 이스트 사이드 + 5 = 41 + 80 + 113 + 70 = 309

축전지와 집계

개념을 더 자세히 설명하기 위해 정원의 각 사분면에있는 꽃의 최대 개수를 알아 내고 합계를 집계하려고한다고 가정합니다. 파티션 내부 기능을 약간 변경하면됩니다.

Listing 11. 사분면 용 누산기


val sumofmaximums = flowers.aggregate(0)(Math.max(_,_), _+_)

정원 전체에서 한 사람이 선택할 수있는 최대 꽃 수를 찾고 싶다면 어떻게해야할까요? 우리가 할 수있는 일은 :

목록 12. 정원용 축전지


val maximum = flowers.aggregate(0)(Math.max(_,_), Math.max(_,_))

이 예제에서 사용 된 초기 값을 누적 기 (accumulator )라고하는데 ,이 경우 누락 된 값은 파티션간에 반복 된 다음 최종 결과를 위해 전파됩니다.

튜플을 가진 집계

마지막 예제에서는 원하는만큼 초기 값을 사용할 수 있다고 가정 해 봅시다. 이 경우 우리는 정원의 각 사분면에있는 모든 꽃 피커 중에서 평균 꽃 수를 찾는 문제를 다음과 같이 해결할 수 있습니다.

Listing 13. 튜플


val flowersandpickers = flowers.aggregate((0,0)) (
        (acc, value) => (acc._1 + value, acc_.2 +1),
         (acc1, acc2) => acc1._1 + acc2._1, acc1._2 + acc2._2)
)

이 예에서는 reduce집계 내에 적용된 함수가 교환 가능하고 연관성이 있어야합니다. 시퀀싱 또는 결합 작업에 대한 실행 순서가 없어야합니다. 초기 값은 튜플 또는 쌍을 나타내는 두 개의 0 입니다. 첫 번째 0은 선택한 꽃의 총 수에 대한 초기 값입니다 (바구니에 0 개의 꽃으로 시작하기 때문에). 두 번째 0은 피커 당 고른 꽃의 평균 합을 찾기 위해 사용하는 초기 값입니다 (왜냐하면 당신이 선택한 꽃이 0부터 시작되기 때문입니다). 시퀀스 내 집합은 각 사분면의 꽃 수를 더합니다. 동시에, 바구니 당 하나의 꽃 따기를 추가했음을 나타내는 1을 더합니다. 파티션 간 조합 기능은 각 사분면에서 꽃 수와 꽃 따기 수를 더합니다. 평균을 찾으려면 다음을 작성하십시오.

Listing 14. 튜플을 사용하여 평균화하기


val avg = flowersandpickers._1/ flowersandpickers._2.toDouble

비교를 위해 다음은 Scala 대신 Python으로 동등한 코드를 작성하는 방법입니다.

Listing 15. 파이썬으로 집계하기


 flowersandpickers = sc.parallelize([11,12,13,24,25, 26, 35,36,37, 24,15,16],4).aggregate(
  (0, 0),
  (lambda acc, value: (acc[0] + value, acc[1] + 1)),
  (lambda acc1, acc2: (acc1[0] + acc2[0], acc1[1] + acc2[1])))
  avg = flowersandpickers[0]/ float(flowersandpickers[1])

결론적으로

3 노드 스파크 클러스터는 약 14 배의 속도로 MapReduce를 능가하며, DAG 실행 패턴은 향후 반복을 위해 RDD를 재사용 할 수있게합니다. Spark 솔루션을 Java로 프로그래밍 할 수 있지만 스칼라 코드는 훨씬 효율적입니다. 누적 기와 튜플을 포함한 전체 프로그램은 단지 10 줄의 코드 일뿐입니다.

728x90
반응형