Chapter 7 - 집계 연산
포스트
취소

Chapter 7 - 집계 연산

Chapter 7 - 집계 연산

집계 함수

함수내용
count로우 수 카운트
countDistinct고유 레코드 수
approx_count_distinct근사치 고유 레코드 수 반환(오차있음)
first첫번째 값 반환
last마지막 값 반환
min최소값
max최대값
sum칼럼의 레코드 합산
sumDistinct고윳값 합산
avg, mean평균
var_pop, stddev_pop분산, 표준편차
skewness비대칭도
kurtosis첨도
corr피어슨 상관계수
covar_pop표본 공분산
covar_samp모공분산
collect_set칼럼 내 고유 데이터 모음
collect_list칼럼 내 데이터 리스트 모음
agg복합 데이터 집계

그룹화

1
df.groupBy("InvoiceNo", "CustomerId").count().show()

agg를 통해 여러 집계 처리를 한 번에 지정할 수 있다.

1
2
3
4
5
6
from pyspark.sql.functions import count, expr

df.groupBy("InvoiceNo").agg(
   count("Quantity").alias("quan"),
   expr("count(Quantity)")
).show()

칼럼을 키로, 수행할 집계 함수의 문자열을 값으로 하는 맵 타입을 사용해 정의할 수 있다.

1
2
3
from pyspark.sql.functions import expr
df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"),expr("stddev_pop(Quantity)"))\
  .show()

윈도우 함수

윈도우 함수를 집계에 사용할 수 있다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
from pyspark.sql.window import Window
from pyspark.sql.functions import desc

windowSpec = Window\
    .partitionBy("CustomerId", "date")\
    .orderBy(desc("Quantity"))\
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

## 윈도우 별 최대 구매 개수
from pyspark.sql.functions import max, col

maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)

## 모든 고객에 대한 최대 구매 수량을 가진 날짜 산출
from pyspark.sql.functions import dense_rank, rank

purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)

## 윈도우 값 확인
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")\
  .select(
    col("CustomerId"),
    col("date"),
    col("Quantity"),
    purchaseRank.alias("quantityRank"),
    purchaseDenseRank.alias("quantityDenseRank"),
    maxPurchaseQuantity.alias("maxPurchaseQuantity")
  ).show()

롤업

롤업은 group by 스타일의 다양한 연산을 수행할 수 있는 다차원 집계 기능이다.

1
2
3
4
5
6
7
rolledUpDf = dfNoNull.rollup("Date", "Country").agg(sum("Quantiyu"))\
    .selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")\
    .orderBy("Date")
rolledUpDf.show()

rolledUpDf.where("Country IS NULL").show()
rolledUpDf.where("Date IS NULL").show()

큐브는 롤업을 고차원적으로 사용할 수 있게 한다.
큐브는 요소들을 계층적으로 다루는 대신 모든 차원에 대해 동일한 작업을 수행한다.
즉, 전체 기간에 대해 날짜와 국가별 결과를 얻을 수 있다.

1
2
3
4
from pyspark.sql.functions import sum

dfNoNull.cube("Date", "Country").agg(sum(col("Quantity")))\
  .select("Date", "Country", "sum(Quantity)").orderBy("Date").show()

큐브와 롤업을 사용하다 보면 집계 수준에 따라 쉽게 필터링하기 위해 집계수준을 조회하는 경우가 필요하다. 이때, grouping_id를 사용한다.

피벗을 사용해 로우를 칼럼으로 변환할 수 있다.

1
2
3
pivoted = dfWithDate.groupBy("date").pivot("Country").sum()

pivoted.where("date > '2011-12-05'").select("date", "`USA_sum(Quantity)`").show()



Reference

스파크 완벽 가이드

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.