Chevron Left

查看所有社区故事

数据驱动的管道作为竞争优势

2021 年 11 月 9 日

作者

JP Urrutia

KPMG

photo of JP Urrutia

JP 是毕马威的数据工程师,该公司以在所有行业提供审计、税务和咨询专业服务而闻名,是四大会计组织之一。在业余时间,JP 会收藏黑胶唱片,烧烤和烹饪,阅读,创作,学习,并进行一些积极的活动。您可以在 领英推特 上找到 JP。

作为一名数据工程师,我一直致力于开发数据驱动的应用程序,为数据科学和商业智能提供动力。我希望构建能够增加价值并提供竞争优势的产品和服务,因为数据正成为智能决策的来源。

我了解到准确和实时的数据是关键驱动力。

我也看到过一些组织缺乏数据,对绩效、行业等没有真正的理解。他们倾向于基于不准确的或凭感觉做决策。

与此同时,那些利用数据的公司正在更好地了解他们的市场、业务和竞争对手地位。这种信息能够建立信心。它是一种竞争优势。

数据管道是如何工作的?

那么,一个组织如何才能使其数据保持最新并争取这一优势呢?

数据管道……分析成功的基石。

通常,数据驱动型公司会聘请数据工程师/架构师在其基础设施中实施作为数据管道的提取-转换-加载(ETL)工具。

但他们具体是如何做到的呢?

构建数据管道的步骤是什么?

数据管道将包括从源中提取数据、数据预处理、验证和数据目的地等几个步骤。我们来看一个简单的例子。

在这个用例中,我正在从一个黑胶唱片市场抓取数据,并希望对这些商品的价格进行分析。

ETL infographic

构建数据管道的过程

这里的第一步是访问 API 或数据库,并使用我编写的 Python 脚本提取我们正在寻找的数据。

这太棒了,因为我现在有数据了!

问题是它不一定是我想要的格式,所以我需要进行一些转换才能获得所需的这个数据集。

完成之后,我就可以将它加载到像 AWS S3 或 Azure Blob 这样的暂存区域进行数据存储。我称之为暂存区域,因为我想将它作为通往最终目的地的跳板。

为了确保可靠性,我将构建一些测试、警报和备份计划,以防出现问题或花费太长时间。最后,我的数据仓库将识别暂存区域中的新记录,并摄取新数据,为组织内的分析师和数据科学家维护一个新鲜的数据集。

砰!现在我的报告和机器学习模型都连接到这个最终数据源,数据以我选择的速度流入!管道已就位,已部署,我不再需要手动操作或摄取数据(希望如此)。

这只是一个数据源,但现在我可以寻找其他源,看看是否可以通过其他管道引入外部源,以增强我的数据并继续利用数据管道的竞争优势。请查看下面的代码,更深入地了解 ETL 代码。

import psycopg2
import csv
import boto3
import configparser
import os
import pandas as pd
from bs4 import BeautifulSoup
import requests
from time import time
from datetime import datetime

# config credentials from env
access_key_id = os.environ.get('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
bucket_name = 'discog-data'

# scrape data
startTime = time()

url = '...'
page = requests.get(url)

soup = BeautifulSoup(page.content, 'html.parser')

results = soup.find(id="pjax_container")

record_elements = results.find_all("tr", class_="shortcut_navigable")

item_list = []
price_list = []
sellers_list = []
total_price_list = []
link_list = []

for record_element in record_elements:

    item_description = (record_element.find("a", class_="item_description_title"))
    item_list.append(item_description.text)
    price = (record_element.find("span", class_="price"))
    price_list.append(price.text)
    seller = record_element.find(lambda tag: tag.name == 'a' and tag.get('href') and tag.text and '/seller/' in tag.get('href'))
    sellers_list.append(seller)
    total_price = (record_element.find("span", class_="converted_price"))

    total_price_list.append(total_price)

    record_info = record_element.find(lambda tag: tag.name == 'a' and tag.get('href') and '/sell/' in tag.get('href'))
    link = record_element.find("a", class_="item_description_title", href=True)
    link_list.append(link['href'])

# create cols from item description
artists = [str(item).split('-')[0].rstrip() for item in item_list]
albums = [str(item).split('-')[1].lstrip() for item in item_list]
album_class = [str(item[item.find("(")+1:item.find(")")]) for item in item_list]

total_price_list = [item.text if item is not None else '0' for item in total_price_list]

my_dict = {'item_description': item_list,
           'artists': artists,
           'album': albums,
           'album_class': album_class,
           'seller': sellers_list,
           'price': price_list,
           'total price': total_price_list}

filename = f'discogs_market_data_{datetime.now().strftime("%Y%m%d-%H%M")}.csv'
df = pd.DataFrame(my_dict)
df.to_csv(filename, index=False)

# load file to S3
s3 = boto3.client('s3',
                  aws_access_key_id=access_key_id,
                  aws_secret_access_key=aws_secret_access_key)
s3_file = filename
s3.upload_file(filename, bucket_name, s3_file)

print(f' execution time: {(time() - startTime)}')

作者

JP Urrutia

KPMG

photo of JP Urrutia

JP 是毕马威的数据工程师,该公司以在所有行业提供审计、税务和咨询专业服务而闻名,是四大会计组织之一。在业余时间,JP 会收藏黑胶唱片,烧烤和烹饪,阅读,创作,学习,并进行一些积极的活动。您可以在 领英推特 上找到 JP。

您可能也喜欢

这是我从创建数据管道中学到的

Srivamsi Sakirepalli

Lendingkart

将所有数据集中到一处的三个常用方法

Ali Baghshomali

Mentat Analytics

撰写社区故事的技巧

Metabot

Metabase

预测下一次点击

Ukrit Wattanavaekin

Metabase

您可能也喜欢

这是我从创建数据管道中学到的

Srivamsi Sakirepalli

Lendingkart

将所有数据集中到一处的三个常用方法

Ali Baghshomali

Mentat Analytics

撰写社区故事的技巧

Metabot

Metabase

预测下一次点击

Ukrit Wattanavaekin

Metabase

© . This site is unofficial and not affiliated with Metabase, Inc.