데이터 분석

[PySpark] 숙박업 분석 - (1) 데이터 수집 및 전처리

너굴맨_ 2022. 3. 22. 15:56

 

* 목표

Local Data 사이트에서 제공하는 숙박업 데이터를 다운 받아 데이터 분석 프레임 워크인 PySpark를 활용하여 데이터 분석을 진행하자

* 데이터 수집

분석 과정에 쓰일 데이터는 LocalDATA에서 제공하는 숙박업종 정보 데이터로 아래 사이트에서 데이터를 다운 받을 수 있습니다.

 

https://www.localdata.go.kr/devcenter/dataDown.do?menuNo=20001 

 

LOCALDATA - 지방행정인허가데이터개방:데이터다운로드

전체 데이터다운로드 전체 인허가 데이터에 대한 전체분,월 변동분, 일 변동분에 대한 자료를 제공받을 수 있습니다. 전체 다운로드 선택 업종다운로드 36개 그룹, 190개 업종에 대한 데이터를 업

www.localdata.go.kr

 


 

* 진행 과정

 

1. Colab에 Pyspark 설치
2. SparkSession 생성하기
3. 파일 읽기
4. 데이터 전처리
5. 데이터 parquet으로 저장하기

 

 

1. Colab 환경에서 PySpark 설치하기

!apt-get install openjdk-8-jdk-headless
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar -xvf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"
import findspark
findspark.init()

 

2. SparkSession 객체 생성

PySpark를 사용하려면 제일 먼저 SparkSession 객체를 생성해야 한다.

 

# 필요한 라이브러리
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import os
import pandas as pd

os.chdir("C:\\Users\\USER\\Desktop\\Dataset\\lodge_industry")

# SparkSession 객체 생성
spark = SparkSession\
        .builder\
        .appName('Python Spark SQL lodge industry')\
        .config('spark.some.config.option', 'some-value')\
        .getOrCreate()

 

- 3. csv 파일 읽기

위 사이트에서 제공 받은 데이터를 pyspark에 불러오기

#파일 읽기
lodge_df1 = spark.read.csv("fulldata_03_11_01_P_관광숙박업.csv", encoding='cp949',inferSchema = True, header = True)
lodge_df2 = spark.read.csv("fulldata_03_11_02_P_관광펜션업.csv", encoding='cp949',inferSchema = True, header = True)
lodge_df3 = spark.read.csv("fulldata_03_11_03_P_숙박업.csv", encoding='cp949',inferSchema = True, header = True)
lodge_df4 = spark.read.csv("fulldata_03_11_04_P_외국인관광도시민박업.csv", encoding='cp949',inferSchema = True, header = True)
lodge_df5 = spark.read.csv("fulldata_03_11_05_P_자동차야영장업.csv", encoding='cp949',inferSchema = True, header = True)
lodge_df6 = spark.read.csv("fulldata_03_11_06_P_한옥체험업.csv", encoding='cp949',inferSchema = True, header = True)
lodge_df7 = spark.read.csv("fulldata_03_11_07_P_일반야영장업.csv", encoding='cp949',inferSchema = True, header = True)

 

- Schema 구조 확인

# printSchema()로 스키마 구조 확인
lodge_df1.printSchema()​

- 데이터 확인하기

스키마 구조를 살펴봤으니 안에 있는 내용을 확인하자.

# lodge_df 데이터 확인
lodge_df1.show(5)

 

4. 데이터 전처리하기

- 필요한 칼럼 선택하기 ( select )

분석 목적에 부합하는 칼럼만 선택

 

<선택된 칼럼들>

번호,개방자치단체코드,사업자명,인허가일자,상세영업상태명,폐업일자,시설면적,데이터갱신일자,건물지상층수,건물소유구분명,

 

# 위의 칼럼만 담은 새로운 DataFrame 생성 -- Select
lodge_df1 = lodge_df1.select(
    col('번호'),col('개방서비스명'),col('사업장명'), col('개방자치단체코드'), col('상세영업상태명'),col('인허가일자'),
    col('폐업일자'),col('데이터갱신일자') ,col('시설면적')
)

lodge_df2 = lodge_df2.select(
    col('번호'),col('개방서비스명'),col('사업장명'), col('개방자치단체코드'), col('상세영업상태명'),col('인허가일자'),
    col('폐업일자'),col('데이터갱신일자') ,col('시설면적')
)

lodge_df3 = lodge_df3.select(
    col('번호'),col('개방서비스명'),col('사업장명'), col('개방자치단체코드'), col('상세영업상태명'),col('인허가일자'),
    col('폐업일자'),col('데이터갱신일자') ,col('소재지면적')
)

lodge_df4 = lodge_df4.select(
    col('번호'),col('개방서비스명'),col('사업장명'), col('개방자치단체코드'), col('상세영업상태명'),col('인허가일자'),
    col('폐업일자'),col('데이터갱신일자') ,col('시설면적')
)

lodge_df5 = lodge_df5.select(
    col('번호'),col('개방서비스명'),col('사업장명'), col('개방자치단체코드'), col('상세영업상태명'),col('인허가일자'),
    col('폐업일자'),col('데이터갱신일자') ,col('시설면적')
)

lodge_df6 = lodge_df6.select(
    col('번호'),col('개방서비스명'),col('사업장명'), col('개방자치단체코드'), col('상세영업상태명'),col('인허가일자'),
    col('폐업일자'),col('데이터갱신일자') ,col('시설면적')
)

lodge_df7 = lodge_df7.select(
    col('번호'),col('개방서비스명'),col('사업장명'), col('개방자치단체코드'), col('상세영업상태명'),col('인허가일자'),
    col('폐업일자'),col('데이터갱신일자') ,col('시설면적')
# df3 번만 시설면적이 아닌 소재지면적으로 표기되므로 변경
# 칼럼명 변경 및 기존 칼럼 삭제
lodge_df3 = lodge_df3.withColumn("시설면적", col('소재지면적'))
lodge_df3 = lodge_df3.drop("소재지면적")

- 하나의 데이터 프레임으로 합치기

# 하나의 df로 합치기. (unionAll)
lodge = lodge_df1.unionAll(lodge_df2)
lodge = lodge.unionAll(lodge_df3)
lodge = lodge.unionAll(lodge_df4)
lodge = lodge.unionAll(lodge_df5)
lodge = lodge.unionAll(lodge_df6)
lodge = lodge.unionAll(lodge_df7)

lodge.show()

- 결측치 확인 및 값 대체하기

데이터 속 Null 값을 먼저 확인하자.

# 결측치 확인
lodge.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in lodge.columns]).show()

Null 값이 있는 칼럼은 아래와 같이 처리


* 개방서비스명 - '숙박업'으로 대체 (제일 많은 비율을 차지)

* 사업자명 - '알수없음'으로 대체

* 개방자치단체코드 - '999'로 대체. 추후 알수없음으로 교체

* 상세영업상태명 - '영업중'으로 대체

* 인허가일자 - 데이터의 평균을 구한 뒤 근접한 날짜로 대체

* 폐업일자 - 데이터 기준 날짜로 변경 (2022.03.11)

* 시설면적 - '0'으로 대체. 추후 알수없음으로 교체

* 갱신일자 - 데이터 기준 날짜로 대체 (2022-03)

 

lodge.select(avg("인허가일자")).show()

lodge = lodge.na.fill('숙박업', subset=["개방서비스명"])
lodge = lodge.na.fill('알수없음', subset=["사업장명"])
lodge = lodge.na.fill(999, subset=["개방자치단체코드"])
lodge = lodge.na.fill('영업중', subset=["상세영업상태명"])
lodge = lodge.na.fill(19991231, subset=["인허가일자"])
lodge = lodge.na.fill(20220311, subset=["폐업일자"])
lodge = lodge.na.fill('0', subset=["시설면적"])
# 갱신일자
lodge.withColumn("갱신일자", lpad(col("데이터갱신일자"),7,""))
lodge = lodge.withColumn("갱신일자", lpad(col("데이터갱신일자"),7,""))

# 칼럼 삭제하기
lodge = lodge.drop("데이터갱신일자")

# na 값 대체하기
lodge = lodge.na.fill('2022-03', subset=["갱신일자"])
lodge = lodge.withColumn('면적', (regexp_replace('시설면적','[\s,]','')))
lodge = lodge.drop('시설면적')
lodge.show(50)

마지막으로 한번 더 결측값이 있는지 확인

lodge.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in lodge.columns]).show()

- 5. 데이터 Parquet으로 저장하기

Parquet 형식으로 위 데이터프레임을 저장하자.

 

# Parquet로 저장하기
lodge.write.parquet("lodge.parquet")
spark.stop()

만약에 위 코드를 실행 시 아래와 같은 오류가 발생한다면 Hadoop을 설치한 뒤 다시 진행을 해야한다.

Hadoop의 설치 방법은 아래의 사이트에서 자세히 알려주기 때문에 아래의 블로그 참조

https://codedragon.tistory.com/9582

 

Hadoop - install for windows (설치 및 설정하기)

Hadoop - install for windows 하둡 설치파일 압축해제 환경변수 추가하기 정상 설치 확인하기 HDFS configurations YARN configurations Initialize environment variables Format file system 설정 Start HDF..

codedragon.tistory.com

 

위 코드가 제대로 실행되면 경로에 아래와 같은 폴더가 생성된 것을 확인할 수 있다.

 

Reference

https://www.analyticsvidhya.com/blog/2020/11/a-must-read-guide-on-how-to-work-with-pyspark-on-google-colab-for-data-scientists/

 

PySpark Google Colab | Working With PySpark in Colab

Understand the integration of PySpark in google colab. Learn to work with PySpark dataframes on Google Colab to accomplish tasks.

www.analyticsvidhya.com

https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html

 

Spark SQL — PySpark 3.2.1 documentation

Computes the first argument into a string from a binary using the provided character set (one of ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’).

spark.apache.org

https://minsw.github.io/2021/02/02/Spark-The-Definitive-Guide-6%EC%9E%A5/

 

'Spark The Definitive Guide' 6장 - 데이터 타입 (비)공식 가이드북 · Look out

라떼 시절엔,, 가이드북이 하나면 든-든했다,,, 이말이야,,, 총총 @}---- '아파치 스파크' 미인증 비공식 가이드 북 [전원 증정] 50.00 페이지 포인트 (캐시 아이템 구매 가능) _ _ _ CHAPTER 6 다양한 데이

minsw.github.io

위 데이터 분석 과정에 대한 전반적인 내용은 아래 github 사이트에서 확인할 수 있습니다.

https://github.com/Han-Archives/Colab/blob/main/%5BPyspark%5D_%EC%88%99%EB%B0%95%EC%97%85_%EB%8D%B0%EC%9D%B4%ED%84%B0_%EB%B6%84%EC%84%9D.ipynb

 

GitHub - Han-Archives/Colab

Contribute to Han-Archives/Colab development by creating an account on GitHub.

github.com