-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcoffea-test.py
165 lines (124 loc) · 5.19 KB
/
coffea-test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import uproot
from coffea.nanoevents import NanoEventsFactory, BaseSchema
import awkward as ak
from coffea import hist, processor
# https://github.com/scikit-hep/uproot4/issues/122
uproot.open.defaults["xrootd_handler"] = uproot.source.xrootd.MultithreadedXRootDSource
# register our candidate behaviors
from coffea.nanoevents.methods import candidate
ak.behavior.update(candidate.behavior)
class MyProcessor(processor.ProcessorABC):
def __init__(self):
self._accumulator = processor.dict_accumulator({
"sumw": processor.defaultdict_accumulator(float),
"mass": hist.Hist(
"Events",
hist.Cat("dataset", "Dataset"),
hist.Bin("mass", "$m_{\mu\mu}$ [GeV]", 60, 60, 120),
),
})
@property
def accumulator(self):
return self._accumulator
def process(self, events):
# Note: This is required to ensure that behaviors are registered
# when running this code in a remote task.
ak.behavior.update(candidate.behavior)
output = self.accumulator.identity()
dataset = events.metadata['dataset']
muons = ak.zip({
"pt": events.Muon_pt,
"eta": events.Muon_eta,
"phi": events.Muon_phi,
"mass": events.Muon_mass,
"charge": events.Muon_charge,
}, with_name="PtEtaPhiMCandidate")
cut = (ak.num(muons) == 2) & (ak.sum(muons.charge) == 0)
# add first and second muon in every event together
dimuon = muons[cut][:, 0] + muons[cut][:, 1]
output["sumw"][dataset] += len(events)
output["mass"].fill(
dataset=dataset,
mass=dimuon.mass,
)
return output
def postprocess(self, accumulator):
return accumulator
###############################################################
# Collect and display setup info.
###############################################################
print("------------------------------------------------")
print("Example Coffea Analysis with Work Queue Executor")
print("------------------------------------------------")
import shutil
import getpass
import os.path
wq_env_tarball="coffea-env.tar.gz"
wq_wrapper_path=shutil.which('python_package_run')
wq_master_name="coffea-wq-{}".format(getpass.getuser())
print("Master Name: -N "+wq_master_name)
print("Environment: "+wq_env_tarball)
print("Wrapper Path: "+wq_wrapper_path)
print("------------------------------------------------")
###############################################################
# Sample data sources come from CERN opendata.
###############################################################
fileset = {
'DoubleMuon': [
'root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012B_DoubleMuParked.root',
'root://eospublic.cern.ch//eos/root-eos/cms_opendata_2012_nanoaod/Run2012C_DoubleMuParked.root',
],
}
###############################################################
# Configuration of the Work Queue Executor
###############################################################
work_queue_executor_args = {
# Options are common to all executors:
'compression': 1,
'schema' : BaseSchema,
'skipbadfiles': False, # Note that maxchunks only works if this is false.
# Options specific to Work Queue: resources to allocate per task.
'resources-mode' : 'auto', # Adapt task resources to what's observed.
'resource-monitor': True, # Measure actual resource consumption
# With resources set to auto, these are the max values for any task.
'cores': 2, # Cores needed per task
'disk': 2000, # Disk needed per task (MB)
'memory': 2000, # Memory needed per task (MB)
# Options to control how workers find this master.
'master-name': wq_master_name,
'port': 9123, # Port for manager to listen on: if zero, will choose automatically.
# Options to control how the environment is constructed.
# The named tarball will be transferred to each worker
# and activated using the wrapper script.
'environment-file': wq_env_tarball,
'wrapper' : wq_wrapper_path,
# Debugging: Display output of task if not empty.
'print-stdout': True,
# Debugging: Display notes about each task submitted/complete.
'verbose': False,
# Debugging: Produce a lot at the master side of things.
'debug-log' : 'coffea-wq.log',
}
###############################################################
# Run the analysis via run_uproot_job.
###############################################################
import time
tstart = time.time()
output = processor.run_uproot_job(
fileset,
treename='Events',
processor_instance=MyProcessor(),
executor=processor.work_queue_executor,
executor_args=work_queue_executor_args,
chunksize=100000,
# Change this to None for a large run:
maxchunks=4,
)
elapsed = time.time() - tstart
print(output)
# Expected output:
# {'sumw': defaultdict_accumulator(<class 'float'>, {'DoubleMuon': 400224.0}), 'mass': <Hist (dataset,mass) instance at 0x7f4e02708460>}
if output['sumw']['DoubleMuon']==400224.0:
print("Output is correct.")
else:
raise RuntimeError("Incorrect output value!")