Chapter 5 - 구조적 API 기본 연산
포스트
취소

Chapter 5 - 구조적 API 기본 연산

Chapter 5 - 구조적 API 기본 연산

Spark DataType - Python DataType

스파크 데이터 타입파이썬 데이터 타입데이터 타입 생성/접근용 API
ByteTypeInt, long / (-2^8 ~ 2^8-1)ByteType()
ShortTypeInt, long / (-2^16 ~ 2^16-1)ShortType()
IntegerTypeInt, longIntegerType()
LongTypeLong / (-2^63 ~ 2^64-1)LongType()
FloatTypefloatFloatType()
DoubleTypefloatDoubleType()
DecimalTypeDecimalDecimalType()
StringTypestringStringType()
BinaryTypebytearrayBinaryType()
BooleanTypeboolBooleanType()
TimestampTypedatetimeTimestampType()
DateTypeDatetime, dateDateType()
ArrayTypeList, tuple, arrayArrayType()
MapTypedictMapType()
StructTypeList, tupleStructType()
StructField StructField()

표현식

여러 칼럼을 입력 받아 식별하여 각 레코드에 적용하는 함수로, expr(“someCol”), col(“somCol”)로 표현한다.

1
2
3
from pyspark.sql.functions import expr

expr("((((someCol) + 5) * 200) - 6) < otherCol")

DataFrame 컬럼에 접근하기

1
2
3
4
from pyspark.sql import SparkSession
spark = SparkSession.builder(master='local') ## python spark session 생성

spark.read.format("dataFormat").load("$dataPath").columns

SQL 을 위한 임시뷰로 등록하기

1
2
df = spark.read.format("dataFormat").load("$dataPath")
df.createOrReplaceTempView("dfTable") ## 임시 뷰 생성

select와 selectExpr

Python Code :

1
df.select("DEST_COUTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

SQL Code :

1
2
3
4
SELECT  DEST_COUNTRY_NAME
       ,ORIGIN_COUNTRY_NAME
  FROM  dfTable
 LIMIT  2

Python Code :

1
2
3
df.select(expr("DEST_COUTRY_NAME AS destination")).show(2)
df.select(("DEST_COUNTRY_NAME").alias(destination)).show(2)
df.selectExpr("DEST_COUTRY_NAME", "destination").show(2)

SQL Code :

1
2
3
SELECT DEST_COUNTRY_NAME AS destination
  FROM dfTable
 LIMIT 2;

Python Code :

1
2
3
4
df.selectExpr(
   "*",
   "(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry"
).show(2)

SQL Code :

1
2
3
4
SELECT *
      ,(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry
 FROM  dfTable
LIMIT  2

컬럼 추가하기

Python Code:

1
2
from pyspark.sql.functions import lit
df.withColumn("numberOne", lit(1)).show(2)

SQL Code:

1
2
3
SELECT *, 1 as numberOne
  FROM dfTable
 LIMIT 2

Python Code:

1
df.withColumn("withinCountry", expr("DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME")).show(2)
1
2
3
4
SELECT *
      ,(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry
 FROM  dfTable
LIMIT  2

컬럼명 변경하기

1
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest")

컬럼 제거하기

1
2
df.drop("ORIGIN_COUNTRY_NAME")
df.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME")

컬럼 데이터 타입 변경하기

1
2
from pyspark.sql.functions import col
df.withColumn("count2", col("count").cast("string"))

레코드 필터링

Python Code :

1
2
3
from pyspark.sql.functions import col
df.filter(col("count") < 2).show(2)
df.where("count < 2").show(2)

SQL Code :

1
2
3
4
SELECT *
  FROM dfTable
 WHERE count < 2
 LIMIT 2

Python Code :

1
2
from pyspark.sql.functions import col
df.where("count < 2").where(col("ORIGIN_COUNTRY_NAME =!= Croatia").show(2)

SQL Code:

1
2
3
4
5
SELECT *
  FROM dfTable
 WHERE count < 2
   AND ORIGIN_COUNTRY_NAME <> Croatia
 LIMIT 2

Distinct

Python Code :

1
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()

SQL Code :

1
2
SELECT COUNT(DISTINCT(ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME))
  FROM dfTable

무작위 샘플 만들기

1
2
3
4
5
seed = 5
withReplacement = False
fraction = 0.5

df.sample(withReplacement, fraction, seed).count()

임의 분할하기

1
2
seed = 5
train_set, test_set = df.randomSplit([0.75, 0.25], seed)

로우 추가하기

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from pyspark.sql import Row

schema = df.schma
newRows = [
    Row("New Country", "Other Country", 5L),
    Row("New Country2", "Other Country2", 1L),
]
parallelizedRows = spark.spakrContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)

df.union(newDF)\
  .where("count = 1")\
  .where("ORIGIN_COUNTRY_NAME =!= United States")\
  .show()

레코드 정렬하기

1
2
3
4
from pyspark.sql.functions import asc, desc, col
df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").shw(5)
df.orderBy(col("count").asc(), col("DEST_COUNTRY_NAME").desc()).show(5)

로우수 제한하기

1
2
from pyspark.sql.functions import expr
df.orderBy(expr("count desc")).limit(6).show()



Reference

스파크 완벽 가이드

이 기사는 저작권자의 CC BY 4.0 라이센스를 따릅니다.