-
Notifications
You must be signed in to change notification settings - Fork 0
/
slack_pipeline.py
79 lines (56 loc) · 2.07 KB
/
slack_pipeline.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
"""Pipeline to load slack into duckdb."""
from typing import List
import dlt
from pendulum import datetime
from slack import slack_source
def load_all_resources(start_date) -> None:
"""Load all resources from slack without any selection of channels."""
pipeline = dlt.pipeline(
pipeline_name="slack", destination='bigquery', dataset_name="slack_community_backup"
)
source = slack_source(
page_size=1000, start_date=start_date,
)
source.root_key = True
# Uncomment the following line to load only the access_logs resource. It is not selectes
# by default because it is a resource just available on paid accounts.
# source.access_logs.selected = True
load_info = pipeline.run(
source,
)
print(load_info)
def select_resource(selected_channels: List[str]) -> None:
"""Execute a pipeline that will load the given Slack list of channels with the selected
channels incrementally beginning at the given start date."""
pipeline = dlt.pipeline(
pipeline_name="slack", destination='duckdb', dataset_name="slack_data"
)
source = slack_source(
page_size=20,
selected_channels=selected_channels,
start_date=datetime(2023, 9, 1),
end_date=datetime(2023, 9, 8),
).with_resources("channels", "1-announcements")
load_info = pipeline.run(
source,
)
print(load_info)
def get_users() -> None:
"""Execute a pipeline that will load Slack users list."""
pipeline = dlt.pipeline(
pipeline_name="slack", destination='duckdb', dataset_name="slack_data"
)
source = slack_source(
page_size=20,
).with_resources("users")
load_info = pipeline.run(
source,
)
print(load_info)
if __name__ == "__main__":
# Add your desired resources to the list...
# resources = ["access_logs", "conversations", "conversations_history"]
load_all_resources(start_date=datetime(2000, 1, 1))
# select_resource(selected_channels=["dlt-github-ci"])
# select_resource(selected_channels=["1-announcements"])
# get_users()