Chevron Left

查看所有社区故事

数据驱动管道作为竞争优势

2021年11月9日

贡献者

JP Urrutia

KPMG

photo of JP Urrutia

JP是KPMG的数据工程师,KPMG是一家在所有行业提供审计、税务和咨询专业服务的公司,也是四大会计师事务所之一。在业余时间,你可以找到JP在他的黑胶唱片收藏、烧烤和烹饪、阅读、创作、学习和做一些积极的事情。你可以在领英推特上找到JP。

作为一个数据工程师,我始终想要开发数据驱动的应用程序,以推动数据科学和商业智能。我想构建具有价值并提供竞争优势的产品和服务,因为数据正成为智能决策的来源。

我了解到准确和实时的数据是关键驱动力。

我也见过一些组织缺失数据,并且对性能、行业等没有真正的理解。他们往往基于不准确的信息或者他们感觉正确的事情来做决定。

与此同时,那些利用数据的公司正在更好地理解他们的市场、业务和竞争对手的位置。这种类型的信息增强了信心。这是一项竞争优势。

数据管道是如何工作的?

那么,一个组织如何保持其数据最新并努力实现这一优势呢?

数据管道……是数据分析成功的基础。

一般来说,数据驱动型公司会雇佣数据工程师/架构师,在他们的基础设施中实施数据管道,即提取-转换-加载(ETL)工具。

但他们具体是如何做的呢?

构建数据管道的步骤有哪些?

数据管道将包括几个步骤,包括从源提取数据、数据预处理、验证和数据的目的地。让我们来看一个简单的例子。

在这个用例中,我从一家黑胶唱片市场抓取数据,并想对这些商品的价格进行分析。

ETL infographic

构建数据管道的过程

这里的第一个步骤是通过我制作的Python脚本来调用API或数据库,并提取所需的数据。

太棒了,因为我现在有数据了!

问题是它不是我所需要的格式,所以我需要进行一些转换以获得所需的dataset。

完成这些后,我可以将其加载到一个临时区域,如AWS S3或Azure Blob进行数据存储。我称之为临时区域,因为我希望将其作为最终目的地的垫脚石。

为了确保其可靠性,我想要建立一些测试、警报和备份计划,以防出现错误或耗时过长的情况。最后,我的数据仓库将识别临时区域中的新记录,并摄取新数据,以保持组织内分析师和数据科学家的新鲜dataset。

砰!现在,我的报告和机器学习模型连接到了这个最终数据源,它以我所选择的速度提供数据!管道已经到位,部署完成,我不再需要手动接触它们或摄取数据(希望如此)。

这只是一个数据源,但现在我可以寻找其他来源,看看是否可以通过其他管道引入外部数据源,以增强我的数据并继续构建数据管道的竞争优势。查看下面的代码,以深入了解ETL代码。

import psycopg2
import csv
import boto3
import configparser
import os
import pandas as pd
from bs4 import BeautifulSoup
import requests
from time import time
from datetime import datetime

# config credentials from env
access_key_id = os.environ.get('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
bucket_name = 'discog-data'

# scrape data
startTime = time()

url = '...'
page = requests.get(url)

soup = BeautifulSoup(page.content, 'html.parser')

results = soup.find(id="pjax_container")

record_elements = results.find_all("tr", class_="shortcut_navigable")

item_list = []
price_list = []
sellers_list = []
total_price_list = []
link_list = []

for record_element in record_elements:

    item_description = (record_element.find("a", class_="item_description_title"))
    item_list.append(item_description.text)
    price = (record_element.find("span", class_="price"))
    price_list.append(price.text)
    seller = record_element.find(lambda tag: tag.name == 'a' and tag.get('href') and tag.text and '/seller/' in tag.get('href'))
    sellers_list.append(seller)
    total_price = (record_element.find("span", class_="converted_price"))

    total_price_list.append(total_price)

    record_info = record_element.find(lambda tag: tag.name == 'a' and tag.get('href') and '/sell/' in tag.get('href'))
    link = record_element.find("a", class_="item_description_title", href=True)
    link_list.append(link['href'])

# create cols from item description
artists = [str(item).split('-')[0].rstrip() for item in item_list]
albums = [str(item).split('-')[1].lstrip() for item in item_list]
album_class = [str(item[item.find("(")+1:item.find(")")]) for item in item_list]

total_price_list = [item.text if item is not None else '0' for item in total_price_list]

my_dict = {'item_description': item_list,
           'artists': artists,
           'album': albums,
           'album_class': album_class,
           'seller': sellers_list,
           'price': price_list,
           'total price': total_price_list}

filename = f'discogs_market_data_{datetime.now().strftime("%Y%m%d-%H%M")}.csv'
df = pd.DataFrame(my_dict)
df.to_csv(filename, index=False)

# load file to S3
s3 = boto3.client('s3',
                  aws_access_key_id=access_key_id,
                  aws_secret_access_key=aws_secret_access_key)
s3_file = filename
s3.upload_file(filename, bucket_name, s3_file)

print(f' execution time: {(time() - startTime)}')

贡献者

JP Urrutia

KPMG

photo of JP Urrutia

JP是KPMG的数据工程师,KPMG是一家在所有行业提供审计、税务和咨询专业服务的公司,也是四大会计师事务所之一。在业余时间,你可以找到JP在他的黑胶唱片收藏、烧烤和烹饪、阅读、创作、学习和做一些积极的事情。你可以在领英推特上找到JP。

你可能还喜欢

从创建数据管道中学到的东西

Srivamsi Sakirepalli

Lendingkart

将所有数据集中在一起的三种常见方法

Ali Baghshomali

Mentat Analytics

撰写社区故事的技巧

Metabot

Metabase

预测下一次点击

Ukrit Wattanavaekin

Metabase

你可能还喜欢

从创建数据管道中学到的东西

Srivamsi Sakirepalli

Lendingkart

将所有数据集中在一起的三种常见方法

Ali Baghshomali

Mentat Analytics

撰写社区故事的技巧

Metabot

Metabase

预测下一次点击

Ukrit Wattanavaekin

Metabase